solve some cases where crashing workers could result in an invalid state for the queue

pull/1/head
ansuz 5 years ago
parent 89262cd29e
commit e8b1fcf710

@ -124,11 +124,12 @@ Workers.initializeIndexWorkers = function (Env, config, _cb) {
return response.expected(id)? guid(): id; return response.expected(id)? guid(): id;
}; };
const MAX_JOBS = 32;
var workerOffset = -1; var workerOffset = -1;
var getAvailableWorkerIndex = function () { var getAvailableWorkerIndex = function () {
var L = workers.length; var L = workers.length;
if (L === 0) { if (L === 0) {
console.log("no workers available"); console.log("no workers available"); // XXX
return -1; return -1;
} }
@ -141,7 +142,15 @@ Workers.initializeIndexWorkers = function (Env, config, _cb) {
var temp; var temp;
for (let i = 0; i < L; i++) { for (let i = 0; i < L; i++) {
temp = (workerOffset + i) % L; temp = (workerOffset + i) % L;
if (workers[temp] && workers[temp].count > 0) { /* I'd like for this condition to be more efficient
(`Object.keys` is sub-optimal) but I found some bugs in my initial
implementation stemming from a task counter variable going out-of-sync
with reality when a worker crashed and its tasks were re-assigned to
its substitute. I'm sure it can be done correctly and efficiently,
but this is a relatively easy way to make sure it's always up to date.
We'll see how it performs in practice before optimizing.
*/
if (workers[temp] && Object.keys(workers[temp]).length < MAX_JOBS) {
return temp; return temp;
} }
} }
@ -149,7 +158,6 @@ Workers.initializeIndexWorkers = function (Env, config, _cb) {
}; };
var queue = []; var queue = [];
var MAX_JOBS = 32; //1; //8;
var sendCommand = function (msg, _cb) { var sendCommand = function (msg, _cb) {
var index = getAvailableWorkerIndex(); var index = getAvailableWorkerIndex();
@ -157,24 +165,14 @@ Workers.initializeIndexWorkers = function (Env, config, _cb) {
var state = workers[index]; var state = workers[index];
// if there is no worker available: // if there is no worker available:
if (!isWorker(state)) { if (!isWorker(state)) {
console.log("queueing for later");
// queue the message for when one becomes available // queue the message for when one becomes available
queue.push({ queue.push({
msg: msg, msg: msg,
cb: _cb, cb: _cb,
}); });
return; return;
//return void cb("NO_WORKERS");
} else {
console.log("worker #%s handling %s messages currently", index, MAX_JOBS + 1 - state.count);
} }
console.log("%s queued messages", queue.length);
console.log("[%s]\n", msg.command);
//console.log(msg);
var cb = Util.once(Util.mkAsync(_cb)); var cb = Util.once(Util.mkAsync(_cb));
const txid = guid(); const txid = guid();
@ -183,24 +181,12 @@ Workers.initializeIndexWorkers = function (Env, config, _cb) {
// track which worker is doing which jobs // track which worker is doing which jobs
state.tasks[txid] = msg; state.tasks[txid] = msg;
state.count--;
if (state.count < 0) { response.expect(txid, cb, 60000);
console.log(state);
throw new Error("too many jobs"); // XXX
}
response.expect(txid, function (err, value) {
// clean up when you get a response
delete state[txid];
state.count++;
cb(err, value);
}, 60000);
state.worker.send(msg); state.worker.send(msg);
}; };
var backlogged; var handleResponse = function (state, res) {
var handleResponse = function (res) {
if (!res) { return; } if (!res) { return; }
// handle log messages before checking if it was addressed to your PID // handle log messages before checking if it was addressed to your PID
// it might still be useful to know what happened inside an orphaned worker // it might still be useful to know what happened inside an orphaned worker
@ -213,28 +199,22 @@ Workers.initializeIndexWorkers = function (Env, config, _cb) {
return void Log.error("WRONG_PID", res); return void Log.error("WRONG_PID", res);
} }
setTimeout(function () { if (!res.txid) { return; }
response.handle(res.txid, [res.error, res.value]); response.handle(res.txid, [res.error, res.value]);
delete state.tasks[res.txid];
if (!queue.length) { if (!queue.length) { return; }
if (backlogged) {
backlogged = false; var nextMsg = queue.shift();
console.log("queue has been drained"); /* `nextMsg` was at the top of the queue.
} We know that a job just finished and all of this code
return; is synchronous, so calling `sendCommand` should take the worker
} else { which was just freed up. This is somewhat fragile though, so
backlogged = true; be careful if you want to modify this block. The risk is that
console.log(queue, queue.length); we take something that was at the top of the queue and push it
} to the back because the following msg took its place. OR, in an
even worse scenario, we cycle through the queue but don't run anything.
console.log("taking queued message"); */
sendCommand(nextMsg.msg, nextMsg.cb);
// XXX take a request from the queue
var nextMsg = queue.shift();
sendCommand(nextMsg.msg, nextMsg.cb); // XXX doesn't feel right
console.log("%s queued messages remaining", queue.length);
}, (Math.floor(Math.random() * 150) * 10));
}; };
const initWorker = function (worker, cb) { const initWorker = function (worker, cb) {
@ -243,7 +223,6 @@ Workers.initializeIndexWorkers = function (Env, config, _cb) {
const state = { const state = {
worker: worker, worker: worker,
tasks: {}, tasks: {},
count: MAX_JOBS, //1, // XXX
}; };
response.expect(txid, function (err) { response.expect(txid, function (err) {
@ -258,7 +237,9 @@ Workers.initializeIndexWorkers = function (Env, config, _cb) {
config: config, config: config,
}); });
worker.on('message', handleResponse); worker.on('message', function (res) {
handleResponse(state, res);
});
var substituteWorker = Util.once(function () { var substituteWorker = Util.once(function () {
Env.Log.info("SUBSTITUTE_DB_WORKER", ''); Env.Log.info("SUBSTITUTE_DB_WORKER", '');

Loading…
Cancel
Save