From 338f967149247dcb3c42332320133c1315a85f7d Mon Sep 17 00:00:00 2001 From: ansuz Date: Thu, 22 Aug 2019 11:21:10 +0200 Subject: [PATCH 01/13] restructure onChannelMessage --- historyKeeper.js | 118 +++++++++++++++++++++++++++++++++-------------- 1 file changed, 84 insertions(+), 34 deletions(-) diff --git a/historyKeeper.js b/historyKeeper.js index 85e5f1dfb..4b4e0ba58 100644 --- a/historyKeeper.js +++ b/historyKeeper.js @@ -220,6 +220,10 @@ module.exports.create = function (cfg) { if the channel exists but its index does not then it caches the index */ const getIndex = (ctx, channelName, cb) => { + // FIXME don't allow more than one index to be computed at a time + // if one is in progress, the callback to a queue + // whenever you completed, empty the queue in order + const chan = ctx.channels[channelName]; if (chan && chan.index) { // enforce async behaviour @@ -262,6 +266,8 @@ module.exports.create = function (cfg) { TODO rename maybeMsgHash to optionalMsgHash */ const storeMessage = function (ctx, channel, msg, isCp, maybeMsgHash) { + // TODO implement a queue so that we know messages are written in order + const msgBin = new Buffer(msg + '\n', 'utf8'); // Store the message first, and update the index only once it's stored. // store.messageBin can be async so updating the index first may @@ -312,55 +318,96 @@ module.exports.create = function (cfg) { * caches the id of the last saved checkpoint * adds timestamps to incoming messages * writes messages to the store - - */ const onChannelMessage = function (ctx, channel, msgStruct) { // don't store messages if the channel id indicates that it's an ephemeral message if (!channel.id || channel.id.length === EPHEMERAL_CHANNEL_LENGTH) { return; } const isCp = /^cp\|/.test(msgStruct[4]); - if (metadata_cache[channel.id] && metadata_cache[channel.id].expire && - metadata_cache[channel.id].expire < +new Date()) { - return; // Don't store messages on expired channel - // TODO if a channel expired a long time ago but it's still here, remove it - } let id; if (isCp) { - /*::if (typeof(msgStruct[4]) !== 'string') { throw new Error(); }*/ + // id becomes either null or an array or results... id = CHECKPOINT_PATTERN.exec(msgStruct[4]); if (Array.isArray(id) && id[2] && id[2] === channel.lastSavedCp) { // Reject duplicate checkpoints return; } } - var metadata = metadata_cache[channel.id]; - if (metadata && metadata.validateKey) { - /*::if (typeof(msgStruct[4]) !== 'string') { throw new Error(); }*/ - let signedMsg = (isCp) ? msgStruct[4].replace(CHECKPOINT_PATTERN, '') : msgStruct[4]; - signedMsg = Nacl.util.decodeBase64(signedMsg); - // FIXME PERFORMANCE: cache the decoded key instead of decoding it every time - // CPU/Memory tradeoff - const validateKey = Nacl.util.decodeBase64(metadata.validateKey); - const validated = Nacl.sign.open(signedMsg, validateKey); - if (!validated) { - Log.info("HK_SIGNED_MESSAGE_REJECTED", 'Channel '+channel.id); - return; - } - } - if (isCp) { - // WARNING: the fact that we only check the most recent checkpoints - // is a potential source of bugs if one editor has high latency and - // pushes a duplicate of an earlier checkpoint than the latest which - // has been pushed by editors with low latency - // FIXME - if (Array.isArray(id) && id[2]) { - // Store new checkpoint hash - channel.lastSavedCp = id[2]; + + let metadata; + nThen(function (w) { + // getIndex (and therefore the latest metadata) + getIndex(ctx, channel.id, w(function (err, index) { + if (err) { + w.abort(); + return void Log.error('CHANNEL_MESSAGE_ERROR', err); + } + + if (!index.metadata) { + // if there's no channel metadata then it can't be an expiring channel + // nor can we possibly validate it + return; + } + + metadata = index.metadata; + + if (metadata.expire && metadata.expire < +new Date()) { + // don't store message sent to expired channels + w.abort(); + return; + // TODO if a channel expired a long time ago but it's still here, remove it + } + + // if there's no validateKey present skip to the next block + if (!metadata.validateKey) { return; } + + // trim the checkpoint indicator off the message if it's present + let signedMsg = (isCp) ? msgStruct[4].replace(CHECKPOINT_PATTERN, '') : msgStruct[4]; + // convert the message from a base64 string into a Uint8Array + + // FIXME this can fail and the client won't notice + signedMsg = Nacl.util.decodeBase64(signedMsg); + + // FIXME this can blow up + // TODO check that that won't cause any problems other than not being able to append... + const validateKey = Nacl.util.decodeBase64(metadata.validateKey); + // validate the message + const validated = Nacl.sign.open(signedMsg, validateKey); + if (!validated) { + // don't go any further if the message fails validation + w.abort(); + Log.info("HK_SIGNED_MESSAGE_REJECTED", 'Channel '+channel.id); + return; + } + })); + }).nThen(function () { + // do checkpoint stuff... + + // 1. get the checkpoint id + // 2. reject duplicate checkpoints + + if (isCp) { + // if the message is a checkpoint we will have already validated + // that it isn't a duplicate. remember its id so that we can + // repeat this process for the next incoming checkpoint + + // WARNING: the fact that we only check the most recent checkpoints + // is a potential source of bugs if one editor has high latency and + // pushes a duplicate of an earlier checkpoint than the latest which + // has been pushed by editors with low latency + // FIXME + if (Array.isArray(id) && id[2]) { + // Store new checkpoint hash + channel.lastSavedCp = id[2]; + } } - } - msgStruct.push(now()); - storeMessage(ctx, channel, JSON.stringify(msgStruct), isCp, getHash(msgStruct[4])); + + // add the time to the message + msgStruct.push(now()); + + // storeMessage + storeMessage(ctx, channel, JSON.stringify(msgStruct), isCp, getHash(msgStruct[4])); + }); }; /* dropChannel @@ -673,6 +720,9 @@ module.exports.create = function (cfg) { } metadata.channel = channelName; + // XXX check that the validateKey is valid, otherwise send an error? + // don't bother putting it into storage + nThen(function (waitFor) { var w = waitFor(); From 347b7a639bdb8a46febc58694b66fb0fe83c1f6c Mon Sep 17 00:00:00 2001 From: ansuz Date: Thu, 22 Aug 2019 14:40:01 +0200 Subject: [PATCH 02/13] update XXX comment --- historyKeeper.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/historyKeeper.js b/historyKeeper.js index 4b4e0ba58..6ea0a2324 100644 --- a/historyKeeper.js +++ b/historyKeeper.js @@ -853,8 +853,8 @@ module.exports.create = function (cfg) { // parsed[3] is the last known hash (optionnal) sendMsg(ctx, user, [seq, 'ACK']); - // XXX should we send metadata here too? - // my gut says yes + // FIXME should we send metadata here too? + // none of the clientside code which uses this API needs metadata, but it won't hurt to send it (2019-08-22) getHistoryAsync(ctx, parsed[1], -1, false, (msg, readMore) => { if (!msg) { return; } sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify(['FULL_HISTORY', msg])], readMore); From 53f04d096df605c875c92107473d8bdcacf872e3 Mon Sep 17 00:00:00 2001 From: ansuz Date: Thu, 22 Aug 2019 14:52:11 +0200 Subject: [PATCH 03/13] update the server's cache whenever we write metadata --- historyKeeper.js | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/historyKeeper.js b/historyKeeper.js index 6ea0a2324..8a542fff5 100644 --- a/historyKeeper.js +++ b/historyKeeper.js @@ -771,6 +771,12 @@ module.exports.create = function (cfg) { if (msgCount === 0 && !metadata_cache[channelName] && chan && chan.indexOf(user) > -1) { metadata_cache[channelName] = metadata; + + // the index will have already been constructed and cached at this point + // but it will not have detected any metadata because it hasn't been written yet + // this means that the cache starts off as invalid, so we have to correct it + if (chan && chan.index) { chan.index.metadata = metadata; } + // new channels will always have their metadata written to a dedicated metadata log // but any lines after the first which are not amendments in a particular format will be ignored. // Thus we should be safe from race conditions here if just write metadata to the log as below... @@ -778,6 +784,7 @@ module.exports.create = function (cfg) { // otherwise maybe we need to check that the metadata log is empty as well store.writeMetadata(channelName, JSON.stringify(metadata), function (err) { if (err) { + // XXX tell the user that there was a channel error? return void Log.error('HK_WRITE_METADATA'); } }); From 096c8eb2ba319d6278ce03bd60def5380aa1cc6e Mon Sep 17 00:00:00 2001 From: ansuz Date: Thu, 22 Aug 2019 16:59:46 +0200 Subject: [PATCH 04/13] check that the user-supplied validateKey is actually before doing any more work --- historyKeeper.js | 20 ++++++++++++++++++-- 1 file changed, 18 insertions(+), 2 deletions(-) diff --git a/historyKeeper.js b/historyKeeper.js index 8a542fff5..a06fb1098 100644 --- a/historyKeeper.js +++ b/historyKeeper.js @@ -66,6 +66,17 @@ const isMetadataMessage = function (parsed) { return Boolean(parsed && parsed.channel); }; +const isValidValidateKey = function (key) { + if (typeof(key) !== 'string') { return false; } + let valid = false; + try { + if (Nacl.util.decodeBase64(key).length !== Nacl.sign.publicKeyLength) { return false; } + } catch (e) { + return valid; + } + return valid; +}; + module.exports.create = function (cfg) { const rpc = cfg.rpc; const tasks = cfg.tasks; @@ -720,8 +731,13 @@ module.exports.create = function (cfg) { } metadata.channel = channelName; - // XXX check that the validateKey is valid, otherwise send an error? - // don't bother putting it into storage + // if the user sends us an invalid key, we won't be able to validate their messages + // so they'll never get written to the log anyway. Let's just drop their message + // on the floor instead of doing a bunch of extra work + // TODO send them an error message so they know something is wrong + if (metadata.validateKey && !isValidValidateKey(metadata.validateKey)) { + return void Log.error('HK_INVALID_KEY', metadata.validateKey); + } nThen(function (waitFor) { var w = waitFor(); From c9f569b1096ae55bd33349774f0b14fbfb5ee3ef Mon Sep 17 00:00:00 2001 From: ansuz Date: Thu, 22 Aug 2019 17:00:51 +0200 Subject: [PATCH 05/13] optimize getIndex so identical indexes are never computed concurrently --- historyKeeper.js | 44 ++++++++++++++++++++++++++++++++++++++------ 1 file changed, 38 insertions(+), 6 deletions(-) diff --git a/historyKeeper.js b/historyKeeper.js index a06fb1098..fa753610e 100644 --- a/historyKeeper.js +++ b/historyKeeper.js @@ -230,11 +230,8 @@ module.exports.create = function (cfg) { as an added bonus: if the channel exists but its index does not then it caches the index */ + const indexQueue = {}; const getIndex = (ctx, channelName, cb) => { - // FIXME don't allow more than one index to be computed at a time - // if one is in progress, the callback to a queue - // whenever you completed, empty the queue in order - const chan = ctx.channels[channelName]; if (chan && chan.index) { // enforce async behaviour @@ -242,10 +239,45 @@ module.exports.create = function (cfg) { cb(undefined, chan.index); }); } + + // make a queue of callbacks in case getIndex is called before you + // compute and cache the index + + // if a call to computeIndex is already in progress for this channel + // then add the callback for the latest invocation to the queue + // and wait for it to complete + if (indexQueue[channelName]) { + indexQueue.channelName.push(cb); + return; + } + + // if computeIndex is not already in progress, make an array + // with the current call first in line, so additional calls + // can add their callbacks to the queue + indexQueue[channelName] = (indexQueue[channelName] || []).push(cb); + computeIndex(channelName, (err, ret) => { - if (err) { return void cb(err); } + // keep a local reference to this channel's queue + let queue = indexQueue[channelName]; + + // but remove it from the map of queues and not worry about cleaning up + // as long as computeIndex always calls back this won't be a memory leak + delete indexQueue[channelName]; + + if (err) { + // call back every pending function with the error + return void queue.forEach(function (_cb) { + _cb(err); + }); + } + // if there's a channel to use as a cache, store the result + // for future use if (chan) { chan.index = ret; } - cb(undefined, ret); + + // call back every pending function with the result + queue.forEach(function (_cb) { + _cb(void 0, ret); + }); }); }; From 204f6b3af114ecf07fff71e537987f7a7ba17f4b Mon Sep 17 00:00:00 2001 From: ansuz Date: Thu, 22 Aug 2019 17:01:05 +0200 Subject: [PATCH 06/13] remove some comments --- historyKeeper.js | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/historyKeeper.js b/historyKeeper.js index fa753610e..fb1d4ab1f 100644 --- a/historyKeeper.js +++ b/historyKeeper.js @@ -106,7 +106,7 @@ module.exports.create = function (cfg) { * offsetByHash: * a map containing message offsets by their hash * this is for every message in history, so it could be very large... - * XXX OFFSET + * FIXME OFFSET * except we remove offsets from the map if they occur before the oldest relevant checkpoint * size: in bytes * metadata: @@ -333,7 +333,7 @@ module.exports.create = function (cfg) { if (isCp) { index.cpIndex = sliceCpIndex(index.cpIndex, index.line || 0); for (let k in index.offsetByHash) { - // XXX OFFSET + // FIXME OFFSET if (index.offsetByHash[k] < index.cpIndex[0]) { delete index.offsetByHash[k]; } @@ -513,7 +513,6 @@ module.exports.create = function (cfg) { // QUESTION: does this mean mailboxes are causing the server to store too much stuff in memory? if (lastKnownHash && typeof(lkh) !== "number") { waitFor.abort(); - // XXX this smells bad return void cb(new Error('EINVAL')); } @@ -542,7 +541,7 @@ module.exports.create = function (cfg) { if (offset !== -1) { return; } // do a lookup from the index - // XXX maybe we don't need this anymore? + // FIXME maybe we don't need this anymore? // otherwise we have a non-negative offset and we can start to read from there store.readMessagesBin(channelName, 0, (msgObj, readMore, abort) => { // tryParse return a parsed message or undefined @@ -678,7 +677,6 @@ module.exports.create = function (cfg) { // If it is, remove it from memory and broadcast a message to its members const onChannelMetadataChanged = function (ctx, channel) { - // XXX lint compliance channel = channel; }; @@ -832,7 +830,7 @@ module.exports.create = function (cfg) { // otherwise maybe we need to check that the metadata log is empty as well store.writeMetadata(channelName, JSON.stringify(metadata), function (err) { if (err) { - // XXX tell the user that there was a channel error? + // FIXME tell the user that there was a channel error? return void Log.error('HK_WRITE_METADATA'); } }); From 84e1057153d64362f0535b41e28fc134603e702b Mon Sep 17 00:00:00 2001 From: ansuz Date: Thu, 22 Aug 2019 17:48:28 +0200 Subject: [PATCH 07/13] address remaining notes in the storage api --- storage/file.js | 46 +++++++++++++++++++++++++++++++++++----------- 1 file changed, 35 insertions(+), 11 deletions(-) diff --git a/storage/file.js b/storage/file.js index def17e622..4c4867641 100644 --- a/storage/file.js +++ b/storage/file.js @@ -236,8 +236,7 @@ How to proceed // writeMetadata appends to the dedicated log of metadata amendments var writeMetadata = function (env, channelId, data, cb) { var path = mkMetadataPath(env, channelId); - // XXX appendFile isn't great - // but this is a simple way to get things working + // TODO see if we can make this any faster by using something other than appendFile Fs.appendFile(path, data + '\n', cb); }; @@ -290,7 +289,11 @@ const mkOffsetCounter = () => { }); }; -// XXX write some docs for this magic +// readMessagesBin asynchronously iterates over the messages in a channel log +// the handler for each message must call back to read more, which should mean +// that this function has a lower memory profile than our classic method +// of reading logs line by line. +// it also allows the handler to abort reading at any time const readMessagesBin = (env, id, start, msgHandler, cb) => { const stream = Fs.createReadStream(mkPath(env, id), { start: start }); let keepReading = true; @@ -341,23 +344,33 @@ var removeChannel = function (env, channelName, cb) { var CB = Once(cb); + var errors = 0; nThen(function (w) { Fs.unlink(channelPath, w(function (err) { if (err) { - // XXX handle ENOENT and only return an error - // if both channel and metadata did not exist... + if (err.code === 'ENOENT') { + errors++; + return; + } w.abort(); CB(labelError("E_CHANNEL_REMOVAL", err)); } })); Fs.unlink(metadataPath, w(function (err) { if (err) { - if (err.code === 'ENOENT') { return; } // proceed if there's no metadata to delete + if (err.code === 'ENOENT') { + errors++; + return; + } // proceed if there's no metadata to delete w.abort(); CB(labelError("E_METADATA_REMOVAL", err)); } })); }).nThen(function () { + if (errors === 2) { + return void CB(labelError('E_REMOVE_CHANNEL', new Error("ENOENT"))); + } + CB(); }); }; @@ -421,12 +434,23 @@ var listChannels = function (root, handler, cb) { if (err) { return void handler(err); } // Is this correct? list.forEach(function (item) { - // ignore things that don't match the naming pattern - // XXX don't ignore metadata files if there is no corresponding channel - // since you probably want to clean those up - if (/^\./.test(item) || !/[0-9a-fA-F]{32}\.ndjson$/.test(item)) { return; } + // ignore hidden files + if (/^\./.test(item)) { return; } + // ignore anything that isn't channel or metadata + if (!/^[0-9a-fA-F]{32}(\.metadata?)*\.ndjson$/.test(item)) { + return; + } + if (!/^[0-9a-fA-F]{32}\.ndjson$/.test(item)) { + // this will catch metadata, which we want to ignore if + // the corresponding channel is present + if (list.indexOf(item.replace(/\.metadata/, '')) !== -1) { return; } + // otherwise fall through + } var filepath = Path.join(nestedDirPath, item); - var channel = filepath.replace(/\.ndjson$/, '').replace(/.*\//, ''); + var channel = filepath + .replace(/\.ndjson$/, '') + .replace(/\.metadata/, '') + .replace(/.*\//, ''); if ([32, 34].indexOf(channel.length) === -1) { return; } // otherwise throw it on the pile From 62c23a29a7cb0577967b0f601c09e00b713781ea Mon Sep 17 00:00:00 2001 From: ansuz Date: Fri, 23 Aug 2019 12:28:17 +0200 Subject: [PATCH 08/13] simplify validateKey validation --- historyKeeper.js | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/historyKeeper.js b/historyKeeper.js index 52ae95b2c..b1bf49fd5 100644 --- a/historyKeeper.js +++ b/historyKeeper.js @@ -66,15 +66,14 @@ const isMetadataMessage = function (parsed) { return Boolean(parsed && parsed.channel); }; -const isValidValidateKey = function (key) { - if (typeof(key) !== 'string') { return false; } - let valid = false; +// validateKeyStrings supplied by clients must decode to 32-byte Uint8Arrays +const isValidValidateKeyString = function (key) { try { - if (Nacl.util.decodeBase64(key).length !== Nacl.sign.publicKeyLength) { return false; } + return typeof(key) === 'string' && + Nacl.util.decodeBase64(key).length === Nacl.sign.publicKeyLength; } catch (e) { - return valid; + return false; } - return valid; }; module.exports.create = function (cfg) { @@ -765,7 +764,7 @@ module.exports.create = function (cfg) { // so they'll never get written to the log anyway. Let's just drop their message // on the floor instead of doing a bunch of extra work // TODO send them an error message so they know something is wrong - if (metadata.validateKey && !isValidValidateKey(metadata.validateKey)) { + if (metadata.validateKey && !isValidValidateKeyString(metadata.validateKey)) { return void Log.error('HK_INVALID_KEY', metadata.validateKey); } From 7642ee9584751d9a6e268990e57cba4de2e8fad6 Mon Sep 17 00:00:00 2001 From: ansuz Date: Fri, 23 Aug 2019 12:43:05 +0200 Subject: [PATCH 09/13] log catastrophic errors in getIndex, clean up comments --- historyKeeper.js | 32 +++++++++++++++----------------- 1 file changed, 15 insertions(+), 17 deletions(-) diff --git a/historyKeeper.js b/historyKeeper.js index b1bf49fd5..9add76ee6 100644 --- a/historyKeeper.js +++ b/historyKeeper.js @@ -229,9 +229,10 @@ module.exports.create = function (cfg) { as an added bonus: if the channel exists but its index does not then it caches the index */ - const indexQueue = {}; + const indexQueues = {}; const getIndex = (ctx, channelName, cb) => { const chan = ctx.channels[channelName]; + // if there is a channel in memory and it has an index cached, return it if (chan && chan.index) { // enforce async behaviour return void setTimeout(function () { @@ -239,38 +240,35 @@ module.exports.create = function (cfg) { }); } - // make a queue of callbacks in case getIndex is called before you - // compute and cache the index - // if a call to computeIndex is already in progress for this channel // then add the callback for the latest invocation to the queue // and wait for it to complete - if (indexQueue[channelName]) { - indexQueue.channelName.push(cb); + if (Array.isArray(indexQueues[channelName])) { + indexQueues[channelName].push(cb); return; } - // if computeIndex is not already in progress, make an array - // with the current call first in line, so additional calls - // can add their callbacks to the queue - indexQueue[channelName] = (indexQueue[channelName] || []).push(cb); + // otherwise, make a queue for any 'getIndex' calls made before the following 'computeIndex' call completes + var queue = indexQueues[channelName] = (indexQueues[channelName] || [cb]); computeIndex(channelName, (err, ret) => { - // keep a local reference to this channel's queue - let queue = indexQueue[channelName]; + if (!Array.isArray(queue)) { + // something is very wrong if there's no callback array + return void Log.error("E_INDEX_NO_CALLBACK", channelName); + } + - // but remove it from the map of queues and not worry about cleaning up - // as long as computeIndex always calls back this won't be a memory leak - delete indexQueue[channelName]; + // clean up the queue that you're about to handle, but keep a local copy + delete indexQueues[channelName]; + // this is most likely an unrecoverable filesystem error if (err) { // call back every pending function with the error return void queue.forEach(function (_cb) { _cb(err); }); } - // if there's a channel to use as a cache, store the result - // for future use + // cache the computed result if possible if (chan) { chan.index = ret; } // call back every pending function with the result From a34abd748e3ee5009b634771e2674c41435feaa0 Mon Sep 17 00:00:00 2001 From: ansuz Date: Fri, 23 Aug 2019 12:43:42 +0200 Subject: [PATCH 10/13] rename unclear variable name --- historyKeeper.js | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/historyKeeper.js b/historyKeeper.js index 9add76ee6..3572db7c0 100644 --- a/historyKeeper.js +++ b/historyKeeper.js @@ -302,10 +302,8 @@ module.exports.create = function (cfg) { * because the two actions were performed like ABba... * the fix is to use callbacks and implement queueing for writes * to guarantee that offset computation is always atomic with writes - - TODO rename maybeMsgHash to optionalMsgHash */ - const storeMessage = function (ctx, channel, msg, isCp, maybeMsgHash) { + const storeMessage = function (ctx, channel, msg, isCp, optionalMessageHash) { // TODO implement a queue so that we know messages are written in order const msgBin = new Buffer(msg + '\n', 'utf8'); @@ -340,7 +338,7 @@ module.exports.create = function (cfg) { line: ((index.line || 0) + 1) } /*:cp_index_item*/)); } - if (maybeMsgHash) { index.offsetByHash[maybeMsgHash] = index.size; } + if (optionalMessageHash) { index.offsetByHash[optionalMessageHash] = index.size; } index.size += msgBin.length; })); }); From 689886b9bf28a4410631f517d71a2f34ccfa2cd2 Mon Sep 17 00:00:00 2001 From: ansuz Date: Fri, 23 Aug 2019 13:12:58 +0200 Subject: [PATCH 11/13] always store messages according to the order in which they were received --- historyKeeper.js | 58 +++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 50 insertions(+), 8 deletions(-) diff --git a/historyKeeper.js b/historyKeeper.js index 3572db7c0..76b3bc2dc 100644 --- a/historyKeeper.js +++ b/historyKeeper.js @@ -105,7 +105,6 @@ module.exports.create = function (cfg) { * offsetByHash: * a map containing message offsets by their hash * this is for every message in history, so it could be very large... - * FIXME OFFSET * except we remove offsets from the map if they occur before the oldest relevant checkpoint * size: in bytes * metadata: @@ -303,32 +302,53 @@ module.exports.create = function (cfg) { * the fix is to use callbacks and implement queueing for writes * to guarantee that offset computation is always atomic with writes */ - const storeMessage = function (ctx, channel, msg, isCp, optionalMessageHash) { - // TODO implement a queue so that we know messages are written in order + const storageQueues = {}; + + const storeQueuedMessage = function (ctx, queue, id) { + if (queue.length === 0) { + delete storageQueues[id]; + return; + } + + const first = queue.shift(); + + const msgBin = first.msg; + const optionalMessageHash = first.hash; + const isCp = first.isCp; - const msgBin = new Buffer(msg + '\n', 'utf8'); // Store the message first, and update the index only once it's stored. // store.messageBin can be async so updating the index first may // result in a wrong cpIndex nThen((waitFor) => { - store.messageBin(channel.id, msgBin, waitFor(function (err) { + store.messageBin(id, msgBin, waitFor(function (err) { if (err) { waitFor.abort(); - return void Log.error("HK_STORE_MESSAGE_ERROR", err.message); + Log.error("HK_STORE_MESSAGE_ERROR", err.message); + + // this error is critical, but there's not much we can do at the moment + // proceed with more messages, but they'll probably fail too + // at least you won't have a memory leak + + // TODO make it possible to respond to clients with errors so they know + // their message wasn't stored + storeQueuedMessage(ctx, queue, id); + return; } })); }).nThen((waitFor) => { - getIndex(ctx, channel.id, waitFor((err, index) => { + getIndex(ctx, id, waitFor((err, index) => { if (err) { Log.warn("HK_STORE_MESSAGE_INDEX", err.stack); // non-critical, we'll be able to get the channel index later + + // proceed to the next message in the queue + storeQueuedMessage(ctx, queue, id); return; } if (typeof (index.line) === "number") { index.line++; } if (isCp) { index.cpIndex = sliceCpIndex(index.cpIndex, index.line || 0); for (let k in index.offsetByHash) { - // FIXME OFFSET if (index.offsetByHash[k] < index.cpIndex[0]) { delete index.offsetByHash[k]; } @@ -340,10 +360,32 @@ module.exports.create = function (cfg) { } if (optionalMessageHash) { index.offsetByHash[optionalMessageHash] = index.size; } index.size += msgBin.length; + + // handle the next element in the queue + storeQueuedMessage(ctx, queue, id); })); }); }; + const storeMessage = function (ctx, channel, msg, isCp, optionalMessageHash) { + const id = channel.id; + + const msgBin = new Buffer(msg + '\n', 'utf8'); + if (Array.isArray(storageQueues[id])) { + return void storageQueues[id].push({ + msg: msgBin, + hash: optionalMessageHash, + isCp: isCp, + }); + } + + const queue = storageQueues[id] = (storageQueues[id] || [{ + msg: msgBin, + hash: optionalMessageHash, + }]); + storeQueuedMessage(ctx, queue, id); + }; + var CHECKPOINT_PATTERN = /^cp\|(([A-Za-z0-9+\/=]+)\|)?/; /* onChannelMessage From 8d102ea762f94c172d3f00c130eb8ffe9f467c35 Mon Sep 17 00:00:00 2001 From: ansuz Date: Mon, 26 Aug 2019 17:40:14 +0200 Subject: [PATCH 12/13] log more --- historyKeeper.js | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/historyKeeper.js b/historyKeeper.js index 76b3bc2dc..3243d0f15 100644 --- a/historyKeeper.js +++ b/historyKeeper.js @@ -868,7 +868,10 @@ module.exports.create = function (cfg) { store.writeMetadata(channelName, JSON.stringify(metadata), function (err) { if (err) { // FIXME tell the user that there was a channel error? - return void Log.error('HK_WRITE_METADATA'); + return void Log.error('HK_WRITE_METADATA', { + channel: channelName, + error: err, + }); } }); From d6fc35cab40b76bed91568701443480ed65e6e36 Mon Sep 17 00:00:00 2001 From: ansuz Date: Mon, 26 Aug 2019 17:44:21 +0200 Subject: [PATCH 13/13] mkdirp the path to metadata before writing --- storage/file.js | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/storage/file.js b/storage/file.js index cbb45138d..739f079ec 100644 --- a/storage/file.js +++ b/storage/file.js @@ -236,8 +236,13 @@ How to proceed // writeMetadata appends to the dedicated log of metadata amendments var writeMetadata = function (env, channelId, data, cb) { var path = mkMetadataPath(env, channelId); - // TODO see if we can make this any faster by using something other than appendFile - Fs.appendFile(path, data + '\n', cb); + + Fse.mkdirp(Path.dirname(path), PERMISSIVE, function (err) { + if (err && err.code !== 'EEXIST') { return void cb(err); } + + // TODO see if we can make this any faster by using something other than appendFile + Fs.appendFile(path, data + '\n', cb); + }); };