begin standardizing our method of streaming lines from files

pull/1/head
ansuz 5 years ago
parent 35eca2c5d2
commit 32d769447a

@ -204,6 +204,7 @@ Channel.isNewChannel = function (Env, channel, cb) {
if (!Core.isValidId(channel)) { return void cb('INVALID_CHAN'); } if (!Core.isValidId(channel)) { return void cb('INVALID_CHAN'); }
if (channel.length !== 32) { return void cb('INVALID_CHAN'); } if (channel.length !== 32) { return void cb('INVALID_CHAN'); }
// TODO replace with readMessagesBin
var done = false; var done = false;
Env.msgStore.getMessages(channel, function (msg) { Env.msgStore.getMessages(channel, function (msg) {
if (done) { return; } if (done) { return; }

@ -174,6 +174,7 @@ var loadUserPins = function (Env, safeKey, cb) {
}); });
// if channels aren't in memory. load them from disk // if channels aren't in memory. load them from disk
// TODO replace with readMessagesBin
Env.pinStore.getMessages(safeKey, lineHandler, function () { Env.pinStore.getMessages(safeKey, lineHandler, function () {
// no more messages // no more messages

@ -65,7 +65,7 @@ var channelExists = function (filepath, cb) {
// it also allows the handler to abort reading at any time // it also allows the handler to abort reading at any time
const readMessagesBin = (env, id, start, msgHandler, cb) => { const readMessagesBin = (env, id, start, msgHandler, cb) => {
const stream = Fs.createReadStream(mkPath(env, id), { start: start }); const stream = Fs.createReadStream(mkPath(env, id), { start: start });
return void readFileBin(env, stream, msgHandler, cb); return void readFileBin(stream, msgHandler, cb);
}; };
// reads classic metadata from a channel log and aborts // reads classic metadata from a channel log and aborts
@ -90,7 +90,7 @@ var getMetadataAtPath = function (Env, path, _cb) {
}); });
var i = 0; var i = 0;
return readFileBin(Env, stream, function (msgObj, readMore, abort) { return readFileBin(stream, function (msgObj, readMore, abort) {
const line = msgObj.buff.toString('utf8'); const line = msgObj.buff.toString('utf8');
if (!line) { if (!line) {
@ -164,28 +164,16 @@ var clearChannel = function (env, channelId, _cb) {
}; };
/* readMessages is our classic method of reading messages from the disk /* readMessages is our classic method of reading messages from the disk
notably doesn't provide a means of aborting if you finish early notably doesn't provide a means of aborting if you finish early.
Internally it uses readFileBin: to avoid duplicating code and to use less memory
*/ */
// XXX replicate current API on top of readMessagesBin var readMessages = function (path, msgHandler, _cb) {
var readMessages = function (path, msgHandler, cb) { var stream = Fs.createReadStream(path, { start: 0});
var remainder = ''; var cb = Util.once(Util.mkAsync(_cb));
var stream = Fs.createReadStream(path, { encoding: 'utf8' }); return readFileBin(stream, function (msgObj, readMore) {
var complete = function (err) { msgHandler(msgObj.buff.toString('utf8'));
var _cb = cb; readMore();
cb = undefined; }, cb);
if (_cb) { _cb(err); }
};
stream.on('data', function (chunk) {
var lines = chunk.split('\n');
lines[0] = remainder + lines[0];
remainder = lines.pop();
lines.forEach(msgHandler);
});
stream.on('end', function () {
msgHandler(remainder);
complete();
});
stream.on('error', function (e) { complete(e); });
}; };
/* getChannelMetadata /* getChannelMetadata
@ -203,23 +191,21 @@ var getChannelMetadata = function (Env, channelId, cb) {
// low level method for getting just the dedicated metadata channel // low level method for getting just the dedicated metadata channel
var getDedicatedMetadata = function (env, channelId, handler, cb) { var getDedicatedMetadata = function (env, channelId, handler, cb) {
var metadataPath = mkMetadataPath(env, channelId); var metadataPath = mkMetadataPath(env, channelId);
// XXX use readFileBin var stream = Fs.createReadStream(metadataPath, {start: 0});
readMessages(metadataPath, function (line) { readFileBin(stream, function (msgObj, readMore) {
if (!line) { return; } var line = msgObj.buff.toString('utf8');
try { try {
var parsed = JSON.parse(line); var parsed = JSON.parse(line);
handler(null, parsed); handler(null, parsed);
} catch (e) { } catch (err) {
handler(e, line); handler(err, line);
} }
readMore();
}, function (err) { }, function (err) {
if (err) { // ENOENT => there is no metadata log
// ENOENT => there is no metadata log if (!err || err.code === 'ENOENT') { return void cb(); }
if (err.code === 'ENOENT') { return void cb(); } // otherwise stream errors?
// otherwise stream errors? cb(err);
return void cb(err);
}
cb();
}); });
}; };
@ -377,7 +363,7 @@ var removeArchivedChannel = function (env, channelName, cb) {
}); });
}; };
// XXX use ../plan.js // TODO use ../plan.js for a smaller memory footprint
var listChannels = function (root, handler, cb) { var listChannels = function (root, handler, cb) {
// do twenty things at a time // do twenty things at a time
var sema = Semaphore.create(20); var sema = Semaphore.create(20);
@ -793,6 +779,7 @@ var message = function (env, chanName, msg, cb) {
}; };
// stream messages from a channel log // stream messages from a channel log
// TODO replace getMessages with readFileBin
var getMessages = function (env, chanName, handler, cb) { var getMessages = function (env, chanName, handler, cb) {
getChannel(env, chanName, function (err, chan) { getChannel(env, chanName, function (err, chan) {
if (!chan) { if (!chan) {

@ -59,7 +59,7 @@ const mkOffsetCounter = () => {
// that this function has a lower memory profile than our classic method // that this function has a lower memory profile than our classic method
// of reading logs line by line. // of reading logs line by line.
// it also allows the handler to abort reading at any time // it also allows the handler to abort reading at any time
Stream.readFileBin = (env, stream, msgHandler, cb) => { Stream.readFileBin = (stream, msgHandler, cb) => {
//const stream = Fs.createReadStream(path, { start: start }); //const stream = Fs.createReadStream(path, { start: start });
let keepReading = true; let keepReading = true;
Pull( Pull(

Loading…
Cancel
Save