diff --git a/storage/tasks.js b/storage/tasks.js index 0edf068e8..939c2c0cc 100644 --- a/storage/tasks.js +++ b/storage/tasks.js @@ -18,6 +18,7 @@ var encode = function (time, command, args) { return [time, command].concat(args); }; +/* var randomId = function () { var bytes = Array.prototype.slice.call(nacl.randomBytes(16)); return bytes.map(function (b) { @@ -26,59 +27,70 @@ var randomId = function () { }).join(''); }; + var mkPath = function (env, id) { return Path.join(env.root, id.slice(0, 2), id) + '.ndjson'; }; +*/ -var getFreeId = function (env, cb, tries) { - if (tries > 5) { return void cb('ETOOMANYTRIES'); } - - // generate a unique id - var id = randomId(); - - // derive a path from that id - var path = mkPath(env, id); +// make a new folder every MODULUS ms +var MODULUS = 1000 * 60 * 60 * 24; // one day +var moduloTime = function (d) { + return d - (d % MODULUS); +}; - Fs.stat(path, function (err) { - if (err && err.code === "ENOENT") { - cb(void 0, id); - } else { - getFreeId(env, cb); - } - }); +var makeDirectoryId = function (d) { + return '' + moduloTime(d); }; var write = function (env, task, cb) { var str = JSON.stringify(task) + '\n'; var id = nacl.util.encodeBase64(nacl.hash(nacl.util.decodeUTF8(str))).replace(/\//g, '-'); - var path = mkPath(env, id); + var dir = makeDirectoryId(task[0]); + var path = Path.join(env.root, dir); + nThen(function (w) { - // check if the file already exists... - Fs.stat(path, w(function (err) { - if (err && err.code === 'ENOENT') { return; } - w.abort(); cb(); - })); - }).nThen(function (w) { // create the parent directory if it does not exist - var dir = id.slice(0, 2); - var dirpath = Path.join(env.root, dir); - - Fse.mkdirp(dirpath, 0x1ff, w(function (err) { + Fse.mkdirp(path, 0x1ff, w(function (err) { if (err) { + w.abort(); return void cb(err); } })); }).nThen(function () { // write the file to the path - Fs.writeFile(mkPath(env, id), str, function (e) { - if (e) { return void cb(e); } + var fullPath = Path.join(path, id + '.ndjson'); + + // the file ids are based on the hash of the file contents to be written + // as such, writing an exact task a second time will overwrite the first with the same contents + // this shouldn't be a problem + + Fs.writeFile(fullPath, str, function (e) { + if (e) { + env.log.error("TASK_WRITE_FAILURE", { + error: e, + path: fullPath, + }); + return void cb(e); + } + env.log.info("SUCCESSFUL_WRITE", { + path: fullPath, + }); cb(); }); }); }; -var list = Tasks.list = function (env, cb) { +var remove = function (env, path, cb) { + Fs.unlink(path, cb); +}; + +var removeDirectory = function (env, path, cb) { + Fs.rmdir(path, cb); +}; + +var list = Tasks.list = function (env, cb, migration) { var rootDirs; nThen(function (w) { @@ -104,9 +116,33 @@ var list = Tasks.list = function (env, cb) { var allPaths = []; + var currentWindow = moduloTime(+new Date() + MODULUS); + // We prioritize a small footprint over speed, so we // iterate over directories in serial rather than parallel rootDirs.forEach(function (dir) { + // if a directory is two characters, it's the old format + // otherwise, it indicates when the file is set to expire + // so we can ignore directories which are clearly in the future + + var dirTime; + if (migration) { + // this block handles migrations. ignore new formats + if (dir.length !== 2) { + return; + } + } else { + // not in migration mode, check if it's a new format + if (dir.length >= 2) { + // might be the new format. + // check its time to see if it should be skipped + dirTime = parseInt(dir); + if (!isNaN(dirTime) && dirTime >= currentWindow) { + return; + } + } + } + queue.nThen(function (w) { var subPath = Path.join(env.root, dir); Fs.readdir(subPath, w(function (e, paths) { @@ -117,6 +153,18 @@ var list = Tasks.list = function (env, cb) { }); return; } + + if (paths.length === 0) { + removeDirectory(env, subPath, function (err) { + if (err) { + env.log.error('TASKS_REMOVE_EMPTY_DIRECTORY', { + error: err, + path: subPath, + }); + } + }); + } + // concat in place Array.prototype.push.apply(allPaths, paths.map(function (p) { return Path.join(subPath, p); @@ -131,10 +179,6 @@ var list = Tasks.list = function (env, cb) { }); }; -var remove = function (env, path, cb) { - Fs.unlink(path, cb); -}; - var read = function (env, filePath, cb) { Fs.readFile(filePath, 'utf8', function (e, str) { if (e) { return void cb(e); } @@ -223,7 +267,7 @@ var runAll = function (env, cb) { var done = w(); var nt = nThen(function () {}); paths.forEach(function (path) { - nt.nThen(function (w) { + nt = nt.nThen(function (w) { run(env, path, w(function (err) { if (err) { // Any errors are already logged in 'run' @@ -232,7 +276,7 @@ var runAll = function (env, cb) { })); }); }); - nt.nThen(function () { + nt = nt.nThen(function () { done(); }); }).nThen(function (/*w*/) { @@ -241,6 +285,60 @@ var runAll = function (env, cb) { }); }; +var migrate function (env, cb) { + // list every task + list(env, function (err, paths) { + if (err) { + return void cb(err); + } + var nt = nThen(function () {}); + paths.forEach(function (path) { + var bypass; + var task; + + nt = nt.nThen(function (w) { + // read + read(env, path, w(function (err, _task) { + if (err) { + bypass = true; + env.log.error("TASK_MIGRATION_READ", { + error: err, + path: path, + }); + return; + } + })); + }).nThen(function (w) { + if (bypass) { return; } + // rewrite in new format + write(env, task, w(function (err) { + if (err) { + bypass = true; + env.log.error("TASK_MIGRATION_WRITE", { + error: err, + task: task, + }); + } + })); + }).nThen(function (w) { + if (bypass) { return; } + // remove + remove(env, path, function (err) { + if (err) { + env.log.error("TASK_MIGRATION_REMOVE", { + error: err, + path: path, + }); + } + }); + }); + }); + nt = nt.nThen(function (w) { + cb(); + }); + }, true); +}; + Tasks.create = function (config, cb) { if (!config.store) { throw new Error("E_STORE_REQUIRED"); } if (!config.log) { throw new Error("E_LOG_REQUIRED"); } @@ -271,6 +369,9 @@ Tasks.create = function (config, cb) { runAll: function (cb) { runAll(env, cb); }, + migrate: function (cb) { + migrate(env, cb); + }, }); }); };