From 92dda92a0035ed0795c56b66f952217b69550c95 Mon Sep 17 00:00:00 2001 From: ansuz Date: Thu, 4 Jul 2019 11:04:28 +0200 Subject: [PATCH] very WIP update to serve accumulated metadata updates --- historyKeeper.js | 101 ++++++++++++++++++++++++++++++----------------- 1 file changed, 65 insertions(+), 36 deletions(-) diff --git a/historyKeeper.js b/historyKeeper.js index dac280ffb..37d647360 100644 --- a/historyKeeper.js +++ b/historyKeeper.js @@ -5,6 +5,8 @@ const nThen = require('nthen'); const Nacl = require('tweetnacl'); const Crypto = require('crypto'); +const Once = require("./lib/once"); +const Meta = require("./lib/metadata"); let Log; const now = function () { return (new Date()).getTime(); }; @@ -61,51 +63,78 @@ module.exports.create = function (cfg) { const cpIndex = []; let messageBuf = []; let validateKey; - let metadata; // FIXME METADATA + let metadata; // FIXME METADATA READ let i = 0; - store.readMessagesBin(channelName, 0, (msgObj, rmcb) => { - let msg; - i++; - if (!validateKey && msgObj.buff.indexOf('validateKey') > -1) { - metadata = msg = tryParse(msgObj.buff.toString('utf8')); // FIXME METADATA - if (typeof msg === "undefined") { return rmcb(); } - if (msg.validateKey) { - validateKey = historyKeeperKeys[channelName] = msg; - return rmcb(); + + const ref = {}; + + const CB = Once(cb); + + const offsetByHash = {}; + let size = 0; + nThen(function (w) { + store.readMessagesBin(channelName, 0, (msgObj, rmcb) => { + let msg; + i++; + if (!validateKey && msgObj.buff.indexOf('validateKey') > -1) { + metadata = msg = tryParse(msgObj.buff.toString('utf8')); // FIXME METADATA READ + if (typeof msg === "undefined") { return rmcb(); } + if (msg.validateKey) { + validateKey = historyKeeperKeys[channelName] = msg; + return rmcb(); + } } - } - if (msgObj.buff.indexOf('cp|') > -1) { - msg = msg || tryParse(msgObj.buff.toString('utf8')); - if (typeof msg === "undefined") { return rmcb(); } - if (msg[2] === 'MSG' && msg[4].indexOf('cp|') === 0) { - cpIndex.push({ - offset: msgObj.offset, - line: i - }); - messageBuf = []; + if (msgObj.buff.indexOf('cp|') > -1) { + msg = msg || tryParse(msgObj.buff.toString('utf8')); + if (typeof msg === "undefined") { return rmcb(); } + if (msg[2] === 'MSG' && msg[4].indexOf('cp|') === 0) { + cpIndex.push({ + offset: msgObj.offset, + line: i + }); + messageBuf = []; + } + } + messageBuf.push(msgObj); + return rmcb(); + }, w((err) => { + if (err && err.code !== 'ENOENT') { + w.abort(); + return void CB(err); } + messageBuf.forEach((msgObj) => { + const msg = tryParse(msgObj.buff.toString('utf8')); + if (typeof msg === "undefined") { return; } + if (msg[0] === 0 && msg[2] === 'MSG' && typeof(msg[4]) === 'string') { + offsetByHash[getHash(msg[4])] = msgObj.offset; + } + // There is a trailing \n at the end of the file + size = msgObj.offset + msgObj.buff.length + 1; + }); + })); + }).nThen(function (w) { + // get amended metadata + const handler = Meta.createLineHandler(ref, Log.error); + + if (metadata) { + handler(void 0, metadata); } - messageBuf.push(msgObj); - return rmcb(); - }, (err) => { - if (err && err.code !== 'ENOENT') { return void cb(err); } - const offsetByHash = {}; - let size = 0; - messageBuf.forEach((msgObj) => { - const msg = tryParse(msgObj.buff.toString('utf8')); - if (typeof msg === "undefined") { return; } - if (msg[0] === 0 && msg[2] === 'MSG' && typeof(msg[4]) === 'string') { - offsetByHash[getHash(msg[4])] = msgObj.offset; + + store.readDedicatedMetadata(channelName, handler, w(function (err) { + if (err) { + return void Log.error("DEDICATED_METADATA_ERROR", err); } - // There is a trailing \n at the end of the file - size = msgObj.offset + msgObj.buff.length + 1; - }); - cb(null, { + metadata = ref.meta; + })); + }).nThen(function () { + // FIXME METADATA READ + + CB(null, { // Only keep the checkpoints included in the last 100 messages cpIndex: sliceCpIndex(cpIndex, i), offsetByHash: offsetByHash, size: size, - metadata: metadata, // FIXME METADATA + metadata: metadata, // FIXME METADATA STORE line: i }); });