diff --git a/lib/storage/file.js b/lib/storage/file.js index b1ac4de3d..e98b17f94 100644 --- a/lib/storage/file.js +++ b/lib/storage/file.js @@ -10,11 +10,9 @@ var Util = require("../common-util"); var Meta = require("../metadata"); var Extras = require("../hk-util"); -const Schedule = require("../schedule"); -const Readline = require("readline"); -const ToPull = require('stream-to-pull-stream'); -const Pull = require('pull-stream'); +const readFileBin = require("../stream-file").readFileBin; +const Schedule = require("../schedule"); const isValidChannelId = function (id) { return typeof(id) === 'string' && id.length >= 32 && id.length < 50 && @@ -60,13 +58,24 @@ var channelExists = function (filepath, cb) { }); }; +// readMessagesBin asynchronously iterates over the messages in a channel log +// the handler for each message must call back to read more, which should mean +// that this function has a lower memory profile than our classic method +// of reading logs line by line. +// it also allows the handler to abort reading at any time +const readMessagesBin = (env, id, start, msgHandler, cb) => { + const stream = Fs.createReadStream(mkPath(env, id), { start: start }); + return void readFileBin(env, stream, msgHandler, cb); +}; + // reads classic metadata from a channel log and aborts // returns undefined if the first message was not an object (not an array) var getMetadataAtPath = function (Env, path, _cb) { - var stream; + const stream = Fs.createReadStream(path, { start: 0 }); // cb implicitly destroys the stream, if it exists // and calls back asynchronously no more than once + /* var cb = Util.once(Util.both(function () { try { stream.destroy(); @@ -74,20 +83,26 @@ var getMetadataAtPath = function (Env, path, _cb) { return err; } }, Util.mkAsync(_cb))); + */ - // stream creation emit errors... probably ENOENT - stream = Fs.createReadStream(path, { encoding: 'utf8' }).on('error', cb); - - // stream lines - const rl = Readline.createInterface({ - input: stream, + var cb = Util.once(Util.mkAsync(_cb), function () { + throw new Error("Multiple Callbacks"); }); var i = 0; - rl - .on('line', function (line) { + return readFileBin(Env, stream, function (msgObj, readMore, abort) { + const line = msgObj.buff.toString('utf8'); + + if (!line) { + return readMore(); + } + // metadata should always be on the first line or not exist in the channel at all - if (i++ > 0) { return void cb(); } + if (i++ > 0) { + console.log("aborting"); + abort(); + return void cb(); + } var metadata; try { metadata = JSON.parse(line); @@ -102,9 +117,10 @@ var getMetadataAtPath = function (Env, path, _cb) { // if you can't parse, that's bad return void cb("INVALID_METADATA"); } - }) - .on('close', cb) - .on('error', cb); + readMore(); + }, function (err) { + cb(err); + }); }; var closeChannel = function (env, channelName, cb) { @@ -150,6 +166,7 @@ var clearChannel = function (env, channelId, _cb) { /* readMessages is our classic method of reading messages from the disk notably doesn't provide a means of aborting if you finish early */ +// XXX replicate current API on top of readMessagesBin var readMessages = function (path, msgHandler, cb) { var remainder = ''; var stream = Fs.createReadStream(path, { encoding: 'utf8' }); @@ -186,6 +203,7 @@ var getChannelMetadata = function (Env, channelId, cb) { // low level method for getting just the dedicated metadata channel var getDedicatedMetadata = function (env, channelId, handler, cb) { var metadataPath = mkMetadataPath(env, channelId); + // XXX use readFileBin readMessages(metadataPath, function (line) { if (!line) { return; } try { @@ -266,75 +284,6 @@ var writeMetadata = function (env, channelId, data, cb) { }; -// transform a stream of arbitrarily divided data -// into a stream of buffers divided by newlines in the source stream -// TODO see if we could improve performance by using libnewline -const NEWLINE_CHR = ('\n').charCodeAt(0); -const mkBufferSplit = () => { - let remainder = null; - return Pull((read) => { - return (abort, cb) => { - read(abort, function (end, data) { - if (end) { - if (data) { console.log("mkBufferSplit() Data at the end"); } - cb(end, remainder ? [remainder, data] : [data]); - remainder = null; - return; - } - const queue = []; - for (;;) { - const offset = data.indexOf(NEWLINE_CHR); - if (offset < 0) { - remainder = remainder ? Buffer.concat([remainder, data]) : data; - break; - } - let subArray = data.slice(0, offset); - if (remainder) { - subArray = Buffer.concat([remainder, subArray]); - remainder = null; - } - queue.push(subArray); - data = data.slice(offset + 1); - } - cb(end, queue); - }); - }; - }, Pull.flatten()); -}; - -// return a streaming function which transforms buffers into objects -// containing the buffer and the offset from the start of the stream -const mkOffsetCounter = () => { - let offset = 0; - return Pull.map((buff) => { - const out = { offset: offset, buff: buff }; - // +1 for the eaten newline - offset += buff.length + 1; - return out; - }); -}; - -// readMessagesBin asynchronously iterates over the messages in a channel log -// the handler for each message must call back to read more, which should mean -// that this function has a lower memory profile than our classic method -// of reading logs line by line. -// it also allows the handler to abort reading at any time -const readMessagesBin = (env, id, start, msgHandler, cb) => { - const stream = Fs.createReadStream(mkPath(env, id), { start: start }); - let keepReading = true; - Pull( - ToPull.read(stream), - mkBufferSplit(), - mkOffsetCounter(), - Pull.asyncMap((data, moreCb) => { - msgHandler(data, moreCb, () => { keepReading = false; moreCb(); }); - }), - Pull.drain(() => (keepReading), (err) => { - cb((keepReading) ? err : undefined); - }) - ); -}; - // check if a file exists at $path var checkPath = function (path, callback) { Fs.stat(path, function (err) { @@ -428,6 +377,7 @@ var removeArchivedChannel = function (env, channelName, cb) { }); }; +// XXX use ../plan.js var listChannels = function (root, handler, cb) { // do twenty things at a time var sema = Semaphore.create(20); diff --git a/lib/stream-file.js b/lib/stream-file.js new file mode 100644 index 000000000..12322d868 --- /dev/null +++ b/lib/stream-file.js @@ -0,0 +1,76 @@ +/* jshint esversion: 6 */ +/* global Buffer */ + +const ToPull = require('stream-to-pull-stream'); +const Pull = require('pull-stream'); + +const Stream = module.exports; + +// transform a stream of arbitrarily divided data +// into a stream of buffers divided by newlines in the source stream +// TODO see if we could improve performance by using libnewline +const NEWLINE_CHR = ('\n').charCodeAt(0); +const mkBufferSplit = () => { + let remainder = null; + return Pull((read) => { + return (abort, cb) => { + read(abort, function (end, data) { + if (end) { + if (data) { console.log("mkBufferSplit() Data at the end"); } + cb(end, remainder ? [remainder, data] : [data]); + remainder = null; + return; + } + const queue = []; + for (;;) { + const offset = data.indexOf(NEWLINE_CHR); + if (offset < 0) { + remainder = remainder ? Buffer.concat([remainder, data]) : data; + break; + } + let subArray = data.slice(0, offset); + if (remainder) { + subArray = Buffer.concat([remainder, subArray]); + remainder = null; + } + queue.push(subArray); + data = data.slice(offset + 1); + } + cb(end, queue); + }); + }; + }, Pull.flatten()); +}; + +// return a streaming function which transforms buffers into objects +// containing the buffer and the offset from the start of the stream +const mkOffsetCounter = () => { + let offset = 0; + return Pull.map((buff) => { + const out = { offset: offset, buff: buff }; + // +1 for the eaten newline + offset += buff.length + 1; + return out; + }); +}; + +// readMessagesBin asynchronously iterates over the messages in a channel log +// the handler for each message must call back to read more, which should mean +// that this function has a lower memory profile than our classic method +// of reading logs line by line. +// it also allows the handler to abort reading at any time +Stream.readFileBin = (env, stream, msgHandler, cb) => { + //const stream = Fs.createReadStream(path, { start: start }); + let keepReading = true; + Pull( + ToPull.read(stream), + mkBufferSplit(), + mkOffsetCounter(), + Pull.asyncMap((data, moreCb) => { + msgHandler(data, moreCb, () => { keepReading = false; moreCb(); }); + }), + Pull.drain(() => (keepReading), (err) => { + cb((keepReading) ? err : undefined); + }) + ); +};