diff --git a/historyKeeper.js b/historyKeeper.js index 6de09a900..3243d0f15 100644 --- a/historyKeeper.js +++ b/historyKeeper.js @@ -66,6 +66,16 @@ const isMetadataMessage = function (parsed) { return Boolean(parsed && parsed.channel); }; +// validateKeyStrings supplied by clients must decode to 32-byte Uint8Arrays +const isValidValidateKeyString = function (key) { + try { + return typeof(key) === 'string' && + Nacl.util.decodeBase64(key).length === Nacl.sign.publicKeyLength; + } catch (e) { + return false; + } +}; + module.exports.create = function (cfg) { const rpc = cfg.rpc; const tasks = cfg.tasks; @@ -95,7 +105,6 @@ module.exports.create = function (cfg) { * offsetByHash: * a map containing message offsets by their hash * this is for every message in history, so it could be very large... - * XXX OFFSET * except we remove offsets from the map if they occur before the oldest relevant checkpoint * size: in bytes * metadata: @@ -219,18 +228,52 @@ module.exports.create = function (cfg) { as an added bonus: if the channel exists but its index does not then it caches the index */ + const indexQueues = {}; const getIndex = (ctx, channelName, cb) => { const chan = ctx.channels[channelName]; + // if there is a channel in memory and it has an index cached, return it if (chan && chan.index) { // enforce async behaviour return void setTimeout(function () { cb(undefined, chan.index); }); } + + // if a call to computeIndex is already in progress for this channel + // then add the callback for the latest invocation to the queue + // and wait for it to complete + if (Array.isArray(indexQueues[channelName])) { + indexQueues[channelName].push(cb); + return; + } + + // otherwise, make a queue for any 'getIndex' calls made before the following 'computeIndex' call completes + var queue = indexQueues[channelName] = (indexQueues[channelName] || [cb]); + computeIndex(channelName, (err, ret) => { - if (err) { return void cb(err); } + if (!Array.isArray(queue)) { + // something is very wrong if there's no callback array + return void Log.error("E_INDEX_NO_CALLBACK", channelName); + } + + + // clean up the queue that you're about to handle, but keep a local copy + delete indexQueues[channelName]; + + // this is most likely an unrecoverable filesystem error + if (err) { + // call back every pending function with the error + return void queue.forEach(function (_cb) { + _cb(err); + }); + } + // cache the computed result if possible if (chan) { chan.index = ret; } - cb(undefined, ret); + + // call back every pending function with the result + queue.forEach(function (_cb) { + _cb(void 0, ret); + }); }); }; @@ -258,33 +301,54 @@ module.exports.create = function (cfg) { * because the two actions were performed like ABba... * the fix is to use callbacks and implement queueing for writes * to guarantee that offset computation is always atomic with writes - - TODO rename maybeMsgHash to optionalMsgHash */ - const storeMessage = function (ctx, channel, msg, isCp, maybeMsgHash) { - const msgBin = new Buffer(msg + '\n', 'utf8'); + const storageQueues = {}; + + const storeQueuedMessage = function (ctx, queue, id) { + if (queue.length === 0) { + delete storageQueues[id]; + return; + } + + const first = queue.shift(); + + const msgBin = first.msg; + const optionalMessageHash = first.hash; + const isCp = first.isCp; + // Store the message first, and update the index only once it's stored. // store.messageBin can be async so updating the index first may // result in a wrong cpIndex nThen((waitFor) => { - store.messageBin(channel.id, msgBin, waitFor(function (err) { + store.messageBin(id, msgBin, waitFor(function (err) { if (err) { waitFor.abort(); - return void Log.error("HK_STORE_MESSAGE_ERROR", err.message); + Log.error("HK_STORE_MESSAGE_ERROR", err.message); + + // this error is critical, but there's not much we can do at the moment + // proceed with more messages, but they'll probably fail too + // at least you won't have a memory leak + + // TODO make it possible to respond to clients with errors so they know + // their message wasn't stored + storeQueuedMessage(ctx, queue, id); + return; } })); }).nThen((waitFor) => { - getIndex(ctx, channel.id, waitFor((err, index) => { + getIndex(ctx, id, waitFor((err, index) => { if (err) { Log.warn("HK_STORE_MESSAGE_INDEX", err.stack); // non-critical, we'll be able to get the channel index later + + // proceed to the next message in the queue + storeQueuedMessage(ctx, queue, id); return; } if (typeof (index.line) === "number") { index.line++; } if (isCp) { index.cpIndex = sliceCpIndex(index.cpIndex, index.line || 0); for (let k in index.offsetByHash) { - // XXX OFFSET if (index.offsetByHash[k] < index.cpIndex[0]) { delete index.offsetByHash[k]; } @@ -294,12 +358,34 @@ module.exports.create = function (cfg) { line: ((index.line || 0) + 1) } /*:cp_index_item*/)); } - if (maybeMsgHash) { index.offsetByHash[maybeMsgHash] = index.size; } + if (optionalMessageHash) { index.offsetByHash[optionalMessageHash] = index.size; } index.size += msgBin.length; + + // handle the next element in the queue + storeQueuedMessage(ctx, queue, id); })); }); }; + const storeMessage = function (ctx, channel, msg, isCp, optionalMessageHash) { + const id = channel.id; + + const msgBin = new Buffer(msg + '\n', 'utf8'); + if (Array.isArray(storageQueues[id])) { + return void storageQueues[id].push({ + msg: msgBin, + hash: optionalMessageHash, + isCp: isCp, + }); + } + + const queue = storageQueues[id] = (storageQueues[id] || [{ + msg: msgBin, + hash: optionalMessageHash, + }]); + storeQueuedMessage(ctx, queue, id); + }; + var CHECKPOINT_PATTERN = /^cp\|(([A-Za-z0-9+\/=]+)\|)?/; /* onChannelMessage @@ -312,55 +398,96 @@ module.exports.create = function (cfg) { * caches the id of the last saved checkpoint * adds timestamps to incoming messages * writes messages to the store - - */ const onChannelMessage = function (ctx, channel, msgStruct) { // don't store messages if the channel id indicates that it's an ephemeral message if (!channel.id || channel.id.length === EPHEMERAL_CHANNEL_LENGTH) { return; } const isCp = /^cp\|/.test(msgStruct[4]); - if (metadata_cache[channel.id] && metadata_cache[channel.id].expire && - metadata_cache[channel.id].expire < +new Date()) { - return; // Don't store messages on expired channel - // TODO if a channel expired a long time ago but it's still here, remove it - } let id; if (isCp) { - /*::if (typeof(msgStruct[4]) !== 'string') { throw new Error(); }*/ + // id becomes either null or an array or results... id = CHECKPOINT_PATTERN.exec(msgStruct[4]); if (Array.isArray(id) && id[2] && id[2] === channel.lastSavedCp) { // Reject duplicate checkpoints return; } } - var metadata = metadata_cache[channel.id]; - if (metadata && metadata.validateKey) { - /*::if (typeof(msgStruct[4]) !== 'string') { throw new Error(); }*/ - let signedMsg = (isCp) ? msgStruct[4].replace(CHECKPOINT_PATTERN, '') : msgStruct[4]; - signedMsg = Nacl.util.decodeBase64(signedMsg); - // FIXME PERFORMANCE: cache the decoded key instead of decoding it every time - // CPU/Memory tradeoff - const validateKey = Nacl.util.decodeBase64(metadata.validateKey); - const validated = Nacl.sign.open(signedMsg, validateKey); - if (!validated) { - Log.info("HK_SIGNED_MESSAGE_REJECTED", 'Channel '+channel.id); - return; - } - } - if (isCp) { - // WARNING: the fact that we only check the most recent checkpoints - // is a potential source of bugs if one editor has high latency and - // pushes a duplicate of an earlier checkpoint than the latest which - // has been pushed by editors with low latency - // FIXME - if (Array.isArray(id) && id[2]) { - // Store new checkpoint hash - channel.lastSavedCp = id[2]; + + let metadata; + nThen(function (w) { + // getIndex (and therefore the latest metadata) + getIndex(ctx, channel.id, w(function (err, index) { + if (err) { + w.abort(); + return void Log.error('CHANNEL_MESSAGE_ERROR', err); + } + + if (!index.metadata) { + // if there's no channel metadata then it can't be an expiring channel + // nor can we possibly validate it + return; + } + + metadata = index.metadata; + + if (metadata.expire && metadata.expire < +new Date()) { + // don't store message sent to expired channels + w.abort(); + return; + // TODO if a channel expired a long time ago but it's still here, remove it + } + + // if there's no validateKey present skip to the next block + if (!metadata.validateKey) { return; } + + // trim the checkpoint indicator off the message if it's present + let signedMsg = (isCp) ? msgStruct[4].replace(CHECKPOINT_PATTERN, '') : msgStruct[4]; + // convert the message from a base64 string into a Uint8Array + + // FIXME this can fail and the client won't notice + signedMsg = Nacl.util.decodeBase64(signedMsg); + + // FIXME this can blow up + // TODO check that that won't cause any problems other than not being able to append... + const validateKey = Nacl.util.decodeBase64(metadata.validateKey); + // validate the message + const validated = Nacl.sign.open(signedMsg, validateKey); + if (!validated) { + // don't go any further if the message fails validation + w.abort(); + Log.info("HK_SIGNED_MESSAGE_REJECTED", 'Channel '+channel.id); + return; + } + })); + }).nThen(function () { + // do checkpoint stuff... + + // 1. get the checkpoint id + // 2. reject duplicate checkpoints + + if (isCp) { + // if the message is a checkpoint we will have already validated + // that it isn't a duplicate. remember its id so that we can + // repeat this process for the next incoming checkpoint + + // WARNING: the fact that we only check the most recent checkpoints + // is a potential source of bugs if one editor has high latency and + // pushes a duplicate of an earlier checkpoint than the latest which + // has been pushed by editors with low latency + // FIXME + if (Array.isArray(id) && id[2]) { + // Store new checkpoint hash + channel.lastSavedCp = id[2]; + } } - } - msgStruct.push(now()); - storeMessage(ctx, channel, JSON.stringify(msgStruct), isCp, getHash(msgStruct[4])); + + // add the time to the message + msgStruct.push(now()); + + // storeMessage + storeMessage(ctx, channel, JSON.stringify(msgStruct), isCp, getHash(msgStruct[4])); + }); }; /* dropChannel @@ -423,7 +550,6 @@ module.exports.create = function (cfg) { // QUESTION: does this mean mailboxes are causing the server to store too much stuff in memory? if (lastKnownHash && typeof(lkh) !== "number") { waitFor.abort(); - // XXX this smells bad return void cb(new Error('EINVAL')); } @@ -452,7 +578,7 @@ module.exports.create = function (cfg) { if (offset !== -1) { return; } // do a lookup from the index - // XXX maybe we don't need this anymore? + // FIXME maybe we don't need this anymore? // otherwise we have a non-negative offset and we can start to read from there store.readMessagesBin(channelName, 0, (msgObj, readMore, abort) => { // tryParse return a parsed message or undefined @@ -588,7 +714,6 @@ module.exports.create = function (cfg) { // If it is, remove it from memory and broadcast a message to its members const onChannelMetadataChanged = function (ctx, channel) { - // XXX lint compliance channel = channel; }; @@ -673,6 +798,14 @@ module.exports.create = function (cfg) { } metadata.channel = channelName; + // if the user sends us an invalid key, we won't be able to validate their messages + // so they'll never get written to the log anyway. Let's just drop their message + // on the floor instead of doing a bunch of extra work + // TODO send them an error message so they know something is wrong + if (metadata.validateKey && !isValidValidateKeyString(metadata.validateKey)) { + return void Log.error('HK_INVALID_KEY', metadata.validateKey); + } + nThen(function (waitFor) { var w = waitFor(); @@ -734,8 +867,11 @@ module.exports.create = function (cfg) { // otherwise maybe we need to check that the metadata log is empty as well store.writeMetadata(channelName, JSON.stringify(metadata), function (err) { if (err) { - // XXX tell the user that there was a channel error? - return void Log.error('HK_WRITE_METADATA'); + // FIXME tell the user that there was a channel error? + return void Log.error('HK_WRITE_METADATA', { + channel: channelName, + error: err, + }); } }); @@ -810,8 +946,8 @@ module.exports.create = function (cfg) { // parsed[3] is the last known hash (optionnal) sendMsg(ctx, user, [seq, 'ACK']); - // XXX should we send metadata here too? - // my gut says yes + // FIXME should we send metadata here too? + // none of the clientside code which uses this API needs metadata, but it won't hurt to send it (2019-08-22) getHistoryAsync(ctx, parsed[1], -1, false, (msg, readMore) => { if (!msg) { return; } sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify(['FULL_HISTORY', msg])], readMore); diff --git a/storage/file.js b/storage/file.js index ab25b0963..739f079ec 100644 --- a/storage/file.js +++ b/storage/file.js @@ -236,9 +236,13 @@ How to proceed // writeMetadata appends to the dedicated log of metadata amendments var writeMetadata = function (env, channelId, data, cb) { var path = mkMetadataPath(env, channelId); - // XXX appendFile isn't great - // but this is a simple way to get things working - Fs.appendFile(path, data + '\n', cb); + + Fse.mkdirp(Path.dirname(path), PERMISSIVE, function (err) { + if (err && err.code !== 'EEXIST') { return void cb(err); } + + // TODO see if we can make this any faster by using something other than appendFile + Fs.appendFile(path, data + '\n', cb); + }); }; @@ -290,7 +294,11 @@ const mkOffsetCounter = () => { }); }; -// XXX write some docs for this magic +// readMessagesBin asynchronously iterates over the messages in a channel log +// the handler for each message must call back to read more, which should mean +// that this function has a lower memory profile than our classic method +// of reading logs line by line. +// it also allows the handler to abort reading at any time const readMessagesBin = (env, id, start, msgHandler, cb) => { const stream = Fs.createReadStream(mkPath(env, id), { start: start }); let keepReading = true; @@ -341,23 +349,33 @@ var removeChannel = function (env, channelName, cb) { var CB = Once(cb); + var errors = 0; nThen(function (w) { Fs.unlink(channelPath, w(function (err) { if (err) { - // XXX handle ENOENT and only return an error - // if both channel and metadata did not exist... + if (err.code === 'ENOENT') { + errors++; + return; + } w.abort(); CB(labelError("E_CHANNEL_REMOVAL", err)); } })); Fs.unlink(metadataPath, w(function (err) { if (err) { - if (err.code === 'ENOENT') { return; } // proceed if there's no metadata to delete + if (err.code === 'ENOENT') { + errors++; + return; + } // proceed if there's no metadata to delete w.abort(); CB(labelError("E_METADATA_REMOVAL", err)); } })); }).nThen(function () { + if (errors === 2) { + return void CB(labelError('E_REMOVE_CHANNEL', new Error("ENOENT"))); + } + CB(); }); }; @@ -421,12 +439,23 @@ var listChannels = function (root, handler, cb) { if (err) { return void handler(err); } // Is this correct? list.forEach(function (item) { - // ignore things that don't match the naming pattern - // XXX don't ignore metadata files if there is no corresponding channel - // since you probably want to clean those up - if (/^\./.test(item) || !/[0-9a-fA-F]{32}\.ndjson$/.test(item)) { return; } + // ignore hidden files + if (/^\./.test(item)) { return; } + // ignore anything that isn't channel or metadata + if (!/^[0-9a-fA-F]{32}(\.metadata?)*\.ndjson$/.test(item)) { + return; + } + if (!/^[0-9a-fA-F]{32}\.ndjson$/.test(item)) { + // this will catch metadata, which we want to ignore if + // the corresponding channel is present + if (list.indexOf(item.replace(/\.metadata/, '')) !== -1) { return; } + // otherwise fall through + } var filepath = Path.join(nestedDirPath, item); - var channel = filepath.replace(/\.ndjson$/, '').replace(/.*\//, ''); + var channel = filepath + .replace(/\.ndjson$/, '') + .replace(/\.metadata/, '') + .replace(/.*\//, ''); if ([32, 34].indexOf(channel.length) === -1) { return; } // otherwise throw it on the pile