From 466072d03b9dee88a1c1b642f9173248ed7b6ca6 Mon Sep 17 00:00:00 2001 From: ansuz Date: Thu, 26 Nov 2020 12:15:02 +0530 Subject: [PATCH] read files starting from the oldest known point of relevance when computing indices --- lib/storage/file.js | 64 +++++++++++++++++++++- lib/stream-file.js | 9 +-- lib/workers/db-worker.js | 115 ++++++++++++++++++++++++++++++++++----- 3 files changed, 167 insertions(+), 21 deletions(-) diff --git a/lib/storage/file.js b/lib/storage/file.js index b1ccfde0f..8cdfa747a 100644 --- a/lib/storage/file.js +++ b/lib/storage/file.js @@ -1,6 +1,6 @@ /*@flow*/ /* jshint esversion: 6 */ -/* global Buffer */ +/* globals Buffer */ var Fs = require("fs"); var Fse = require("fs-extra"); var Path = require("path"); @@ -66,6 +66,10 @@ var mkTempPath = function (env, channelId) { return mkPath(env, channelId) + '.temp'; }; +var mkOffsetPath = function (env, channelId) { + return mkPath(env, channelId) + '.offset'; +}; + // pass in the path so we can reuse the same function for archived files var channelExists = function (filepath, cb) { Fs.stat(filepath, function (err, stat) { @@ -131,7 +135,9 @@ const readMessagesBin = (env, id, start, msgHandler, cb) => { const collector = createIdleStreamCollector(stream); const handleMessageAndKeepStreamAlive = Util.both(msgHandler, collector.keepAlive); const done = Util.both(cb, collector); - return void readFileBin(stream, handleMessageAndKeepStreamAlive, done); + return void readFileBin(stream, handleMessageAndKeepStreamAlive, done, { + offset: start, + }); }; // reads classic metadata from a channel log and aborts @@ -190,6 +196,37 @@ var closeChannel = function (env, channelName, cb) { } }; +var clearOffset = function (env, channelId, cb) { + var path = mkOffsetPath(env, channelId); + // we should always be able to recover from invalid offsets, so failure to delete them + // is not catastrophic. Anything calling this function can optionally ignore errors it might report + Fs.unlink(path, cb); +}; + +var writeOffset = function (env, channelId, data, cb) { + var path = mkOffsetPath(env, channelId); + var s_data; + try { + s_data = JSON.stringify(data); + } catch (err) { + return void cb(err); + } + Fs.writeFile(path, s_data, cb); +}; + +var getOffset = function (env, channelId, cb) { + var path = mkOffsetPath(env, channelId); + Fs.readFile(path, function (err, content) { + if (err) { return void cb(err); } + try { + var json = JSON.parse(content); + cb(void 0, json); + } catch (err2) { + cb(err2); + } + }); +}; + // truncates a file to the end of its metadata line // TODO write the metadata in a dedicated file var clearChannel = function (env, channelId, _cb) { @@ -213,6 +250,7 @@ var clearChannel = function (env, channelId, _cb) { cb(); }); }); + clearOffset(env, channelId, function () {}); }); }; @@ -389,6 +427,7 @@ var removeChannel = function (env, channelName, cb) { CB(labelError("E_METADATA_REMOVAL", err)); } })); + clearOffset(env, channelName, w()); }).nThen(function () { if (errors === 2) { return void CB(labelError('E_REMOVE_CHANNEL', new Error("ENOENT"))); @@ -604,6 +643,8 @@ var archiveChannel = function (env, channelName, cb) { return void cb(err); } })); + }).nThen(function (w) { + clearOffset(env, channelName, w()); }).nThen(function (w) { // archive the dedicated metadata channel var metadataPath = mkMetadataPath(env, channelName); @@ -861,6 +902,7 @@ var trimChannel = function (env, channelName, hash, _cb) { } })); }).nThen(function (w) { + clearOffset(env, channelName, w()); cleanUp(w(function (err) { if (err) { w.abort(); @@ -1177,6 +1219,24 @@ module.exports.create = function (conf, _cb) { }); }, + // OFFSETS +// these exist strictly as an optimization +// you can always remove them without data loss + clearOffset: function (channelName, cb) { + if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); } + clearOffset(env, channelName, cb); + }, + writeOffset: function (channelName, data, _cb) { + var cb = Util.mkAsync(_cb); + if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); } + writeOffset(env, channelName, data, cb); + }, + getOffset: function (channelName, _cb) { + var cb = Util.mkAsync(_cb); + if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); } + getOffset(env, channelName, cb); + }, + // METADATA METHODS // fetch the metadata for a channel getChannelMetadata: function (channelName, cb) { diff --git a/lib/stream-file.js b/lib/stream-file.js index c3130365b..a164ceffc 100644 --- a/lib/stream-file.js +++ b/lib/stream-file.js @@ -44,8 +44,8 @@ const mkBufferSplit = () => { // return a streaming function which transforms buffers into objects // containing the buffer and the offset from the start of the stream -const mkOffsetCounter = () => { - let offset = 0; +const mkOffsetCounter = (offset) => { + offset = offset || 0; return Pull.map((buff) => { const out = { offset: offset, buff: buff }; // +1 for the eaten newline @@ -59,13 +59,14 @@ const mkOffsetCounter = () => { // that this function has a lower memory profile than our classic method // of reading logs line by line. // it also allows the handler to abort reading at any time -Stream.readFileBin = (stream, msgHandler, cb) => { +Stream.readFileBin = (stream, msgHandler, cb, opt) => { + opt = opt || {}; //const stream = Fs.createReadStream(path, { start: start }); let keepReading = true; Pull( ToPull.read(stream), mkBufferSplit(), - mkOffsetCounter(), + mkOffsetCounter(opt.offset), Pull.asyncMap((data, moreCb) => { msgHandler(data, moreCb, () => { try { diff --git a/lib/workers/db-worker.js b/lib/workers/db-worker.js index 9d5abf386..d871bcb97 100644 --- a/lib/workers/db-worker.js +++ b/lib/workers/db-worker.js @@ -1,5 +1,5 @@ /* jshint esversion: 6 */ -/* global process */ +/* globals process, Buffer */ const HK = require("../hk-util"); const Store = require("../storage/file"); @@ -114,14 +114,15 @@ const init = function (config, _cb) { * including the initial metadata line, if it exists */ -const computeIndex = function (data, cb) { - if (!data || !data.channel) { - return void cb('E_NO_CHANNEL'); - } - const channelName = data.channel; +const OPEN_CURLY_BRACE = Buffer.from('{'); +const CHECKPOINT_PREFIX = Buffer.from('cp|'); +const isValidOffsetNumber = function (n) { + return typeof(n) === 'number' && n >= 0; +}; - const cpIndex = []; +const computeIndexFromOffset = function (channelName, offset, cb) { + let cpIndex = []; let messageBuf = []; let i = 0; @@ -129,27 +130,42 @@ const computeIndex = function (data, cb) { const offsetByHash = {}; let offsetCount = 0; - let size = 0; + let size = offset || 0; + var start = offset || 0; + let unconventional = false; + nThen(function (w) { // iterate over all messages in the channel log // old channels can contain metadata as the first message of the log // skip over metadata as that is handled elsewhere // otherwise index important messages in the log - store.readMessagesBin(channelName, 0, (msgObj, readMore) => { + store.readMessagesBin(channelName, start, (msgObj, readMore, abort) => { let msg; // keep an eye out for the metadata line if you haven't already seen it // but only check for metadata on the first line - if (!i && msgObj.buff.indexOf('{') === 0) { - i++; // always increment the message counter + if (i) { + // fall through intentionally because the following blocks are invalid + // for all but the first message + } else if (msgObj.buff.includes(OPEN_CURLY_BRACE)) { msg = HK.tryParse(Env, msgObj.buff.toString('utf8')); - if (typeof msg === "undefined") { return readMore(); } + if (typeof msg === "undefined") { + i++; // always increment the message counter + return readMore(); + } // validate that the current line really is metadata before storing it as such // skip this, as you already have metadata... - if (HK.isMetadataMessage(msg)) { return readMore(); } + if (HK.isMetadataMessage(msg)) { + i++; // always increment the message counter + return readMore(); + } + } else if (!(msg = HK.tryParse(Env, msgObj.buff.toString('utf8')))) { + w.abort(); + abort(); + return CB("OFFSET_ERROR"); } i++; - if (msgObj.buff.indexOf('cp|') > -1) { + if (msgObj.buff.includes(CHECKPOINT_PREFIX)) { msg = msg || HK.tryParse(Env, msgObj.buff.toString('utf8')); if (typeof msg === "undefined") { return readMore(); } // cache the offsets of checkpoints if they can be parsed @@ -164,6 +180,7 @@ const computeIndex = function (data, cb) { } } else if (messageBuf.length > 100 && cpIndex.length === 0) { // take the last 50 messages + unconventional = true; messageBuf = messageBuf.slice(-50); } // if it's not metadata or a checkpoint then it should be a regular message @@ -192,11 +209,38 @@ const computeIndex = function (data, cb) { size = msgObj.offset + msgObj.buff.length + 1; }); })); + }).nThen(function (w) { + cpIndex = HK.sliceCpIndex(cpIndex, i); + + var new_start; + if (cpIndex.length) { + new_start = cpIndex[0].offset; + } else if (unconventional && messageBuf.length && isValidOffsetNumber(messageBuf[0].offset)) { + new_start = messageBuf[0].offset; + } + + if (new_start === start) { return; } + if (!isValidOffsetNumber(new_start)) { return; } + + // store the offset of the earliest relevant line so that you can start from there next time... + store.writeOffset(channelName, { + start: new_start, + created: +new Date(), + }, w(function () { + var diff = new_start - start; + Env.Log.info('WORKER_OFFSET_UPDATE', { + channel: channelName, + old_start: start, + new_start: new_start, + diff: diff, + diffMB: diff / 1024 / 1024, + }); + })); }).nThen(function () { // return the computed index CB(null, { // Only keep the checkpoints included in the last 100 messages - cpIndex: HK.sliceCpIndex(cpIndex, i), + cpIndex: cpIndex, offsetByHash: offsetByHash, offsets: offsetCount, size: size, @@ -206,6 +250,47 @@ const computeIndex = function (data, cb) { }); }; +const computeIndex = function (data, cb) { + if (!data || !data.channel) { + return void cb('E_NO_CHANNEL'); + } + + const channelName = data.channel; + const CB = Util.once(cb); + + var start = 0; + nThen(function (w) { + store.getOffset(channelName, w(function (err, obj) { + if (err) { return; } + if (obj && typeof(obj.start) === 'number' && obj.start > 0) { + start = obj.start; + Env.Log.verbose('WORKER_OFFSET_RECOVERY', { + channel: channelName, + start: start, + startMB: start / 1024 / 1024, + }); + } + })); + }).nThen(function (w) { + computeIndexFromOffset(channelName, start, w(function (err, index) { + if (err === 'OFFSET_ERROR') { + return Env.Log.error("WORKER_OFFSET_ERROR", { + channel: channelName, + }); + } + w.abort(); + CB(err, index); + })); + }).nThen(function (w) { + // if you're here there was an OFFSET_ERROR.. + // first remove the offset that caused the problem to begin with + store.clearOffset(channelName, w()); + }).nThen(function () { + // now get the history as though it were the first time + computeIndexFromOffset(channelName, 0, CB); + }); +}; + const computeMetadata = function (data, cb) { const ref = {}; const lineHandler = Meta.createLineHandler(ref, Env.Log.error);