From 30f17040ac8f871ec462e12bfeba0a13d68135b5 Mon Sep 17 00:00:00 2001 From: ansuz Date: Mon, 16 Mar 2020 16:13:38 -0400 Subject: [PATCH] close streams when possible, simplify some code, enforce asynchrony --- lib/storage/file.js | 19 ++++++++----------- 1 file changed, 8 insertions(+), 11 deletions(-) diff --git a/lib/storage/file.js b/lib/storage/file.js index 053ecc9c7..7c0a773cc 100644 --- a/lib/storage/file.js +++ b/lib/storage/file.js @@ -65,7 +65,10 @@ var channelExists = function (filepath, cb) { // it also allows the handler to abort reading at any time const readMessagesBin = (env, id, start, msgHandler, cb) => { const stream = Fs.createReadStream(mkPath(env, id), { start: start }); - return void readFileBin(stream, msgHandler, cb); + return void readFileBin(stream, msgHandler, function (err) { + try { stream.close(); } catch (err2) { } + cb(err); + }); }; // reads classic metadata from a channel log and aborts @@ -101,9 +104,6 @@ var getMetadataAtPath = function (Env, path, _cb) { if (i++ > 0) { console.log("aborting"); abort(); - try { stream.close(); } catch (err) { - console.log("could not close stream"); - } return void cb(); } var metadata; @@ -684,6 +684,7 @@ var getChannel = function ( id, callback /*:(err:?Error, chan:?ChainPadServer_ChannelInternal_t)=>void*/ ) { + var callback = Util.once(Util.mkAsync(_callback)); if (env.channels[id]) { var chan = env.channels[id]; chan.atime = +new Date(); @@ -725,23 +726,20 @@ var getChannel = function ( delete env.channels[id]; } if (!channel.writeStream) { - throw new Error("getChannel() complete called without channel writeStream"); + throw new Error("getChannel() complete called without channel writeStream"); // XXX } whenLoaded.forEach(function (wl) { wl(err, (err) ? undefined : channel); }); }; var fileExists; - var errorState; nThen(function (waitFor) { checkPath(path, waitFor(function (err, exists) { if (err) { - errorState = true; - complete(err); - return; + waitFor.abort(); + return void complete(err); } fileExists = exists; })); }).nThen(function (waitFor) { - if (errorState) { return; } var stream = channel.writeStream = Fs.createWriteStream(path, { flags: 'a' }); env.openFiles++; stream.on('open', waitFor()); @@ -757,7 +755,6 @@ var getChannel = function ( } }); }).nThen(function () { - if (errorState) { return; } complete(); }); };