From dfe3eac606eac7e8052d22764fb4f678eeba1cba Mon Sep 17 00:00:00 2001 From: ansuz Date: Mon, 15 May 2017 18:12:10 +0200 Subject: [PATCH] big rpc refactor: * support pinning files * include pinned files in usage calculation * safer session handling * fix bugs in limit update code --- rpc.js | 171 ++++++++++++++++++++++++++++++++------------------------- 1 file changed, 97 insertions(+), 74 deletions(-) diff --git a/rpc.js b/rpc.js index df9020e6a..bb9e27194 100644 --- a/rpc.js +++ b/rpc.js @@ -13,7 +13,7 @@ var RPC = module.exports; var Store = require("./storage/file"); -var DEFAULT_LIMIT = 100; +var DEFAULT_LIMIT = 50 * 1024 * 1024; var isValidId = function (chan) { return /^[a-fA-F0-9]/.test(chan) || @@ -74,12 +74,17 @@ var parseCookie = function (cookie) { return c; }; +var escapeKeyCharacters = function (key) { + return key.replace(/\//g, '-'); +}; + var beginSession = function (Sessions, key) { - if (Sessions[key]) { - Sessions[key].atime = +new Date(); - return Sessions[key]; + var safeKey = escapeKeyCharacters(key); + if (Sessions[safeKey]) { + Sessions[safeKey].atime = +new Date(); + return Sessions[safeKey]; } - var user = Sessions[key] = {}; + var user = Sessions[safeKey] = {}; user.atime = +new Date(); user.tokens = [ makeToken() @@ -107,7 +112,7 @@ var expireSessions = function (Sessions) { var addTokenForKey = function (Sessions, publicKey, token) { if (!Sessions[publicKey]) { throw new Error('undefined user'); } - var user = Sessions[publicKey]; + var user = beginSession(Sessions, publicKey); user.tokens.push(token); user.atime = +new Date(); if (user.tokens.length > 2) { user.tokens.shift(); } @@ -129,7 +134,7 @@ var isValidCookie = function (Sessions, publicKey, cookie) { return false; } - var user = Sessions[publicKey]; + var user = beginSession(Sessions, publicKey); if (!user) { return false; } var idx = user.tokens.indexOf(parsed.seq); @@ -266,15 +271,15 @@ var getUploadSize = function (paths, channel, cb) { }); }; -var getFileSize = function (paths, store, channel, cb) { +var getFileSize = function (paths, msgStore, channel, cb) { if (!isValidId(channel)) { return void cb('INVALID_CHAN'); } if (channel.length === 32) { - if (typeof(store.getChannelSize) !== 'function') { + if (typeof(msgStore.getChannelSize) !== 'function') { return cb('GET_CHANNEL_SIZE_UNSUPPORTED'); } - return void store.getChannelSize(channel, function (e, size) { + return void msgStore.getChannelSize(channel, function (e, size) { if (e) { return void cb(e.code); } cb(void 0, size); }); @@ -287,9 +292,9 @@ var getFileSize = function (paths, store, channel, cb) { }); }; -var getMultipleFileSize = function (paths, store, channels, cb) { +var getMultipleFileSize = function (paths, msgStore, channels, cb) { if (!Array.isArray(channels)) { return cb('INVALID_LIST'); } - if (typeof(store.getChannelSize) !== 'function') { + if (typeof(msgStore.getChannelSize) !== 'function') { return cb('GET_CHANNEL_SIZE_UNSUPPORTED'); } @@ -302,7 +307,7 @@ var getMultipleFileSize = function (paths, store, channels, cb) { }; channels.forEach(function (channel) { - getFileSize(paths, store, channel, function (e, size) { + getFileSize(paths, msgStore, channel, function (e, size) { if (e) { console.error(e); counts[channel] = -1; @@ -314,7 +319,7 @@ var getMultipleFileSize = function (paths, store, channels, cb) { }); }; -var getTotalSize = function (pinStore, messageStore, Sessions, publicKey, cb) { +var getTotalSize = function (paths, pinStore, msgStore, Sessions, publicKey, cb) { var bytes = 0; return void getChannelList(pinStore, Sessions, publicKey, function (channels) { @@ -324,7 +329,7 @@ var getTotalSize = function (pinStore, messageStore, Sessions, publicKey, cb) { if (!count) { cb(void 0, 0); } channels.forEach(function (channel) { - return messageStore.getChannelSize(channel, function (e, size) { + getFileSize(paths, msgStore, channel, function (e, size) { count--; if (!e) { bytes += size; } if (count === 0) { return cb(void 0, bytes); } @@ -353,18 +358,14 @@ var getHash = function (store, Sessions, publicKey, cb) { }); }; -/* var storeMessage = function (store, publicKey, msg, cb) { - store.message(publicKey, JSON.stringify(msg), cb); -}; */ - // TODO check if new pinned size exceeds user quota -var pinChannel = function (store, Sessions, publicKey, channels, cb) { +var pinChannel = function (pinStore, Sessions, publicKey, channels, cb) { if (!channels && channels.filter) { // expected array return void cb('[TYPE_ERROR] pin expects channel list argument'); } - getChannelList(store, Sessions, publicKey, function (pinned) { + getChannelList(pinStore, Sessions, publicKey, function (pinned) { var session = beginSession(Sessions, publicKey); // only pin channels which are not already pinned @@ -373,27 +374,27 @@ var pinChannel = function (store, Sessions, publicKey, channels, cb) { }); if (toStore.length === 0) { - return void getHash(store, Sessions, publicKey, cb); + return void getHash(pinStore, Sessions, publicKey, cb); } - store.message(publicKey, JSON.stringify(['PIN', toStore]), + pinStore.message(publicKey, JSON.stringify(['PIN', toStore]), function (e) { if (e) { return void cb(e); } toStore.forEach(function (channel) { session.channels[channel] = true; }); - getHash(store, Sessions, publicKey, cb); + getHash(pinStore, Sessions, publicKey, cb); }); }); }; -var unpinChannel = function (store, Sessions, publicKey, channels, cb) { +var unpinChannel = function (pinStore, Sessions, publicKey, channels, cb) { if (!channels && channels.filter) { // expected array return void cb('[TYPE_ERROR] unpin expects channel list argument'); } - getChannelList(store, Sessions, publicKey, function (pinned) { + getChannelList(pinStore, Sessions, publicKey, function (pinned) { var session = beginSession(Sessions, publicKey); // only unpin channels which are pinned @@ -402,35 +403,35 @@ var unpinChannel = function (store, Sessions, publicKey, channels, cb) { }); if (toStore.length === 0) { - return void getHash(store, Sessions, publicKey, cb); + return void getHash(pinStore, Sessions, publicKey, cb); } - store.message(publicKey, JSON.stringify(['UNPIN', toStore]), + pinStore.message(publicKey, JSON.stringify(['UNPIN', toStore]), function (e) { if (e) { return void cb(e); } toStore.forEach(function (channel) { delete session.channels[channel]; }); - getHash(store, Sessions, publicKey, cb); + getHash(pinStore, Sessions, publicKey, cb); }); }); }; // TODO check if new pinned size exceeds user quota -var resetUserPins = function (store, Sessions, publicKey, channelList, cb) { +var resetUserPins = function (pinStore, Sessions, publicKey, channelList, cb) { var session = beginSession(Sessions, publicKey); var pins = session.channels = {}; - store.message(publicKey, JSON.stringify(['RESET', channelList]), + pinStore.message(publicKey, JSON.stringify(['RESET', channelList]), function (e) { if (e) { return void cb(e); } channelList.forEach(function (channel) { pins[channel] = true; }); - getHash(store, Sessions, publicKey, function (e, hash) { + getHash(pinStore, Sessions, publicKey, function (e, hash) { cb(e, hash); }); }); @@ -480,8 +481,8 @@ var updateLimits = function (config, publicKey, cb) { } }; var req = Https.request(options, function (response) { - if (!('' + req.statusCode).match(/^2\d\d$/)) { - return void cb('SERVER ERROR ' + req.statusCode); + if (!('' + response.statusCode).match(/^2\d\d$/)) { + return void cb('SERVER ERROR ' + response.statusCode); } var str = ''; @@ -514,10 +515,13 @@ var updateLimits = function (config, publicKey, cb) { }; var getLimit = function (publicKey, cb) { - var limit = limits[publicKey]; + var unescapedKey = escapeKeyCharacters(publicKey); + var limit = limits[unescapedKey]; - cb(void 0, limit && typeof(limit.limit) === "number"? - limit.limit : DEFAULT_LIMIT); + var toSend = limit && typeof(limit.limit) === "number"? + limit.limit : DEFAULT_LIMIT; + + cb(void 0, toSend); }; var safeMkdir = function (path, cb) { @@ -549,22 +553,32 @@ var makeFileStream = function (root, id, cb) { var upload = function (paths, Sessions, publicKey, content, cb) { var dec = new Buffer(Nacl.util.decodeBase64(content)); // jshint ignore:line + var len = dec.length; - // TODO check that the ongoing upload has not exceeded its declared size - // TODO fail if it has... + var session = beginSession(Sessions, publicKey); + + if (typeof(session.currentUploadSize) !== 'number') { + // improperly initialized... maybe they didn't check before uploading? + // reject it, just in case + return cb('NOT_READY'); + } + + if (session.currentUploadSize > session.pendingUploadSize) { + return cb('TOO_LARGE'); + } - var session = Sessions[publicKey]; - session.atime = +new Date(); if (!session.blobstage) { makeFileStream(paths.staging, publicKey, function (e, stream) { if (e) { return void cb(e); } var blobstage = session.blobstage = stream; blobstage.write(dec); + session.currentUploadSize += len; cb(void 0, dec.length); }); } else { session.blobstage.write(dec); + session.currentUploadSize += len; cb(void 0, dec.length); } }; @@ -593,14 +607,8 @@ var isFile = function (filePath, cb) { }); }; -/* TODO -change channel IDs to a different length so that when we pin, we will be able -to tell that it is not a channel, but a file, just by its length. - -also, when your upload is complete, pin the resulting file. -*/ var upload_complete = function (paths, Sessions, publicKey, cb) { - var session = Sessions[publicKey]; + var session = beginSession(Sessions, publicKey); if (session.blobstage && session.blobstage.close) { session.blobstage.close(); @@ -639,25 +647,32 @@ var upload_complete = function (paths, Sessions, publicKey, cb) { console.error(e); return cb(e); } - cb(void 0, id); }); }); }; -/* TODO -when asking about your upload status, also send some information about how big -your upload is going to be. if that would exceed your limit, return TOO_LARGE -error. - -*/ -var upload_status = function (paths, Sessions, size, publicKey, cb) { - // TODO validate that size is within tolerance +var upload_status = function (paths, pinStore, msgStore, Sessions, publicKey, filesize, cb) { + // validate that the provided size is actually a positive number + if (typeof(filesize) !== 'number' && + filesize >= 0) { return void cb('E_INVALID_SIZE'); } + // validate that the provided path is not junk var filePath = makeFilePath(paths.staging, publicKey); if (!filePath) { return void cb('E_INVALID_PATH'); } - isFile(filePath, function (e, yes) { - cb(e, yes); + + getLimit(publicKey, function (e, limit) { + if (e) { return void cb(e); } + getTotalSize(paths, pinStore, msgStore, Sessions, publicKey, function (e, size) { + if ((filesize + size) >= limit) { return cb('TOO_LARGE'); } + isFile(filePath, function (e, yes) { + if (e) { + console.error("uploadError: [%s]", e); + return cb('UNNOWN_ERROR'); + } + cb(e, yes); + }); + }); }); }; @@ -677,7 +692,7 @@ RPC.create = function (config /*:typeof(ConfigType)*/, cb /*:(?Error, ?Function) var blobPath = paths.blob = keyOrDefaultString('blobPath', './blob'); var blobStagingPath = paths.staging = keyOrDefaultString('blobStagingPath', './blobstage'); - var store; + var pinStore; var rpc = function ( ctx /*:{ store: Object }*/, @@ -725,7 +740,7 @@ RPC.create = function (config /*:typeof(ConfigType)*/, cb /*:(?Error, ?Function) return void respond("INVALID_SIGNATURE_OR_PUBLIC_KEY"); } - var safeKey = publicKey.replace(/\//g, '-'); + var safeKey = escapeKeyCharacters(publicKey); /* If you have gotten this far, you have signed the message with the public key which you provided. @@ -736,7 +751,7 @@ RPC.create = function (config /*:typeof(ConfigType)*/, cb /*:(?Error, ?Function) msg.shift(); var Respond = function (e, msg) { - var token = Sessions[publicKey].tokens.slice(-1)[0]; + var token = Sessions[safeKey].tokens.slice(-1)[0]; var cookie = makeCookie(token).join('|'); respond(e, [cookie].concat(typeof(msg) !== 'undefined' ?msg: [])); }; @@ -749,33 +764,35 @@ RPC.create = function (config /*:typeof(ConfigType)*/, cb /*:(?Error, ?Function) Respond('E_ACCESS_DENIED'); }; + var msgStore = ctx.store; + var handleMessage = function (privileged) { switch (msg[0]) { case 'COOKIE': return void Respond(void 0); case 'RESET': - return resetUserPins(store, Sessions, safeKey, msg[1], function (e, hash) { + return resetUserPins(pinStore, Sessions, safeKey, msg[1], function (e, hash) { return void Respond(e, hash); }); case 'PIN': // TODO don't pin if over the limit // if over, send error E_OVER_LIMIT - return pinChannel(store, Sessions, safeKey, msg[1], function (e, hash) { + return pinChannel(pinStore, Sessions, safeKey, msg[1], function (e, hash) { Respond(e, hash); }); case 'UNPIN': - return unpinChannel(store, Sessions, safeKey, msg[1], function (e, hash) { + return unpinChannel(pinStore, Sessions, safeKey, msg[1], function (e, hash) { Respond(e, hash); }); case 'GET_HASH': - return void getHash(store, Sessions, safeKey, function (e, hash) { + return void getHash(pinStore, Sessions, safeKey, function (e, hash) { Respond(e, hash); }); case 'GET_TOTAL_SIZE': // TODO cache this, since it will get called quite a bit - return getTotalSize(store, ctx.store, Sessions, safeKey, function (e, size) { + return getTotalSize(paths, pinStore, msgStore, Sessions, safeKey, function (e, size) { if (e) { return void Respond(e); } Respond(e, size); }); case 'GET_FILE_SIZE': - return void getFileSize(paths, ctx.store, msg[1], Respond); + return void getFileSize(paths, msgStore, msg[1], Respond); case 'UPDATE_LIMITS': return void updateLimits(config, safeKey, function (e, limit) { if (e) { return void Respond(e); } @@ -784,11 +801,10 @@ RPC.create = function (config /*:typeof(ConfigType)*/, cb /*:(?Error, ?Function) case 'GET_LIMIT': return void getLimit(safeKey, function (e, limit) { if (e) { return void Respond(e); } - limit = limit; Respond(void 0, limit); }); case 'GET_MULTIPLE_FILE_SIZE': - return void getMultipleFileSize(paths, ctx.store, msg[1], function (e, dict) { + return void getMultipleFileSize(paths, msgStore, msg[1], function (e, dict) { if (e) { return void Respond(e); } Respond(void 0, dict); }); @@ -801,8 +817,15 @@ RPC.create = function (config /*:typeof(ConfigType)*/, cb /*:(?Error, ?Function) }); case 'UPLOAD_STATUS': if (!privileged) { return deny(); } - return void upload_status(paths, Sessions, safeKey, msg[1], function (e, stat) { - Respond(e, stat); + var filesize = msg[1]; + return void upload_status(paths, pinStore, msgStore, Sessions, safeKey, msg[1], function (e, yes) { + if (!e && !yes) { + // no pending uploads, set the new size + var user = beginSession(Sessions, safeKey); + user.pendingUploadSize = filesize; + user.currentUploadSize = 0; + } + Respond(e, yes); }); case 'UPLOAD_COMPLETE': if (!privileged) { return deny(); } @@ -830,7 +853,7 @@ RPC.create = function (config /*:typeof(ConfigType)*/, cb /*:(?Error, ?Function) } // if session has not been authenticated, do so - var session = Sessions[publicKey]; + var session = beginSession(Sessions, safeKey); if (typeof(session.privilege) !== 'boolean') { return void isPrivilegedUser(publicKey, function (yes) { session.privilege = yes; @@ -853,7 +876,7 @@ RPC.create = function (config /*:typeof(ConfigType)*/, cb /*:(?Error, ?Function) Store.create({ filePath: pinPath, }, function (s) { - store = s; + pinStore = s; safeMkdir(blobPath, function (e) { if (e) { throw e; }