From 9058a59555416c0bead041b4e501941e8fa0faa7 Mon Sep 17 00:00:00 2001 From: ansuz Date: Fri, 27 Mar 2020 17:17:42 -0400 Subject: [PATCH] reassign db tasks if the responsible worker fails --- lib/workers/index.js | 95 ++++++++++++++++++++++++++------------- www/common/common-util.js | 3 ++ 2 files changed, 66 insertions(+), 32 deletions(-) diff --git a/lib/workers/index.js b/lib/workers/index.js index 16282b202..5eeb5e285 100644 --- a/lib/workers/index.js +++ b/lib/workers/index.js @@ -115,14 +115,58 @@ Workers.initializeIndexWorkers = function (Env, config, _cb) { Log[level](label, info); }; + var isWorker = function (value) { + return value && value.worker && typeof(value.worker.send) === 'function'; + }; + + // pick ids that aren't already in use... + const guid = function () { + var id = Util.uid(); + return response.expected(id)? guid(): id; + }; + + var workerIndex = 0; + var sendCommand = function (msg, _cb) { + console.log("SEND_COMMAND"); + var cb = Util.once(Util.mkAsync(_cb)); + + workerIndex = (workerIndex + 1) % workers.length; + if (!isWorker(workers[workerIndex])) { + return void cb("NO_WORKERS"); + } + + var state = workers[workerIndex]; + + // XXX insert a queue here to prevent timeouts + + const txid = guid(); + msg.txid = txid; + msg.pid = PID; + + // track which worker is doing which jobs + state.tasks[txid] = msg; + response.expect(txid, function (err, value) { + // clean up when you get a response + delete state[txid]; + cb(err, value); + }, 60000); + state.worker.send(msg); + }; + const initWorker = function (worker, cb) { //console.log("initializing index worker"); - const txid = Util.uid(); + const txid = guid(); + + const state = { + worker: worker, + tasks: {}, + }; + response.expect(txid, function (err) { if (err) { return void cb(err); } //console.log("worker initialized"); - workers.push(worker); - cb(); + workers.push(state); + cb(void 0, state); }, 15000); worker.send({ @@ -148,18 +192,28 @@ Workers.initializeIndexWorkers = function (Env, config, _cb) { }); var substituteWorker = Util.once(function () { - // XXX reassign jobs delegated to failed workers - Env.Log.info("SUBSTITUTE_INDEX_WORKER", ''); - var idx = workers.indexOf(worker); + Env.Log.info("SUBSTITUTE_DB_WORKER", ''); + var idx = workers.indexOf(state); if (idx !== -1) { workers.splice(idx, 1); } + + Object.keys(state.tasks).forEach(function (txid) { + const cb = response.expectation(txid); + if (typeof(cb) !== 'function') { return; } + const task = state.tasks[txid]; + if (!task && task.msg) { return; } + response.clear(txid); + Log.info('DB_WORKER_RESEND', task.msg); + sendCommand(task.msg, cb); + }); + var w = fork(DB_PATH); - initWorker(w, function (err) { + initWorker(w, function (err, state) { if (err) { throw new Error(err); } - workers.push(w); + workers.push(state); }); }); @@ -167,32 +221,12 @@ Workers.initializeIndexWorkers = function (Env, config, _cb) { worker.on('close', substituteWorker); worker.on('error', function (err) { substituteWorker(); - Env.log.error("INDEX_WORKER_ERROR", { + Env.Log.error("DB_WORKER_ERROR", { error: err, }); }); }; - var workerIndex = 0; - var sendCommand = function (msg, _cb) { - var cb = Util.once(Util.mkAsync(_cb)); - - workerIndex = (workerIndex + 1) % workers.length; - if (workers.length === 0 || - typeof(workers[workerIndex].send) !== 'function') { - return void cb("NO_WORKERS"); - } - - // XXX insert a queue here to prevent timeouts - // XXX track which worker is doing which jobs - - const txid = Util.uid(); - msg.txid = txid; - msg.pid = PID; - response.expect(txid, cb, 60000); - workers[workerIndex].send(msg); - }; - nThen(function (w) { OS.cpus().forEach(function () { initWorker(fork(DB_PATH), w(function (err) { @@ -299,13 +333,10 @@ Workers.initializeIndexWorkers = function (Env, config, _cb) { }, cb); }; - //console.log("index workers ready"); cb(void 0); }); }; -// XXX task expiration... - Workers.initialize = function (Env, config, cb) { Workers.initializeValidationWorkers(Env); Workers.initializeIndexWorkers(Env, config, cb); diff --git a/www/common/common-util.js b/www/common/common-util.js index a1c35307f..cf7d5d97e 100644 --- a/www/common/common-util.js +++ b/www/common/common-util.js @@ -134,6 +134,9 @@ expected: function (id) { return Boolean(pending[id]); }, + expectation: function (id) { + return pending[id]; + }, expect: expect, handle: handle, };