|
|
|
@ -94,25 +94,31 @@ const destroyStream = function (stream) {
|
|
|
|
|
}, STREAM_DESTROY_TIMEOUT);
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
accept a stream, an id (used as a label) and an optional number of milliseconds
|
|
|
|
|
/* createIdleStreamCollector
|
|
|
|
|
|
|
|
|
|
Takes a stream and returns a function to asynchronously close that stream.
|
|
|
|
|
Successive calls to the function will be ignored.
|
|
|
|
|
|
|
|
|
|
If the function is not called for a period of STREAM_CLOSE_TIMEOUT it will
|
|
|
|
|
be called automatically unless its `keepAlive` method has been invoked
|
|
|
|
|
in the meantime. Used to prevent file descriptor leaks in the case of
|
|
|
|
|
abandoned streams while closing streams which are being read very very
|
|
|
|
|
slowly.
|
|
|
|
|
|
|
|
|
|
return a function which ignores all arguments
|
|
|
|
|
and first tries to gracefully close a stream
|
|
|
|
|
then destroys it after a period if the close was not successful
|
|
|
|
|
if the function is not called within the specified number of milliseconds
|
|
|
|
|
then it will be called implicitly with an error to indicate
|
|
|
|
|
that it was run because it timed out
|
|
|
|
|
XXX inform the stream consumer when it has been closed prematurely
|
|
|
|
|
by calling back with a TIMEOUT error or something
|
|
|
|
|
|
|
|
|
|
*/
|
|
|
|
|
const ensureStreamCloses = function (stream, id, ms) {
|
|
|
|
|
return Util.bake(Util.mkTimeout(Util.once(function (err) {
|
|
|
|
|
destroyStream(stream);
|
|
|
|
|
if (err) {
|
|
|
|
|
// this can only be a timeout error...
|
|
|
|
|
console.log("stream close error:", err, id);
|
|
|
|
|
}
|
|
|
|
|
}), ms || STREAM_CLOSE_TIMEOUT), []);
|
|
|
|
|
const createIdleStreamCollector = function (stream) {
|
|
|
|
|
// create a function to close the stream which takes no arguments
|
|
|
|
|
// and will do nothing after being called the first time
|
|
|
|
|
var collector = Util.once(Util.mkAsync(Util.bake(destroyStream, [stream])));
|
|
|
|
|
|
|
|
|
|
// create a second function which will execute the first function after a delay
|
|
|
|
|
// calling this function will reset the delay and thus keep the stream 'alive'
|
|
|
|
|
collector.keepAlive = Util.throttle(collector, STREAM_CLOSE_TIMEOUT);
|
|
|
|
|
collector.keepAlive();
|
|
|
|
|
return collector;
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
// readMessagesBin asynchronously iterates over the messages in a channel log
|
|
|
|
@ -122,25 +128,22 @@ const ensureStreamCloses = function (stream, id, ms) {
|
|
|
|
|
// it also allows the handler to abort reading at any time
|
|
|
|
|
const readMessagesBin = (env, id, start, msgHandler, cb) => {
|
|
|
|
|
const stream = Fs.createReadStream(mkPath(env, id), { start: start });
|
|
|
|
|
const finish = ensureStreamCloses(stream, '[readMessagesBin:' + id + ']');
|
|
|
|
|
return void readFileBin(stream, msgHandler, function (err) {
|
|
|
|
|
cb(err);
|
|
|
|
|
finish();
|
|
|
|
|
});
|
|
|
|
|
const collector = createIdleStreamCollector(stream);
|
|
|
|
|
const handleMessageAndKeepStreamAlive = Util.both(msgHandler, collector.keepAlive);
|
|
|
|
|
const done = Util.both(cb, collector);
|
|
|
|
|
return void readFileBin(stream, handleMessageAndKeepStreamAlive, done);
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
// reads classic metadata from a channel log and aborts
|
|
|
|
|
// returns undefined if the first message was not an object (not an array)
|
|
|
|
|
var getMetadataAtPath = function (Env, path, _cb) {
|
|
|
|
|
const stream = Fs.createReadStream(path, { start: 0 });
|
|
|
|
|
const finish = ensureStreamCloses(stream, '[getMetadataAtPath:' + path + ']');
|
|
|
|
|
var cb = Util.once(Util.mkAsync(Util.both(_cb, finish)), function () {
|
|
|
|
|
throw new Error("Multiple Callbacks");
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
const collector = createIdleStreamCollector(stream);
|
|
|
|
|
var cb = Util.once(Util.mkAsync(Util.both(_cb, collector)));
|
|
|
|
|
var i = 0;
|
|
|
|
|
|
|
|
|
|
return readFileBin(stream, function (msgObj, readMore, abort) {
|
|
|
|
|
collector.keepAlive();
|
|
|
|
|
const line = msgObj.buff.toString('utf8');
|
|
|
|
|
|
|
|
|
|
if (!line) {
|
|
|
|
@ -149,7 +152,7 @@ var getMetadataAtPath = function (Env, path, _cb) {
|
|
|
|
|
|
|
|
|
|
// metadata should always be on the first line or not exist in the channel at all
|
|
|
|
|
if (i++ > 0) {
|
|
|
|
|
console.log("aborting");
|
|
|
|
|
//console.log("aborting");
|
|
|
|
|
abort();
|
|
|
|
|
return void cb();
|
|
|
|
|
}
|
|
|
|
@ -219,10 +222,11 @@ var clearChannel = function (env, channelId, _cb) {
|
|
|
|
|
*/
|
|
|
|
|
var readMessages = function (path, msgHandler, _cb) {
|
|
|
|
|
var stream = Fs.createReadStream(path, { start: 0});
|
|
|
|
|
const finish = ensureStreamCloses(stream, '[readMessages:' + path + ']');
|
|
|
|
|
var cb = Util.once(Util.mkAsync(Util.both(finish, _cb)));
|
|
|
|
|
var collector = createIdleStreamCollector(stream);
|
|
|
|
|
var cb = Util.once(Util.mkAsync(Util.both(_cb, collector)));
|
|
|
|
|
|
|
|
|
|
return readFileBin(stream, function (msgObj, readMore) {
|
|
|
|
|
collector.keepAlive();
|
|
|
|
|
msgHandler(msgObj.buff.toString('utf8'));
|
|
|
|
|
readMore();
|
|
|
|
|
}, function (err) {
|
|
|
|
@ -247,10 +251,11 @@ var getDedicatedMetadata = function (env, channelId, handler, _cb) {
|
|
|
|
|
var metadataPath = mkMetadataPath(env, channelId);
|
|
|
|
|
var stream = Fs.createReadStream(metadataPath, {start: 0});
|
|
|
|
|
|
|
|
|
|
const finish = ensureStreamCloses(stream, '[getDedicatedMetadata:' + metadataPath + ']');
|
|
|
|
|
var cb = Util.both(finish, _cb);
|
|
|
|
|
const collector = createIdleStreamCollector(stream);
|
|
|
|
|
var cb = Util.both(_cb, collector);
|
|
|
|
|
|
|
|
|
|
readFileBin(stream, function (msgObj, readMore) {
|
|
|
|
|
collector.keepAlive();
|
|
|
|
|
var line = msgObj.buff.toString('utf8');
|
|
|
|
|
try {
|
|
|
|
|
var parsed = JSON.parse(line);
|
|
|
|
@ -758,11 +763,11 @@ var getChannel = function (env, id, _callback) {
|
|
|
|
|
});
|
|
|
|
|
});
|
|
|
|
|
}).nThen(function () {
|
|
|
|
|
channel.delayClose = Util.throttle(function () {
|
|
|
|
|
channel.delayClose = Util.throttle(Util.once(function () {
|
|
|
|
|
delete env.channels[id];
|
|
|
|
|
destroyStream(channel.writeStream, path);
|
|
|
|
|
//console.log("closing writestream");
|
|
|
|
|
}, CHANNEL_WRITE_WINDOW);
|
|
|
|
|
}), CHANNEL_WRITE_WINDOW);
|
|
|
|
|
channel.delayClose();
|
|
|
|
|
env.channels[id] = channel;
|
|
|
|
|
done(void 0, channel);
|
|
|
|
|