pull file streaming out into its own file, leave a few notes

pull/1/head
ansuz 5 years ago
parent ccd6e1d6df
commit 35eca2c5d2

@ -10,11 +10,9 @@ var Util = require("../common-util");
var Meta = require("../metadata");
var Extras = require("../hk-util");
const Schedule = require("../schedule");
const Readline = require("readline");
const ToPull = require('stream-to-pull-stream');
const Pull = require('pull-stream');
const readFileBin = require("../stream-file").readFileBin;
const Schedule = require("../schedule");
const isValidChannelId = function (id) {
return typeof(id) === 'string' &&
id.length >= 32 && id.length < 50 &&
@ -60,13 +58,24 @@ var channelExists = function (filepath, cb) {
});
};
// readMessagesBin asynchronously iterates over the messages in a channel log
// the handler for each message must call back to read more, which should mean
// that this function has a lower memory profile than our classic method
// of reading logs line by line.
// it also allows the handler to abort reading at any time
const readMessagesBin = (env, id, start, msgHandler, cb) => {
const stream = Fs.createReadStream(mkPath(env, id), { start: start });
return void readFileBin(env, stream, msgHandler, cb);
};
// reads classic metadata from a channel log and aborts
// returns undefined if the first message was not an object (not an array)
var getMetadataAtPath = function (Env, path, _cb) {
var stream;
const stream = Fs.createReadStream(path, { start: 0 });
// cb implicitly destroys the stream, if it exists
// and calls back asynchronously no more than once
/*
var cb = Util.once(Util.both(function () {
try {
stream.destroy();
@ -74,20 +83,26 @@ var getMetadataAtPath = function (Env, path, _cb) {
return err;
}
}, Util.mkAsync(_cb)));
*/
// stream creation emit errors... probably ENOENT
stream = Fs.createReadStream(path, { encoding: 'utf8' }).on('error', cb);
// stream lines
const rl = Readline.createInterface({
input: stream,
var cb = Util.once(Util.mkAsync(_cb), function () {
throw new Error("Multiple Callbacks");
});
var i = 0;
rl
.on('line', function (line) {
return readFileBin(Env, stream, function (msgObj, readMore, abort) {
const line = msgObj.buff.toString('utf8');
if (!line) {
return readMore();
}
// metadata should always be on the first line or not exist in the channel at all
if (i++ > 0) { return void cb(); }
if (i++ > 0) {
console.log("aborting");
abort();
return void cb();
}
var metadata;
try {
metadata = JSON.parse(line);
@ -102,9 +117,10 @@ var getMetadataAtPath = function (Env, path, _cb) {
// if you can't parse, that's bad
return void cb("INVALID_METADATA");
}
})
.on('close', cb)
.on('error', cb);
readMore();
}, function (err) {
cb(err);
});
};
var closeChannel = function (env, channelName, cb) {
@ -150,6 +166,7 @@ var clearChannel = function (env, channelId, _cb) {
/* readMessages is our classic method of reading messages from the disk
notably doesn't provide a means of aborting if you finish early
*/
// XXX replicate current API on top of readMessagesBin
var readMessages = function (path, msgHandler, cb) {
var remainder = '';
var stream = Fs.createReadStream(path, { encoding: 'utf8' });
@ -186,6 +203,7 @@ var getChannelMetadata = function (Env, channelId, cb) {
// low level method for getting just the dedicated metadata channel
var getDedicatedMetadata = function (env, channelId, handler, cb) {
var metadataPath = mkMetadataPath(env, channelId);
// XXX use readFileBin
readMessages(metadataPath, function (line) {
if (!line) { return; }
try {
@ -266,75 +284,6 @@ var writeMetadata = function (env, channelId, data, cb) {
};
// transform a stream of arbitrarily divided data
// into a stream of buffers divided by newlines in the source stream
// TODO see if we could improve performance by using libnewline
const NEWLINE_CHR = ('\n').charCodeAt(0);
const mkBufferSplit = () => {
let remainder = null;
return Pull((read) => {
return (abort, cb) => {
read(abort, function (end, data) {
if (end) {
if (data) { console.log("mkBufferSplit() Data at the end"); }
cb(end, remainder ? [remainder, data] : [data]);
remainder = null;
return;
}
const queue = [];
for (;;) {
const offset = data.indexOf(NEWLINE_CHR);
if (offset < 0) {
remainder = remainder ? Buffer.concat([remainder, data]) : data;
break;
}
let subArray = data.slice(0, offset);
if (remainder) {
subArray = Buffer.concat([remainder, subArray]);
remainder = null;
}
queue.push(subArray);
data = data.slice(offset + 1);
}
cb(end, queue);
});
};
}, Pull.flatten());
};
// return a streaming function which transforms buffers into objects
// containing the buffer and the offset from the start of the stream
const mkOffsetCounter = () => {
let offset = 0;
return Pull.map((buff) => {
const out = { offset: offset, buff: buff };
// +1 for the eaten newline
offset += buff.length + 1;
return out;
});
};
// readMessagesBin asynchronously iterates over the messages in a channel log
// the handler for each message must call back to read more, which should mean
// that this function has a lower memory profile than our classic method
// of reading logs line by line.
// it also allows the handler to abort reading at any time
const readMessagesBin = (env, id, start, msgHandler, cb) => {
const stream = Fs.createReadStream(mkPath(env, id), { start: start });
let keepReading = true;
Pull(
ToPull.read(stream),
mkBufferSplit(),
mkOffsetCounter(),
Pull.asyncMap((data, moreCb) => {
msgHandler(data, moreCb, () => { keepReading = false; moreCb(); });
}),
Pull.drain(() => (keepReading), (err) => {
cb((keepReading) ? err : undefined);
})
);
};
// check if a file exists at $path
var checkPath = function (path, callback) {
Fs.stat(path, function (err) {
@ -428,6 +377,7 @@ var removeArchivedChannel = function (env, channelName, cb) {
});
};
// XXX use ../plan.js
var listChannels = function (root, handler, cb) {
// do twenty things at a time
var sema = Semaphore.create(20);

@ -0,0 +1,76 @@
/* jshint esversion: 6 */
/* global Buffer */
const ToPull = require('stream-to-pull-stream');
const Pull = require('pull-stream');
const Stream = module.exports;
// transform a stream of arbitrarily divided data
// into a stream of buffers divided by newlines in the source stream
// TODO see if we could improve performance by using libnewline
const NEWLINE_CHR = ('\n').charCodeAt(0);
const mkBufferSplit = () => {
let remainder = null;
return Pull((read) => {
return (abort, cb) => {
read(abort, function (end, data) {
if (end) {
if (data) { console.log("mkBufferSplit() Data at the end"); }
cb(end, remainder ? [remainder, data] : [data]);
remainder = null;
return;
}
const queue = [];
for (;;) {
const offset = data.indexOf(NEWLINE_CHR);
if (offset < 0) {
remainder = remainder ? Buffer.concat([remainder, data]) : data;
break;
}
let subArray = data.slice(0, offset);
if (remainder) {
subArray = Buffer.concat([remainder, subArray]);
remainder = null;
}
queue.push(subArray);
data = data.slice(offset + 1);
}
cb(end, queue);
});
};
}, Pull.flatten());
};
// return a streaming function which transforms buffers into objects
// containing the buffer and the offset from the start of the stream
const mkOffsetCounter = () => {
let offset = 0;
return Pull.map((buff) => {
const out = { offset: offset, buff: buff };
// +1 for the eaten newline
offset += buff.length + 1;
return out;
});
};
// readMessagesBin asynchronously iterates over the messages in a channel log
// the handler for each message must call back to read more, which should mean
// that this function has a lower memory profile than our classic method
// of reading logs line by line.
// it also allows the handler to abort reading at any time
Stream.readFileBin = (env, stream, msgHandler, cb) => {
//const stream = Fs.createReadStream(path, { start: start });
let keepReading = true;
Pull(
ToPull.read(stream),
mkBufferSplit(),
mkOffsetCounter(),
Pull.asyncMap((data, moreCb) => {
msgHandler(data, moreCb, () => { keepReading = false; moreCb(); });
}),
Pull.drain(() => (keepReading), (err) => {
cb((keepReading) ? err : undefined);
})
);
};
Loading…
Cancel
Save