simplify open/close of writeStreams

pull/1/head
ansuz 5 years ago
parent 32cd0f3c4d
commit d386e223e4

@ -11,6 +11,7 @@ var Meta = require("../metadata");
var Extras = require("../hk-util");
const readFileBin = require("../stream-file").readFileBin;
const BatchRead = require("../batch-read");
const Schedule = require("../schedule");
const isValidChannelId = function (id) {
@ -59,9 +60,9 @@ var channelExists = function (filepath, cb) {
};
const destroyStream = function (stream) {
stream.close();
try { stream.close(); } catch (err) { console.error(err); }
setTimeout(function () {
try { stream.destroy(); } catch (err) { console.log(err); }
try { stream.destroy(); } catch (err) { console.error(err); }
}, 5000);
};
@ -141,7 +142,6 @@ var closeChannel = function (env, channelName, cb) {
destroyStream(stream, channelName);
}
delete env.channels[channelName];
env.openFiles--;
cb();
} catch (err) {
cb(err);
@ -290,7 +290,7 @@ var writeMetadata = function (env, channelId, data, cb) {
// check if a file exists at $path
var checkPath = function (path, callback) {
var checkPath = function (path, callback) { // callback's second arg is never used...
Fs.stat(path, function (err) {
if (!err) {
callback(undefined, true);
@ -632,7 +632,7 @@ var unarchiveChannel = function (env, channelName, cb) {
}));
});
};
/*
var flushUnusedChannels = function (env, cb, frame) {
var currentTime = +new Date();
@ -654,6 +654,7 @@ var flushUnusedChannels = function (env, cb, frame) {
});
cb();
};
*/
/* channelBytes
calls back with an error or the size (in bytes) of a channel and its metadata
@ -686,106 +687,63 @@ var channelBytes = function (env, chanName, cb) {
});
};
/*::
export type ChainPadServer_ChannelInternal_t = {
atime: number,
writeStream: typeof(process.stdout),
whenLoaded: ?Array<(err:?Error, chan:?ChainPadServer_ChannelInternal_t)=>void>,
onError: Array<(?Error)=>void>,
path: string
};
*/
var getChannel = function ( // XXX BatchRead
env,
id,
_callback /*:(err:?Error, chan:?ChainPadServer_ChannelInternal_t)=>void*/
) {
//console.log("getting channel [%s]", id);
var callback = Util.once(Util.mkAsync(_callback));
var getChannel = function (env, id, _callback) {
var cb = Util.once(Util.mkAsync(_callback));
// if the channel is in memory
if (env.channels[id]) {
var chan = env.channels[id];
chan.atime = +new Date();
if (chan.whenLoaded) {
chan.whenLoaded.push(callback);
} else {
callback(undefined, chan);
}
return;
// delay its pending close a little longer
chan.delayClose();
// and return its writeStream
return void cb(void 0, chan);
}
if (env.openFiles >= env.openFileLimit) {
// FIXME warn if this is the case?
// alternatively use graceful-fs to handle lots of concurrent reads
// if you're running out of open files, asynchronously clean up expired files
// do it on a shorter timeframe, though (half of normal)
setTimeout(function () {
//console.log("FLUSHING UNUSED CHANNELS");
flushUnusedChannels(env, function () {
if (env.verbose) {
console.log("Approaching open file descriptor limit. Cleaning up");
// otherwise you need to open it or wait until its pending open completes
return void env.batchGetChannel(id, cb, function (done) {
var path = mkPath(env, id);
var channel = {
onError: [],
};
nThen(function (w) {
// create the path to the file if it doesn't exist
checkPath(path, w(function (err) {
if (err) {
w.abort();
return void done(err);
}
}, env.channelExpirationMs / 2);
});
}
var path = mkPath(env, id);
var channel /*:ChainPadServer_ChannelInternal_t*/ = env.channels[id] = {
atime: +new Date(),
writeStream: (undefined /*:any*/),
whenLoaded: [ callback ],
onError: [ ],
path: path
};
var complete = function (err) {
var whenLoaded = channel.whenLoaded;
// no guarantee stream.on('error') will not cause this to be called multiple times
if (!whenLoaded) { return; }
channel.whenLoaded = undefined;
if (err) {
delete env.channels[id];
}
if (!channel.writeStream) {
throw new Error("getChannel() complete called without channel writeStream"); // XXX
}
whenLoaded.forEach(function (wl) { wl(err, (err) ? undefined : channel); });
};
var fileExists;
nThen(function (waitFor) {
checkPath(path, waitFor(function (err, exists) {
if (err) {
waitFor.abort();
return void complete(err);
}
fileExists = exists;
}));
}).nThen(function (waitFor) {
var stream = channel.writeStream = Fs.createWriteStream(path, { flags: 'a' }); // XXX
env.openFiles++;
stream.on('open', waitFor());
stream.on('error', function (err /*:?Error*/) {
env.openFiles--;
// this might be called after this nThen block closes.
if (channel.whenLoaded) {
complete(err);
} else {
}));
}).nThen(function (w) {
var stream = channel.writeStream = Fs.createWriteStream(path, { flags: 'a' });
stream.on('open', w());
stream.on('error', function (err) {
w.abort();
// this might be called after this nThen block closes.
channel.onError.forEach(function (handler) {
handler(err);
});
}
});
}).nThen(function () {
channel.delayClose = Util.throttle(function () {
delete env.channels[id];
destroyStream(channel.writeStream, path);
//console.log("closing writestream");
}, 30000);
channel.delayClose();
env.channels[id] = channel;
done(void 0, channel);
});
}).nThen(function () {
complete();
});
};
// write a message to the disk as raw bytes
const messageBin = (env, chanName, msgBin, cb) => {
var complete = Util.once(cb);
getChannel(env, chanName, function (err, chan) { // XXX
getChannel(env, chanName, function (err, chan) {
if (!chan) { return void complete(err); }
chan.onError.push(complete);
chan.writeStream.write(msgBin, function () {
chan.onError.splice(chan.onError.indexOf(complete), 1);
chan.atime = +new Date(); // XXX we should just throttle closing, much simpler and less error prone
complete();
});
});
@ -999,8 +957,7 @@ module.exports.create = function (conf, cb) {
channels: { },
channelExpirationMs: conf.channelExpirationMs || 30000,
verbose: conf.verbose,
openFiles: 0,
openFileLimit: conf.openFileLimit || 2048,
batchGetChannel: BatchRead('store_batch_channel'),
};
var it;
@ -1232,7 +1189,8 @@ module.exports.create = function (conf, cb) {
},
// iterate over open channels and close any that are not active
flushUnusedChannels: function (cb) {
flushUnusedChannels(env, cb);
cb("DEPRECATED");
//flushUnusedChannels(env, cb);
},
// write to a log file
log: function (channelName, content, cb) {
@ -1247,7 +1205,8 @@ module.exports.create = function (conf, cb) {
}
});
});
/*
it = setInterval(function () {
flushUnusedChannels(env, function () { });
}, 5000);
}, 5000);*/
};

Loading…
Cancel
Save