guard against several serverside typeErrors

and warn in cases where they would have occurred
pull/1/head
ansuz 4 years ago
parent e8428a2a73
commit 100b417646

@ -82,6 +82,13 @@ Workers.initialize = function (Env, config, _cb) {
var drained = true;
var sendCommand = function (msg, _cb, opt) {
if (!_cb) {
return void Log.error('WORKER_COMMAND_MISSING_CB', {
msg: msg,
opt: opt,
});
}
opt = opt || {};
var index = getAvailableWorkerIndex();
@ -95,7 +102,7 @@ Workers.initialize = function (Env, config, _cb) {
});
if (drained) {
drained = false;
Log.debug('WORKER_QUEUE_BACKLOG', {
Log.error('WORKER_QUEUE_BACKLOG', {
workers: workers.length,
});
}
@ -103,13 +110,6 @@ Workers.initialize = function (Env, config, _cb) {
return;
}
const txid = guid();
msg.txid = txid;
msg.pid = PID;
// track which worker is doing which jobs
state.tasks[txid] = msg;
var cb = Util.once(Util.mkAsync(Util.both(_cb, function (err /*, value */) {
if (err !== 'TIMEOUT') { return; }
// in the event of a timeout the user will receive an error
@ -120,6 +120,17 @@ Workers.initialize = function (Env, config, _cb) {
delete state.tasks[txid];
})));
if (!msg) {
return void cb('ESERVERERR');
}
const txid = guid();
msg.txid = txid;
msg.pid = PID;
// track which worker is doing which jobs
state.tasks[txid] = msg;
// default to timing out affter 180s if no explicit timeout is passed
var timeout = typeof(opt.timeout) !== 'undefined'? opt.timeout: 180000;
response.expect(txid, cb, timeout);
@ -153,6 +164,13 @@ Workers.initialize = function (Env, config, _cb) {
}
var nextMsg = queue.shift();
if (!nextMsg || !nextMsg.msg) {
return void Log.error('WORKER_QUEUE_EMPTY_MESSAGE', {
item: nextMsg,
});
}
/* `nextMsg` was at the top of the queue.
We know that a job just finished and all of this code
is synchronous, so calling `sendCommand` should take the worker
@ -200,7 +218,7 @@ Workers.initialize = function (Env, config, _cb) {
const cb = response.expectation(txid);
if (typeof(cb) !== 'function') { return; }
const task = state.tasks[txid];
if (!task && task.msg) { return; }
if (!(task && task.msg)) { return; }
response.clear(txid);
Log.info('DB_WORKER_RESEND', task.msg);
sendCommand(task.msg, cb);

Loading…
Cancel
Save