diff --git a/lib/api.js b/lib/api.js index bd5c99629..8e6725039 100644 --- a/lib/api.js +++ b/lib/api.js @@ -14,21 +14,7 @@ module.exports.create = function (config) { .on('channelClose', historyKeeper.channelClose) .on('channelMessage', historyKeeper.channelMessage) .on('channelOpen', historyKeeper.channelOpen) - .on('sessionClose', function (userId, reason) { - if (['BAD_MESSAGE', 'SOCKET_ERROR', 'SEND_MESSAGE_FAIL_2'].indexOf(reason) !== -1) { - if (reason && reason.code === 'ECONNRESET') { return; } - return void log.error('SESSION_CLOSE_WITH_ERROR', { - userId: userId, - reason: reason, - }); - } - - if (reason && reason === 'SOCKET_CLOSED') { return; } - log.verbose('SESSION_CLOSE_ROUTINE', { - userId: userId, - reason: reason, - }); - }) + .on('sessionClose', historyKeeper.sessionClose) .on('error', function (error, label, info) { if (!error) { return; } /* labels: diff --git a/lib/commands/metadata.js b/lib/commands/metadata.js index 5b5e28f7e..78584d3ae 100644 --- a/lib/commands/metadata.js +++ b/lib/commands/metadata.js @@ -12,7 +12,7 @@ Data.getMetadata = function (Env, channel, cb/* , Server */) { if (!Core.isValidId(channel)) { return void cb('INVALID_CHAN'); } if (channel.length !== 32) { return cb("INVALID_CHAN_LENGTH"); } - // FIXME get metadata from the server cache if it is available + // XXX get metadata from the server cache if it is available batchMetadata(channel, cb, function (done) { var ref = {}; var lineHandler = Meta.createLineHandler(ref, Env.Log.error); @@ -108,6 +108,16 @@ Data.setMetadata = function (Env, safeKey, data, cb, Server) { return void next(); } + + // chainpad-server@4.0.3 supports a removeFromChannel method + // Server.removeFromChannel(channelName, userId); + // this lets us kick users from restricted channels + + // XXX RESTRICT + // if the metadata changes and includes an allowed list + // kick any current users from the channel + // if they aren't on it. + cb(void 0, metadata); next(); diff --git a/lib/historyKeeper.js b/lib/historyKeeper.js index 1b306681e..53de34e6b 100644 --- a/lib/historyKeeper.js +++ b/lib/historyKeeper.js @@ -46,6 +46,8 @@ module.exports.create = function (config, cb) { paths: {}, //msgStore: config.store, + netfluxUsers: {}, + pinStore: undefined, pinnedPads: {}, pinsLoaded: false, @@ -110,27 +112,81 @@ module.exports.create = function (config, cb) { // we drop cached metadata and indexes at the same time HK.dropChannel(Env, channelName); }, - channelOpen: function (Server, channelName, userId) { + channelOpen: function (Server, channelName, userId, wait) { Env.channel_cache[channelName] = Env.channel_cache[channelName] || {}; - //const metadata = Env.metadata_cache[channelName]; - // chainpad-server@4.0.3 supports a removeFromChannel method - // Server.removeFromChannel(channelName, userId); - // this lets us kick users from restricted channels - - // XXX RESTRICT - // this event is emitted whenever a user joins a channel. - // if that channel is restricted then we should forcefully disconnect them. - // we won't know that it's restricted until we load its metadata. - // as long as metadata is in memory as long as anyone is sending messages to a channel - // then we won't broadcast messages to unauthorized users - - Server.send(userId, [ - 0, - Env.id, - 'JOIN', - channelName - ]); + var proceed = function () { + Server.send(userId, [ + 0, + Env.id, + 'JOIN', + channelName + ]); + }; + + // only conventional channels can be restricted + if ((channelName || "").length !== 32) { // XXX use contants + return proceed(); + } + + var next = wait(); + + // gets and caches the metadata... + // XXX make sure it doesn't get stuck in cache... + HK.getMetadata(Env, channelName, function (err, metadata) { + if (err) { + console.log("> METADATA ERR", err); + throw new Error(err); // XXX + } + + if (!metadata || (metadata && !metadata.restricted)) { + // the channel doesn't have metadata, or it does and it's not restricted + // either way, let them join. + proceed(); + return void next(); + } + + // this channel is restricted. verify that the user in question is in the allow list + + // construct a definitive list (owners + allowed) + var allowed = HK.listAllowedUsers(metadata); + // and get the list of keys for which this user has already authenticated + var session = HK.getNetfluxSession(Env, userId); + + // iterate over their keys. If any of them are in the allow list, let them join + if (session) { + for (var unsafeKey in session) { + if (allowed.indexOf(unsafeKey) !== -1) { + proceed(); + return void next(); + } + } + } + + // otherwise they're not allowed. + // respond with a special error that includes the list of keys + // which would be allowed... + // XXX bonus points if you hash the keys to limit data exposure + next(["ERESTRICTED"].concat(allowed)); + }); + }, + sessionClose: function (userId, reason) { + HK.closeNetfluxSession(Env, userId); + + // XXX RESTRICT drop user session data + if (['BAD_MESSAGE', 'SOCKET_ERROR', 'SEND_MESSAGE_FAIL_2'].indexOf(reason) !== -1) { + if (reason && reason.code === 'ECONNRESET') { return; } + return void Log.error('SESSION_CLOSE_WITH_ERROR', { + userId: userId, + reason: reason, + }); + } + + if (reason && reason === 'SOCKET_CLOSED') { return; } + Log.verbose('SESSION_CLOSE_ROUTINE', { + userId: userId, + reason: reason, + }); }, directMessage: function (Server, seq, userId, json) { // netflux-server allows you to register an id with a handler diff --git a/lib/hk-util.js b/lib/hk-util.js index c94a67bd5..eac886c79 100644 --- a/lib/hk-util.js +++ b/lib/hk-util.js @@ -4,7 +4,7 @@ var HK = module.exports; const nThen = require('nthen'); const Util = require("./common-util"); -const Meta = require("./metadata"); +const MetaRPC = require("./commands/metadata"); const Nacl = require('tweetnacl/nacl-fast'); const now = function () { return (new Date()).getTime(); }; @@ -80,6 +80,23 @@ const isChannelRestricted = function (metadata) { // XXX RESTRICT return false; }; +HK.listAllowedUsers = function (metadata) { + return (metadata.owners || []).concat((metadata.allowed || [])); +}; + +HK.getNetfluxSession = function (Env, netfluxId) { + return Env.netfluxUsers[netfluxId]; +}; + +HK.authenticateNetfluxSession = function (Env, netfluxId, unsafeKey) { + var user = Env.netfluxUsers[netfluxId] = Env.netfluxUsers[netfluxId] || {}; + user[unsafeKey] = +new Date(); +}; + +HK.closeNetfluxSession = function (Env, netfluxId) { + delete Env.netfluxUsers[netfluxId]; +}; + const isUserAllowed = function (metadata, userId) { // XXX RESTRICT /* @@ -174,6 +191,23 @@ const checkExpired = function (Env, Server, channel) { return true; }; +const getMetadata = HK.getMetadata = function (Env, channelName, cb) { + var metadata = Env.metadata_cache[channelName]; + if (metadata && typeof(metadata) === 'object') { + return void Util.mkAsync(cb)(undefined, metadata); + } + + MetaRPC.getMetadata(Env, channelName, function (err, metadata) { + if (err) { + console.error(err); + return void cb(err); + } + // cache it + Env.metadata_cache[channelName] = metadata; + cb(undefined, metadata); + }); +}; + /* computeIndex can call back with an error or a computed index which includes: * cpIndex: @@ -203,13 +237,18 @@ const computeIndex = function (Env, channelName, cb) { let metadata; let i = 0; - const ref = {}; - const CB = Util.once(cb); const offsetByHash = {}; let size = 0; nThen(function (w) { + getMetadata(Env, channelName, w(function (err, _metadata) { + if (err) { + throw new Error(err); + } + metadata = _metadata; + })); + }).nThen(function (w) { // iterate over all messages in the channel log // old channels can contain metadata as the first message of the log // remember metadata the first time you encounter it @@ -218,14 +257,15 @@ const computeIndex = function (Env, channelName, cb) { let msg; // keep an eye out for the metadata line if you haven't already seen it // but only check for metadata on the first line - if (!i && !metadata && msgObj.buff.indexOf('{') === 0) { + if (!i && msgObj.buff.indexOf('{') === 0) { // XXX RESTRICT metadata... i++; // always increment the message counter msg = tryParse(Env, msgObj.buff.toString('utf8')); if (typeof msg === "undefined") { return readMore(); } // validate that the current line really is metadata before storing it as such - if (isMetadataMessage(msg)) { - metadata = msg; + if (isMetadataMessage(msg)) { // XXX RESTRICT + //metadata = msg; // XXX RESTRICT + // skip this, as you already have metadata... return readMore(); } } @@ -268,26 +308,8 @@ const computeIndex = function (Env, channelName, cb) { size = msgObj.offset + msgObj.buff.length + 1; }); })); - }).nThen(function (w) { - // create a function which will iterate over amendments to the metadata - const handler = Meta.createLineHandler(ref, Log.error); - - // initialize the accumulator in case there was a foundational metadata line in the log content - if (metadata) { handler(void 0, metadata); } - - // iterate over the dedicated metadata log (if it exists) - // proceed even in the event of a stream error on the metadata log - store.readDedicatedMetadata(channelName, handler, w(function (err) { - if (err) { - return void Log.error("DEDICATED_METADATA_ERROR", err); - } - })); }).nThen(function () { - // when all is done, cache the metadata in memory - if (ref.index) { // but don't bother if no metadata was found... - metadata = Env.metadata_cache[channelName] = ref.meta; - } - // and return the computed index + // return the computed index CB(null, { // Only keep the checkpoints included in the last 100 messages cpIndex: sliceCpIndex(cpIndex, i), @@ -316,9 +338,7 @@ const getIndex = (Env, channelName, cb) => { // if there is a channel in memory and it has an index cached, return it if (chan && chan.index) { // enforce async behaviour - return void setTimeout(function () { - cb(undefined, chan.index); - }); + return void Util.mkAsync(cb)(undefined, chan.index); } Env.batchIndexReads(channelName, cb, function (done) { @@ -592,7 +612,7 @@ const handleRPC = function (Env, Server, seq, userId, parsed) { Server.send(userId, [seq, 'ACK']); try { // slice off the sequence number and pass in the rest of the message - Env.rpc(Server, rpc_call, function (err, output) { + Env.rpc(Server, userId, rpc_call, function (err, output) { if (err) { Server.send(userId, [0, HISTORY_KEEPER_ID, 'MSG', userId, JSON.stringify([parsed[0], 'ERROR', err])]); return; diff --git a/lib/metadata.js b/lib/metadata.js index a8ed86fd3..46d261e3d 100644 --- a/lib/metadata.js +++ b/lib/metadata.js @@ -224,6 +224,9 @@ commands.RM_OWNERS = function (meta, args) { changed = true; }); + // XXX RESTRICT only owned channels can be restricted + // drop the restricted flag if there are no owners + return changed; }; @@ -302,6 +305,9 @@ commands.RESET_OWNERS = function (meta, args) { }); } + // XXX RESTRICT only owned channels can be restricted + // drop the restricted flag if there are no owners + return true; }; diff --git a/lib/rpc.js b/lib/rpc.js index 0d9c605f7..ad6ccde5c 100644 --- a/lib/rpc.js +++ b/lib/rpc.js @@ -9,6 +9,7 @@ const Block = require("./commands/block"); const Metadata = require("./commands/metadata"); const Channel = require("./commands/channel"); const Upload = require("./commands/upload"); +const HK = require("./hk-util"); var RPC = module.exports; @@ -117,7 +118,7 @@ var handleAuthenticatedMessage = function (Env, unsafeKey, msg, respond, Server) return void Respond('UNSUPPORTED_RPC_CALL', msg); }; -var rpc = function (Env, Server, data, respond) { +var rpc = function (Env, Server, userId, data, respond) { if (!Array.isArray(data)) { Env.Log.debug('INVALID_ARG_FORMET', data); return void respond('INVALID_ARG_FORMAT'); @@ -143,8 +144,9 @@ var rpc = function (Env, Server, data, respond) { var publicKey = msg.shift(); // make sure a user object is initialized in the cookie jar + var session; if (publicKey) { - Core.getSession(Env.Sessions, publicKey); + session = Core.getSession(Env.Sessions, publicKey); } else { Env.Log.debug("NO_PUBLIC_KEY_PROVIDED", publicKey); } @@ -174,6 +176,7 @@ var rpc = function (Env, Server, data, respond) { // check the signature on the message // refuse the command if it doesn't validate if (Core.checkSignature(Env, serialized, signature, publicKey) === true) { + HK.authenticateNetfluxSession(Env, userId, publicKey); return void handleAuthenticatedMessage(Env, publicKey, msg, respond, Server); } return void respond("INVALID_SIGNATURE_OR_PUBLIC_KEY"); @@ -202,9 +205,9 @@ RPC.create = function (Env, cb) { Core.expireSessions(Sessions); }, Core.SESSION_EXPIRATION_TIME); - cb(void 0, function (Server, data, respond) { + cb(void 0, function (Server, userId, data, respond) { try { - return rpc(Env, Server, data, respond); + return rpc(Env, Server, userId, data, respond); } catch (e) { console.log("Error from RPC with data " + JSON.stringify(data)); console.log(e.stack);