Merge branch 'aggressive-stream-closing' into staging

pull/1/head
ansuz 5 years ago
commit cea9705bbe

@ -11,6 +11,7 @@ var Meta = require("../metadata");
var Extras = require("../hk-util"); var Extras = require("../hk-util");
const readFileBin = require("../stream-file").readFileBin; const readFileBin = require("../stream-file").readFileBin;
const BatchRead = require("../batch-read");
const Schedule = require("../schedule"); const Schedule = require("../schedule");
const isValidChannelId = function (id) { const isValidChannelId = function (id) {
@ -59,9 +60,9 @@ var channelExists = function (filepath, cb) {
}; };
const destroyStream = function (stream) { const destroyStream = function (stream) {
stream.close(); try { stream.close(); } catch (err) { console.error(err); }
setTimeout(function () { setTimeout(function () {
try { stream.destroy(); } catch (err) { console.log(err); } try { stream.destroy(); } catch (err) { console.error(err); }
}, 5000); }, 5000);
}; };
@ -141,7 +142,6 @@ var closeChannel = function (env, channelName, cb) {
destroyStream(stream, channelName); destroyStream(stream, channelName);
} }
delete env.channels[channelName]; delete env.channels[channelName];
env.openFiles--;
cb(); cb();
} catch (err) { } catch (err) {
cb(err); cb(err);
@ -290,7 +290,7 @@ var writeMetadata = function (env, channelId, data, cb) {
// check if a file exists at $path // check if a file exists at $path
var checkPath = function (path, callback) { var checkPath = function (path, callback) { // callback's second arg is never used...
Fs.stat(path, function (err) { Fs.stat(path, function (err) {
if (!err) { if (!err) {
callback(undefined, true); callback(undefined, true);
@ -632,7 +632,7 @@ var unarchiveChannel = function (env, channelName, cb) {
})); }));
}); });
}; };
/*
var flushUnusedChannels = function (env, cb, frame) { var flushUnusedChannels = function (env, cb, frame) {
var currentTime = +new Date(); var currentTime = +new Date();
@ -654,6 +654,7 @@ var flushUnusedChannels = function (env, cb, frame) {
}); });
cb(); cb();
}; };
*/
/* channelBytes /* channelBytes
calls back with an error or the size (in bytes) of a channel and its metadata calls back with an error or the size (in bytes) of a channel and its metadata
@ -686,106 +687,63 @@ var channelBytes = function (env, chanName, cb) {
}); });
}; };
/*:: var getChannel = function (env, id, _callback) {
export type ChainPadServer_ChannelInternal_t = { var cb = Util.once(Util.mkAsync(_callback));
atime: number,
writeStream: typeof(process.stdout), // if the channel is in memory
whenLoaded: ?Array<(err:?Error, chan:?ChainPadServer_ChannelInternal_t)=>void>,
onError: Array<(?Error)=>void>,
path: string
};
*/
var getChannel = function ( // XXX BatchRead
env,
id,
_callback /*:(err:?Error, chan:?ChainPadServer_ChannelInternal_t)=>void*/
) {
//console.log("getting channel [%s]", id);
var callback = Util.once(Util.mkAsync(_callback));
if (env.channels[id]) { if (env.channels[id]) {
var chan = env.channels[id]; var chan = env.channels[id];
chan.atime = +new Date(); // delay its pending close a little longer
if (chan.whenLoaded) { chan.delayClose();
chan.whenLoaded.push(callback); // and return its writeStream
} else { return void cb(void 0, chan);
callback(undefined, chan);
}
return;
} }
if (env.openFiles >= env.openFileLimit) { // otherwise you need to open it or wait until its pending open completes
// FIXME warn if this is the case? return void env.batchGetChannel(id, cb, function (done) {
// alternatively use graceful-fs to handle lots of concurrent reads var path = mkPath(env, id);
// if you're running out of open files, asynchronously clean up expired files var channel = {
// do it on a shorter timeframe, though (half of normal) onError: [],
setTimeout(function () { };
//console.log("FLUSHING UNUSED CHANNELS"); nThen(function (w) {
flushUnusedChannels(env, function () { // create the path to the file if it doesn't exist
if (env.verbose) { checkPath(path, w(function (err) {
console.log("Approaching open file descriptor limit. Cleaning up"); if (err) {
w.abort();
return void done(err);
} }
}, env.channelExpirationMs / 2); }));
}); }).nThen(function (w) {
} var stream = channel.writeStream = Fs.createWriteStream(path, { flags: 'a' });
var path = mkPath(env, id); stream.on('open', w());
var channel /*:ChainPadServer_ChannelInternal_t*/ = env.channels[id] = { stream.on('error', function (err) {
atime: +new Date(), w.abort();
writeStream: (undefined /*:any*/), // this might be called after this nThen block closes.
whenLoaded: [ callback ],
onError: [ ],
path: path
};
var complete = function (err) {
var whenLoaded = channel.whenLoaded;
// no guarantee stream.on('error') will not cause this to be called multiple times
if (!whenLoaded) { return; }
channel.whenLoaded = undefined;
if (err) {
delete env.channels[id];
}
if (!channel.writeStream) {
throw new Error("getChannel() complete called without channel writeStream"); // XXX
}
whenLoaded.forEach(function (wl) { wl(err, (err) ? undefined : channel); });
};
var fileExists;
nThen(function (waitFor) {
checkPath(path, waitFor(function (err, exists) {
if (err) {
waitFor.abort();
return void complete(err);
}
fileExists = exists;
}));
}).nThen(function (waitFor) {
var stream = channel.writeStream = Fs.createWriteStream(path, { flags: 'a' }); // XXX
env.openFiles++;
stream.on('open', waitFor());
stream.on('error', function (err /*:?Error*/) {
env.openFiles--;
// this might be called after this nThen block closes.
if (channel.whenLoaded) {
complete(err);
} else {
channel.onError.forEach(function (handler) { channel.onError.forEach(function (handler) {
handler(err); handler(err);
}); });
} });
}).nThen(function () {
channel.delayClose = Util.throttle(function () {
delete env.channels[id];
destroyStream(channel.writeStream, path);
//console.log("closing writestream");
}, 30000);
channel.delayClose();
env.channels[id] = channel;
done(void 0, channel);
}); });
}).nThen(function () {
complete();
}); });
}; };
// write a message to the disk as raw bytes // write a message to the disk as raw bytes
const messageBin = (env, chanName, msgBin, cb) => { const messageBin = (env, chanName, msgBin, cb) => {
var complete = Util.once(cb); var complete = Util.once(cb);
getChannel(env, chanName, function (err, chan) { // XXX getChannel(env, chanName, function (err, chan) {
if (!chan) { return void complete(err); } if (!chan) { return void complete(err); }
chan.onError.push(complete); chan.onError.push(complete);
chan.writeStream.write(msgBin, function () { chan.writeStream.write(msgBin, function () {
chan.onError.splice(chan.onError.indexOf(complete), 1); chan.onError.splice(chan.onError.indexOf(complete), 1);
chan.atime = +new Date(); // XXX we should just throttle closing, much simpler and less error prone
complete(); complete();
}); });
}); });
@ -999,8 +957,7 @@ module.exports.create = function (conf, cb) {
channels: { }, channels: { },
channelExpirationMs: conf.channelExpirationMs || 30000, channelExpirationMs: conf.channelExpirationMs || 30000,
verbose: conf.verbose, verbose: conf.verbose,
openFiles: 0, batchGetChannel: BatchRead('store_batch_channel'),
openFileLimit: conf.openFileLimit || 2048,
}; };
var it; var it;
@ -1232,7 +1189,8 @@ module.exports.create = function (conf, cb) {
}, },
// iterate over open channels and close any that are not active // iterate over open channels and close any that are not active
flushUnusedChannels: function (cb) { flushUnusedChannels: function (cb) {
flushUnusedChannels(env, cb); cb("DEPRECATED");
//flushUnusedChannels(env, cb);
}, },
// write to a log file // write to a log file
log: function (channelName, content, cb) { log: function (channelName, content, cb) {
@ -1247,7 +1205,8 @@ module.exports.create = function (conf, cb) {
} }
}); });
}); });
/*
it = setInterval(function () { it = setInterval(function () {
flushUnusedChannels(env, function () { }); flushUnusedChannels(env, function () { });
}, 5000); }, 5000);*/
}; };

Loading…
Cancel
Save