From abe654f56581b624d10474394fd6c94c297d8660 Mon Sep 17 00:00:00 2001 From: ansuz Date: Tue, 23 Jul 2019 13:59:12 +0200 Subject: [PATCH] many more comments. cache metadata even in the event of a metadata stream error --- historyKeeper.js | 39 +++++++++++++++++++++------------------ 1 file changed, 21 insertions(+), 18 deletions(-) diff --git a/historyKeeper.js b/historyKeeper.js index 48c7f83ea..a2c67bc0e 100644 --- a/historyKeeper.js +++ b/historyKeeper.js @@ -105,7 +105,7 @@ module.exports.create = function (cfg) { const computeIndex = function (channelName, cb) { const cpIndex = []; let messageBuf = []; - let metadata; // FIXME METADATA READ + let metadata; let i = 0; const ref = {}; @@ -124,19 +124,13 @@ module.exports.create = function (cfg) { // keep an eye out for the metadata line if you haven't already seen it // but only check for metadata on the first line if (!i && !metadata && msgObj.buff.indexOf('{') === 0) { - i++; // always increment i - msg = tryParse(msgObj.buff.toString('utf8')); // FIXME METADATA READ + i++; // always increment the message counter + msg = tryParse(msgObj.buff.toString('utf8')); if (typeof msg === "undefined") { return rmcb(); } - // XXX metadata should be truthey, an object, and not an array... + // validate that the current line really is metadata before storing it as such if (msg && typeof(msg) === 'object' && !Array.isArray(msg)) { metadata = msg; - - // metadata can contain: - // validateKey, owners, expiration... - //if (msg.validateKey || msg.owners || msg.expire) { - //metadata_cache[channelName] = msg; - //} return rmcb(); } } @@ -144,14 +138,19 @@ module.exports.create = function (cfg) { if (msgObj.buff.indexOf('cp|') > -1) { msg = msg || tryParse(msgObj.buff.toString('utf8')); if (typeof msg === "undefined") { return rmcb(); } + // cache the offsets of checkpoints if they can be parsed if (msg[2] === 'MSG' && msg[4].indexOf('cp|') === 0) { cpIndex.push({ offset: msgObj.offset, line: i }); + // we only want to store messages since the latest checkpoint + // so clear the buffer every time you see a new one messageBuf = []; } } + // if it's not metadata or a checkpoint then it should be a regular message + // store it in the buffer messageBuf.push(msgObj); return rmcb(); }, w((err) => { @@ -159,6 +158,9 @@ module.exports.create = function (cfg) { w.abort(); return void CB(err); } + + // once indexing is complete you should have a buffer of messages since the latest checkpoint + // map the 'hash' of each message to its byte offset in the log, to be used for reconnecting clients messageBuf.forEach((msgObj) => { const msg = tryParse(msgObj.buff.toString('utf8')); if (typeof msg === "undefined") { return; } @@ -172,28 +174,29 @@ module.exports.create = function (cfg) { }); })); }).nThen(function (w) { - // get amended metadata + // create a function which will iterate over amendments to the metadata const handler = Meta.createLineHandler(ref, Log.error); - if (metadata) { - handler(void 0, metadata); - } + // initialize the accumulator in case there was a foundational metadata line in the log content + if (metadata) { handler(void 0, metadata); } + // iterate over the dedicated metadata log (if it exists) + // proceed even in the event of a stream error on the metadata log store.readDedicatedMetadata(channelName, handler, w(function (err) { if (err) { return void Log.error("DEDICATED_METADATA_ERROR", err); } - metadata = metadata_cache[channelName] = ref.meta; })); }).nThen(function () { - // FIXME METADATA READ - + // when all is done, cache the metadata in memory + metadata = metadata_cache[channelName] = ref.meta; + // and return the computed index CB(null, { // Only keep the checkpoints included in the last 100 messages cpIndex: sliceCpIndex(cpIndex, i), offsetByHash: offsetByHash, size: size, - metadata: metadata, // FIXME METADATA STORE + metadata: metadata, line: i }); });