From 32d769447a0cba7e27d9ec83296baca0537be33a Mon Sep 17 00:00:00 2001 From: ansuz Date: Wed, 4 Mar 2020 11:38:07 -0500 Subject: [PATCH] begin standardizing our method of streaming lines from files --- lib/commands/channel.js | 1 + lib/commands/pin-rpc.js | 1 + lib/storage/file.js | 59 ++++++++++++++++------------------------- lib/stream-file.js | 2 +- 4 files changed, 26 insertions(+), 37 deletions(-) diff --git a/lib/commands/channel.js b/lib/commands/channel.js index c24837df3..10131d9d8 100644 --- a/lib/commands/channel.js +++ b/lib/commands/channel.js @@ -204,6 +204,7 @@ Channel.isNewChannel = function (Env, channel, cb) { if (!Core.isValidId(channel)) { return void cb('INVALID_CHAN'); } if (channel.length !== 32) { return void cb('INVALID_CHAN'); } + // TODO replace with readMessagesBin var done = false; Env.msgStore.getMessages(channel, function (msg) { if (done) { return; } diff --git a/lib/commands/pin-rpc.js b/lib/commands/pin-rpc.js index bd6852a67..2888f1e61 100644 --- a/lib/commands/pin-rpc.js +++ b/lib/commands/pin-rpc.js @@ -174,6 +174,7 @@ var loadUserPins = function (Env, safeKey, cb) { }); // if channels aren't in memory. load them from disk + // TODO replace with readMessagesBin Env.pinStore.getMessages(safeKey, lineHandler, function () { // no more messages diff --git a/lib/storage/file.js b/lib/storage/file.js index e98b17f94..6d2c672a5 100644 --- a/lib/storage/file.js +++ b/lib/storage/file.js @@ -65,7 +65,7 @@ var channelExists = function (filepath, cb) { // it also allows the handler to abort reading at any time const readMessagesBin = (env, id, start, msgHandler, cb) => { const stream = Fs.createReadStream(mkPath(env, id), { start: start }); - return void readFileBin(env, stream, msgHandler, cb); + return void readFileBin(stream, msgHandler, cb); }; // reads classic metadata from a channel log and aborts @@ -90,7 +90,7 @@ var getMetadataAtPath = function (Env, path, _cb) { }); var i = 0; - return readFileBin(Env, stream, function (msgObj, readMore, abort) { + return readFileBin(stream, function (msgObj, readMore, abort) { const line = msgObj.buff.toString('utf8'); if (!line) { @@ -164,28 +164,16 @@ var clearChannel = function (env, channelId, _cb) { }; /* readMessages is our classic method of reading messages from the disk - notably doesn't provide a means of aborting if you finish early + notably doesn't provide a means of aborting if you finish early. + Internally it uses readFileBin: to avoid duplicating code and to use less memory */ -// XXX replicate current API on top of readMessagesBin -var readMessages = function (path, msgHandler, cb) { - var remainder = ''; - var stream = Fs.createReadStream(path, { encoding: 'utf8' }); - var complete = function (err) { - var _cb = cb; - cb = undefined; - if (_cb) { _cb(err); } - }; - stream.on('data', function (chunk) { - var lines = chunk.split('\n'); - lines[0] = remainder + lines[0]; - remainder = lines.pop(); - lines.forEach(msgHandler); - }); - stream.on('end', function () { - msgHandler(remainder); - complete(); - }); - stream.on('error', function (e) { complete(e); }); +var readMessages = function (path, msgHandler, _cb) { + var stream = Fs.createReadStream(path, { start: 0}); + var cb = Util.once(Util.mkAsync(_cb)); + return readFileBin(stream, function (msgObj, readMore) { + msgHandler(msgObj.buff.toString('utf8')); + readMore(); + }, cb); }; /* getChannelMetadata @@ -203,23 +191,21 @@ var getChannelMetadata = function (Env, channelId, cb) { // low level method for getting just the dedicated metadata channel var getDedicatedMetadata = function (env, channelId, handler, cb) { var metadataPath = mkMetadataPath(env, channelId); - // XXX use readFileBin - readMessages(metadataPath, function (line) { - if (!line) { return; } + var stream = Fs.createReadStream(metadataPath, {start: 0}); + readFileBin(stream, function (msgObj, readMore) { + var line = msgObj.buff.toString('utf8'); try { var parsed = JSON.parse(line); handler(null, parsed); - } catch (e) { - handler(e, line); + } catch (err) { + handler(err, line); } + readMore(); }, function (err) { - if (err) { - // ENOENT => there is no metadata log - if (err.code === 'ENOENT') { return void cb(); } - // otherwise stream errors? - return void cb(err); - } - cb(); + // ENOENT => there is no metadata log + if (!err || err.code === 'ENOENT') { return void cb(); } + // otherwise stream errors? + cb(err); }); }; @@ -377,7 +363,7 @@ var removeArchivedChannel = function (env, channelName, cb) { }); }; -// XXX use ../plan.js +// TODO use ../plan.js for a smaller memory footprint var listChannels = function (root, handler, cb) { // do twenty things at a time var sema = Semaphore.create(20); @@ -793,6 +779,7 @@ var message = function (env, chanName, msg, cb) { }; // stream messages from a channel log +// TODO replace getMessages with readFileBin var getMessages = function (env, chanName, handler, cb) { getChannel(env, chanName, function (err, chan) { if (!chan) { diff --git a/lib/stream-file.js b/lib/stream-file.js index 12322d868..dc44aaf50 100644 --- a/lib/stream-file.js +++ b/lib/stream-file.js @@ -59,7 +59,7 @@ const mkOffsetCounter = () => { // that this function has a lower memory profile than our classic method // of reading logs line by line. // it also allows the handler to abort reading at any time -Stream.readFileBin = (env, stream, msgHandler, cb) => { +Stream.readFileBin = (stream, msgHandler, cb) => { //const stream = Fs.createReadStream(path, { start: start }); let keepReading = true; Pull(