From d386e223e4babf1909c728944b94ca05af117949 Mon Sep 17 00:00:00 2001 From: ansuz Date: Thu, 19 Mar 2020 17:33:22 -0400 Subject: [PATCH] simplify open/close of writeStreams --- lib/storage/file.js | 139 ++++++++++++++++---------------------------- 1 file changed, 49 insertions(+), 90 deletions(-) diff --git a/lib/storage/file.js b/lib/storage/file.js index 4f7cf54a6..c868d46a6 100644 --- a/lib/storage/file.js +++ b/lib/storage/file.js @@ -11,6 +11,7 @@ var Meta = require("../metadata"); var Extras = require("../hk-util"); const readFileBin = require("../stream-file").readFileBin; +const BatchRead = require("../batch-read"); const Schedule = require("../schedule"); const isValidChannelId = function (id) { @@ -59,9 +60,9 @@ var channelExists = function (filepath, cb) { }; const destroyStream = function (stream) { - stream.close(); + try { stream.close(); } catch (err) { console.error(err); } setTimeout(function () { - try { stream.destroy(); } catch (err) { console.log(err); } + try { stream.destroy(); } catch (err) { console.error(err); } }, 5000); }; @@ -141,7 +142,6 @@ var closeChannel = function (env, channelName, cb) { destroyStream(stream, channelName); } delete env.channels[channelName]; - env.openFiles--; cb(); } catch (err) { cb(err); @@ -290,7 +290,7 @@ var writeMetadata = function (env, channelId, data, cb) { // check if a file exists at $path -var checkPath = function (path, callback) { +var checkPath = function (path, callback) { // callback's second arg is never used... Fs.stat(path, function (err) { if (!err) { callback(undefined, true); @@ -632,7 +632,7 @@ var unarchiveChannel = function (env, channelName, cb) { })); }); }; - +/* var flushUnusedChannels = function (env, cb, frame) { var currentTime = +new Date(); @@ -654,6 +654,7 @@ var flushUnusedChannels = function (env, cb, frame) { }); cb(); }; +*/ /* channelBytes calls back with an error or the size (in bytes) of a channel and its metadata @@ -686,106 +687,63 @@ var channelBytes = function (env, chanName, cb) { }); }; -/*:: -export type ChainPadServer_ChannelInternal_t = { - atime: number, - writeStream: typeof(process.stdout), - whenLoaded: ?Array<(err:?Error, chan:?ChainPadServer_ChannelInternal_t)=>void>, - onError: Array<(?Error)=>void>, - path: string -}; -*/ -var getChannel = function ( // XXX BatchRead - env, - id, - _callback /*:(err:?Error, chan:?ChainPadServer_ChannelInternal_t)=>void*/ -) { - //console.log("getting channel [%s]", id); - var callback = Util.once(Util.mkAsync(_callback)); +var getChannel = function (env, id, _callback) { + var cb = Util.once(Util.mkAsync(_callback)); + + // if the channel is in memory if (env.channels[id]) { var chan = env.channels[id]; - chan.atime = +new Date(); - if (chan.whenLoaded) { - chan.whenLoaded.push(callback); - } else { - callback(undefined, chan); - } - return; + // delay its pending close a little longer + chan.delayClose(); + // and return its writeStream + return void cb(void 0, chan); } - if (env.openFiles >= env.openFileLimit) { - // FIXME warn if this is the case? - // alternatively use graceful-fs to handle lots of concurrent reads - // if you're running out of open files, asynchronously clean up expired files - // do it on a shorter timeframe, though (half of normal) - setTimeout(function () { - //console.log("FLUSHING UNUSED CHANNELS"); - flushUnusedChannels(env, function () { - if (env.verbose) { - console.log("Approaching open file descriptor limit. Cleaning up"); + // otherwise you need to open it or wait until its pending open completes + return void env.batchGetChannel(id, cb, function (done) { + var path = mkPath(env, id); + var channel = { + onError: [], + }; + nThen(function (w) { + // create the path to the file if it doesn't exist + checkPath(path, w(function (err) { + if (err) { + w.abort(); + return void done(err); } - }, env.channelExpirationMs / 2); - }); - } - var path = mkPath(env, id); - var channel /*:ChainPadServer_ChannelInternal_t*/ = env.channels[id] = { - atime: +new Date(), - writeStream: (undefined /*:any*/), - whenLoaded: [ callback ], - onError: [ ], - path: path - }; - var complete = function (err) { - var whenLoaded = channel.whenLoaded; - // no guarantee stream.on('error') will not cause this to be called multiple times - if (!whenLoaded) { return; } - channel.whenLoaded = undefined; - if (err) { - delete env.channels[id]; - } - if (!channel.writeStream) { - throw new Error("getChannel() complete called without channel writeStream"); // XXX - } - whenLoaded.forEach(function (wl) { wl(err, (err) ? undefined : channel); }); - }; - var fileExists; - nThen(function (waitFor) { - checkPath(path, waitFor(function (err, exists) { - if (err) { - waitFor.abort(); - return void complete(err); - } - fileExists = exists; - })); - }).nThen(function (waitFor) { - var stream = channel.writeStream = Fs.createWriteStream(path, { flags: 'a' }); // XXX - env.openFiles++; - stream.on('open', waitFor()); - stream.on('error', function (err /*:?Error*/) { - env.openFiles--; - // this might be called after this nThen block closes. - if (channel.whenLoaded) { - complete(err); - } else { + })); + }).nThen(function (w) { + var stream = channel.writeStream = Fs.createWriteStream(path, { flags: 'a' }); + stream.on('open', w()); + stream.on('error', function (err) { + w.abort(); + // this might be called after this nThen block closes. channel.onError.forEach(function (handler) { handler(err); }); - } + }); + }).nThen(function () { + channel.delayClose = Util.throttle(function () { + delete env.channels[id]; + destroyStream(channel.writeStream, path); + //console.log("closing writestream"); + }, 30000); + channel.delayClose(); + env.channels[id] = channel; + done(void 0, channel); }); - }).nThen(function () { - complete(); }); }; // write a message to the disk as raw bytes const messageBin = (env, chanName, msgBin, cb) => { var complete = Util.once(cb); - getChannel(env, chanName, function (err, chan) { // XXX + getChannel(env, chanName, function (err, chan) { if (!chan) { return void complete(err); } chan.onError.push(complete); chan.writeStream.write(msgBin, function () { chan.onError.splice(chan.onError.indexOf(complete), 1); - chan.atime = +new Date(); // XXX we should just throttle closing, much simpler and less error prone complete(); }); }); @@ -999,8 +957,7 @@ module.exports.create = function (conf, cb) { channels: { }, channelExpirationMs: conf.channelExpirationMs || 30000, verbose: conf.verbose, - openFiles: 0, - openFileLimit: conf.openFileLimit || 2048, + batchGetChannel: BatchRead('store_batch_channel'), }; var it; @@ -1232,7 +1189,8 @@ module.exports.create = function (conf, cb) { }, // iterate over open channels and close any that are not active flushUnusedChannels: function (cb) { - flushUnusedChannels(env, cb); + cb("DEPRECATED"); + //flushUnusedChannels(env, cb); }, // write to a log file log: function (channelName, content, cb) { @@ -1247,7 +1205,8 @@ module.exports.create = function (conf, cb) { } }); }); + /* it = setInterval(function () { flushUnusedChannels(env, function () { }); - }, 5000); + }, 5000);*/ };