From ceb351326c1b50146f175a9ea8010333c97acb82 Mon Sep 17 00:00:00 2001 From: ansuz Date: Fri, 24 Jan 2020 13:36:14 -0500 Subject: [PATCH] split out some more rpc functionality and fix broken module paths --- lib/commands/channel.js | 193 +++++++++++++++++++++++++++++ lib/commands/metadata.js | 4 +- lib/commands/quota.js | 2 +- lib/commands/upload.js | 37 ++++++ lib/rpc.js | 260 ++++----------------------------------- 5 files changed, 255 insertions(+), 241 deletions(-) create mode 100644 lib/commands/channel.js create mode 100644 lib/commands/upload.js diff --git a/lib/commands/channel.js b/lib/commands/channel.js new file mode 100644 index 000000000..052aa3c44 --- /dev/null +++ b/lib/commands/channel.js @@ -0,0 +1,193 @@ +/*jshint esversion: 6 */ +const Channel = module.exports; + +const Util = require("../common-util"); +const nThen = require("nthen"); +const Core = require("./core"); +const Metadata = require("./metadata"); + +Channel.clearOwnedChannel = function (Env, channelId, unsafeKey, cb) { + if (typeof(channelId) !== 'string' || channelId.length !== 32) { + return cb('INVALID_ARGUMENTS'); + } + + Metadata.getMetadata(Env, channelId, function (err, metadata) { + if (err) { return void cb(err); } + if (!Core.hasOwners(metadata)) { return void cb('E_NO_OWNERS'); } + // Confirm that the channel is owned by the user in question + if (!Core.isOwner(metadata, unsafeKey)) { + return void cb('INSUFFICIENT_PERMISSIONS'); + } + return void Env.msgStore.clearChannel(channelId, function (e) { + cb(e); + }); + }); +}; + +Channel.removeOwnedChannel = function (Env, channelId, unsafeKey, cb) { + if (typeof(channelId) !== 'string' || !Core.isValidId(channelId)) { + return cb('INVALID_ARGUMENTS'); + } + + if (Env.blobStore.isFileId(channelId)) { + var safeKey = Util.escapeKeyCharacters(unsafeKey); + var blobId = channelId; + + return void nThen(function (w) { + // check if you have permissions + Env.blobStore.isOwnedBy(safeKey, blobId, w(function (err, owned) { + if (err || !owned) { + w.abort(); + return void cb("INSUFFICIENT_PERMISSIONS"); + } + })); + }).nThen(function (w) { + // remove the blob + return void Env.blobStore.archive.blob(blobId, w(function (err) { + Env.Log.info('ARCHIVAL_OWNED_FILE_BY_OWNER_RPC', { + safeKey: safeKey, + blobId: blobId, + status: err? String(err): 'SUCCESS', + }); + if (err) { + w.abort(); + return void cb(err); + } + })); + }).nThen(function () { + // archive the proof + return void Env.blobStore.archive.proof(safeKey, blobId, function (err) { + Env.Log.info("ARCHIVAL_PROOF_REMOVAL_BY_OWNER_RPC", { + safeKey: safeKey, + blobId: blobId, + status: err? String(err): 'SUCCESS', + }); + if (err) { + return void cb("E_PROOF_REMOVAL"); + } + cb(); + }); + }); + } + + Metadata.getMetadata(Env, channelId, function (err, metadata) { + if (err) { return void cb(err); } + if (!Core.hasOwners(metadata)) { return void cb('E_NO_OWNERS'); } + if (!Core.isOwner(metadata, unsafeKey)) { + return void cb('INSUFFICIENT_PERMISSIONS'); + } + // temporarily archive the file + return void Env.msgStore.archiveChannel(channelId, function (e) { + Env.Log.info('ARCHIVAL_CHANNEL_BY_OWNER_RPC', { + unsafeKey: unsafeKey, + channelId: channelId, + status: e? String(e): 'SUCCESS', + }); + cb(e); + }); + }); +}; + +Channel.removeOwnedChannelHistory = function (Env, channelId, unsafeKey, hash, cb) { + nThen(function (w) { + Metadata.getMetadata(Env, channelId, w(function (err, metadata) { + if (err) { return void cb(err); } + if (!Core.hasOwners(metadata)) { + w.abort(); + return void cb('E_NO_OWNERS'); + } + if (!Core.isOwner(metadata, unsafeKey)) { + w.abort(); + return void cb("INSUFFICIENT_PERMISSIONS"); + } + // else fall through to the next block + })); + }).nThen(function () { + Env.msgStore.trimChannel(channelId, hash, function (err) { + if (err) { return void cb(err); } + + + // XXX you must also clear the channel's index from historyKeeper cache + }); + }); +}; + +var ARRAY_LINE = /^\[/; + +/* Files can contain metadata but not content + call back with true if the channel log has no content other than metadata + otherwise false +*/ +Channel.isNewChannel = function (Env, channel, cb) { + if (!Core.isValidId(channel)) { return void cb('INVALID_CHAN'); } + if (channel.length !== 32) { return void cb('INVALID_CHAN'); } + + var done = false; + Env.msgStore.getMessages(channel, function (msg) { + if (done) { return; } + try { + if (typeof(msg) === 'string' && ARRAY_LINE.test(msg)) { + done = true; + return void cb(void 0, false); + } + } catch (e) { + Env.WARN('invalid message read from store', e); + } + }, function () { + if (done) { return; } + // no more messages... + cb(void 0, true); + }); +}; + +/* writePrivateMessage + allows users to anonymously send a message to the channel + prevents their netflux-id from being stored in history + and from being broadcast to anyone that might currently be in the channel + + Otherwise behaves the same as sending to a channel +*/ +Channel.writePrivateMessage = function (Env, args, nfwssCtx, cb) { + var channelId = args[0]; + var msg = args[1]; + + // don't bother handling empty messages + if (!msg) { return void cb("INVALID_MESSAGE"); } + + // don't support anything except regular channels + if (!Core.isValidId(channelId) || channelId.length !== 32) { + return void cb("INVALID_CHAN"); + } + + // We expect a modern netflux-websocket-server instance + // if this API isn't here everything will fall apart anyway + if (!(nfwssCtx && nfwssCtx.historyKeeper && typeof(nfwssCtx.historyKeeper.onChannelMessage) === 'function')) { + return void cb("NOT_IMPLEMENTED"); + } + + // historyKeeper expects something with an 'id' attribute + // it will fail unless you provide it, but it doesn't need anything else + var channelStruct = { + id: channelId, + }; + + // construct a message to store and broadcast + var fullMessage = [ + 0, // idk + null, // normally the netflux id, null isn't rejected, and it distinguishes messages written in this way + "MSG", // indicate that this is a MSG + channelId, // channel id + msg // the actual message content. Generally a string + ]; + + // store the message and do everything else that is typically done when going through historyKeeper + nfwssCtx.historyKeeper.onChannelMessage(nfwssCtx, channelStruct, fullMessage); + + // call back with the message and the target channel. + // historyKeeper will take care of broadcasting it if anyone is in the channel + cb(void 0, { + channel: channelId, + message: fullMessage + }); +}; + diff --git a/lib/commands/metadata.js b/lib/commands/metadata.js index 91b08d8cc..3a21aae0b 100644 --- a/lib/commands/metadata.js +++ b/lib/commands/metadata.js @@ -3,8 +3,8 @@ const Data = module.exports; const Meta = require("../metadata"); const BatchRead = require("../batch-read"); -const WriteQueue = require("./write-queue"); -const Core = require("./commands/core"); +const WriteQueue = require("../write-queue"); +const Core = require("./core"); const batchMetadata = BatchRead("GET_METADATA"); Data.getMetadata = function (Env, channel, cb) { diff --git a/lib/commands/quota.js b/lib/commands/quota.js index e7df14364..92b1c3cd0 100644 --- a/lib/commands/quota.js +++ b/lib/commands/quota.js @@ -2,7 +2,7 @@ /* globals Buffer*/ const Quota = module.exports; -const Core = require("./commands/core"); +const Core = require("./core"); const Util = require("../common-util"); const Package = require('../../package.json'); const Https = require("https"); diff --git a/lib/commands/upload.js b/lib/commands/upload.js new file mode 100644 index 000000000..f89cf2904 --- /dev/null +++ b/lib/commands/upload.js @@ -0,0 +1,37 @@ +/*jshint esversion: 6 */ +const Upload = module.exports; +const Util = require("../common-util"); +const Pinning = require("./pin-rpc"); +const nThen = require("nthen"); + +// upload_status +Upload.upload_status = function (Env, safeKey, filesize, _cb) { // FIXME FILES + var cb = Util.once(Util.mkAsync(_cb)); + + // validate that the provided size is actually a positive number + if (typeof(filesize) !== 'number' && + filesize >= 0) { return void cb('E_INVALID_SIZE'); } + + if (filesize >= Env.maxUploadSize) { return cb('TOO_LARGE'); } + + nThen(function (w) { + var abortAndCB = Util.both(w.abort, cb); + Env.blobStore.status(safeKey, w(function (err, inProgress) { + // if there's an error something is weird + if (err) { return void abortAndCB(err); } + + // we cannot upload two things at once + if (inProgress) { return void abortAndCB(void 0, true); } + })); + }).nThen(function () { + // if yuo're here then there are no pending uploads + // check if you have space in your quota to upload something of this size + Pinning.getFreeSpace(Env, safeKey, function (e, free) { + if (e) { return void cb(e); } + if (filesize >= free) { return cb('NOT_ENOUGH_SPACE'); } + cb(void 0, false); + }); + }); +}; + + diff --git a/lib/rpc.js b/lib/rpc.js index 0ddfdb128..1912868ed 100644 --- a/lib/rpc.js +++ b/lib/rpc.js @@ -2,7 +2,6 @@ const nThen = require("nthen"); const Util = require("./common-util"); -const escapeKeyCharacters = Util.escapeKeyCharacters; const mkEvent = Util.mkEvent; const Core = require("./commands/core"); @@ -11,208 +10,13 @@ const Pinning = require("./commands/pin-rpc"); const Quota = require("./commands/quota"); const Block = require("./commands/block"); const Metadata = require("./commands/metadata"); +const Channel = require("./commands/channel"); +const Upload = require("./commands/upload"); var RPC = module.exports; -var Store = require("../storage/file"); -var BlobStore = require("../storage/blob"); - -var Log; - -var WARN = function (e, output) { - if (e && output) { - Log.warn(e, { - output: output, - message: String(e), - stack: new Error(e).stack, - }); - } -}; - -var clearOwnedChannel = function (Env, channelId, unsafeKey, cb) { - if (typeof(channelId) !== 'string' || channelId.length !== 32) { - return cb('INVALID_ARGUMENTS'); - } - - Metadata.getMetadata(Env, channelId, function (err, metadata) { - if (err) { return void cb(err); } - if (!Core.hasOwners(metadata)) { return void cb('E_NO_OWNERS'); } - // Confirm that the channel is owned by the user in question - if (!Core.isOwner(metadata, unsafeKey)) { - return void cb('INSUFFICIENT_PERMISSIONS'); - } - return void Env.msgStore.clearChannel(channelId, function (e) { - cb(e); - }); - }); -}; - -var removeOwnedChannel = function (Env, channelId, unsafeKey, cb) { - if (typeof(channelId) !== 'string' || !Core.isValidId(channelId)) { - return cb('INVALID_ARGUMENTS'); - } - - if (Env.blobStore.isFileId(channelId)) { - var safeKey = escapeKeyCharacters(unsafeKey); - var blobId = channelId; - - return void nThen(function (w) { - // check if you have permissions - Env.blobStore.isOwnedBy(safeKey, blobId, w(function (err, owned) { - if (err || !owned) { - w.abort(); - return void cb("INSUFFICIENT_PERMISSIONS"); - } - })); - }).nThen(function (w) { - // remove the blob - return void Env.blobStore.archive.blob(blobId, w(function (err) { - Log.info('ARCHIVAL_OWNED_FILE_BY_OWNER_RPC', { - safeKey: safeKey, - blobId: blobId, - status: err? String(err): 'SUCCESS', - }); - if (err) { - w.abort(); - return void cb(err); - } - })); - }).nThen(function () { - // archive the proof - return void Env.blobStore.archive.proof(safeKey, blobId, function (err) { - Log.info("ARCHIVAL_PROOF_REMOVAL_BY_OWNER_RPC", { - safeKey: safeKey, - blobId: blobId, - status: err? String(err): 'SUCCESS', - }); - if (err) { - return void cb("E_PROOF_REMOVAL"); - } - cb(); - }); - }); - } - - Metadata.getMetadata(Env, channelId, function (err, metadata) { - if (err) { return void cb(err); } - if (!Core.hasOwners(metadata)) { return void cb('E_NO_OWNERS'); } - if (!Core.isOwner(metadata, unsafeKey)) { - return void cb('INSUFFICIENT_PERMISSIONS'); - } - // temporarily archive the file - return void Env.msgStore.archiveChannel(channelId, function (e) { - Log.info('ARCHIVAL_CHANNEL_BY_OWNER_RPC', { - unsafeKey: unsafeKey, - channelId: channelId, - status: e? String(e): 'SUCCESS', - }); - cb(e); - }); - }); -}; - -var removeOwnedChannelHistory = function (Env, channelId, unsafeKey, hash, cb) { - nThen(function (w) { - Metadata.getMetadata(Env, channelId, w(function (err, metadata) { - if (err) { return void cb(err); } - if (!Core.hasOwners(metadata)) { - w.abort(); - return void cb('E_NO_OWNERS'); - } - if (!Core.isOwner(metadata, unsafeKey)) { - w.abort(); - return void cb("INSUFFICIENT_PERMISSIONS"); - } - // else fall through to the next block - })); - }).nThen(function () { - Env.msgStore.trimChannel(channelId, hash, function (err) { - if (err) { return void cb(err); } - - - // XXX you must also clear the channel's index from historyKeeper cache - }); - }); -}; - -var ARRAY_LINE = /^\[/; - -/* Files can contain metadata but not content - call back with true if the channel log has no content other than metadata - otherwise false -*/ -var isNewChannel = function (Env, channel, cb) { - if (!Core.isValidId(channel)) { return void cb('INVALID_CHAN'); } - if (channel.length !== 32) { return void cb('INVALID_CHAN'); } - - var done = false; - Env.msgStore.getMessages(channel, function (msg) { - if (done) { return; } - try { - if (typeof(msg) === 'string' && ARRAY_LINE.test(msg)) { - done = true; - return void cb(void 0, false); - } - } catch (e) { - WARN('invalid message read from store', e); - } - }, function () { - if (done) { return; } - // no more messages... - cb(void 0, true); - }); -}; - -/* writePrivateMessage - allows users to anonymously send a message to the channel - prevents their netflux-id from being stored in history - and from being broadcast to anyone that might currently be in the channel - - Otherwise behaves the same as sending to a channel -*/ -var writePrivateMessage = function (Env, args, nfwssCtx, cb) { - var channelId = args[0]; - var msg = args[1]; - - // don't bother handling empty messages - if (!msg) { return void cb("INVALID_MESSAGE"); } - - // don't support anything except regular channels - if (!Core.isValidId(channelId) || channelId.length !== 32) { - return void cb("INVALID_CHAN"); - } - - // We expect a modern netflux-websocket-server instance - // if this API isn't here everything will fall apart anyway - if (!(nfwssCtx && nfwssCtx.historyKeeper && typeof(nfwssCtx.historyKeeper.onChannelMessage) === 'function')) { - return void cb("NOT_IMPLEMENTED"); - } - - // historyKeeper expects something with an 'id' attribute - // it will fail unless you provide it, but it doesn't need anything else - var channelStruct = { - id: channelId, - }; - - // construct a message to store and broadcast - var fullMessage = [ - 0, // idk - null, // normally the netflux id, null isn't rejected, and it distinguishes messages written in this way - "MSG", // indicate that this is a MSG - channelId, // channel id - msg // the actual message content. Generally a string - ]; - - // store the message and do everything else that is typically done when going through historyKeeper - nfwssCtx.historyKeeper.onChannelMessage(nfwssCtx, channelStruct, fullMessage); - - // call back with the message and the target channel. - // historyKeeper will take care of broadcasting it if anyone is in the channel - cb(void 0, { - channel: channelId, - message: fullMessage - }); -}; +const Store = require("../storage/file"); +const BlobStore = require("../storage/blob"); var isUnauthenticatedCall = function (call) { return [ @@ -254,38 +58,8 @@ var isAuthenticatedCall = function (call) { ].indexOf(call) !== -1; }; -// upload_status -var upload_status = function (Env, safeKey, filesize, _cb) { // FIXME FILES - var cb = Util.once(Util.mkAsync(_cb)); - - // validate that the provided size is actually a positive number - if (typeof(filesize) !== 'number' && - filesize >= 0) { return void cb('E_INVALID_SIZE'); } - - if (filesize >= Env.maxUploadSize) { return cb('TOO_LARGE'); } - - nThen(function (w) { - var abortAndCB = Util.both(w.abort, cb); - Env.blobStore.status(safeKey, w(function (err, inProgress) { - // if there's an error something is weird - if (err) { return void abortAndCB(err); } - - // we cannot upload two things at once - if (inProgress) { return void abortAndCB(void 0, true); } - })); - }).nThen(function () { - // if yuo're here then there are no pending uploads - // check if you have space in your quota to upload something of this size - Pinning.getFreeSpace(Env, safeKey, function (e, free) { - if (e) { return void cb(e); } - if (filesize >= free) { return cb('NOT_ENOUGH_SPACE'); } - cb(void 0, false); - }); - }); -}; - RPC.create = function (config, cb) { - Log = config.log; + var Log = config.log; // load pin-store... Log.silly('LOADING RPC MODULE'); @@ -294,6 +68,16 @@ RPC.create = function (config, cb) { return typeof(config[key]) === 'string'? config[key]: def; }; + var WARN = function (e, output) { + if (e && output) { + Log.warn(e, { + output: output, + message: String(e), + stack: new Error(e).stack, + }); + } + }; + var Env = { defaultStorageLimit: config.defaultStorageLimit, maxUploadSize: config.maxUploadSize || (20 * 1024 * 1024), @@ -382,11 +166,11 @@ RPC.create = function (config, cb) { respond(null, [null, isPinned, null]); }); case 'IS_NEW_CHANNEL': - return void isNewChannel(Env, msg[1], function (e, isNew) { + return void Channel.isNewChannel(Env, msg[1], function (e, isNew) { respond(e, [null, isNew, null]); }); case 'WRITE_PRIVATE_MESSAGE': - return void writePrivateMessage(Env, msg[1], nfwssCtx, function (e, output) { + return void Channel.writePrivateMessage(Env, msg[1], nfwssCtx, function (e, output) { respond(e, output); }); default: @@ -450,7 +234,7 @@ RPC.create = function (config, cb) { return void respond("INVALID_RPC_CALL"); } - var safeKey = escapeKeyCharacters(publicKey); + var safeKey = Util.escapeKeyCharacters(publicKey); /* If you have gotten this far, you have signed the message with the public key which you provided. @@ -528,18 +312,18 @@ RPC.create = function (config, cb) { Respond(void 0, "OK"); }); case 'CLEAR_OWNED_CHANNEL': - return void clearOwnedChannel(Env, msg[1], publicKey, function (e, response) { + return void Channel.clearOwnedChannel(Env, msg[1], publicKey, function (e, response) { if (e) { return void Respond(e); } Respond(void 0, response); }); case 'REMOVE_OWNED_CHANNEL': - return void removeOwnedChannel(Env, msg[1], publicKey, function (e) { + return void Channel.removeOwnedChannel(Env, msg[1], publicKey, function (e) { if (e) { return void Respond(e); } Respond(void 0, "OK"); }); case 'TRIM_OWNED_CHANNEL_HISTORY': - return void removeOwnedChannelHistory(Env, msg[1], publicKey, msg[2], function (e) { + return void Channel.removeOwnedChannelHistory(Env, msg[1], publicKey, msg[2], function (e) { if (e) { return void Respond(e); } Respond(void 0, 'OK'); }); @@ -560,7 +344,7 @@ RPC.create = function (config, cb) { }); case 'UPLOAD_STATUS': var filesize = msg[1]; - return void upload_status(Env, safeKey, filesize, function (e, yes) { + return void Upload.upload_status(Env, safeKey, filesize, function (e, yes) { if (!e && !yes) { // no pending uploads, set the new size var user = Core.getSession(Sessions, safeKey);