From 338f967149247dcb3c42332320133c1315a85f7d Mon Sep 17 00:00:00 2001 From: ansuz Date: Thu, 22 Aug 2019 11:21:10 +0200 Subject: [PATCH] restructure onChannelMessage --- historyKeeper.js | 118 +++++++++++++++++++++++++++++++++-------------- 1 file changed, 84 insertions(+), 34 deletions(-) diff --git a/historyKeeper.js b/historyKeeper.js index 85e5f1dfb..4b4e0ba58 100644 --- a/historyKeeper.js +++ b/historyKeeper.js @@ -220,6 +220,10 @@ module.exports.create = function (cfg) { if the channel exists but its index does not then it caches the index */ const getIndex = (ctx, channelName, cb) => { + // FIXME don't allow more than one index to be computed at a time + // if one is in progress, the callback to a queue + // whenever you completed, empty the queue in order + const chan = ctx.channels[channelName]; if (chan && chan.index) { // enforce async behaviour @@ -262,6 +266,8 @@ module.exports.create = function (cfg) { TODO rename maybeMsgHash to optionalMsgHash */ const storeMessage = function (ctx, channel, msg, isCp, maybeMsgHash) { + // TODO implement a queue so that we know messages are written in order + const msgBin = new Buffer(msg + '\n', 'utf8'); // Store the message first, and update the index only once it's stored. // store.messageBin can be async so updating the index first may @@ -312,55 +318,96 @@ module.exports.create = function (cfg) { * caches the id of the last saved checkpoint * adds timestamps to incoming messages * writes messages to the store - - */ const onChannelMessage = function (ctx, channel, msgStruct) { // don't store messages if the channel id indicates that it's an ephemeral message if (!channel.id || channel.id.length === EPHEMERAL_CHANNEL_LENGTH) { return; } const isCp = /^cp\|/.test(msgStruct[4]); - if (metadata_cache[channel.id] && metadata_cache[channel.id].expire && - metadata_cache[channel.id].expire < +new Date()) { - return; // Don't store messages on expired channel - // TODO if a channel expired a long time ago but it's still here, remove it - } let id; if (isCp) { - /*::if (typeof(msgStruct[4]) !== 'string') { throw new Error(); }*/ + // id becomes either null or an array or results... id = CHECKPOINT_PATTERN.exec(msgStruct[4]); if (Array.isArray(id) && id[2] && id[2] === channel.lastSavedCp) { // Reject duplicate checkpoints return; } } - var metadata = metadata_cache[channel.id]; - if (metadata && metadata.validateKey) { - /*::if (typeof(msgStruct[4]) !== 'string') { throw new Error(); }*/ - let signedMsg = (isCp) ? msgStruct[4].replace(CHECKPOINT_PATTERN, '') : msgStruct[4]; - signedMsg = Nacl.util.decodeBase64(signedMsg); - // FIXME PERFORMANCE: cache the decoded key instead of decoding it every time - // CPU/Memory tradeoff - const validateKey = Nacl.util.decodeBase64(metadata.validateKey); - const validated = Nacl.sign.open(signedMsg, validateKey); - if (!validated) { - Log.info("HK_SIGNED_MESSAGE_REJECTED", 'Channel '+channel.id); - return; - } - } - if (isCp) { - // WARNING: the fact that we only check the most recent checkpoints - // is a potential source of bugs if one editor has high latency and - // pushes a duplicate of an earlier checkpoint than the latest which - // has been pushed by editors with low latency - // FIXME - if (Array.isArray(id) && id[2]) { - // Store new checkpoint hash - channel.lastSavedCp = id[2]; + + let metadata; + nThen(function (w) { + // getIndex (and therefore the latest metadata) + getIndex(ctx, channel.id, w(function (err, index) { + if (err) { + w.abort(); + return void Log.error('CHANNEL_MESSAGE_ERROR', err); + } + + if (!index.metadata) { + // if there's no channel metadata then it can't be an expiring channel + // nor can we possibly validate it + return; + } + + metadata = index.metadata; + + if (metadata.expire && metadata.expire < +new Date()) { + // don't store message sent to expired channels + w.abort(); + return; + // TODO if a channel expired a long time ago but it's still here, remove it + } + + // if there's no validateKey present skip to the next block + if (!metadata.validateKey) { return; } + + // trim the checkpoint indicator off the message if it's present + let signedMsg = (isCp) ? msgStruct[4].replace(CHECKPOINT_PATTERN, '') : msgStruct[4]; + // convert the message from a base64 string into a Uint8Array + + // FIXME this can fail and the client won't notice + signedMsg = Nacl.util.decodeBase64(signedMsg); + + // FIXME this can blow up + // TODO check that that won't cause any problems other than not being able to append... + const validateKey = Nacl.util.decodeBase64(metadata.validateKey); + // validate the message + const validated = Nacl.sign.open(signedMsg, validateKey); + if (!validated) { + // don't go any further if the message fails validation + w.abort(); + Log.info("HK_SIGNED_MESSAGE_REJECTED", 'Channel '+channel.id); + return; + } + })); + }).nThen(function () { + // do checkpoint stuff... + + // 1. get the checkpoint id + // 2. reject duplicate checkpoints + + if (isCp) { + // if the message is a checkpoint we will have already validated + // that it isn't a duplicate. remember its id so that we can + // repeat this process for the next incoming checkpoint + + // WARNING: the fact that we only check the most recent checkpoints + // is a potential source of bugs if one editor has high latency and + // pushes a duplicate of an earlier checkpoint than the latest which + // has been pushed by editors with low latency + // FIXME + if (Array.isArray(id) && id[2]) { + // Store new checkpoint hash + channel.lastSavedCp = id[2]; + } } - } - msgStruct.push(now()); - storeMessage(ctx, channel, JSON.stringify(msgStruct), isCp, getHash(msgStruct[4])); + + // add the time to the message + msgStruct.push(now()); + + // storeMessage + storeMessage(ctx, channel, JSON.stringify(msgStruct), isCp, getHash(msgStruct[4])); + }); }; /* dropChannel @@ -673,6 +720,9 @@ module.exports.create = function (cfg) { } metadata.channel = channelName; + // XXX check that the validateKey is valid, otherwise send an error? + // don't bother putting it into storage + nThen(function (waitFor) { var w = waitFor();