From b093d3f0d2cb75472e15235e0cbd4589b8d5cbff Mon Sep 17 00:00:00 2001 From: ansuz Date: Fri, 24 Jan 2020 14:45:53 -0500 Subject: [PATCH] WIP massive rpc refactor --- lib/commands/admin-rpc.js | 4 +- lib/commands/quota.js | 31 +- lib/rpc.js | 713 ++++++++++++++++++++------------------ 3 files changed, 386 insertions(+), 362 deletions(-) diff --git a/lib/commands/admin-rpc.js b/lib/commands/admin-rpc.js index 9e32fe2a8..24a8e4348 100644 --- a/lib/commands/admin-rpc.js +++ b/lib/commands/admin-rpc.js @@ -93,7 +93,7 @@ var getDiskUsage = function (Env, cb) { -Admin.command = function (Env, ctx, publicKey, config, data, cb) { +Admin.command = function (Env, ctx, publicKey, data, cb) { var admins = Env.admins; if (admins.indexOf(publicKey) === -1) { return void cb("FORBIDDEN"); @@ -109,7 +109,7 @@ Admin.command = function (Env, ctx, publicKey, config, data, cb) { case 'DISK_USAGE': return getDiskUsage(Env, cb); case 'FLUSH_CACHE': - config.flushCache(); + Env.flushCache(); return cb(void 0, true); case 'SHUTDOWN': return shutdown(Env, ctx, cb); diff --git a/lib/commands/quota.js b/lib/commands/quota.js index 92b1c3cd0..b74195821 100644 --- a/lib/commands/quota.js +++ b/lib/commands/quota.js @@ -7,7 +7,7 @@ const Util = require("../common-util"); const Package = require('../../package.json'); const Https = require("https"); -Quota.applyCustomLimits = function (Env, config) { +Quota.applyCustomLimits = function (Env) { var isLimit = function (o) { var valid = o && typeof(o) === 'object' && typeof(o.limit) === 'number' && @@ -16,7 +16,7 @@ Quota.applyCustomLimits = function (Env, config) { return valid; }; - // read custom limits from the config + // read custom limits from the Environment (taken from config) var customLimits = (function (custom) { var limits = {}; Object.keys(custom).forEach(function (k) { @@ -27,7 +27,7 @@ Quota.applyCustomLimits = function (Env, config) { }); }); return limits; - }(config.customLimits || {})); + }(Env.customLimits || {})); Object.keys(customLimits).forEach(function (k) { if (!isLimit(customLimits[k])) { return; } @@ -37,17 +37,18 @@ Quota.applyCustomLimits = function (Env, config) { // The limits object contains storage limits for all the publicKey that have paid // To each key is associated an object containing the 'limit' value and a 'note' explaining that limit -Quota.updateLimits = function (Env, config, publicKey, cb) { // FIXME BATCH?S +// XXX maybe the use case with a publicKey should be a different command that calls this? +Quota.updateLimits = function (Env, publicKey, cb) { // FIXME BATCH?S - if (config.adminEmail === false) { - Quota.applyCustomLimits(Env, config); - if (config.allowSubscriptions === false) { return; } + if (Env.adminEmail === false) { + Quota.applyCustomLimits(Env); + if (Env.allowSubscriptions === false) { return; } throw new Error("allowSubscriptions must be false if adminEmail is false"); } if (typeof cb !== "function") { cb = function () {}; } - var defaultLimit = typeof(config.defaultStorageLimit) === 'number'? - config.defaultStorageLimit: Core.DEFAULT_LIMIT; + var defaultLimit = typeof(Env.defaultStorageLimit) === 'number'? + Env.defaultStorageLimit: Core.DEFAULT_LIMIT; var userId; if (publicKey) { @@ -55,9 +56,9 @@ Quota.updateLimits = function (Env, config, publicKey, cb) { // FIXME BATCH?S } var body = JSON.stringify({ - domain: config.myDomain, - subdomain: config.mySubdomain || null, - adminEmail: config.adminEmail, + domain: Env.myDomain, + subdomain: Env.mySubdomain || null, + adminEmail: Env.adminEmail, version: Package.version }); var options = { @@ -84,7 +85,7 @@ Quota.updateLimits = function (Env, config, publicKey, cb) { // FIXME BATCH?S try { var json = JSON.parse(str); Env.limits = json; - Quota.applyCustomLimits(Env, config); + Quota.applyCustomLimits(Env); var l; if (userId) { @@ -100,8 +101,8 @@ Quota.updateLimits = function (Env, config, publicKey, cb) { // FIXME BATCH?S }); req.on('error', function (e) { - Quota.applyCustomLimits(Env, config); - if (!config.domain) { return cb(); } + Quota.applyCustomLimits(Env); + if (!Env.domain) { return cb(); } // XXX cb(e); }); diff --git a/lib/rpc.js b/lib/rpc.js index 1912868ed..171fb590f 100644 --- a/lib/rpc.js +++ b/lib/rpc.js @@ -18,44 +18,359 @@ var RPC = module.exports; const Store = require("../storage/file"); const BlobStore = require("../storage/blob"); +const UNAUTHENTICATED_CALLS = [ + 'GET_FILE_SIZE', + 'GET_METADATA', + 'GET_MULTIPLE_FILE_SIZE', + 'IS_CHANNEL_PINNED', + 'IS_NEW_CHANNEL', + 'GET_HISTORY_OFFSET', + 'GET_DELETED_PADS', + 'WRITE_PRIVATE_MESSAGE', +]; + var isUnauthenticatedCall = function (call) { - return [ - 'GET_FILE_SIZE', - 'GET_METADATA', - 'GET_MULTIPLE_FILE_SIZE', - 'IS_CHANNEL_PINNED', - 'IS_NEW_CHANNEL', - 'GET_HISTORY_OFFSET', - 'GET_DELETED_PADS', - 'WRITE_PRIVATE_MESSAGE', - ].indexOf(call) !== -1; + return UNAUTHENTICATED_CALLS.indexOf(call) !== -1; }; +const AUTHENTICATED_CALLS = [ + 'COOKIE', + 'RESET', + 'PIN', + 'UNPIN', + 'GET_HASH', + 'GET_TOTAL_SIZE', + 'UPDATE_LIMITS', + 'GET_LIMIT', + 'UPLOAD_STATUS', + 'UPLOAD_COMPLETE', + 'OWNED_UPLOAD_COMPLETE', + 'UPLOAD_CANCEL', + 'EXPIRE_SESSION', + 'TRIM_OWNED_CHANNEL_HISTORY', + 'CLEAR_OWNED_CHANNEL', + 'REMOVE_OWNED_CHANNEL', + 'REMOVE_PINS', + 'TRIM_PINS', + 'WRITE_LOGIN_BLOCK', + 'REMOVE_LOGIN_BLOCK', + 'ADMIN', + 'SET_METADATA' +]; + var isAuthenticatedCall = function (call) { - return [ - 'COOKIE', - 'RESET', - 'PIN', - 'UNPIN', - 'GET_HASH', - 'GET_TOTAL_SIZE', - 'UPDATE_LIMITS', - 'GET_LIMIT', - 'UPLOAD_STATUS', - 'UPLOAD_COMPLETE', - 'OWNED_UPLOAD_COMPLETE', - 'UPLOAD_CANCEL', - 'EXPIRE_SESSION', - 'TRIM_OWNED_CHANNEL_HISTORY', - 'CLEAR_OWNED_CHANNEL', - 'REMOVE_OWNED_CHANNEL', - 'REMOVE_PINS', - 'TRIM_PINS', - 'WRITE_LOGIN_BLOCK', - 'REMOVE_LOGIN_BLOCK', - 'ADMIN', - 'SET_METADATA' - ].indexOf(call) !== -1; + return AUTHENTICATED_CALLS.indexOf(call) !== -1; +}; + +var isUnauthenticateMessage = function (msg) { + return msg && msg.length === 2 && isUnauthenticatedCall(msg[0]); +}; + +var handleUnauthenticatedMessage = function (Env, msg, respond, nfwssCtx) { + Env.Log.silly('LOG_RPC', msg[0]); + switch (msg[0]) { + case 'GET_HISTORY_OFFSET': { // XXX not actually used anywhere? + if (typeof(msg[1]) !== 'object' || typeof(msg[1].channelName) !== 'string') { + return respond('INVALID_ARG_FORMAT', msg); + } + const msgHash = typeof(msg[1].msgHash) === 'string' ? msg[1].msgHash : undefined; + nfwssCtx.getHistoryOffset(nfwssCtx, msg[1].channelName, msgHash, (e, ret) => { + if (e) { + if (e.code !== 'ENOENT') { + Env.WARN(e.stack, msg); + } + return respond(e.message); + } + respond(e, [null, ret, null]); + }); + break; + } + case 'GET_FILE_SIZE': + return void Pinning.getFileSize(Env, msg[1], function (e, size) { + Env.WARN(e, msg[1]); + respond(e, [null, size, null]); + }); + case 'GET_METADATA': + return void Metadata.getMetadata(Env, msg[1], function (e, data) { + Env.WARN(e, msg[1]); + respond(e, [null, data, null]); + }); + case 'GET_MULTIPLE_FILE_SIZE': // XXX not actually used on the client? + return void Pinning.getMultipleFileSize(Env, msg[1], function (e, dict) { + if (e) { + Env.WARN(e, dict); + return respond(e); + } + respond(e, [null, dict, null]); + }); + case 'GET_DELETED_PADS': + return void Pinning.getDeletedPads(Env, msg[1], function (e, list) { + if (e) { + Env.WARN(e, msg[1]); + return respond(e); + } + respond(e, [null, list, null]); + }); + case 'IS_CHANNEL_PINNED': + return void Pinning.isChannelPinned(Env, msg[1], function (isPinned) { + respond(null, [null, isPinned, null]); + }); + case 'IS_NEW_CHANNEL': + return void Channel.isNewChannel(Env, msg[1], function (e, isNew) { + respond(e, [null, isNew, null]); + }); + case 'WRITE_PRIVATE_MESSAGE': + return void Channel.writePrivateMessage(Env, msg[1], nfwssCtx, function (e, output) { + respond(e, output); + }); + default: + Env.Log.warn("UNSUPPORTED_RPC_CALL", msg); + return respond('UNSUPPORTED_RPC_CALL', msg); + } +}; + +var handleAuthenticatedMessage = function (Env, map) { + var msg = map.msg; + var safeKey = map.safeKey; + var publicKey = map.publicKey; + var Respond = map.Respond; + var ctx = map.ctx; + + Env.Log.silly('LOG_RPC', msg[0]); + switch (msg[0]) { + case 'COOKIE': return void Respond(void 0); + case 'RESET': + return Pinning.resetUserPins(Env, safeKey, msg[1], function (e, hash) { // XXX USER_TARGETED + //WARN(e, hash); + return void Respond(e, hash); + }); + case 'PIN': + return Pinning.pinChannel(Env, safeKey, msg[1], function (e, hash) { // XXX USER_TARGETED + Env.WARN(e, hash); + Respond(e, hash); + }); + case 'UNPIN': + return Pinning.unpinChannel(Env, safeKey, msg[1], function (e, hash) { // XXX USER_TARGETED + Env.WARN(e, hash); + Respond(e, hash); + }); + case 'GET_HASH': + return void Pinning.getHash(Env, safeKey, function (e, hash) { // XXX USER_SCOPED + Env.WARN(e, hash); + Respond(e, hash); + }); + case 'GET_TOTAL_SIZE': // TODO cache this, since it will get called quite a bit + return Pinning.getTotalSize(Env, safeKey, function (e, size) { // XXX USER_SCOPED + if (e) { + Env.WARN(e, safeKey); + return void Respond(e); + } + Respond(e, size); + }); + case 'UPDATE_LIMITS': + return void Quota.updateLimits(Env, safeKey, function (e, limit) { // XXX USER_SCOPED + if (e) { + Env.WARN(e, limit); + return void Respond(e); + } + Respond(void 0, limit); + }); + case 'GET_LIMIT': + return void Pinning.getLimit(Env, safeKey, function (e, limit) { // XXX USER_SCOPED + if (e) { + Env.WARN(e, limit); + return void Respond(e); + } + Respond(void 0, limit); + }); + case 'EXPIRE_SESSION': + return void setTimeout(function () { // XXX USER_SCOPED + Core.expireSession(Env.Sessions, safeKey); + Respond(void 0, "OK"); + }); + case 'CLEAR_OWNED_CHANNEL': + return void Channel.clearOwnedChannel(Env, msg[1], publicKey, function (e, response) { // XXX USER_TARGETD_INVERSE + if (e) { return void Respond(e); } + Respond(void 0, response); + }); + + case 'REMOVE_OWNED_CHANNEL': + return void Channel.removeOwnedChannel(Env, msg[1], publicKey, function (e) { // XXX USER_TARGETED_INVERSE + if (e) { return void Respond(e); } + Respond(void 0, "OK"); + }); + case 'TRIM_OWNED_CHANNEL_HISTORY': + return void Channel.removeOwnedChannelHistory(Env, msg[1], publicKey, msg[2], function (e) { // XXX USER_TARGETED_DOUBLE + if (e) { return void Respond(e); } + Respond(void 0, 'OK'); + }); + case 'REMOVE_PINS': + return void Pinning.removePins(Env, safeKey, function (e) { // XXX USER_SCOPED + if (e) { return void Respond(e); } + Respond(void 0, "OK"); + }); + case 'TRIM_PINS': + return void Pinning.trimPins(Env, safeKey, function (e) { // XXX USER_SCOPED + if (e) { return void Respond(e); } + Respond(void 0, "OK"); + }); + case 'UPLOAD': + return void Env.blobStore.upload(safeKey, msg[1], function (e, len) { // XXX USER_SCOPED_SPECIAL + Env.WARN(e, len); + Respond(e, len); + }); + case 'UPLOAD_STATUS': + var filesize = msg[1]; + return void Upload.upload_status(Env, safeKey, filesize, function (e, yes) { // XXX USER_TARGETED + if (!e && !yes) { + // no pending uploads, set the new size + var user = Core.getSession(Env.Sessions, safeKey); + user.pendingUploadSize = filesize; + user.currentUploadSize = 0; + } + Respond(e, yes); + }); + case 'UPLOAD_COMPLETE': + return void Env.blobStore.complete(safeKey, msg[1], function (e, hash) { // XXX USER_SCOPED_SPECIAL + Env.WARN(e, hash); + Respond(e, hash); + }); + case 'OWNED_UPLOAD_COMPLETE': + return void Env.blobStore.completeOwned(safeKey, msg[1], function (e, blobId) { // XXX USER_SCOPED_SPECIAL + Env.WARN(e, blobId); + Respond(e, blobId); + }); + case 'UPLOAD_CANCEL': + // msg[1] is fileSize + // if we pass it here, we can start an upload right away without calling + // UPLOAD_STATUS again + return void Env.blobStore.cancel(safeKey, msg[1], function (e) { // XXX USER_SCOPED_SPECIAL + Env.WARN(e, 'UPLOAD_CANCEL'); + Respond(e); + }); + case 'WRITE_LOGIN_BLOCK': + return void Block.writeLoginBlock(Env, msg[1], function (e) { // XXX SPECIAL + if (e) { + Env.WARN(e, 'WRITE_LOGIN_BLOCK'); + return void Respond(e); + } + Respond(e); + }); + case 'REMOVE_LOGIN_BLOCK': + return void Block.removeLoginBlock(Env, msg[1], function (e) { // XXX SPECIAL + if (e) { + Env.WARN(e, 'REMOVE_LOGIN_BLOCK'); + return void Respond(e); + } + Respond(e); + }); + case 'ADMIN': + return void Admin.command(Env, ctx, safeKey, msg[1], function (e, result) { // XXX SPECIAL + if (e) { + Env.WARN(e, result); + return void Respond(e); + } + Respond(void 0, result); + }); + case 'SET_METADATA': + return void Metadata.setMetadata(Env, msg[1], publicKey, function (e, data) { // XXX USER_TARGETED_INVERSE + if (e) { + Env.WARN(e, data); + return void Respond(e); + } + Respond(void 0, data); + }); + default: + return void Respond('UNSUPPORTED_RPC_CALL', msg); + } +}; + +var rpc = function (Env, ctx, data, respond) { + if (!Array.isArray(data)) { + Env.Log.debug('INVALID_ARG_FORMET', data); + return void respond('INVALID_ARG_FORMAT'); + } + + if (!data.length) { + return void respond("INSUFFICIENT_ARGS"); + } else if (data.length !== 1) { + Env.Log.debug('UNEXPECTED_ARGUMENTS_LENGTH', data); + } + + var msg = data[0].slice(0); + + if (!Array.isArray(msg)) { + return void respond('INVALID_ARG_FORMAT'); + } + + if (isUnauthenticateMessage(msg)) { + return handleUnauthenticatedMessage(Env, msg, respond, ctx); + } + + var signature = msg.shift(); + var publicKey = msg.shift(); + + // make sure a user object is initialized in the cookie jar + if (publicKey) { + Core.getSession(Env.Sessions, publicKey); + } else { + Env.Log.debug("NO_PUBLIC_KEY_PROVIDED", publicKey); + } + + var cookie = msg[0]; + if (!Core.isValidCookie(Env.Sessions, publicKey, cookie)) { + // no cookie is fine if the RPC is to get a cookie + if (msg[1] !== 'COOKIE') { + return void respond('NO_COOKIE'); + } + } + + var serialized = JSON.stringify(msg); + + if (!(serialized && typeof(publicKey) === 'string')) { + return void respond('INVALID_MESSAGE_OR_PUBLIC_KEY'); + } + + if (isAuthenticatedCall(msg[1])) { + if (Core.checkSignature(Env, serialized, signature, publicKey) !== true) { + return void respond("INVALID_SIGNATURE_OR_PUBLIC_KEY"); + } + } else if (msg[1] !== 'UPLOAD') { + Env.Log.warn('INVALID_RPC_CALL', msg[1]); + return void respond("INVALID_RPC_CALL"); + } + + var safeKey = Util.escapeKeyCharacters(publicKey); + /* If you have gotten this far, you have signed the message with the + public key which you provided. + + We can safely modify the state for that key + + OR it's an unauthenticated call, which must not modify the state + for that key in a meaningful way. + */ + + // discard validated cookie from message + msg.shift(); + + var Respond = function (e, msg) { + var session = Env.Sessions[safeKey]; + var token = session? session.tokens.slice(-1)[0]: ''; + var cookie = Core.makeCookie(token).join('|'); + respond(e ? String(e): e, [cookie].concat(typeof(msg) !== 'undefined' ?msg: [])); + }; + + if (typeof(msg) !== 'object' || !msg.length) { + return void Respond('INVALID_MSG'); + } + + handleAuthenticatedMessage(Env, { + msg: msg, + safeKey: safeKey, + publicKey: publicKey, + Respond: Respond, + ctx: ctx, + }); }; RPC.create = function (config, cb) { @@ -92,6 +407,13 @@ RPC.create = function (config, cb) { sessionExpirationInterval: undefined, Log: Log, WARN: WARN, + flushCache: config.flushCache, + adminEmail: config.adminEmail, + allowSubscriptions: config.allowSubscriptions, + myDomain: config.myDomain, + mySubdomain: config.mySubdomain, + customLimits: config.customLimits, + domain: config.domain // XXX }; try { @@ -112,322 +434,14 @@ RPC.create = function (config, cb) { paths.staging = keyOrDefaultString('blobStagingPath', './blobstage'); paths.blob = keyOrDefaultString('blobPath', './blob'); - var isUnauthenticateMessage = function (msg) { - return msg && msg.length === 2 && isUnauthenticatedCall(msg[0]); - }; - - var handleUnauthenticatedMessage = function (msg, respond, nfwssCtx) { - Log.silly('LOG_RPC', msg[0]); - switch (msg[0]) { - case 'GET_HISTORY_OFFSET': { - if (typeof(msg[1]) !== 'object' || typeof(msg[1].channelName) !== 'string') { - return respond('INVALID_ARG_FORMAT', msg); - } - const msgHash = typeof(msg[1].msgHash) === 'string' ? msg[1].msgHash : undefined; - nfwssCtx.getHistoryOffset(nfwssCtx, msg[1].channelName, msgHash, (e, ret) => { - if (e) { - if (e.code !== 'ENOENT') { - WARN(e.stack, msg); - } - return respond(e.message); - } - respond(e, [null, ret, null]); - }); - break; - } - case 'GET_FILE_SIZE': - return void Pinning.getFileSize(Env, msg[1], function (e, size) { - WARN(e, msg[1]); - respond(e, [null, size, null]); - }); - case 'GET_METADATA': - return void Metadata.getMetadata(Env, msg[1], function (e, data) { - WARN(e, msg[1]); - respond(e, [null, data, null]); - }); - case 'GET_MULTIPLE_FILE_SIZE': - return void Pinning.getMultipleFileSize(Env, msg[1], function (e, dict) { - if (e) { - WARN(e, dict); - return respond(e); - } - respond(e, [null, dict, null]); - }); - case 'GET_DELETED_PADS': - return void Pinning.getDeletedPads(Env, msg[1], function (e, list) { - if (e) { - WARN(e, msg[1]); - return respond(e); - } - respond(e, [null, list, null]); - }); - case 'IS_CHANNEL_PINNED': - return void Pinning.isChannelPinned(Env, msg[1], function (isPinned) { - respond(null, [null, isPinned, null]); - }); - case 'IS_NEW_CHANNEL': - return void Channel.isNewChannel(Env, msg[1], function (e, isNew) { - respond(e, [null, isNew, null]); - }); - case 'WRITE_PRIVATE_MESSAGE': - return void Channel.writePrivateMessage(Env, msg[1], nfwssCtx, function (e, output) { - respond(e, output); - }); - default: - Log.warn("UNSUPPORTED_RPC_CALL", msg); - return respond('UNSUPPORTED_RPC_CALL', msg); - } - }; - - var rpc0 = function (ctx, data, respond) { - if (!Array.isArray(data)) { - Log.debug('INVALID_ARG_FORMET', data); - return void respond('INVALID_ARG_FORMAT'); - } - - if (!data.length) { - return void respond("INSUFFICIENT_ARGS"); - } else if (data.length !== 1) { - Log.debug('UNEXPECTED_ARGUMENTS_LENGTH', data); - } - - var msg = data[0].slice(0); - - if (!Array.isArray(msg)) { - return void respond('INVALID_ARG_FORMAT'); - } - - if (isUnauthenticateMessage(msg)) { - return handleUnauthenticatedMessage(msg, respond, ctx); - } - - var signature = msg.shift(); - var publicKey = msg.shift(); - - // make sure a user object is initialized in the cookie jar - if (publicKey) { - Core.getSession(Sessions, publicKey); - } else { - Log.debug("NO_PUBLIC_KEY_PROVIDED", publicKey); - } - - var cookie = msg[0]; - if (!Core.isValidCookie(Sessions, publicKey, cookie)) { - // no cookie is fine if the RPC is to get a cookie - if (msg[1] !== 'COOKIE') { - return void respond('NO_COOKIE'); - } - } - - var serialized = JSON.stringify(msg); - - if (!(serialized && typeof(publicKey) === 'string')) { - return void respond('INVALID_MESSAGE_OR_PUBLIC_KEY'); - } - - if (isAuthenticatedCall(msg[1])) { - if (Core.checkSignature(Env, serialized, signature, publicKey) !== true) { - return void respond("INVALID_SIGNATURE_OR_PUBLIC_KEY"); - } - } else if (msg[1] !== 'UPLOAD') { - Log.warn('INVALID_RPC_CALL', msg[1]); - return void respond("INVALID_RPC_CALL"); - } - - var safeKey = Util.escapeKeyCharacters(publicKey); - /* If you have gotten this far, you have signed the message with the - public key which you provided. - - We can safely modify the state for that key - - OR it's an unauthenticated call, which must not modify the state - for that key in a meaningful way. - */ - - // discard validated cookie from message - msg.shift(); - - var Respond = function (e, msg) { - var session = Sessions[safeKey]; - var token = session? session.tokens.slice(-1)[0]: ''; - var cookie = Core.makeCookie(token).join('|'); - respond(e ? String(e): e, [cookie].concat(typeof(msg) !== 'undefined' ?msg: [])); - }; - - if (typeof(msg) !== 'object' || !msg.length) { - return void Respond('INVALID_MSG'); - } - - var handleMessage = function () { - Log.silly('LOG_RPC', msg[0]); - switch (msg[0]) { - case 'COOKIE': return void Respond(void 0); - case 'RESET': - return Pinning.resetUserPins(Env, safeKey, msg[1], function (e, hash) { - //WARN(e, hash); - return void Respond(e, hash); - }); - case 'PIN': - return Pinning.pinChannel(Env, safeKey, msg[1], function (e, hash) { - WARN(e, hash); - Respond(e, hash); - }); - case 'UNPIN': - return Pinning.unpinChannel(Env, safeKey, msg[1], function (e, hash) { - WARN(e, hash); - Respond(e, hash); - }); - case 'GET_HASH': - return void Pinning.getHash(Env, safeKey, function (e, hash) { - WARN(e, hash); - Respond(e, hash); - }); - case 'GET_TOTAL_SIZE': // TODO cache this, since it will get called quite a bit - return Pinning.getTotalSize(Env, safeKey, function (e, size) { - if (e) { - WARN(e, safeKey); - return void Respond(e); - } - Respond(e, size); - }); - case 'UPDATE_LIMITS': - return void Quota.updateLimits(Env, config, safeKey, function (e, limit) { - if (e) { - WARN(e, limit); - return void Respond(e); - } - Respond(void 0, limit); - }); - case 'GET_LIMIT': - return void Pinning.getLimit(Env, safeKey, function (e, limit) { - if (e) { - WARN(e, limit); - return void Respond(e); - } - Respond(void 0, limit); - }); - case 'EXPIRE_SESSION': - return void setTimeout(function () { - Core.expireSession(Sessions, safeKey); - Respond(void 0, "OK"); - }); - case 'CLEAR_OWNED_CHANNEL': - return void Channel.clearOwnedChannel(Env, msg[1], publicKey, function (e, response) { - if (e) { return void Respond(e); } - Respond(void 0, response); - }); - - case 'REMOVE_OWNED_CHANNEL': - return void Channel.removeOwnedChannel(Env, msg[1], publicKey, function (e) { - if (e) { return void Respond(e); } - Respond(void 0, "OK"); - }); - case 'TRIM_OWNED_CHANNEL_HISTORY': - return void Channel.removeOwnedChannelHistory(Env, msg[1], publicKey, msg[2], function (e) { - if (e) { return void Respond(e); } - Respond(void 0, 'OK'); - }); - case 'REMOVE_PINS': - return void Pinning.removePins(Env, safeKey, function (e) { - if (e) { return void Respond(e); } - Respond(void 0, "OK"); - }); - case 'TRIM_PINS': - return void Pinning.trimPins(Env, safeKey, function (e) { - if (e) { return void Respond(e); } - Respond(void 0, "OK"); - }); - case 'UPLOAD': - return void Env.blobStore.upload(safeKey, msg[1], function (e, len) { - WARN(e, len); - Respond(e, len); - }); - case 'UPLOAD_STATUS': - var filesize = msg[1]; - return void Upload.upload_status(Env, safeKey, filesize, function (e, yes) { - if (!e && !yes) { - // no pending uploads, set the new size - var user = Core.getSession(Sessions, safeKey); - user.pendingUploadSize = filesize; - user.currentUploadSize = 0; - } - Respond(e, yes); - }); - case 'UPLOAD_COMPLETE': - return void Env.blobStore.complete(safeKey, msg[1], function (e, hash) { - WARN(e, hash); - Respond(e, hash); - }); - case 'OWNED_UPLOAD_COMPLETE': - return void Env.blobStore.completeOwned(safeKey, msg[1], function (e, blobId) { - WARN(e, blobId); - Respond(e, blobId); - }); - case 'UPLOAD_CANCEL': - // msg[1] is fileSize - // if we pass it here, we can start an upload right away without calling - // UPLOAD_STATUS again - return void Env.blobStore.cancel(safeKey, msg[1], function (e) { - WARN(e, 'UPLOAD_CANCEL'); - Respond(e); - }); - case 'WRITE_LOGIN_BLOCK': - return void Block.writeLoginBlock(Env, msg[1], function (e) { - if (e) { - WARN(e, 'WRITE_LOGIN_BLOCK'); - return void Respond(e); - } - Respond(e); - }); - case 'REMOVE_LOGIN_BLOCK': - return void Block.removeLoginBlock(Env, msg[1], function (e) { - if (e) { - WARN(e, 'REMOVE_LOGIN_BLOCK'); - return void Respond(e); - } - Respond(e); - }); - case 'ADMIN': - return void Admin.command(Env, ctx, safeKey, config, msg[1], function (e, result) { - if (e) { - WARN(e, result); - return void Respond(e); - } - Respond(void 0, result); - }); - case 'SET_METADATA': - return void Metadata.setMetadata(Env, msg[1], publicKey, function (e, data) { - if (e) { - WARN(e, data); - return void Respond(e); - } - Respond(void 0, data); - }); - default: - return void Respond('UNSUPPORTED_RPC_CALL', msg); - } - }; - - handleMessage(true); - }; - - var rpc = function (ctx, data, respond) { - try { - return rpc0(ctx, data, respond); - } catch (e) { - console.log("Error from RPC with data " + JSON.stringify(data)); - console.log(e.stack); - } - }; - var updateLimitDaily = function () { - Quota.updateLimits(Env, config, undefined, function (e) { + Quota.updateLimits(Env, undefined, function (e) { if (e) { WARN('limitUpdate', e); } }); }; - Quota.applyCustomLimits(Env, config); + Quota.applyCustomLimits(Env); updateLimitDaily(); setInterval(updateLimitDaily, 24*3600*1000); @@ -451,7 +465,16 @@ RPC.create = function (config, cb) { Env.blobStore = blob; })); }).nThen(function () { - cb(void 0, rpc); + // XXX it's ugly that we pass ctx and Env separately + // when they're effectively the same thing... + cb(void 0, function (ctx, data, respond) { + try { + return rpc(Env, ctx, data, respond); + } catch (e) { + console.log("Error from RPC with data " + JSON.stringify(data)); + console.log(e.stack); + } + }); // expire old sessions once per minute // XXX allow for graceful shutdown Env.sessionExpirationInterval = setInterval(function () {