reassign db tasks if the responsible worker fails

pull/1/head
ansuz 5 years ago
parent 172823c954
commit 9058a59555

@ -115,14 +115,58 @@ Workers.initializeIndexWorkers = function (Env, config, _cb) {
Log[level](label, info); Log[level](label, info);
}; };
var isWorker = function (value) {
return value && value.worker && typeof(value.worker.send) === 'function';
};
// pick ids that aren't already in use...
const guid = function () {
var id = Util.uid();
return response.expected(id)? guid(): id;
};
var workerIndex = 0;
var sendCommand = function (msg, _cb) {
console.log("SEND_COMMAND");
var cb = Util.once(Util.mkAsync(_cb));
workerIndex = (workerIndex + 1) % workers.length;
if (!isWorker(workers[workerIndex])) {
return void cb("NO_WORKERS");
}
var state = workers[workerIndex];
// XXX insert a queue here to prevent timeouts
const txid = guid();
msg.txid = txid;
msg.pid = PID;
// track which worker is doing which jobs
state.tasks[txid] = msg;
response.expect(txid, function (err, value) {
// clean up when you get a response
delete state[txid];
cb(err, value);
}, 60000);
state.worker.send(msg);
};
const initWorker = function (worker, cb) { const initWorker = function (worker, cb) {
//console.log("initializing index worker"); //console.log("initializing index worker");
const txid = Util.uid(); const txid = guid();
const state = {
worker: worker,
tasks: {},
};
response.expect(txid, function (err) { response.expect(txid, function (err) {
if (err) { return void cb(err); } if (err) { return void cb(err); }
//console.log("worker initialized"); //console.log("worker initialized");
workers.push(worker); workers.push(state);
cb(); cb(void 0, state);
}, 15000); }, 15000);
worker.send({ worker.send({
@ -148,18 +192,28 @@ Workers.initializeIndexWorkers = function (Env, config, _cb) {
}); });
var substituteWorker = Util.once(function () { var substituteWorker = Util.once(function () {
// XXX reassign jobs delegated to failed workers Env.Log.info("SUBSTITUTE_DB_WORKER", '');
Env.Log.info("SUBSTITUTE_INDEX_WORKER", ''); var idx = workers.indexOf(state);
var idx = workers.indexOf(worker);
if (idx !== -1) { if (idx !== -1) {
workers.splice(idx, 1); workers.splice(idx, 1);
} }
Object.keys(state.tasks).forEach(function (txid) {
const cb = response.expectation(txid);
if (typeof(cb) !== 'function') { return; }
const task = state.tasks[txid];
if (!task && task.msg) { return; }
response.clear(txid);
Log.info('DB_WORKER_RESEND', task.msg);
sendCommand(task.msg, cb);
});
var w = fork(DB_PATH); var w = fork(DB_PATH);
initWorker(w, function (err) { initWorker(w, function (err, state) {
if (err) { if (err) {
throw new Error(err); throw new Error(err);
} }
workers.push(w); workers.push(state);
}); });
}); });
@ -167,32 +221,12 @@ Workers.initializeIndexWorkers = function (Env, config, _cb) {
worker.on('close', substituteWorker); worker.on('close', substituteWorker);
worker.on('error', function (err) { worker.on('error', function (err) {
substituteWorker(); substituteWorker();
Env.log.error("INDEX_WORKER_ERROR", { Env.Log.error("DB_WORKER_ERROR", {
error: err, error: err,
}); });
}); });
}; };
var workerIndex = 0;
var sendCommand = function (msg, _cb) {
var cb = Util.once(Util.mkAsync(_cb));
workerIndex = (workerIndex + 1) % workers.length;
if (workers.length === 0 ||
typeof(workers[workerIndex].send) !== 'function') {
return void cb("NO_WORKERS");
}
// XXX insert a queue here to prevent timeouts
// XXX track which worker is doing which jobs
const txid = Util.uid();
msg.txid = txid;
msg.pid = PID;
response.expect(txid, cb, 60000);
workers[workerIndex].send(msg);
};
nThen(function (w) { nThen(function (w) {
OS.cpus().forEach(function () { OS.cpus().forEach(function () {
initWorker(fork(DB_PATH), w(function (err) { initWorker(fork(DB_PATH), w(function (err) {
@ -299,13 +333,10 @@ Workers.initializeIndexWorkers = function (Env, config, _cb) {
}, cb); }, cb);
}; };
//console.log("index workers ready");
cb(void 0); cb(void 0);
}); });
}; };
// XXX task expiration...
Workers.initialize = function (Env, config, cb) { Workers.initialize = function (Env, config, cb) {
Workers.initializeValidationWorkers(Env); Workers.initializeValidationWorkers(Env);
Workers.initializeIndexWorkers(Env, config, cb); Workers.initializeIndexWorkers(Env, config, cb);

@ -134,6 +134,9 @@
expected: function (id) { expected: function (id) {
return Boolean(pending[id]); return Boolean(pending[id]);
}, },
expectation: function (id) {
return pending[id];
},
expect: expect, expect: expect,
handle: handle, handle: handle,
}; };

Loading…
Cancel
Save