From 06e9c818790f3a676f7606de95f0645ded3bc305 Mon Sep 17 00:00:00 2001 From: ansuz Date: Wed, 21 Oct 2020 14:48:47 +0530 Subject: [PATCH 1/6] correct a typo --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 746ad365e..30661d523 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -23,7 +23,7 @@ To update from 3.23.0 to 3.23.1: 0. Read the 3.23.0 release notes carefully and apply all configuration changes if you haven't already done so. 1. Stop your server -2. Get the latest code with `git checkout 3.20.1` +2. Get the latest code with `git checkout 3.23.1` 3. Install the latest dependencies with `bower update` and `npm i` 4. Restart your server From e8428a2a732fe9754acd8f23653c7f1971d8ab83 Mon Sep 17 00:00:00 2001 From: ansuz Date: Wed, 21 Oct 2020 18:13:10 +0530 Subject: [PATCH 2/6] prevent a case of multiple callbacks --- lib/storage/blob.js | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/lib/storage/blob.js b/lib/storage/blob.js index 21f38a73a..dfbc802b4 100644 --- a/lib/storage/blob.js +++ b/lib/storage/blob.js @@ -378,7 +378,10 @@ var makeWalker = function (n, handleChild, done) { nThen(function (w) { // check if the path is a directory... Fs.stat(path, w(function (err, stats) { - if (err) { return next(); } + if (err) { + w.abort(); + return next(); + } if (!stats.isDirectory()) { w.abort(); return void handleChild(void 0, path, next); From 100b4176462fe93db8daf109b88fe8195c45bac2 Mon Sep 17 00:00:00 2001 From: ansuz Date: Wed, 21 Oct 2020 18:23:59 +0530 Subject: [PATCH 3/6] guard against several serverside typeErrors and warn in cases where they would have occurred --- lib/workers/index.js | 36 +++++++++++++++++++++++++++--------- 1 file changed, 27 insertions(+), 9 deletions(-) diff --git a/lib/workers/index.js b/lib/workers/index.js index d0a9a66f5..4ca1996d0 100644 --- a/lib/workers/index.js +++ b/lib/workers/index.js @@ -82,6 +82,13 @@ Workers.initialize = function (Env, config, _cb) { var drained = true; var sendCommand = function (msg, _cb, opt) { + if (!_cb) { + return void Log.error('WORKER_COMMAND_MISSING_CB', { + msg: msg, + opt: opt, + }); + } + opt = opt || {}; var index = getAvailableWorkerIndex(); @@ -95,7 +102,7 @@ Workers.initialize = function (Env, config, _cb) { }); if (drained) { drained = false; - Log.debug('WORKER_QUEUE_BACKLOG', { + Log.error('WORKER_QUEUE_BACKLOG', { workers: workers.length, }); } @@ -103,13 +110,6 @@ Workers.initialize = function (Env, config, _cb) { return; } - const txid = guid(); - msg.txid = txid; - msg.pid = PID; - - // track which worker is doing which jobs - state.tasks[txid] = msg; - var cb = Util.once(Util.mkAsync(Util.both(_cb, function (err /*, value */) { if (err !== 'TIMEOUT') { return; } // in the event of a timeout the user will receive an error @@ -120,6 +120,17 @@ Workers.initialize = function (Env, config, _cb) { delete state.tasks[txid]; }))); + if (!msg) { + return void cb('ESERVERERR'); + } + + const txid = guid(); + msg.txid = txid; + msg.pid = PID; + + // track which worker is doing which jobs + state.tasks[txid] = msg; + // default to timing out affter 180s if no explicit timeout is passed var timeout = typeof(opt.timeout) !== 'undefined'? opt.timeout: 180000; response.expect(txid, cb, timeout); @@ -153,6 +164,13 @@ Workers.initialize = function (Env, config, _cb) { } var nextMsg = queue.shift(); + + if (!nextMsg || !nextMsg.msg) { + return void Log.error('WORKER_QUEUE_EMPTY_MESSAGE', { + item: nextMsg, + }); + } + /* `nextMsg` was at the top of the queue. We know that a job just finished and all of this code is synchronous, so calling `sendCommand` should take the worker @@ -200,7 +218,7 @@ Workers.initialize = function (Env, config, _cb) { const cb = response.expectation(txid); if (typeof(cb) !== 'function') { return; } const task = state.tasks[txid]; - if (!task && task.msg) { return; } + if (!(task && task.msg)) { return; } response.clear(txid); Log.info('DB_WORKER_RESEND', task.msg); sendCommand(task.msg, cb); From fbfb25bf290783b074a971711a78390400246780 Mon Sep 17 00:00:00 2001 From: ansuz Date: Wed, 21 Oct 2020 21:29:52 +0530 Subject: [PATCH 4/6] lint compliance --- lib/workers/index.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/workers/index.js b/lib/workers/index.js index 4ca1996d0..6e9f57e88 100644 --- a/lib/workers/index.js +++ b/lib/workers/index.js @@ -110,6 +110,7 @@ Workers.initialize = function (Env, config, _cb) { return; } + const txid = guid(); var cb = Util.once(Util.mkAsync(Util.both(_cb, function (err /*, value */) { if (err !== 'TIMEOUT') { return; } // in the event of a timeout the user will receive an error @@ -124,7 +125,6 @@ Workers.initialize = function (Env, config, _cb) { return void cb('ESERVERERR'); } - const txid = guid(); msg.txid = txid; msg.pid = PID; From 67430de7ffb1749cc3f5f9561c892fbe3e59fab3 Mon Sep 17 00:00:00 2001 From: ansuz Date: Thu, 22 Oct 2020 11:17:03 +0530 Subject: [PATCH 5/6] Make efforts to avoid closing streams mid-read 1. Close streams when you're done with them 2. Close streams if they seem to have been abandoned --- lib/storage/file.js | 68 +++++++++++++++++++++++---------------------- 1 file changed, 35 insertions(+), 33 deletions(-) diff --git a/lib/storage/file.js b/lib/storage/file.js index d9d3f7c89..5f3d4ae46 100644 --- a/lib/storage/file.js +++ b/lib/storage/file.js @@ -94,25 +94,28 @@ const destroyStream = function (stream) { }, STREAM_DESTROY_TIMEOUT); }; -/* - accept a stream, an id (used as a label) and an optional number of milliseconds +/* createIdleStreamCollector + +Takes a stream and returns a function to asynchronously close that stream. +Successive calls to the function will be ignored. - return a function which ignores all arguments - and first tries to gracefully close a stream - then destroys it after a period if the close was not successful - if the function is not called within the specified number of milliseconds - then it will be called implicitly with an error to indicate - that it was run because it timed out +If the function is not called for a period of STREAM_CLOSE_TIMEOUT it will +be called automatically unless its `keepAlive` method has been invoked +in the meantime. Used to prevent file descriptor leaks in the case of +abandoned streams while closing streams which are being read very very +slowly. */ -const ensureStreamCloses = function (stream, id, ms) { - return Util.bake(Util.mkTimeout(Util.once(function (err) { - destroyStream(stream); - if (err) { - // this can only be a timeout error... - console.log("stream close error:", err, id); - } - }), ms || STREAM_CLOSE_TIMEOUT), []); +const createIdleStreamCollector = function (stream) { + // create a function to close the stream which takes no arguments + // and will do nothing after being called the first time + var collector = Util.once(Util.mkAsync(Util.bake(destroyStream, [stream]))); + + // create a second function which will execute the first function after a delay + // calling this function will reset the delay and thus keep the stream 'alive' + collector.keepAlive = Util.throttle(collector, STREAM_CLOSE_TIMEOUT); + collector.keepAlive(); + return collector; }; // readMessagesBin asynchronously iterates over the messages in a channel log @@ -122,25 +125,22 @@ const ensureStreamCloses = function (stream, id, ms) { // it also allows the handler to abort reading at any time const readMessagesBin = (env, id, start, msgHandler, cb) => { const stream = Fs.createReadStream(mkPath(env, id), { start: start }); - const finish = ensureStreamCloses(stream, '[readMessagesBin:' + id + ']'); - return void readFileBin(stream, msgHandler, function (err) { - cb(err); - finish(); - }); + const collector = createIdleStreamCollector(stream); + const handleMessageAndKeepStreamAlive = Util.both(msgHandler, collector.keepAlive); + const done = Util.both(cb, collector); + return void readFileBin(stream, handleMessageAndKeepStreamAlive, done); }; // reads classic metadata from a channel log and aborts // returns undefined if the first message was not an object (not an array) var getMetadataAtPath = function (Env, path, _cb) { const stream = Fs.createReadStream(path, { start: 0 }); - const finish = ensureStreamCloses(stream, '[getMetadataAtPath:' + path + ']'); - var cb = Util.once(Util.mkAsync(Util.both(_cb, finish)), function () { - throw new Error("Multiple Callbacks"); - }); - + const collector = createIdleStreamCollector(stream); + var cb = Util.once(Util.mkAsync(Util.both(_cb, collector))); var i = 0; return readFileBin(stream, function (msgObj, readMore, abort) { + collector.keepAlive(); const line = msgObj.buff.toString('utf8'); if (!line) { @@ -149,7 +149,7 @@ var getMetadataAtPath = function (Env, path, _cb) { // metadata should always be on the first line or not exist in the channel at all if (i++ > 0) { - console.log("aborting"); + //console.log("aborting"); abort(); return void cb(); } @@ -219,10 +219,11 @@ var clearChannel = function (env, channelId, _cb) { */ var readMessages = function (path, msgHandler, _cb) { var stream = Fs.createReadStream(path, { start: 0}); - const finish = ensureStreamCloses(stream, '[readMessages:' + path + ']'); - var cb = Util.once(Util.mkAsync(Util.both(finish, _cb))); + var collector = createIdleStreamCollector(stream); + var cb = Util.once(Util.mkAsync(Util.both(_cb, collector))); return readFileBin(stream, function (msgObj, readMore) { + collector.keepAlive(); msgHandler(msgObj.buff.toString('utf8')); readMore(); }, function (err) { @@ -247,10 +248,11 @@ var getDedicatedMetadata = function (env, channelId, handler, _cb) { var metadataPath = mkMetadataPath(env, channelId); var stream = Fs.createReadStream(metadataPath, {start: 0}); - const finish = ensureStreamCloses(stream, '[getDedicatedMetadata:' + metadataPath + ']'); - var cb = Util.both(finish, _cb); + const collector = createIdleStreamCollector(stream); + var cb = Util.both(_cb, collector); readFileBin(stream, function (msgObj, readMore) { + collector.keepAlive(); var line = msgObj.buff.toString('utf8'); try { var parsed = JSON.parse(line); @@ -758,11 +760,11 @@ var getChannel = function (env, id, _callback) { }); }); }).nThen(function () { - channel.delayClose = Util.throttle(function () { + channel.delayClose = Util.throttle(Util.once(function () { delete env.channels[id]; destroyStream(channel.writeStream, path); //console.log("closing writestream"); - }, CHANNEL_WRITE_WINDOW); + }), CHANNEL_WRITE_WINDOW); channel.delayClose(); env.channels[id] = channel; done(void 0, channel); From d72e053560f7dfeab8fd244f47ec76b173219324 Mon Sep 17 00:00:00 2001 From: ansuz Date: Thu, 22 Oct 2020 11:24:58 +0530 Subject: [PATCH 6/6] make a note to improve stream timeout error handling --- lib/storage/file.js | 3 +++ 1 file changed, 3 insertions(+) diff --git a/lib/storage/file.js b/lib/storage/file.js index 5f3d4ae46..b1ccfde0f 100644 --- a/lib/storage/file.js +++ b/lib/storage/file.js @@ -105,6 +105,9 @@ in the meantime. Used to prevent file descriptor leaks in the case of abandoned streams while closing streams which are being read very very slowly. +XXX inform the stream consumer when it has been closed prematurely +by calling back with a TIMEOUT error or something + */ const createIdleStreamCollector = function (stream) { // create a function to close the stream which takes no arguments