From 802034616c3144d167fe46d8abb63c0b1907fd72 Mon Sep 17 00:00:00 2001 From: ansuz Date: Fri, 7 Feb 2020 12:53:12 -0500 Subject: [PATCH] centralize historykeeper-rpc interaction in rpc methods --- lib/commands/channel.js | 87 +++++++++++++++++++++++++++++++++++----- lib/commands/metadata.js | 16 ++++++-- lib/commands/pin-rpc.js | 5 +-- lib/historyKeeper.js | 76 ++--------------------------------- lib/rpc.js | 2 +- 5 files changed, 96 insertions(+), 90 deletions(-) diff --git a/lib/commands/channel.js b/lib/commands/channel.js index bbb83b45a..88404a9b2 100644 --- a/lib/commands/channel.js +++ b/lib/commands/channel.js @@ -6,7 +6,7 @@ const nThen = require("nthen"); const Core = require("./core"); const Metadata = require("./metadata"); -Channel.clearOwnedChannel = function (Env, safeKey, channelId, cb) { +Channel.clearOwnedChannel = function (Env, safeKey, channelId, cb, Server) { if (typeof(channelId) !== 'string' || channelId.length !== 32) { return cb('INVALID_ARGUMENTS'); } @@ -20,19 +20,46 @@ Channel.clearOwnedChannel = function (Env, safeKey, channelId, cb) { return void cb('INSUFFICIENT_PERMISSIONS'); } return void Env.msgStore.clearChannel(channelId, function (e) { - cb(e); + if (e) { return void cb(e); } + cb(); + + const channel_cache = Env.historyKeeper.channel_cache; + + const clear = function () { + // delete the channel cache because it will have been invalidated + delete channel_cache[channelId]; + }; + + nThen(function (w) { + Server.getChannelUserList(channelId).forEach(function (userId) { + Server.send(userId, [ + 0, + Env.historyKeeper.id, + 'MSG', + userId, + JSON.stringify({ + error: 'ECLEARED', + channel: channelId + }) + ], w()); + }); + }).nThen(function () { + clear(); + }).orTimeout(function () { + Env.Log.warn("ON_CHANNEL_CLEARED_TIMEOUT", channelId); + clear(); + }, 30000); }); }); }; -Channel.removeOwnedChannel = function (Env, safeKey, channelId, cb) { +Channel.removeOwnedChannel = function (Env, safeKey, channelId, cb, Server) { if (typeof(channelId) !== 'string' || !Core.isValidId(channelId)) { return cb('INVALID_ARGUMENTS'); } var unsafeKey = Util.unescapeKeyCharacters(safeKey); if (Env.blobStore.isFileId(channelId)) { - //var safeKey = Util.escapeKeyCharacters(unsafeKey); var blobId = channelId; return void nThen(function (w) { @@ -89,6 +116,45 @@ Channel.removeOwnedChannel = function (Env, safeKey, channelId, cb) { return void cb(e); } cb(void 0, 'OK'); + + const channel_cache = Env.historyKeeper.channel_cache; + const metadata_cache = Env.historyKeeper.metadata_cache; + + const clear = function () { + delete channel_cache[channelId]; + Server.clearChannel(channelId); + delete metadata_cache[channelId]; + }; + + // an owner of a channel deleted it + nThen(function (w) { + // close the channel in the store + Env.msgStore.closeChannel(channelId, w()); + }).nThen(function (w) { + // Server.channelBroadcast would be better + // but we can't trust it to track even one callback, + // let alone many in parallel. + // so we simulate it on this side to avoid race conditions + Server.getChannelUserList(channelId).forEach(function (userId) { + Server.send(userId, [ + 0, + Env.historyKeeper.id, + "MSG", + userId, + JSON.stringify({ + error: 'EDELETED', + channel: channelId, + }) + ], w()); + }); + }).nThen(function () { + // clear the channel's data from memory + // once you've sent everyone a notice that the channel has been deleted + clear(); + }).orTimeout(function () { + Env.Log.warn('ON_CHANNEL_DELETED_TIMEOUT', channelId); + clear(); + }, 30000); }); }); }; @@ -121,6 +187,8 @@ Channel.trimHistory = function (Env, safeKey, data, cb) { // clear historyKeeper's cache for this channel Env.historyKeeper.channelClose(channelId); cb(void 0, 'OK'); + delete Env.historyKeeper.channel_cache[channelId]; + delete Env.historyKeeper.metadata_cache[channelId]; }); }); }; @@ -160,7 +228,7 @@ Channel.isNewChannel = function (Env, channel, cb) { Otherwise behaves the same as sending to a channel */ -Channel.writePrivateMessage = function (Env, args, cb, Server) { // XXX odd signature +Channel.writePrivateMessage = function (Env, args, cb, Server) { var channelId = args[0]; var msg = args[1]; @@ -197,11 +265,10 @@ Channel.writePrivateMessage = function (Env, args, cb, Server) { // XXX odd sign // if the message isn't valid it won't be stored. Env.historyKeeper.channelMessage(Server, channelStruct, fullMessage); - // call back with the message and the target channel. - // historyKeeper will take care of broadcasting it if anyone is in the channel - cb(void 0, { - channel: channelId, - message: fullMessage + Server.getChannelUserList(channelId).forEach(function (userId) { + Server.send(userId, fullMessage); }); + + cb(); }; diff --git a/lib/commands/metadata.js b/lib/commands/metadata.js index 09cf1f1d6..41aea9888 100644 --- a/lib/commands/metadata.js +++ b/lib/commands/metadata.js @@ -12,8 +12,7 @@ Data.getMetadata = function (Env, channel, cb/* , Server */) { if (!Core.isValidId(channel)) { return void cb('INVALID_CHAN'); } if (channel.length !== 32) { return cb("INVALID_CHAN_LENGTH"); } - // XXX get metadata from the server cache if it is available - // Server isn't always passed, though... + // FIXME get metadata from the server cache if it is available batchMetadata(channel, cb, function (done) { var ref = {}; var lineHandler = Meta.createLineHandler(ref, Env.Log.error); @@ -37,7 +36,7 @@ Data.getMetadata = function (Env, channel, cb/* , Server */) { } */ var queueMetadata = WriteQueue(); -Data.setMetadata = function (Env, safeKey, data, cb) { +Data.setMetadata = function (Env, safeKey, data, cb, Server) { var unsafeKey = Util.unescapeKeyCharacters(safeKey); var channel = data.channel; @@ -108,8 +107,19 @@ Data.setMetadata = function (Env, safeKey, data, cb) { cb(e); return void next(); } + cb(void 0, metadata); next(); + + const metadata_cache = Env.historyKeeper.metadata_cache; + const channel_cache = Env.historyKeeper.channel_cache; + + metadata_cache[channel] = metadata; + + var index = Util.find(channel_cache, [channel, 'index']); + if (index && typeof(index) === 'object') { index.metadata = metadata; } + + Server.channelBroadcast(channel, JSON.stringify(metadata), Env.historyKeeper.id); }); }); }); diff --git a/lib/commands/pin-rpc.js b/lib/commands/pin-rpc.js index 2478910a7..d4bb96b71 100644 --- a/lib/commands/pin-rpc.js +++ b/lib/commands/pin-rpc.js @@ -205,7 +205,6 @@ Pinning.removePins = function (Env, safeKey, cb) { }; Pinning.trimPins = function (Env, safeKey, cb) { - // XXX trim to latest pin checkpoint cb("NOT_IMPLEMENTED"); }; @@ -453,10 +452,10 @@ Pinning.loadChannelPins = function (Env) { Pinning.isChannelPinned = function (Env, channel, cb) { Env.evPinnedPadsReady.reg(() => { - if (Env.pinnedPads[channel] && Object.keys(Env.pinnedPads[channel]).length) { + if (Env.pinnedPads[channel] && Object.keys(Env.pinnedPads[channel]).length) { // FIXME 'Object.keys' here is overkill. We only need to know that it isn't empty cb(void 0, true); } else { - delete Env.pinnedPads[channel]; // XXX WAT + delete Env.pinnedPads[channel]; cb(void 0, false); } }); diff --git a/lib/historyKeeper.js b/lib/historyKeeper.js index ff7b531e9..9c859f1fb 100644 --- a/lib/historyKeeper.js +++ b/lib/historyKeeper.js @@ -644,48 +644,6 @@ module.exports.create = function (cfg, cb) { }); }; - - /* onChannelCleared - * broadcasts to all clients in a channel if that channel is deleted - */ - const onChannelCleared = function (Server, channel) { - Server.channelBroadcast(channel, { - error: 'ECLEARED', - channel: channel - }, HISTORY_KEEPER_ID); - }; - - // When a channel is removed from datastore, broadcast a message to all its connected users - const onChannelDeleted = function (Server, channel) { - store.closeChannel(channel, function () { - Server.channelBroadcast(channel, { - error: 'EDELETED', - channel: channel - }, HISTORY_KEEPER_ID); - }); - - delete channel_cache[channel]; - Server.clearChannel(channel); - delete metadata_cache[channel]; - }; - // Check if the selected channel is expired - // If it is, remove it from memory and broadcast a message to its members - - const onChannelMetadataChanged = function (Server, channel, metadata) { - if (!(channel && metadata_cache[channel] && typeof (metadata) === "object")) { return; } - Log.silly('SET_METADATA_CACHE', { - channel: channel, - metadata: JSON.stringify(metadata), - }); - - metadata_cache[channel] = metadata; - - if (channel_cache[channel] && channel_cache[channel].index) { - channel_cache[channel].index.metadata = metadata; - } - Server.channelBroadcast(channel, metadata, HISTORY_KEEPER_ID); - }; - const handleGetHistory = function (Server, seq, userId, parsed) { // parsed[1] is the channel id // parsed[2] is a validation key or an object containing metadata (optionnal) @@ -892,37 +850,6 @@ module.exports.create = function (cfg, cb) { Server.send(userId, [0, HISTORY_KEEPER_ID, 'MSG', userId, JSON.stringify([parsed[0], 'ERROR', err])]); return; } - var msg = rpc_call[0].slice(); - if (msg[3] === 'REMOVE_OWNED_CHANNEL') { - onChannelDeleted(Server, msg[4]); - } - if (msg[3] === 'CLEAR_OWNED_CHANNEL') { - onChannelCleared(Server, msg[4]); - } - - if (msg[3] === 'SET_METADATA') { // or whatever we call the RPC???? - // make sure we update our cache of metadata - // or at least invalidate it and force other mechanisms to recompute its state - // 'output' could be the new state as computed by rpc - onChannelMetadataChanged(Server, msg[4].channel, output[1]); - } - - // unauthenticated RPC calls have a different message format - if (msg[0] === "WRITE_PRIVATE_MESSAGE" && output && output.channel) { - // clients don't validate messages sent by the historyKeeper - // so this broadcast needs to come from a different id - // we pass 'null' to indicate that it's not coming from a real user - // to ensure that they know not to trust this message - Server.getChannelUserList(output.channel).forEach(function (userId) { - Server.send(userId, output.message); - }); - - // rpc and anonRpc expect their responses to be of a certain length - // and we've already used the output of the rpc call, so overwrite it - output = [null, null, null]; - } - - // finally, send a response to the client that sent the RPC Server.send(userId, [0, HISTORY_KEEPER_ID, 'MSG', userId, JSON.stringify([parsed[0]].concat(output))]); }); } catch (e) { @@ -967,6 +894,9 @@ module.exports.create = function (cfg, cb) { }; cfg.historyKeeper = { + metadata_cache: metadata_cache, + channel_cache: channel_cache, + id: HISTORY_KEEPER_ID, channelMessage: function (Server, channel, msgStruct) { diff --git a/lib/rpc.js b/lib/rpc.js index 96b678856..60b186563 100644 --- a/lib/rpc.js +++ b/lib/rpc.js @@ -60,6 +60,7 @@ const AUTHENTICATED_USER_TARGETED = { WRITE_LOGIN_BLOCK: Block.writeLoginBlock, REMOVE_LOGIN_BLOCK: Block.removeLoginBlock, ADMIN: Admin.command, + SET_METADATA: Metadata.setMetadata, }; const AUTHENTICATED_USER_SCOPED = { @@ -70,7 +71,6 @@ const AUTHENTICATED_USER_SCOPED = { EXPIRE_SESSION: Core.expireSessionAsync, REMOVE_PINS: Pinning.removePins, TRIM_PINS: Pinning.trimPins, - SET_METADATA: Metadata.setMetadata, COOKIE: Core.haveACookie, };