From 689886b9bf28a4410631f517d71a2f34ccfa2cd2 Mon Sep 17 00:00:00 2001 From: ansuz Date: Fri, 23 Aug 2019 13:12:58 +0200 Subject: [PATCH] always store messages according to the order in which they were received --- historyKeeper.js | 58 +++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 50 insertions(+), 8 deletions(-) diff --git a/historyKeeper.js b/historyKeeper.js index 3572db7c0..76b3bc2dc 100644 --- a/historyKeeper.js +++ b/historyKeeper.js @@ -105,7 +105,6 @@ module.exports.create = function (cfg) { * offsetByHash: * a map containing message offsets by their hash * this is for every message in history, so it could be very large... - * FIXME OFFSET * except we remove offsets from the map if they occur before the oldest relevant checkpoint * size: in bytes * metadata: @@ -303,32 +302,53 @@ module.exports.create = function (cfg) { * the fix is to use callbacks and implement queueing for writes * to guarantee that offset computation is always atomic with writes */ - const storeMessage = function (ctx, channel, msg, isCp, optionalMessageHash) { - // TODO implement a queue so that we know messages are written in order + const storageQueues = {}; + + const storeQueuedMessage = function (ctx, queue, id) { + if (queue.length === 0) { + delete storageQueues[id]; + return; + } + + const first = queue.shift(); + + const msgBin = first.msg; + const optionalMessageHash = first.hash; + const isCp = first.isCp; - const msgBin = new Buffer(msg + '\n', 'utf8'); // Store the message first, and update the index only once it's stored. // store.messageBin can be async so updating the index first may // result in a wrong cpIndex nThen((waitFor) => { - store.messageBin(channel.id, msgBin, waitFor(function (err) { + store.messageBin(id, msgBin, waitFor(function (err) { if (err) { waitFor.abort(); - return void Log.error("HK_STORE_MESSAGE_ERROR", err.message); + Log.error("HK_STORE_MESSAGE_ERROR", err.message); + + // this error is critical, but there's not much we can do at the moment + // proceed with more messages, but they'll probably fail too + // at least you won't have a memory leak + + // TODO make it possible to respond to clients with errors so they know + // their message wasn't stored + storeQueuedMessage(ctx, queue, id); + return; } })); }).nThen((waitFor) => { - getIndex(ctx, channel.id, waitFor((err, index) => { + getIndex(ctx, id, waitFor((err, index) => { if (err) { Log.warn("HK_STORE_MESSAGE_INDEX", err.stack); // non-critical, we'll be able to get the channel index later + + // proceed to the next message in the queue + storeQueuedMessage(ctx, queue, id); return; } if (typeof (index.line) === "number") { index.line++; } if (isCp) { index.cpIndex = sliceCpIndex(index.cpIndex, index.line || 0); for (let k in index.offsetByHash) { - // FIXME OFFSET if (index.offsetByHash[k] < index.cpIndex[0]) { delete index.offsetByHash[k]; } @@ -340,10 +360,32 @@ module.exports.create = function (cfg) { } if (optionalMessageHash) { index.offsetByHash[optionalMessageHash] = index.size; } index.size += msgBin.length; + + // handle the next element in the queue + storeQueuedMessage(ctx, queue, id); })); }); }; + const storeMessage = function (ctx, channel, msg, isCp, optionalMessageHash) { + const id = channel.id; + + const msgBin = new Buffer(msg + '\n', 'utf8'); + if (Array.isArray(storageQueues[id])) { + return void storageQueues[id].push({ + msg: msgBin, + hash: optionalMessageHash, + isCp: isCp, + }); + } + + const queue = storageQueues[id] = (storageQueues[id] || [{ + msg: msgBin, + hash: optionalMessageHash, + }]); + storeQueuedMessage(ctx, queue, id); + }; + var CHECKPOINT_PATTERN = /^cp\|(([A-Za-z0-9+\/=]+)\|)?/; /* onChannelMessage