diff --git a/lib/historyKeeper.js b/lib/historyKeeper.js index 6ec43aaa3..f70dba006 100644 --- a/lib/historyKeeper.js +++ b/lib/historyKeeper.js @@ -245,6 +245,7 @@ module.exports.create = function (config, cb) { Workers.initialize(Env, { blobPath: config.blobPath, blobStagingPath: config.blobStagingPath, + taskPath: config.taskPath, pinPath: pinPath, filePath: config.filePath, archivePath: config.archivePath, @@ -257,26 +258,25 @@ module.exports.create = function (config, cb) { } })); }).nThen(function (w) { - // create a task store + // create a task store (for scheduling tasks) require("./storage/tasks").create(config, w(function (e, tasks) { - if (e) { - throw e; - } + if (e) { throw e; } Env.tasks = tasks; - config.tasks = tasks; - if (config.disableIntegratedTasks) { return; } - - config.intervals = config.intervals || {}; - // XXX - config.intervals.taskExpiration = setInterval(function () { - tasks.runAll(function (err) { - if (err) { - // either TASK_CONCURRENCY or an error with tasks.list - // in either case it is already logged. - } - }); - }, 1000 * 60 * 5); // run every five minutes })); + if (config.disableIntegratedTasks) { return; } + config.intervals = config.intervals || {}; + + var tasks_running; + config.intervals.taskExpiration = setInterval(function () { + if (tasks_running) { return; } + tasks_running = true; + Env.runTasks(function (err) { + if (err) { + Log.error('TASK_RUNNER_ERR', err); + } + tasks_running = false; + }); + }, 1000 * 60 * 5); // run every five minutes }).nThen(function () { RPC.create(Env, function (err, _rpc) { if (err) { throw err; } diff --git a/lib/workers/compute-index.js b/lib/workers/compute-index.js index a1fe06645..3b973015b 100644 --- a/lib/workers/compute-index.js +++ b/lib/workers/compute-index.js @@ -11,6 +11,7 @@ const Pins = require("../pins"); const Core = require("../commands/core"); const Saferphore = require("saferphore"); const Logger = require("../log"); +const Tasks = require("../storage/tasks"); const Env = { Log: {}, @@ -31,6 +32,7 @@ var ready = false; var store; var pinStore; var blobStore; +var tasks; const init = function (config, _cb) { const cb = Util.once(Util.mkAsync(_cb)); if (!config) { @@ -66,6 +68,18 @@ const init = function (config, _cb) { } blobStore = blob; })); + }).nThen(function (w) { + Tasks.create({ + log: Env.Log, + taskPath: config.taskPath, + store: store, + }, w(function (err, tasks) { + if (err) { + w.abort(); + return void cb(err); + } + Env.tasks = tasks; + })); }).nThen(function () { cb(); }); @@ -393,6 +407,10 @@ const removeOwnedBlob = function (data, cb) { }); }; +const runTasks = function (data, cb) { + Env.tasks.runAll(cb); +}; + const COMMANDS = { COMPUTE_INDEX: computeIndex, COMPUTE_METADATA: computeMetadata, @@ -404,6 +422,7 @@ const COMMANDS = { GET_MULTIPLE_FILE_SIZE: getMultipleFileSize, GET_HASH_OFFSET: getHashOffset, REMOVE_OWNED_BLOB: removeOwnedBlob, + RUN_TASKS: runTasks, }; process.on('message', function (data) { @@ -439,7 +458,7 @@ process.on('message', function (data) { }); process.on('uncaughtException', function (err) { - console.error('[%s] UNCAUGHT EXCEPTION IN DB WORKER'); + console.error('[%s] UNCAUGHT EXCEPTION IN DB WORKER', new Date()); console.error(err); console.error("TERMINATING"); process.exit(1); diff --git a/lib/workers/index.js b/lib/workers/index.js index dc380a208..8abdd2e15 100644 --- a/lib/workers/index.js +++ b/lib/workers/index.js @@ -290,6 +290,12 @@ Workers.initializeIndexWorkers = function (Env, config, _cb) { }, cb); }; + Env.runTasks = function (cb) { + sendCommand({ + command: 'RUN_TASKS', + }, cb); + }; + //console.log("index workers ready"); cb(void 0); });