Make efforts to avoid closing streams mid-read

1. Close streams when you're done with them
2. Close streams if they seem to have been abandoned
pull/1/head
ansuz 4 years ago
parent fbfb25bf29
commit 67430de7ff

@ -94,25 +94,28 @@ const destroyStream = function (stream) {
}, STREAM_DESTROY_TIMEOUT); }, STREAM_DESTROY_TIMEOUT);
}; };
/* /* createIdleStreamCollector
accept a stream, an id (used as a label) and an optional number of milliseconds
Takes a stream and returns a function to asynchronously close that stream.
Successive calls to the function will be ignored.
return a function which ignores all arguments If the function is not called for a period of STREAM_CLOSE_TIMEOUT it will
and first tries to gracefully close a stream be called automatically unless its `keepAlive` method has been invoked
then destroys it after a period if the close was not successful in the meantime. Used to prevent file descriptor leaks in the case of
if the function is not called within the specified number of milliseconds abandoned streams while closing streams which are being read very very
then it will be called implicitly with an error to indicate slowly.
that it was run because it timed out
*/ */
const ensureStreamCloses = function (stream, id, ms) { const createIdleStreamCollector = function (stream) {
return Util.bake(Util.mkTimeout(Util.once(function (err) { // create a function to close the stream which takes no arguments
destroyStream(stream); // and will do nothing after being called the first time
if (err) { var collector = Util.once(Util.mkAsync(Util.bake(destroyStream, [stream])));
// this can only be a timeout error...
console.log("stream close error:", err, id); // create a second function which will execute the first function after a delay
} // calling this function will reset the delay and thus keep the stream 'alive'
}), ms || STREAM_CLOSE_TIMEOUT), []); collector.keepAlive = Util.throttle(collector, STREAM_CLOSE_TIMEOUT);
collector.keepAlive();
return collector;
}; };
// readMessagesBin asynchronously iterates over the messages in a channel log // readMessagesBin asynchronously iterates over the messages in a channel log
@ -122,25 +125,22 @@ const ensureStreamCloses = function (stream, id, ms) {
// it also allows the handler to abort reading at any time // it also allows the handler to abort reading at any time
const readMessagesBin = (env, id, start, msgHandler, cb) => { const readMessagesBin = (env, id, start, msgHandler, cb) => {
const stream = Fs.createReadStream(mkPath(env, id), { start: start }); const stream = Fs.createReadStream(mkPath(env, id), { start: start });
const finish = ensureStreamCloses(stream, '[readMessagesBin:' + id + ']'); const collector = createIdleStreamCollector(stream);
return void readFileBin(stream, msgHandler, function (err) { const handleMessageAndKeepStreamAlive = Util.both(msgHandler, collector.keepAlive);
cb(err); const done = Util.both(cb, collector);
finish(); return void readFileBin(stream, handleMessageAndKeepStreamAlive, done);
});
}; };
// reads classic metadata from a channel log and aborts // reads classic metadata from a channel log and aborts
// returns undefined if the first message was not an object (not an array) // returns undefined if the first message was not an object (not an array)
var getMetadataAtPath = function (Env, path, _cb) { var getMetadataAtPath = function (Env, path, _cb) {
const stream = Fs.createReadStream(path, { start: 0 }); const stream = Fs.createReadStream(path, { start: 0 });
const finish = ensureStreamCloses(stream, '[getMetadataAtPath:' + path + ']'); const collector = createIdleStreamCollector(stream);
var cb = Util.once(Util.mkAsync(Util.both(_cb, finish)), function () { var cb = Util.once(Util.mkAsync(Util.both(_cb, collector)));
throw new Error("Multiple Callbacks");
});
var i = 0; var i = 0;
return readFileBin(stream, function (msgObj, readMore, abort) { return readFileBin(stream, function (msgObj, readMore, abort) {
collector.keepAlive();
const line = msgObj.buff.toString('utf8'); const line = msgObj.buff.toString('utf8');
if (!line) { if (!line) {
@ -149,7 +149,7 @@ var getMetadataAtPath = function (Env, path, _cb) {
// metadata should always be on the first line or not exist in the channel at all // metadata should always be on the first line or not exist in the channel at all
if (i++ > 0) { if (i++ > 0) {
console.log("aborting"); //console.log("aborting");
abort(); abort();
return void cb(); return void cb();
} }
@ -219,10 +219,11 @@ var clearChannel = function (env, channelId, _cb) {
*/ */
var readMessages = function (path, msgHandler, _cb) { var readMessages = function (path, msgHandler, _cb) {
var stream = Fs.createReadStream(path, { start: 0}); var stream = Fs.createReadStream(path, { start: 0});
const finish = ensureStreamCloses(stream, '[readMessages:' + path + ']'); var collector = createIdleStreamCollector(stream);
var cb = Util.once(Util.mkAsync(Util.both(finish, _cb))); var cb = Util.once(Util.mkAsync(Util.both(_cb, collector)));
return readFileBin(stream, function (msgObj, readMore) { return readFileBin(stream, function (msgObj, readMore) {
collector.keepAlive();
msgHandler(msgObj.buff.toString('utf8')); msgHandler(msgObj.buff.toString('utf8'));
readMore(); readMore();
}, function (err) { }, function (err) {
@ -247,10 +248,11 @@ var getDedicatedMetadata = function (env, channelId, handler, _cb) {
var metadataPath = mkMetadataPath(env, channelId); var metadataPath = mkMetadataPath(env, channelId);
var stream = Fs.createReadStream(metadataPath, {start: 0}); var stream = Fs.createReadStream(metadataPath, {start: 0});
const finish = ensureStreamCloses(stream, '[getDedicatedMetadata:' + metadataPath + ']'); const collector = createIdleStreamCollector(stream);
var cb = Util.both(finish, _cb); var cb = Util.both(_cb, collector);
readFileBin(stream, function (msgObj, readMore) { readFileBin(stream, function (msgObj, readMore) {
collector.keepAlive();
var line = msgObj.buff.toString('utf8'); var line = msgObj.buff.toString('utf8');
try { try {
var parsed = JSON.parse(line); var parsed = JSON.parse(line);
@ -758,11 +760,11 @@ var getChannel = function (env, id, _callback) {
}); });
}); });
}).nThen(function () { }).nThen(function () {
channel.delayClose = Util.throttle(function () { channel.delayClose = Util.throttle(Util.once(function () {
delete env.channels[id]; delete env.channels[id];
destroyStream(channel.writeStream, path); destroyStream(channel.writeStream, path);
//console.log("closing writestream"); //console.log("closing writestream");
}, CHANNEL_WRITE_WINDOW); }), CHANNEL_WRITE_WINDOW);
channel.delayClose(); channel.delayClose();
env.channels[id] = channel; env.channels[id] = channel;
done(void 0, channel); done(void 0, channel);

Loading…
Cancel
Save