WIP massive rpc refactor

pull/1/head
ansuz 5 years ago
parent ceb351326c
commit b093d3f0d2

@ -93,7 +93,7 @@ var getDiskUsage = function (Env, cb) {
Admin.command = function (Env, ctx, publicKey, config, data, cb) { Admin.command = function (Env, ctx, publicKey, data, cb) {
var admins = Env.admins; var admins = Env.admins;
if (admins.indexOf(publicKey) === -1) { if (admins.indexOf(publicKey) === -1) {
return void cb("FORBIDDEN"); return void cb("FORBIDDEN");
@ -109,7 +109,7 @@ Admin.command = function (Env, ctx, publicKey, config, data, cb) {
case 'DISK_USAGE': case 'DISK_USAGE':
return getDiskUsage(Env, cb); return getDiskUsage(Env, cb);
case 'FLUSH_CACHE': case 'FLUSH_CACHE':
config.flushCache(); Env.flushCache();
return cb(void 0, true); return cb(void 0, true);
case 'SHUTDOWN': case 'SHUTDOWN':
return shutdown(Env, ctx, cb); return shutdown(Env, ctx, cb);

@ -7,7 +7,7 @@ const Util = require("../common-util");
const Package = require('../../package.json'); const Package = require('../../package.json');
const Https = require("https"); const Https = require("https");
Quota.applyCustomLimits = function (Env, config) { Quota.applyCustomLimits = function (Env) {
var isLimit = function (o) { var isLimit = function (o) {
var valid = o && typeof(o) === 'object' && var valid = o && typeof(o) === 'object' &&
typeof(o.limit) === 'number' && typeof(o.limit) === 'number' &&
@ -16,7 +16,7 @@ Quota.applyCustomLimits = function (Env, config) {
return valid; return valid;
}; };
// read custom limits from the config // read custom limits from the Environment (taken from config)
var customLimits = (function (custom) { var customLimits = (function (custom) {
var limits = {}; var limits = {};
Object.keys(custom).forEach(function (k) { Object.keys(custom).forEach(function (k) {
@ -27,7 +27,7 @@ Quota.applyCustomLimits = function (Env, config) {
}); });
}); });
return limits; return limits;
}(config.customLimits || {})); }(Env.customLimits || {}));
Object.keys(customLimits).forEach(function (k) { Object.keys(customLimits).forEach(function (k) {
if (!isLimit(customLimits[k])) { return; } if (!isLimit(customLimits[k])) { return; }
@ -37,17 +37,18 @@ Quota.applyCustomLimits = function (Env, config) {
// The limits object contains storage limits for all the publicKey that have paid // The limits object contains storage limits for all the publicKey that have paid
// To each key is associated an object containing the 'limit' value and a 'note' explaining that limit // To each key is associated an object containing the 'limit' value and a 'note' explaining that limit
Quota.updateLimits = function (Env, config, publicKey, cb) { // FIXME BATCH?S // XXX maybe the use case with a publicKey should be a different command that calls this?
Quota.updateLimits = function (Env, publicKey, cb) { // FIXME BATCH?S
if (config.adminEmail === false) { if (Env.adminEmail === false) {
Quota.applyCustomLimits(Env, config); Quota.applyCustomLimits(Env);
if (config.allowSubscriptions === false) { return; } if (Env.allowSubscriptions === false) { return; }
throw new Error("allowSubscriptions must be false if adminEmail is false"); throw new Error("allowSubscriptions must be false if adminEmail is false");
} }
if (typeof cb !== "function") { cb = function () {}; } if (typeof cb !== "function") { cb = function () {}; }
var defaultLimit = typeof(config.defaultStorageLimit) === 'number'? var defaultLimit = typeof(Env.defaultStorageLimit) === 'number'?
config.defaultStorageLimit: Core.DEFAULT_LIMIT; Env.defaultStorageLimit: Core.DEFAULT_LIMIT;
var userId; var userId;
if (publicKey) { if (publicKey) {
@ -55,9 +56,9 @@ Quota.updateLimits = function (Env, config, publicKey, cb) { // FIXME BATCH?S
} }
var body = JSON.stringify({ var body = JSON.stringify({
domain: config.myDomain, domain: Env.myDomain,
subdomain: config.mySubdomain || null, subdomain: Env.mySubdomain || null,
adminEmail: config.adminEmail, adminEmail: Env.adminEmail,
version: Package.version version: Package.version
}); });
var options = { var options = {
@ -84,7 +85,7 @@ Quota.updateLimits = function (Env, config, publicKey, cb) { // FIXME BATCH?S
try { try {
var json = JSON.parse(str); var json = JSON.parse(str);
Env.limits = json; Env.limits = json;
Quota.applyCustomLimits(Env, config); Quota.applyCustomLimits(Env);
var l; var l;
if (userId) { if (userId) {
@ -100,8 +101,8 @@ Quota.updateLimits = function (Env, config, publicKey, cb) { // FIXME BATCH?S
}); });
req.on('error', function (e) { req.on('error', function (e) {
Quota.applyCustomLimits(Env, config); Quota.applyCustomLimits(Env);
if (!config.domain) { return cb(); } if (!Env.domain) { return cb(); } // XXX
cb(e); cb(e);
}); });

@ -18,44 +18,359 @@ var RPC = module.exports;
const Store = require("../storage/file"); const Store = require("../storage/file");
const BlobStore = require("../storage/blob"); const BlobStore = require("../storage/blob");
const UNAUTHENTICATED_CALLS = [
'GET_FILE_SIZE',
'GET_METADATA',
'GET_MULTIPLE_FILE_SIZE',
'IS_CHANNEL_PINNED',
'IS_NEW_CHANNEL',
'GET_HISTORY_OFFSET',
'GET_DELETED_PADS',
'WRITE_PRIVATE_MESSAGE',
];
var isUnauthenticatedCall = function (call) { var isUnauthenticatedCall = function (call) {
return [ return UNAUTHENTICATED_CALLS.indexOf(call) !== -1;
'GET_FILE_SIZE',
'GET_METADATA',
'GET_MULTIPLE_FILE_SIZE',
'IS_CHANNEL_PINNED',
'IS_NEW_CHANNEL',
'GET_HISTORY_OFFSET',
'GET_DELETED_PADS',
'WRITE_PRIVATE_MESSAGE',
].indexOf(call) !== -1;
}; };
const AUTHENTICATED_CALLS = [
'COOKIE',
'RESET',
'PIN',
'UNPIN',
'GET_HASH',
'GET_TOTAL_SIZE',
'UPDATE_LIMITS',
'GET_LIMIT',
'UPLOAD_STATUS',
'UPLOAD_COMPLETE',
'OWNED_UPLOAD_COMPLETE',
'UPLOAD_CANCEL',
'EXPIRE_SESSION',
'TRIM_OWNED_CHANNEL_HISTORY',
'CLEAR_OWNED_CHANNEL',
'REMOVE_OWNED_CHANNEL',
'REMOVE_PINS',
'TRIM_PINS',
'WRITE_LOGIN_BLOCK',
'REMOVE_LOGIN_BLOCK',
'ADMIN',
'SET_METADATA'
];
var isAuthenticatedCall = function (call) { var isAuthenticatedCall = function (call) {
return [ return AUTHENTICATED_CALLS.indexOf(call) !== -1;
'COOKIE', };
'RESET',
'PIN', var isUnauthenticateMessage = function (msg) {
'UNPIN', return msg && msg.length === 2 && isUnauthenticatedCall(msg[0]);
'GET_HASH', };
'GET_TOTAL_SIZE',
'UPDATE_LIMITS', var handleUnauthenticatedMessage = function (Env, msg, respond, nfwssCtx) {
'GET_LIMIT', Env.Log.silly('LOG_RPC', msg[0]);
'UPLOAD_STATUS', switch (msg[0]) {
'UPLOAD_COMPLETE', case 'GET_HISTORY_OFFSET': { // XXX not actually used anywhere?
'OWNED_UPLOAD_COMPLETE', if (typeof(msg[1]) !== 'object' || typeof(msg[1].channelName) !== 'string') {
'UPLOAD_CANCEL', return respond('INVALID_ARG_FORMAT', msg);
'EXPIRE_SESSION', }
'TRIM_OWNED_CHANNEL_HISTORY', const msgHash = typeof(msg[1].msgHash) === 'string' ? msg[1].msgHash : undefined;
'CLEAR_OWNED_CHANNEL', nfwssCtx.getHistoryOffset(nfwssCtx, msg[1].channelName, msgHash, (e, ret) => {
'REMOVE_OWNED_CHANNEL', if (e) {
'REMOVE_PINS', if (e.code !== 'ENOENT') {
'TRIM_PINS', Env.WARN(e.stack, msg);
'WRITE_LOGIN_BLOCK', }
'REMOVE_LOGIN_BLOCK', return respond(e.message);
'ADMIN', }
'SET_METADATA' respond(e, [null, ret, null]);
].indexOf(call) !== -1; });
break;
}
case 'GET_FILE_SIZE':
return void Pinning.getFileSize(Env, msg[1], function (e, size) {
Env.WARN(e, msg[1]);
respond(e, [null, size, null]);
});
case 'GET_METADATA':
return void Metadata.getMetadata(Env, msg[1], function (e, data) {
Env.WARN(e, msg[1]);
respond(e, [null, data, null]);
});
case 'GET_MULTIPLE_FILE_SIZE': // XXX not actually used on the client?
return void Pinning.getMultipleFileSize(Env, msg[1], function (e, dict) {
if (e) {
Env.WARN(e, dict);
return respond(e);
}
respond(e, [null, dict, null]);
});
case 'GET_DELETED_PADS':
return void Pinning.getDeletedPads(Env, msg[1], function (e, list) {
if (e) {
Env.WARN(e, msg[1]);
return respond(e);
}
respond(e, [null, list, null]);
});
case 'IS_CHANNEL_PINNED':
return void Pinning.isChannelPinned(Env, msg[1], function (isPinned) {
respond(null, [null, isPinned, null]);
});
case 'IS_NEW_CHANNEL':
return void Channel.isNewChannel(Env, msg[1], function (e, isNew) {
respond(e, [null, isNew, null]);
});
case 'WRITE_PRIVATE_MESSAGE':
return void Channel.writePrivateMessage(Env, msg[1], nfwssCtx, function (e, output) {
respond(e, output);
});
default:
Env.Log.warn("UNSUPPORTED_RPC_CALL", msg);
return respond('UNSUPPORTED_RPC_CALL', msg);
}
};
var handleAuthenticatedMessage = function (Env, map) {
var msg = map.msg;
var safeKey = map.safeKey;
var publicKey = map.publicKey;
var Respond = map.Respond;
var ctx = map.ctx;
Env.Log.silly('LOG_RPC', msg[0]);
switch (msg[0]) {
case 'COOKIE': return void Respond(void 0);
case 'RESET':
return Pinning.resetUserPins(Env, safeKey, msg[1], function (e, hash) { // XXX USER_TARGETED
//WARN(e, hash);
return void Respond(e, hash);
});
case 'PIN':
return Pinning.pinChannel(Env, safeKey, msg[1], function (e, hash) { // XXX USER_TARGETED
Env.WARN(e, hash);
Respond(e, hash);
});
case 'UNPIN':
return Pinning.unpinChannel(Env, safeKey, msg[1], function (e, hash) { // XXX USER_TARGETED
Env.WARN(e, hash);
Respond(e, hash);
});
case 'GET_HASH':
return void Pinning.getHash(Env, safeKey, function (e, hash) { // XXX USER_SCOPED
Env.WARN(e, hash);
Respond(e, hash);
});
case 'GET_TOTAL_SIZE': // TODO cache this, since it will get called quite a bit
return Pinning.getTotalSize(Env, safeKey, function (e, size) { // XXX USER_SCOPED
if (e) {
Env.WARN(e, safeKey);
return void Respond(e);
}
Respond(e, size);
});
case 'UPDATE_LIMITS':
return void Quota.updateLimits(Env, safeKey, function (e, limit) { // XXX USER_SCOPED
if (e) {
Env.WARN(e, limit);
return void Respond(e);
}
Respond(void 0, limit);
});
case 'GET_LIMIT':
return void Pinning.getLimit(Env, safeKey, function (e, limit) { // XXX USER_SCOPED
if (e) {
Env.WARN(e, limit);
return void Respond(e);
}
Respond(void 0, limit);
});
case 'EXPIRE_SESSION':
return void setTimeout(function () { // XXX USER_SCOPED
Core.expireSession(Env.Sessions, safeKey);
Respond(void 0, "OK");
});
case 'CLEAR_OWNED_CHANNEL':
return void Channel.clearOwnedChannel(Env, msg[1], publicKey, function (e, response) { // XXX USER_TARGETD_INVERSE
if (e) { return void Respond(e); }
Respond(void 0, response);
});
case 'REMOVE_OWNED_CHANNEL':
return void Channel.removeOwnedChannel(Env, msg[1], publicKey, function (e) { // XXX USER_TARGETED_INVERSE
if (e) { return void Respond(e); }
Respond(void 0, "OK");
});
case 'TRIM_OWNED_CHANNEL_HISTORY':
return void Channel.removeOwnedChannelHistory(Env, msg[1], publicKey, msg[2], function (e) { // XXX USER_TARGETED_DOUBLE
if (e) { return void Respond(e); }
Respond(void 0, 'OK');
});
case 'REMOVE_PINS':
return void Pinning.removePins(Env, safeKey, function (e) { // XXX USER_SCOPED
if (e) { return void Respond(e); }
Respond(void 0, "OK");
});
case 'TRIM_PINS':
return void Pinning.trimPins(Env, safeKey, function (e) { // XXX USER_SCOPED
if (e) { return void Respond(e); }
Respond(void 0, "OK");
});
case 'UPLOAD':
return void Env.blobStore.upload(safeKey, msg[1], function (e, len) { // XXX USER_SCOPED_SPECIAL
Env.WARN(e, len);
Respond(e, len);
});
case 'UPLOAD_STATUS':
var filesize = msg[1];
return void Upload.upload_status(Env, safeKey, filesize, function (e, yes) { // XXX USER_TARGETED
if (!e && !yes) {
// no pending uploads, set the new size
var user = Core.getSession(Env.Sessions, safeKey);
user.pendingUploadSize = filesize;
user.currentUploadSize = 0;
}
Respond(e, yes);
});
case 'UPLOAD_COMPLETE':
return void Env.blobStore.complete(safeKey, msg[1], function (e, hash) { // XXX USER_SCOPED_SPECIAL
Env.WARN(e, hash);
Respond(e, hash);
});
case 'OWNED_UPLOAD_COMPLETE':
return void Env.blobStore.completeOwned(safeKey, msg[1], function (e, blobId) { // XXX USER_SCOPED_SPECIAL
Env.WARN(e, blobId);
Respond(e, blobId);
});
case 'UPLOAD_CANCEL':
// msg[1] is fileSize
// if we pass it here, we can start an upload right away without calling
// UPLOAD_STATUS again
return void Env.blobStore.cancel(safeKey, msg[1], function (e) { // XXX USER_SCOPED_SPECIAL
Env.WARN(e, 'UPLOAD_CANCEL');
Respond(e);
});
case 'WRITE_LOGIN_BLOCK':
return void Block.writeLoginBlock(Env, msg[1], function (e) { // XXX SPECIAL
if (e) {
Env.WARN(e, 'WRITE_LOGIN_BLOCK');
return void Respond(e);
}
Respond(e);
});
case 'REMOVE_LOGIN_BLOCK':
return void Block.removeLoginBlock(Env, msg[1], function (e) { // XXX SPECIAL
if (e) {
Env.WARN(e, 'REMOVE_LOGIN_BLOCK');
return void Respond(e);
}
Respond(e);
});
case 'ADMIN':
return void Admin.command(Env, ctx, safeKey, msg[1], function (e, result) { // XXX SPECIAL
if (e) {
Env.WARN(e, result);
return void Respond(e);
}
Respond(void 0, result);
});
case 'SET_METADATA':
return void Metadata.setMetadata(Env, msg[1], publicKey, function (e, data) { // XXX USER_TARGETED_INVERSE
if (e) {
Env.WARN(e, data);
return void Respond(e);
}
Respond(void 0, data);
});
default:
return void Respond('UNSUPPORTED_RPC_CALL', msg);
}
};
var rpc = function (Env, ctx, data, respond) {
if (!Array.isArray(data)) {
Env.Log.debug('INVALID_ARG_FORMET', data);
return void respond('INVALID_ARG_FORMAT');
}
if (!data.length) {
return void respond("INSUFFICIENT_ARGS");
} else if (data.length !== 1) {
Env.Log.debug('UNEXPECTED_ARGUMENTS_LENGTH', data);
}
var msg = data[0].slice(0);
if (!Array.isArray(msg)) {
return void respond('INVALID_ARG_FORMAT');
}
if (isUnauthenticateMessage(msg)) {
return handleUnauthenticatedMessage(Env, msg, respond, ctx);
}
var signature = msg.shift();
var publicKey = msg.shift();
// make sure a user object is initialized in the cookie jar
if (publicKey) {
Core.getSession(Env.Sessions, publicKey);
} else {
Env.Log.debug("NO_PUBLIC_KEY_PROVIDED", publicKey);
}
var cookie = msg[0];
if (!Core.isValidCookie(Env.Sessions, publicKey, cookie)) {
// no cookie is fine if the RPC is to get a cookie
if (msg[1] !== 'COOKIE') {
return void respond('NO_COOKIE');
}
}
var serialized = JSON.stringify(msg);
if (!(serialized && typeof(publicKey) === 'string')) {
return void respond('INVALID_MESSAGE_OR_PUBLIC_KEY');
}
if (isAuthenticatedCall(msg[1])) {
if (Core.checkSignature(Env, serialized, signature, publicKey) !== true) {
return void respond("INVALID_SIGNATURE_OR_PUBLIC_KEY");
}
} else if (msg[1] !== 'UPLOAD') {
Env.Log.warn('INVALID_RPC_CALL', msg[1]);
return void respond("INVALID_RPC_CALL");
}
var safeKey = Util.escapeKeyCharacters(publicKey);
/* If you have gotten this far, you have signed the message with the
public key which you provided.
We can safely modify the state for that key
OR it's an unauthenticated call, which must not modify the state
for that key in a meaningful way.
*/
// discard validated cookie from message
msg.shift();
var Respond = function (e, msg) {
var session = Env.Sessions[safeKey];
var token = session? session.tokens.slice(-1)[0]: '';
var cookie = Core.makeCookie(token).join('|');
respond(e ? String(e): e, [cookie].concat(typeof(msg) !== 'undefined' ?msg: []));
};
if (typeof(msg) !== 'object' || !msg.length) {
return void Respond('INVALID_MSG');
}
handleAuthenticatedMessage(Env, {
msg: msg,
safeKey: safeKey,
publicKey: publicKey,
Respond: Respond,
ctx: ctx,
});
}; };
RPC.create = function (config, cb) { RPC.create = function (config, cb) {
@ -92,6 +407,13 @@ RPC.create = function (config, cb) {
sessionExpirationInterval: undefined, sessionExpirationInterval: undefined,
Log: Log, Log: Log,
WARN: WARN, WARN: WARN,
flushCache: config.flushCache,
adminEmail: config.adminEmail,
allowSubscriptions: config.allowSubscriptions,
myDomain: config.myDomain,
mySubdomain: config.mySubdomain,
customLimits: config.customLimits,
domain: config.domain // XXX
}; };
try { try {
@ -112,322 +434,14 @@ RPC.create = function (config, cb) {
paths.staging = keyOrDefaultString('blobStagingPath', './blobstage'); paths.staging = keyOrDefaultString('blobStagingPath', './blobstage');
paths.blob = keyOrDefaultString('blobPath', './blob'); paths.blob = keyOrDefaultString('blobPath', './blob');
var isUnauthenticateMessage = function (msg) {
return msg && msg.length === 2 && isUnauthenticatedCall(msg[0]);
};
var handleUnauthenticatedMessage = function (msg, respond, nfwssCtx) {
Log.silly('LOG_RPC', msg[0]);
switch (msg[0]) {
case 'GET_HISTORY_OFFSET': {
if (typeof(msg[1]) !== 'object' || typeof(msg[1].channelName) !== 'string') {
return respond('INVALID_ARG_FORMAT', msg);
}
const msgHash = typeof(msg[1].msgHash) === 'string' ? msg[1].msgHash : undefined;
nfwssCtx.getHistoryOffset(nfwssCtx, msg[1].channelName, msgHash, (e, ret) => {
if (e) {
if (e.code !== 'ENOENT') {
WARN(e.stack, msg);
}
return respond(e.message);
}
respond(e, [null, ret, null]);
});
break;
}
case 'GET_FILE_SIZE':
return void Pinning.getFileSize(Env, msg[1], function (e, size) {
WARN(e, msg[1]);
respond(e, [null, size, null]);
});
case 'GET_METADATA':
return void Metadata.getMetadata(Env, msg[1], function (e, data) {
WARN(e, msg[1]);
respond(e, [null, data, null]);
});
case 'GET_MULTIPLE_FILE_SIZE':
return void Pinning.getMultipleFileSize(Env, msg[1], function (e, dict) {
if (e) {
WARN(e, dict);
return respond(e);
}
respond(e, [null, dict, null]);
});
case 'GET_DELETED_PADS':
return void Pinning.getDeletedPads(Env, msg[1], function (e, list) {
if (e) {
WARN(e, msg[1]);
return respond(e);
}
respond(e, [null, list, null]);
});
case 'IS_CHANNEL_PINNED':
return void Pinning.isChannelPinned(Env, msg[1], function (isPinned) {
respond(null, [null, isPinned, null]);
});
case 'IS_NEW_CHANNEL':
return void Channel.isNewChannel(Env, msg[1], function (e, isNew) {
respond(e, [null, isNew, null]);
});
case 'WRITE_PRIVATE_MESSAGE':
return void Channel.writePrivateMessage(Env, msg[1], nfwssCtx, function (e, output) {
respond(e, output);
});
default:
Log.warn("UNSUPPORTED_RPC_CALL", msg);
return respond('UNSUPPORTED_RPC_CALL', msg);
}
};
var rpc0 = function (ctx, data, respond) {
if (!Array.isArray(data)) {
Log.debug('INVALID_ARG_FORMET', data);
return void respond('INVALID_ARG_FORMAT');
}
if (!data.length) {
return void respond("INSUFFICIENT_ARGS");
} else if (data.length !== 1) {
Log.debug('UNEXPECTED_ARGUMENTS_LENGTH', data);
}
var msg = data[0].slice(0);
if (!Array.isArray(msg)) {
return void respond('INVALID_ARG_FORMAT');
}
if (isUnauthenticateMessage(msg)) {
return handleUnauthenticatedMessage(msg, respond, ctx);
}
var signature = msg.shift();
var publicKey = msg.shift();
// make sure a user object is initialized in the cookie jar
if (publicKey) {
Core.getSession(Sessions, publicKey);
} else {
Log.debug("NO_PUBLIC_KEY_PROVIDED", publicKey);
}
var cookie = msg[0];
if (!Core.isValidCookie(Sessions, publicKey, cookie)) {
// no cookie is fine if the RPC is to get a cookie
if (msg[1] !== 'COOKIE') {
return void respond('NO_COOKIE');
}
}
var serialized = JSON.stringify(msg);
if (!(serialized && typeof(publicKey) === 'string')) {
return void respond('INVALID_MESSAGE_OR_PUBLIC_KEY');
}
if (isAuthenticatedCall(msg[1])) {
if (Core.checkSignature(Env, serialized, signature, publicKey) !== true) {
return void respond("INVALID_SIGNATURE_OR_PUBLIC_KEY");
}
} else if (msg[1] !== 'UPLOAD') {
Log.warn('INVALID_RPC_CALL', msg[1]);
return void respond("INVALID_RPC_CALL");
}
var safeKey = Util.escapeKeyCharacters(publicKey);
/* If you have gotten this far, you have signed the message with the
public key which you provided.
We can safely modify the state for that key
OR it's an unauthenticated call, which must not modify the state
for that key in a meaningful way.
*/
// discard validated cookie from message
msg.shift();
var Respond = function (e, msg) {
var session = Sessions[safeKey];
var token = session? session.tokens.slice(-1)[0]: '';
var cookie = Core.makeCookie(token).join('|');
respond(e ? String(e): e, [cookie].concat(typeof(msg) !== 'undefined' ?msg: []));
};
if (typeof(msg) !== 'object' || !msg.length) {
return void Respond('INVALID_MSG');
}
var handleMessage = function () {
Log.silly('LOG_RPC', msg[0]);
switch (msg[0]) {
case 'COOKIE': return void Respond(void 0);
case 'RESET':
return Pinning.resetUserPins(Env, safeKey, msg[1], function (e, hash) {
//WARN(e, hash);
return void Respond(e, hash);
});
case 'PIN':
return Pinning.pinChannel(Env, safeKey, msg[1], function (e, hash) {
WARN(e, hash);
Respond(e, hash);
});
case 'UNPIN':
return Pinning.unpinChannel(Env, safeKey, msg[1], function (e, hash) {
WARN(e, hash);
Respond(e, hash);
});
case 'GET_HASH':
return void Pinning.getHash(Env, safeKey, function (e, hash) {
WARN(e, hash);
Respond(e, hash);
});
case 'GET_TOTAL_SIZE': // TODO cache this, since it will get called quite a bit
return Pinning.getTotalSize(Env, safeKey, function (e, size) {
if (e) {
WARN(e, safeKey);
return void Respond(e);
}
Respond(e, size);
});
case 'UPDATE_LIMITS':
return void Quota.updateLimits(Env, config, safeKey, function (e, limit) {
if (e) {
WARN(e, limit);
return void Respond(e);
}
Respond(void 0, limit);
});
case 'GET_LIMIT':
return void Pinning.getLimit(Env, safeKey, function (e, limit) {
if (e) {
WARN(e, limit);
return void Respond(e);
}
Respond(void 0, limit);
});
case 'EXPIRE_SESSION':
return void setTimeout(function () {
Core.expireSession(Sessions, safeKey);
Respond(void 0, "OK");
});
case 'CLEAR_OWNED_CHANNEL':
return void Channel.clearOwnedChannel(Env, msg[1], publicKey, function (e, response) {
if (e) { return void Respond(e); }
Respond(void 0, response);
});
case 'REMOVE_OWNED_CHANNEL':
return void Channel.removeOwnedChannel(Env, msg[1], publicKey, function (e) {
if (e) { return void Respond(e); }
Respond(void 0, "OK");
});
case 'TRIM_OWNED_CHANNEL_HISTORY':
return void Channel.removeOwnedChannelHistory(Env, msg[1], publicKey, msg[2], function (e) {
if (e) { return void Respond(e); }
Respond(void 0, 'OK');
});
case 'REMOVE_PINS':
return void Pinning.removePins(Env, safeKey, function (e) {
if (e) { return void Respond(e); }
Respond(void 0, "OK");
});
case 'TRIM_PINS':
return void Pinning.trimPins(Env, safeKey, function (e) {
if (e) { return void Respond(e); }
Respond(void 0, "OK");
});
case 'UPLOAD':
return void Env.blobStore.upload(safeKey, msg[1], function (e, len) {
WARN(e, len);
Respond(e, len);
});
case 'UPLOAD_STATUS':
var filesize = msg[1];
return void Upload.upload_status(Env, safeKey, filesize, function (e, yes) {
if (!e && !yes) {
// no pending uploads, set the new size
var user = Core.getSession(Sessions, safeKey);
user.pendingUploadSize = filesize;
user.currentUploadSize = 0;
}
Respond(e, yes);
});
case 'UPLOAD_COMPLETE':
return void Env.blobStore.complete(safeKey, msg[1], function (e, hash) {
WARN(e, hash);
Respond(e, hash);
});
case 'OWNED_UPLOAD_COMPLETE':
return void Env.blobStore.completeOwned(safeKey, msg[1], function (e, blobId) {
WARN(e, blobId);
Respond(e, blobId);
});
case 'UPLOAD_CANCEL':
// msg[1] is fileSize
// if we pass it here, we can start an upload right away without calling
// UPLOAD_STATUS again
return void Env.blobStore.cancel(safeKey, msg[1], function (e) {
WARN(e, 'UPLOAD_CANCEL');
Respond(e);
});
case 'WRITE_LOGIN_BLOCK':
return void Block.writeLoginBlock(Env, msg[1], function (e) {
if (e) {
WARN(e, 'WRITE_LOGIN_BLOCK');
return void Respond(e);
}
Respond(e);
});
case 'REMOVE_LOGIN_BLOCK':
return void Block.removeLoginBlock(Env, msg[1], function (e) {
if (e) {
WARN(e, 'REMOVE_LOGIN_BLOCK');
return void Respond(e);
}
Respond(e);
});
case 'ADMIN':
return void Admin.command(Env, ctx, safeKey, config, msg[1], function (e, result) {
if (e) {
WARN(e, result);
return void Respond(e);
}
Respond(void 0, result);
});
case 'SET_METADATA':
return void Metadata.setMetadata(Env, msg[1], publicKey, function (e, data) {
if (e) {
WARN(e, data);
return void Respond(e);
}
Respond(void 0, data);
});
default:
return void Respond('UNSUPPORTED_RPC_CALL', msg);
}
};
handleMessage(true);
};
var rpc = function (ctx, data, respond) {
try {
return rpc0(ctx, data, respond);
} catch (e) {
console.log("Error from RPC with data " + JSON.stringify(data));
console.log(e.stack);
}
};
var updateLimitDaily = function () { var updateLimitDaily = function () {
Quota.updateLimits(Env, config, undefined, function (e) { Quota.updateLimits(Env, undefined, function (e) {
if (e) { if (e) {
WARN('limitUpdate', e); WARN('limitUpdate', e);
} }
}); });
}; };
Quota.applyCustomLimits(Env, config); Quota.applyCustomLimits(Env);
updateLimitDaily(); updateLimitDaily();
setInterval(updateLimitDaily, 24*3600*1000); setInterval(updateLimitDaily, 24*3600*1000);
@ -451,7 +465,16 @@ RPC.create = function (config, cb) {
Env.blobStore = blob; Env.blobStore = blob;
})); }));
}).nThen(function () { }).nThen(function () {
cb(void 0, rpc); // XXX it's ugly that we pass ctx and Env separately
// when they're effectively the same thing...
cb(void 0, function (ctx, data, respond) {
try {
return rpc(Env, ctx, data, respond);
} catch (e) {
console.log("Error from RPC with data " + JSON.stringify(data));
console.log(e.stack);
}
});
// expire old sessions once per minute // expire old sessions once per minute
// XXX allow for graceful shutdown // XXX allow for graceful shutdown
Env.sessionExpirationInterval = setInterval(function () { Env.sessionExpirationInterval = setInterval(function () {

Loading…
Cancel
Save