From 9f1f01f3b4ce3e576dbf3101e2f57a29b6798ee7 Mon Sep 17 00:00:00 2001 From: ansuz Date: Tue, 5 May 2020 18:06:28 -0400 Subject: [PATCH] disable worker task queueing and address a probable memory leak --- lib/workers/index.js | 22 ++++++++++++++++++---- 1 file changed, 18 insertions(+), 4 deletions(-) diff --git a/lib/workers/index.js b/lib/workers/index.js index 43d8eb291..a77de5437 100644 --- a/lib/workers/index.js +++ b/lib/workers/index.js @@ -8,7 +8,7 @@ const Workers = module.exports; const PID = process.pid; const DB_PATH = 'lib/workers/db-worker'; -const MAX_JOBS = 8; +const MAX_JOBS = 16; Workers.initialize = function (Env, config, _cb) { var cb = Util.once(Util.mkAsync(_cb)); @@ -35,6 +35,11 @@ Workers.initialize = function (Env, config, _cb) { return response.expected(id)? guid(): id; }; + const countWorkerTasks = function (/* index */) { + return 0; // XXX this disables all queueing until it can be proven correct + //return Object.keys(workers[index].tasks || {}).length; + }; + var workerOffset = -1; var queue = []; var getAvailableWorkerIndex = function () { @@ -67,7 +72,8 @@ Workers.initialize = function (Env, config, _cb) { but this is a relatively easy way to make sure it's always up to date. We'll see how it performs in practice before optimizing. */ - if (workers[temp] && Object.keys(workers[temp].tasks || {}).length < MAX_JOBS) { + + if (workers[temp] && countWorkerTasks(temp) <= MAX_JOBS) { return temp; } } @@ -96,8 +102,6 @@ Workers.initialize = function (Env, config, _cb) { return; } - var cb = Util.once(Util.mkAsync(_cb)); - const txid = guid(); msg.txid = txid; msg.pid = PID; @@ -105,6 +109,16 @@ Workers.initialize = function (Env, config, _cb) { // track which worker is doing which jobs state.tasks[txid] = msg; + var cb = Util.once(Util.mkAsync(Util.both(_cb, function (err /*, value */) { + if (err !== 'TIMEOUT') { return; } + // in the event of a timeout the user will receive an error + // but the state used to resend a query in the event of a worker crash + // won't be cleared. This also leaks a slot that could be used to keep + // an upper bound on the amount of parallelism for any given worker. + // if you run out of slots then the worker locks up. + delete state.tasks[txid]; + }))); + response.expect(txid, cb, 180000); state.worker.send(msg); };