diff --git a/lib/workers/index.js b/lib/workers/index.js index d0a9a66f5..4ca1996d0 100644 --- a/lib/workers/index.js +++ b/lib/workers/index.js @@ -82,6 +82,13 @@ Workers.initialize = function (Env, config, _cb) { var drained = true; var sendCommand = function (msg, _cb, opt) { + if (!_cb) { + return void Log.error('WORKER_COMMAND_MISSING_CB', { + msg: msg, + opt: opt, + }); + } + opt = opt || {}; var index = getAvailableWorkerIndex(); @@ -95,7 +102,7 @@ Workers.initialize = function (Env, config, _cb) { }); if (drained) { drained = false; - Log.debug('WORKER_QUEUE_BACKLOG', { + Log.error('WORKER_QUEUE_BACKLOG', { workers: workers.length, }); } @@ -103,13 +110,6 @@ Workers.initialize = function (Env, config, _cb) { return; } - const txid = guid(); - msg.txid = txid; - msg.pid = PID; - - // track which worker is doing which jobs - state.tasks[txid] = msg; - var cb = Util.once(Util.mkAsync(Util.both(_cb, function (err /*, value */) { if (err !== 'TIMEOUT') { return; } // in the event of a timeout the user will receive an error @@ -120,6 +120,17 @@ Workers.initialize = function (Env, config, _cb) { delete state.tasks[txid]; }))); + if (!msg) { + return void cb('ESERVERERR'); + } + + const txid = guid(); + msg.txid = txid; + msg.pid = PID; + + // track which worker is doing which jobs + state.tasks[txid] = msg; + // default to timing out affter 180s if no explicit timeout is passed var timeout = typeof(opt.timeout) !== 'undefined'? opt.timeout: 180000; response.expect(txid, cb, timeout); @@ -153,6 +164,13 @@ Workers.initialize = function (Env, config, _cb) { } var nextMsg = queue.shift(); + + if (!nextMsg || !nextMsg.msg) { + return void Log.error('WORKER_QUEUE_EMPTY_MESSAGE', { + item: nextMsg, + }); + } + /* `nextMsg` was at the top of the queue. We know that a job just finished and all of this code is synchronous, so calling `sendCommand` should take the worker @@ -200,7 +218,7 @@ Workers.initialize = function (Env, config, _cb) { const cb = response.expectation(txid); if (typeof(cb) !== 'function') { return; } const task = state.tasks[txid]; - if (!task && task.msg) { return; } + if (!(task && task.msg)) { return; } response.clear(txid); Log.info('DB_WORKER_RESEND', task.msg); sendCommand(task.msg, cb);