|
|
|
@ -8,7 +8,7 @@ const Workers = module.exports;
|
|
|
|
|
const PID = process.pid;
|
|
|
|
|
|
|
|
|
|
const DB_PATH = 'lib/workers/db-worker';
|
|
|
|
|
const MAX_JOBS = 8;
|
|
|
|
|
const MAX_JOBS = 16;
|
|
|
|
|
|
|
|
|
|
Workers.initialize = function (Env, config, _cb) {
|
|
|
|
|
var cb = Util.once(Util.mkAsync(_cb));
|
|
|
|
@ -35,6 +35,11 @@ Workers.initialize = function (Env, config, _cb) {
|
|
|
|
|
return response.expected(id)? guid(): id;
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
const countWorkerTasks = function (/* index */) {
|
|
|
|
|
return 0; // XXX this disables all queueing until it can be proven correct
|
|
|
|
|
//return Object.keys(workers[index].tasks || {}).length;
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
var workerOffset = -1;
|
|
|
|
|
var queue = [];
|
|
|
|
|
var getAvailableWorkerIndex = function () {
|
|
|
|
@ -67,7 +72,8 @@ Workers.initialize = function (Env, config, _cb) {
|
|
|
|
|
but this is a relatively easy way to make sure it's always up to date.
|
|
|
|
|
We'll see how it performs in practice before optimizing.
|
|
|
|
|
*/
|
|
|
|
|
if (workers[temp] && Object.keys(workers[temp].tasks || {}).length < MAX_JOBS) {
|
|
|
|
|
|
|
|
|
|
if (workers[temp] && countWorkerTasks(temp) <= MAX_JOBS) {
|
|
|
|
|
return temp;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
@ -96,8 +102,6 @@ Workers.initialize = function (Env, config, _cb) {
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
var cb = Util.once(Util.mkAsync(_cb));
|
|
|
|
|
|
|
|
|
|
const txid = guid();
|
|
|
|
|
msg.txid = txid;
|
|
|
|
|
msg.pid = PID;
|
|
|
|
@ -105,6 +109,16 @@ Workers.initialize = function (Env, config, _cb) {
|
|
|
|
|
// track which worker is doing which jobs
|
|
|
|
|
state.tasks[txid] = msg;
|
|
|
|
|
|
|
|
|
|
var cb = Util.once(Util.mkAsync(Util.both(_cb, function (err /*, value */) {
|
|
|
|
|
if (err !== 'TIMEOUT') { return; }
|
|
|
|
|
// in the event of a timeout the user will receive an error
|
|
|
|
|
// but the state used to resend a query in the event of a worker crash
|
|
|
|
|
// won't be cleared. This also leaks a slot that could be used to keep
|
|
|
|
|
// an upper bound on the amount of parallelism for any given worker.
|
|
|
|
|
// if you run out of slots then the worker locks up.
|
|
|
|
|
delete state.tasks[txid];
|
|
|
|
|
})));
|
|
|
|
|
|
|
|
|
|
response.expect(txid, cb, 180000);
|
|
|
|
|
state.worker.send(msg);
|
|
|
|
|
};
|
|
|
|
|