diff --git a/lib/storage/file.js b/lib/storage/file.js index 00222c3f6..db7871ef0 100644 --- a/lib/storage/file.js +++ b/lib/storage/file.js @@ -27,7 +27,7 @@ const CHANNEL_WRITE_WINDOW = 300000; them. The tradeoff with this timeout is that some functions, the stream, and and the timeout itself are stored in memory. A longer timeout uses more memory and running out of memory will also kill the server. */ -const STREAM_CLOSE_TIMEOUT = 300000; +const STREAM_CLOSE_TIMEOUT = 120000; /* The above timeout closes the stream, but apparently that doesn't always work. We set yet another timeout to allow the runtime to gracefully close the stream @@ -83,15 +83,31 @@ var channelExists = function (filepath, cb) { const destroyStream = function (stream) { if (!stream) { return; } - try { stream.close(); } catch (err) { console.error(err); } + try { + stream.close(); + if (stream.closed && stream.fd === null) { return; } + } catch (err) { + console.error(err); + } setTimeout(function () { try { stream.destroy(); } catch (err) { console.error(err); } }, STREAM_DESTROY_TIMEOUT); }; +/* + accept a stream, an id (used as a label) and an optional number of milliseconds + + return a function which ignores all arguments + and first tries to gracefully close a stream + then destroys it after a period if the close was not successful + if the function is not called within the specified number of milliseconds + then it will be called implicitly with an error to indicate + that it was run because it timed out + +*/ const ensureStreamCloses = function (stream, id, ms) { return Util.bake(Util.mkTimeout(Util.once(function (err) { - destroyStream(stream, id); + destroyStream(stream); if (err) { // this can only be a timeout error... console.log("stream close error:", err, id); @@ -106,7 +122,7 @@ const ensureStreamCloses = function (stream, id, ms) { // it also allows the handler to abort reading at any time const readMessagesBin = (env, id, start, msgHandler, cb) => { const stream = Fs.createReadStream(mkPath(env, id), { start: start }); - const finish = ensureStreamCloses(stream, id); + const finish = ensureStreamCloses(stream, '[readMessagesBin:' + id + ']'); return void readFileBin(stream, msgHandler, function (err) { cb(err); finish(); @@ -117,7 +133,7 @@ const readMessagesBin = (env, id, start, msgHandler, cb) => { // returns undefined if the first message was not an object (not an array) var getMetadataAtPath = function (Env, path, _cb) { const stream = Fs.createReadStream(path, { start: 0 }); - const finish = ensureStreamCloses(stream, path); + const finish = ensureStreamCloses(stream, '[getMetadataAtPath:' + path + ']'); var cb = Util.once(Util.mkAsync(Util.both(_cb, finish)), function () { throw new Error("Multiple Callbacks"); }); @@ -203,7 +219,7 @@ var clearChannel = function (env, channelId, _cb) { */ var readMessages = function (path, msgHandler, _cb) { var stream = Fs.createReadStream(path, { start: 0}); - const finish = ensureStreamCloses(stream, path); + const finish = ensureStreamCloses(stream, '[readMessages:' + path + ']'); var cb = Util.once(Util.mkAsync(Util.both(finish, _cb))); return readFileBin(stream, function (msgObj, readMore) { @@ -231,7 +247,7 @@ var getDedicatedMetadata = function (env, channelId, handler, _cb) { var metadataPath = mkMetadataPath(env, channelId); var stream = Fs.createReadStream(metadataPath, {start: 0}); - const finish = ensureStreamCloses(stream, metadataPath); + const finish = ensureStreamCloses(stream, '[getDedicatedMetadata:' + metadataPath + ']'); var cb = Util.both(finish, _cb); readFileBin(stream, function (msgObj, readMore) {