diff --git a/scripts/expire-channels.js b/scripts/expire-channels.js index 5c87c7cea..2479fc193 100644 --- a/scripts/expire-channels.js +++ b/scripts/expire-channels.js @@ -1,113 +1,43 @@ -var Fs = require("fs"); -var Path = require("path"); - var nThen = require("nthen"); +var Tasks = require("../storage/tasks"); +var Logger = require("../lib/log"); var config = require("../lib/load-config"); - var FileStorage = require('../' + config.storage || './storage/file'); -var root = Path.resolve('../' + config.taskPath || './tasks'); - -var dirs; -var nt; -var store; - -var queue = function (f) { - nt = nt.nThen(f); -}; - -var tryParse = function (s) { - try { return JSON.parse(s); } - catch (e) { return null; } -}; - -var CURRENT = +new Date(); - -var handleTask = function (str, path, cb) { - var task = tryParse(str); - if (!Array.isArray(task)) { - console.error('invalid task: not array'); - return cb(); - } - if (task.length < 2) { - console.error('invalid task: too small'); - return cb(); - } - var time = task[0]; - var command = task[1]; - var args = task.slice(2); +nThen(function (w) { + Logger.create(config, w(function (_log) { + config.log = _log; + })); +}).nThen(function (w) { + FileStorage.create(config, w(function (_store) { + config.store = _store; - if (time > CURRENT) { - // not time for this task yet - console.log('not yet time'); - return cb(); - } + // config.taskPath + // config.store + // config.filePath + // config.blobPath + // config.coldPath - nThen(function (waitFor) { - switch (command) { - case 'EXPIRE': - // FIXME noisy! - console.log("expiring: %s", args[0]); - store.removeChannel(args[0], waitFor()); - break; - default: - // FIXME noisy - console.log("unknown command", command); - } - }).nThen(function () { - // remove the task file... - Fs.unlink(path, function (err) { // FIXME deletion - if (err) { console.error(err); } - cb(); - }); - }); -}; + // config.enableTaskScheduling -nt = nThen(function (w) { - Fs.readdir(root, w(function (e, list) { - if (e) { throw e; } - dirs = list; - if (dirs.length === 0) { - w.abort(); - return; + })); +}).nThen(function (w) { + Tasks.create(config, w(function (err, _tasks) { + if (err) { + throw err; } + config.tasks = _tasks; })); -}).nThen(function (waitFor) { - FileStorage.create(config, waitFor(function (_store) { - store = _store; +}).nThen(function (w) { + config.tasks.runAll(w(function (err) { + if (err) { + // either TASK_CONCURRENCY + // or an error from tasks.list + } })); }).nThen(function () { - dirs.forEach(function (dir, dIdx) { - queue(function (w) { - // FIXME noisy! - console.log('recursing into %s', dir); - Fs.readdir(Path.join(root, dir), w(function (e, list) { - list.forEach(function (fn) { - queue(function (w) { - var filePath = Path.join(root, dir, fn); - var cb = w(); - - // FIXME noisy! - console.log("processing file at %s", filePath); - Fs.readFile(filePath, 'utf8', function (e, str) { - if (e) { - console.error(e); - return void cb(); - } - - handleTask(str, filePath, cb); - }); - }); - }); - if (dIdx === (dirs.length - 1)) { - queue(function () { - store.shutdown(); - }); - } - })); - }); - }); + config.store.shutdown(); + config.log.shutdown(); }); - diff --git a/storage/tasks.js b/storage/tasks.js index 5a999998c..0edf068e8 100644 --- a/storage/tasks.js +++ b/storage/tasks.js @@ -6,6 +6,11 @@ var nThen = require("nthen"); var Tasks = module.exports; +var tryParse = function (s) { + try { return JSON.parse(s); } + catch (e) { return null; } +}; + var encode = function (time, command, args) { if (typeof(time) !== 'number') { return null; } if (typeof(command) !== 'string') { return null; } @@ -73,25 +78,199 @@ var write = function (env, task, cb) { }); }; -// TODO implement a standard API for removing tasks -// currently they are deleted manually in 'expire-channels.js' -// var remove = function (env, id, cb) { }; +var list = Tasks.list = function (env, cb) { + var rootDirs; + + nThen(function (w) { + // read the root directory + Fs.readdir(env.root, w(function (e, list) { + if (e) { + env.log.error("TASK_ROOT_DIR", { + root: env.root, + error: e, + }); + return void cb(e); + } + if (list.length === 0) { + w.abort(); + return void cb(void 0, []); + } + rootDirs = list; + })); + }).nThen(function () { + // schedule the nested directories for exploration + // return a list of paths to tasks + var queue = nThen(function () {}); + + var allPaths = []; + + // We prioritize a small footprint over speed, so we + // iterate over directories in serial rather than parallel + rootDirs.forEach(function (dir) { + queue.nThen(function (w) { + var subPath = Path.join(env.root, dir); + Fs.readdir(subPath, w(function (e, paths) { + if (e) { + env.log.error("TASKS_INVALID_SUBDIR", { + path: subPath, + error: e, + }); + return; + } + // concat in place + Array.prototype.push.apply(allPaths, paths.map(function (p) { + return Path.join(subPath, p); + })); + })); + }); + }); + + queue.nThen(function () { + cb(void 0, allPaths); + }); + }); +}; + +var remove = function (env, path, cb) { + Fs.unlink(path, cb); +}; + +var read = function (env, filePath, cb) { + Fs.readFile(filePath, 'utf8', function (e, str) { + if (e) { return void cb(e); } + + var task = tryParse(str); + if (!Array.isArray(task) || task.length < 2) { + env.log("INVALID_TASK", { + path: filePath, + task: task, + }); + return cb(new Error('INVALID_TASK')); + } + cb(void 0, task); + }); +}; + +var run = Tasks.run = function (env, path, cb) { + var CURRENT = +new Date(); + + var Log = env.log; + var task, time, command, args; + + nThen(function (w) { + read(env, path, w(function (err, _task) { + if (err) { + w.abort(); + // there was a file but it wasn't valid? + return void cb(err); + } + task = _task; + time = task[0]; + + if (time > CURRENT) { + w.abort(); + return cb(); + } + + command = task[1]; + args = task.slice(2); + })); + }).nThen(function (w) { + switch (command) { + case 'EXPIRE': + Log.info('DELETION_SCHEDULED_EXPIRATION', { + task: task, + }); + env.store.removeChannel(args[0], w()); + break; + default: + Log.warn("TASKS_UNKNOWN_COMMAND", task); + } + }).nThen(function () { + // remove the task file... + remove(env, path, function (err) { + if (err) { + Log.error('TASKS_RECORD_REMOVAL', { + path: path, + err: err, + }); + } + cb(); + }); + }); +}; + +var runAll = function (env, cb) { + // check if already running and bail out if so + if (env.running) { + return void cb("TASK_CONCURRENCY"); + } + + // if not, set a flag to block concurrency and proceed + env.running = true; + + var paths; + nThen(function (w) { + list(env, w(function (err, _paths) { + if (err) { + w.abort(); + env.running = false; + return void cb(err); + } + paths = _paths; + })); + }).nThen(function (w) { + var done = w(); + var nt = nThen(function () {}); + paths.forEach(function (path) { + nt.nThen(function (w) { + run(env, path, w(function (err) { + if (err) { + // Any errors are already logged in 'run' + // the admin will need to review the logs and clean up + } + })); + }); + }); + nt.nThen(function () { + done(); + }); + }).nThen(function (/*w*/) { + env.running = false; + cb(); + }); +}; Tasks.create = function (config, cb) { + if (!config.store) { throw new Error("E_STORE_REQUIRED"); } + if (!config.log) { throw new Error("E_LOG_REQUIRED"); } + var env = { root: config.taskPath || './tasks', + log: config.log, + store: config.store, }; // make sure the path exists... Fse.mkdirp(env.root, 0x1ff, function (err) { - if (err && err.code !== 'EEXIST') { - throw err; - } + if (err) { return void cb(err); } cb(void 0, { write: function (time, command, args, cb) { var task = encode(time, command, args); write(env, task, cb); }, + list: function (olderThan, cb) { + list(env, olderThan, cb); + }, + remove: function (id, cb) { + remove(env, id, cb); + }, + run: function (id, cb) { + run(env, id, cb); + }, + runAll: function (cb) { + runAll(env, cb); + }, }); }); };