From c53675c9d5e4803850806b943679e19a276666ce Mon Sep 17 00:00:00 2001 From: ansuz Date: Mon, 6 Apr 2020 10:28:26 -0400 Subject: [PATCH 1/2] WIP worker rpc call queue --- lib/workers/index.js | 121 ++++++++++++++++++++++++++++++++++--------- 1 file changed, 97 insertions(+), 24 deletions(-) diff --git a/lib/workers/index.js b/lib/workers/index.js index 7d8c1dcc7..847081104 100644 --- a/lib/workers/index.js +++ b/lib/workers/index.js @@ -124,18 +124,58 @@ Workers.initializeIndexWorkers = function (Env, config, _cb) { return response.expected(id)? guid(): id; }; - var workerIndex = 0; - var sendCommand = function (msg, _cb) { - var cb = Util.once(Util.mkAsync(_cb)); - - workerIndex = (workerIndex + 1) % workers.length; - if (!isWorker(workers[workerIndex])) { - return void cb("NO_WORKERS"); + var workerOffset = -1; + var getAvailableWorkerIndex = function () { + var L = workers.length; + if (L === 0) { + console.log("no workers available"); + return -1; } - var state = workers[workerIndex]; + // cycle through the workers once + // start from a different offset each time + // return -1 if none are available - // XXX insert a queue here to prevent timeouts + workerOffset = (workerOffset + 1) % L; + + var temp; + for (let i = 0; i < L; i++) { + temp = (workerOffset + i) % L; + if (workers[temp] && workers[temp].count > 0) { + return temp; + } + } + return -1; + }; + + var queue = []; + var MAX_JOBS = 32; //1; //8; + + var sendCommand = function (msg, _cb) { + var index = getAvailableWorkerIndex(); + + var state = workers[index]; + // if there is no worker available: + if (!isWorker(state)) { + console.log("queueing for later"); + // queue the message for when one becomes available + queue.push({ + msg: msg, + cb: _cb, + }); + return; + //return void cb("NO_WORKERS"); + } else { + console.log("worker #%s handling %s messages currently", index, MAX_JOBS + 1 - state.count); + + } + + console.log("%s queued messages", queue.length); + + console.log("[%s]\n", msg.command); + //console.log(msg); + + var cb = Util.once(Util.mkAsync(_cb)); const txid = guid(); msg.txid = txid; @@ -143,20 +183,67 @@ Workers.initializeIndexWorkers = function (Env, config, _cb) { // track which worker is doing which jobs state.tasks[txid] = msg; + state.count--; + + if (state.count < 0) { + console.log(state); + throw new Error("too many jobs"); // XXX + } + response.expect(txid, function (err, value) { // clean up when you get a response delete state[txid]; + state.count++; cb(err, value); }, 60000); state.worker.send(msg); }; + var backlogged; + var handleResponse = function (res) { + if (!res) { return; } + // handle log messages before checking if it was addressed to your PID + // it might still be useful to know what happened inside an orphaned worker + if (res.log) { + return void handleLog(res.log, res.label, res.info); + } + // but don't bother handling things addressed to other processes + // since it's basically guaranteed not to work + if (res.pid !== PID) { + return void Log.error("WRONG_PID", res); + } + + setTimeout(function () { + response.handle(res.txid, [res.error, res.value]); + + if (!queue.length) { + if (backlogged) { + backlogged = false; + console.log("queue has been drained"); + } + return; + } else { + backlogged = true; + console.log(queue, queue.length); + } + + console.log("taking queued message"); + + // XXX take a request from the queue + var nextMsg = queue.shift(); + sendCommand(nextMsg.msg, nextMsg.cb); // XXX doesn't feel right + console.log("%s queued messages remaining", queue.length); + + }, (Math.floor(Math.random() * 150) * 10)); + }; + const initWorker = function (worker, cb) { const txid = guid(); const state = { worker: worker, tasks: {}, + count: MAX_JOBS, //1, // XXX }; response.expect(txid, function (err) { @@ -171,21 +258,7 @@ Workers.initializeIndexWorkers = function (Env, config, _cb) { config: config, }); - worker.on('message', function (res) { - if (!res) { return; } - // handle log messages before checking if it was addressed to your PID - // it might still be useful to know what happened inside an orphaned worker - if (res.log) { - return void handleLog(res.log, res.label, res.info); - } - // but don't bother handling things addressed to other processes - // since it's basically guaranteed not to work - if (res.pid !== PID) { - return void Log.error("WRONG_PID", res); - } - - response.handle(res.txid, [res.error, res.value]); - }); + worker.on('message', handleResponse); var substituteWorker = Util.once(function () { Env.Log.info("SUBSTITUTE_DB_WORKER", ''); From e8b1fcf710c69d701d0bf73f49d5f7f0ebe06169 Mon Sep 17 00:00:00 2001 From: ansuz Date: Wed, 15 Apr 2020 13:59:54 -0400 Subject: [PATCH 2/2] solve some cases where crashing workers could result in an invalid state for the queue --- lib/workers/index.js | 81 +++++++++++++++++--------------------------- 1 file changed, 31 insertions(+), 50 deletions(-) diff --git a/lib/workers/index.js b/lib/workers/index.js index 847081104..d2944225c 100644 --- a/lib/workers/index.js +++ b/lib/workers/index.js @@ -124,11 +124,12 @@ Workers.initializeIndexWorkers = function (Env, config, _cb) { return response.expected(id)? guid(): id; }; + const MAX_JOBS = 32; var workerOffset = -1; var getAvailableWorkerIndex = function () { var L = workers.length; if (L === 0) { - console.log("no workers available"); + console.log("no workers available"); // XXX return -1; } @@ -141,7 +142,15 @@ Workers.initializeIndexWorkers = function (Env, config, _cb) { var temp; for (let i = 0; i < L; i++) { temp = (workerOffset + i) % L; - if (workers[temp] && workers[temp].count > 0) { +/* I'd like for this condition to be more efficient + (`Object.keys` is sub-optimal) but I found some bugs in my initial + implementation stemming from a task counter variable going out-of-sync + with reality when a worker crashed and its tasks were re-assigned to + its substitute. I'm sure it can be done correctly and efficiently, + but this is a relatively easy way to make sure it's always up to date. + We'll see how it performs in practice before optimizing. +*/ + if (workers[temp] && Object.keys(workers[temp]).length < MAX_JOBS) { return temp; } } @@ -149,7 +158,6 @@ Workers.initializeIndexWorkers = function (Env, config, _cb) { }; var queue = []; - var MAX_JOBS = 32; //1; //8; var sendCommand = function (msg, _cb) { var index = getAvailableWorkerIndex(); @@ -157,24 +165,14 @@ Workers.initializeIndexWorkers = function (Env, config, _cb) { var state = workers[index]; // if there is no worker available: if (!isWorker(state)) { - console.log("queueing for later"); // queue the message for when one becomes available queue.push({ msg: msg, cb: _cb, }); return; - //return void cb("NO_WORKERS"); - } else { - console.log("worker #%s handling %s messages currently", index, MAX_JOBS + 1 - state.count); - } - console.log("%s queued messages", queue.length); - - console.log("[%s]\n", msg.command); - //console.log(msg); - var cb = Util.once(Util.mkAsync(_cb)); const txid = guid(); @@ -183,24 +181,12 @@ Workers.initializeIndexWorkers = function (Env, config, _cb) { // track which worker is doing which jobs state.tasks[txid] = msg; - state.count--; - if (state.count < 0) { - console.log(state); - throw new Error("too many jobs"); // XXX - } - - response.expect(txid, function (err, value) { - // clean up when you get a response - delete state[txid]; - state.count++; - cb(err, value); - }, 60000); + response.expect(txid, cb, 60000); state.worker.send(msg); }; - var backlogged; - var handleResponse = function (res) { + var handleResponse = function (state, res) { if (!res) { return; } // handle log messages before checking if it was addressed to your PID // it might still be useful to know what happened inside an orphaned worker @@ -213,28 +199,22 @@ Workers.initializeIndexWorkers = function (Env, config, _cb) { return void Log.error("WRONG_PID", res); } - setTimeout(function () { - response.handle(res.txid, [res.error, res.value]); + if (!res.txid) { return; } + response.handle(res.txid, [res.error, res.value]); + delete state.tasks[res.txid]; + if (!queue.length) { return; } - if (!queue.length) { - if (backlogged) { - backlogged = false; - console.log("queue has been drained"); - } - return; - } else { - backlogged = true; - console.log(queue, queue.length); - } - - console.log("taking queued message"); - - // XXX take a request from the queue - var nextMsg = queue.shift(); - sendCommand(nextMsg.msg, nextMsg.cb); // XXX doesn't feel right - console.log("%s queued messages remaining", queue.length); - - }, (Math.floor(Math.random() * 150) * 10)); + var nextMsg = queue.shift(); +/* `nextMsg` was at the top of the queue. + We know that a job just finished and all of this code + is synchronous, so calling `sendCommand` should take the worker + which was just freed up. This is somewhat fragile though, so + be careful if you want to modify this block. The risk is that + we take something that was at the top of the queue and push it + to the back because the following msg took its place. OR, in an + even worse scenario, we cycle through the queue but don't run anything. +*/ + sendCommand(nextMsg.msg, nextMsg.cb); }; const initWorker = function (worker, cb) { @@ -243,7 +223,6 @@ Workers.initializeIndexWorkers = function (Env, config, _cb) { const state = { worker: worker, tasks: {}, - count: MAX_JOBS, //1, // XXX }; response.expect(txid, function (err) { @@ -258,7 +237,9 @@ Workers.initializeIndexWorkers = function (Env, config, _cb) { config: config, }); - worker.on('message', handleResponse); + worker.on('message', function (res) { + handleResponse(state, res); + }); var substituteWorker = Util.once(function () { Env.Log.info("SUBSTITUTE_DB_WORKER", '');