close streams when possible, simplify some code, enforce asynchrony

pull/1/head
ansuz 5 years ago
parent e2c748b6c7
commit 30f17040ac

@ -65,7 +65,10 @@ var channelExists = function (filepath, cb) {
// it also allows the handler to abort reading at any time // it also allows the handler to abort reading at any time
const readMessagesBin = (env, id, start, msgHandler, cb) => { const readMessagesBin = (env, id, start, msgHandler, cb) => {
const stream = Fs.createReadStream(mkPath(env, id), { start: start }); const stream = Fs.createReadStream(mkPath(env, id), { start: start });
return void readFileBin(stream, msgHandler, cb); return void readFileBin(stream, msgHandler, function (err) {
try { stream.close(); } catch (err2) { }
cb(err);
});
}; };
// reads classic metadata from a channel log and aborts // reads classic metadata from a channel log and aborts
@ -101,9 +104,6 @@ var getMetadataAtPath = function (Env, path, _cb) {
if (i++ > 0) { if (i++ > 0) {
console.log("aborting"); console.log("aborting");
abort(); abort();
try { stream.close(); } catch (err) {
console.log("could not close stream");
}
return void cb(); return void cb();
} }
var metadata; var metadata;
@ -684,6 +684,7 @@ var getChannel = function (
id, id,
callback /*:(err:?Error, chan:?ChainPadServer_ChannelInternal_t)=>void*/ callback /*:(err:?Error, chan:?ChainPadServer_ChannelInternal_t)=>void*/
) { ) {
var callback = Util.once(Util.mkAsync(_callback));
if (env.channels[id]) { if (env.channels[id]) {
var chan = env.channels[id]; var chan = env.channels[id];
chan.atime = +new Date(); chan.atime = +new Date();
@ -725,23 +726,20 @@ var getChannel = function (
delete env.channels[id]; delete env.channels[id];
} }
if (!channel.writeStream) { if (!channel.writeStream) {
throw new Error("getChannel() complete called without channel writeStream"); throw new Error("getChannel() complete called without channel writeStream"); // XXX
} }
whenLoaded.forEach(function (wl) { wl(err, (err) ? undefined : channel); }); whenLoaded.forEach(function (wl) { wl(err, (err) ? undefined : channel); });
}; };
var fileExists; var fileExists;
var errorState;
nThen(function (waitFor) { nThen(function (waitFor) {
checkPath(path, waitFor(function (err, exists) { checkPath(path, waitFor(function (err, exists) {
if (err) { if (err) {
errorState = true; waitFor.abort();
complete(err); return void complete(err);
return;
} }
fileExists = exists; fileExists = exists;
})); }));
}).nThen(function (waitFor) { }).nThen(function (waitFor) {
if (errorState) { return; }
var stream = channel.writeStream = Fs.createWriteStream(path, { flags: 'a' }); var stream = channel.writeStream = Fs.createWriteStream(path, { flags: 'a' });
env.openFiles++; env.openFiles++;
stream.on('open', waitFor()); stream.on('open', waitFor());
@ -757,7 +755,6 @@ var getChannel = function (
} }
}); });
}).nThen(function () { }).nThen(function () {
if (errorState) { return; }
complete(); complete();
}); });
}; };

Loading…
Cancel
Save