Merge branch 'child-process-fixes' into soon

pull/1/head
ansuz 4 years ago
commit f608f902bf

@ -568,7 +568,7 @@ process.on('message', function (data) {
const cb = function (err, value) { const cb = function (err, value) {
process.send({ process.send({
error: err, error: Util.serializeError(err),
txid: data.txid, txid: data.txid,
pid: data.pid, pid: data.pid,
value: value, value: value,
@ -577,7 +577,7 @@ process.on('message', function (data) {
if (!ready) { if (!ready) {
return void init(data.config, function (err) { return void init(data.config, function (err) {
if (err) { return void cb(err); } if (err) { return void cb(Util.serializeError(err)); }
ready = true; ready = true;
cb(); cb();
}); });

@ -9,6 +9,7 @@ const PID = process.pid;
const DB_PATH = 'lib/workers/db-worker'; const DB_PATH = 'lib/workers/db-worker';
const MAX_JOBS = 16; const MAX_JOBS = 16;
const DEFAULT_QUERY_TIMEOUT = 60000 * 15; // increased from three to fifteen minutes because queries for very large files were taking as long as seven minutes
Workers.initialize = function (Env, config, _cb) { Workers.initialize = function (Env, config, _cb) {
var cb = Util.once(Util.mkAsync(_cb)); var cb = Util.once(Util.mkAsync(_cb));
@ -113,6 +114,7 @@ Workers.initialize = function (Env, config, _cb) {
const txid = guid(); const txid = guid();
var cb = Util.once(Util.mkAsync(Util.both(_cb, function (err /*, value */) { var cb = Util.once(Util.mkAsync(Util.both(_cb, function (err /*, value */) {
if (err !== 'TIMEOUT') { return; } if (err !== 'TIMEOUT') { return; }
Log.debug("WORKER_TIMEOUT_CAUSE", msg);
// in the event of a timeout the user will receive an error // in the event of a timeout the user will receive an error
// but the state used to resend a query in the event of a worker crash // but the state used to resend a query in the event of a worker crash
// won't be cleared. This also leaks a slot that could be used to keep // won't be cleared. This also leaks a slot that could be used to keep
@ -132,7 +134,7 @@ Workers.initialize = function (Env, config, _cb) {
state.tasks[txid] = msg; state.tasks[txid] = msg;
// default to timing out affter 180s if no explicit timeout is passed // default to timing out affter 180s if no explicit timeout is passed
var timeout = typeof(opt.timeout) !== 'undefined'? opt.timeout: 180000; var timeout = typeof(opt.timeout) !== 'undefined'? opt.timeout: DEFAULT_QUERY_TIMEOUT;
response.expect(txid, cb, timeout); response.expect(txid, cb, timeout);
state.worker.send(msg); state.worker.send(msg);
}; };

@ -30,6 +30,15 @@
return JSON.parse(JSON.stringify(o)); return JSON.parse(JSON.stringify(o));
}; };
Util.serializeError = function (err) {
if (!(err instanceof Error)) { return err; }
var ser = {};
Object.getOwnPropertyNames(err).forEach(function (key) {
ser[key] = err[key];
});
return ser;
};
Util.tryParse = function (s) { Util.tryParse = function (s) {
try { return JSON.parse(s); } catch (e) { return;} try { return JSON.parse(s); } catch (e) { return;}
}; };
@ -113,13 +122,13 @@
var handle = function (id, args) { var handle = function (id, args) {
var fn = pending[id]; var fn = pending[id];
if (typeof(fn) !== 'function') { if (typeof(fn) !== 'function') {
errorHandler("MISSING_CALLBACK", { return void errorHandler("MISSING_CALLBACK", {
id: id, id: id,
args: args, args: args,
}); });
} }
try { try {
pending[id].apply(null, Array.isArray(args)? args : [args]); fn.apply(null, Array.isArray(args)? args : [args]);
} catch (err) { } catch (err) {
errorHandler('HANDLER_ERROR', { errorHandler('HANDLER_ERROR', {
error: err, error: err,

Loading…
Cancel
Save