From 67430de7ffb1749cc3f5f9561c892fbe3e59fab3 Mon Sep 17 00:00:00 2001 From: ansuz Date: Thu, 22 Oct 2020 11:17:03 +0530 Subject: [PATCH] Make efforts to avoid closing streams mid-read 1. Close streams when you're done with them 2. Close streams if they seem to have been abandoned --- lib/storage/file.js | 68 +++++++++++++++++++++++---------------------- 1 file changed, 35 insertions(+), 33 deletions(-) diff --git a/lib/storage/file.js b/lib/storage/file.js index d9d3f7c89..5f3d4ae46 100644 --- a/lib/storage/file.js +++ b/lib/storage/file.js @@ -94,25 +94,28 @@ const destroyStream = function (stream) { }, STREAM_DESTROY_TIMEOUT); }; -/* - accept a stream, an id (used as a label) and an optional number of milliseconds +/* createIdleStreamCollector + +Takes a stream and returns a function to asynchronously close that stream. +Successive calls to the function will be ignored. - return a function which ignores all arguments - and first tries to gracefully close a stream - then destroys it after a period if the close was not successful - if the function is not called within the specified number of milliseconds - then it will be called implicitly with an error to indicate - that it was run because it timed out +If the function is not called for a period of STREAM_CLOSE_TIMEOUT it will +be called automatically unless its `keepAlive` method has been invoked +in the meantime. Used to prevent file descriptor leaks in the case of +abandoned streams while closing streams which are being read very very +slowly. */ -const ensureStreamCloses = function (stream, id, ms) { - return Util.bake(Util.mkTimeout(Util.once(function (err) { - destroyStream(stream); - if (err) { - // this can only be a timeout error... - console.log("stream close error:", err, id); - } - }), ms || STREAM_CLOSE_TIMEOUT), []); +const createIdleStreamCollector = function (stream) { + // create a function to close the stream which takes no arguments + // and will do nothing after being called the first time + var collector = Util.once(Util.mkAsync(Util.bake(destroyStream, [stream]))); + + // create a second function which will execute the first function after a delay + // calling this function will reset the delay and thus keep the stream 'alive' + collector.keepAlive = Util.throttle(collector, STREAM_CLOSE_TIMEOUT); + collector.keepAlive(); + return collector; }; // readMessagesBin asynchronously iterates over the messages in a channel log @@ -122,25 +125,22 @@ const ensureStreamCloses = function (stream, id, ms) { // it also allows the handler to abort reading at any time const readMessagesBin = (env, id, start, msgHandler, cb) => { const stream = Fs.createReadStream(mkPath(env, id), { start: start }); - const finish = ensureStreamCloses(stream, '[readMessagesBin:' + id + ']'); - return void readFileBin(stream, msgHandler, function (err) { - cb(err); - finish(); - }); + const collector = createIdleStreamCollector(stream); + const handleMessageAndKeepStreamAlive = Util.both(msgHandler, collector.keepAlive); + const done = Util.both(cb, collector); + return void readFileBin(stream, handleMessageAndKeepStreamAlive, done); }; // reads classic metadata from a channel log and aborts // returns undefined if the first message was not an object (not an array) var getMetadataAtPath = function (Env, path, _cb) { const stream = Fs.createReadStream(path, { start: 0 }); - const finish = ensureStreamCloses(stream, '[getMetadataAtPath:' + path + ']'); - var cb = Util.once(Util.mkAsync(Util.both(_cb, finish)), function () { - throw new Error("Multiple Callbacks"); - }); - + const collector = createIdleStreamCollector(stream); + var cb = Util.once(Util.mkAsync(Util.both(_cb, collector))); var i = 0; return readFileBin(stream, function (msgObj, readMore, abort) { + collector.keepAlive(); const line = msgObj.buff.toString('utf8'); if (!line) { @@ -149,7 +149,7 @@ var getMetadataAtPath = function (Env, path, _cb) { // metadata should always be on the first line or not exist in the channel at all if (i++ > 0) { - console.log("aborting"); + //console.log("aborting"); abort(); return void cb(); } @@ -219,10 +219,11 @@ var clearChannel = function (env, channelId, _cb) { */ var readMessages = function (path, msgHandler, _cb) { var stream = Fs.createReadStream(path, { start: 0}); - const finish = ensureStreamCloses(stream, '[readMessages:' + path + ']'); - var cb = Util.once(Util.mkAsync(Util.both(finish, _cb))); + var collector = createIdleStreamCollector(stream); + var cb = Util.once(Util.mkAsync(Util.both(_cb, collector))); return readFileBin(stream, function (msgObj, readMore) { + collector.keepAlive(); msgHandler(msgObj.buff.toString('utf8')); readMore(); }, function (err) { @@ -247,10 +248,11 @@ var getDedicatedMetadata = function (env, channelId, handler, _cb) { var metadataPath = mkMetadataPath(env, channelId); var stream = Fs.createReadStream(metadataPath, {start: 0}); - const finish = ensureStreamCloses(stream, '[getDedicatedMetadata:' + metadataPath + ']'); - var cb = Util.both(finish, _cb); + const collector = createIdleStreamCollector(stream); + var cb = Util.both(_cb, collector); readFileBin(stream, function (msgObj, readMore) { + collector.keepAlive(); var line = msgObj.buff.toString('utf8'); try { var parsed = JSON.parse(line); @@ -758,11 +760,11 @@ var getChannel = function (env, id, _callback) { }); }); }).nThen(function () { - channel.delayClose = Util.throttle(function () { + channel.delayClose = Util.throttle(Util.once(function () { delete env.channels[id]; destroyStream(channel.writeStream, path); //console.log("closing writestream"); - }, CHANNEL_WRITE_WINDOW); + }), CHANNEL_WRITE_WINDOW); channel.delayClose(); env.channels[id] = channel; done(void 0, channel);