additional work on the storage API

* optimize layout on the filesystem
* drop some unused functions
* don't recurse over folders that you know won't be relevant
* remove empty folders
* add a migration API
* fix some minor bugs with asynchrony
* lint compliance
pull/1/head
ansuz 6 years ago
parent d64e1c450e
commit ec64de2c67

@ -18,6 +18,7 @@ var encode = function (time, command, args) {
return [time, command].concat(args); return [time, command].concat(args);
}; };
/*
var randomId = function () { var randomId = function () {
var bytes = Array.prototype.slice.call(nacl.randomBytes(16)); var bytes = Array.prototype.slice.call(nacl.randomBytes(16));
return bytes.map(function (b) { return bytes.map(function (b) {
@ -26,59 +27,70 @@ var randomId = function () {
}).join(''); }).join('');
}; };
var mkPath = function (env, id) { var mkPath = function (env, id) {
return Path.join(env.root, id.slice(0, 2), id) + '.ndjson'; return Path.join(env.root, id.slice(0, 2), id) + '.ndjson';
}; };
*/
var getFreeId = function (env, cb, tries) { // make a new folder every MODULUS ms
if (tries > 5) { return void cb('ETOOMANYTRIES'); } var MODULUS = 1000 * 60 * 60 * 24; // one day
var moduloTime = function (d) {
// generate a unique id return d - (d % MODULUS);
var id = randomId(); };
// derive a path from that id
var path = mkPath(env, id);
Fs.stat(path, function (err) { var makeDirectoryId = function (d) {
if (err && err.code === "ENOENT") { return '' + moduloTime(d);
cb(void 0, id);
} else {
getFreeId(env, cb);
}
});
}; };
var write = function (env, task, cb) { var write = function (env, task, cb) {
var str = JSON.stringify(task) + '\n'; var str = JSON.stringify(task) + '\n';
var id = nacl.util.encodeBase64(nacl.hash(nacl.util.decodeUTF8(str))).replace(/\//g, '-'); var id = nacl.util.encodeBase64(nacl.hash(nacl.util.decodeUTF8(str))).replace(/\//g, '-');
var path = mkPath(env, id); var dir = makeDirectoryId(task[0]);
var path = Path.join(env.root, dir);
nThen(function (w) { nThen(function (w) {
// check if the file already exists...
Fs.stat(path, w(function (err) {
if (err && err.code === 'ENOENT') { return; }
w.abort(); cb();
}));
}).nThen(function (w) {
// create the parent directory if it does not exist // create the parent directory if it does not exist
var dir = id.slice(0, 2); Fse.mkdirp(path, 0x1ff, w(function (err) {
var dirpath = Path.join(env.root, dir);
Fse.mkdirp(dirpath, 0x1ff, w(function (err) {
if (err) { if (err) {
w.abort();
return void cb(err); return void cb(err);
} }
})); }));
}).nThen(function () { }).nThen(function () {
// write the file to the path // write the file to the path
Fs.writeFile(mkPath(env, id), str, function (e) { var fullPath = Path.join(path, id + '.ndjson');
if (e) { return void cb(e); }
// the file ids are based on the hash of the file contents to be written
// as such, writing an exact task a second time will overwrite the first with the same contents
// this shouldn't be a problem
Fs.writeFile(fullPath, str, function (e) {
if (e) {
env.log.error("TASK_WRITE_FAILURE", {
error: e,
path: fullPath,
});
return void cb(e);
}
env.log.info("SUCCESSFUL_WRITE", {
path: fullPath,
});
cb(); cb();
}); });
}); });
}; };
var list = Tasks.list = function (env, cb) { var remove = function (env, path, cb) {
Fs.unlink(path, cb);
};
var removeDirectory = function (env, path, cb) {
Fs.rmdir(path, cb);
};
var list = Tasks.list = function (env, cb, migration) {
var rootDirs; var rootDirs;
nThen(function (w) { nThen(function (w) {
@ -104,9 +116,33 @@ var list = Tasks.list = function (env, cb) {
var allPaths = []; var allPaths = [];
var currentWindow = moduloTime(+new Date() + MODULUS);
// We prioritize a small footprint over speed, so we // We prioritize a small footprint over speed, so we
// iterate over directories in serial rather than parallel // iterate over directories in serial rather than parallel
rootDirs.forEach(function (dir) { rootDirs.forEach(function (dir) {
// if a directory is two characters, it's the old format
// otherwise, it indicates when the file is set to expire
// so we can ignore directories which are clearly in the future
var dirTime;
if (migration) {
// this block handles migrations. ignore new formats
if (dir.length !== 2) {
return;
}
} else {
// not in migration mode, check if it's a new format
if (dir.length >= 2) {
// might be the new format.
// check its time to see if it should be skipped
dirTime = parseInt(dir);
if (!isNaN(dirTime) && dirTime >= currentWindow) {
return;
}
}
}
queue.nThen(function (w) { queue.nThen(function (w) {
var subPath = Path.join(env.root, dir); var subPath = Path.join(env.root, dir);
Fs.readdir(subPath, w(function (e, paths) { Fs.readdir(subPath, w(function (e, paths) {
@ -117,6 +153,18 @@ var list = Tasks.list = function (env, cb) {
}); });
return; return;
} }
if (paths.length === 0) {
removeDirectory(env, subPath, function (err) {
if (err) {
env.log.error('TASKS_REMOVE_EMPTY_DIRECTORY', {
error: err,
path: subPath,
});
}
});
}
// concat in place // concat in place
Array.prototype.push.apply(allPaths, paths.map(function (p) { Array.prototype.push.apply(allPaths, paths.map(function (p) {
return Path.join(subPath, p); return Path.join(subPath, p);
@ -131,10 +179,6 @@ var list = Tasks.list = function (env, cb) {
}); });
}; };
var remove = function (env, path, cb) {
Fs.unlink(path, cb);
};
var read = function (env, filePath, cb) { var read = function (env, filePath, cb) {
Fs.readFile(filePath, 'utf8', function (e, str) { Fs.readFile(filePath, 'utf8', function (e, str) {
if (e) { return void cb(e); } if (e) { return void cb(e); }
@ -223,7 +267,7 @@ var runAll = function (env, cb) {
var done = w(); var done = w();
var nt = nThen(function () {}); var nt = nThen(function () {});
paths.forEach(function (path) { paths.forEach(function (path) {
nt.nThen(function (w) { nt = nt.nThen(function (w) {
run(env, path, w(function (err) { run(env, path, w(function (err) {
if (err) { if (err) {
// Any errors are already logged in 'run' // Any errors are already logged in 'run'
@ -232,7 +276,7 @@ var runAll = function (env, cb) {
})); }));
}); });
}); });
nt.nThen(function () { nt = nt.nThen(function () {
done(); done();
}); });
}).nThen(function (/*w*/) { }).nThen(function (/*w*/) {
@ -241,6 +285,60 @@ var runAll = function (env, cb) {
}); });
}; };
var migrate function (env, cb) {
// list every task
list(env, function (err, paths) {
if (err) {
return void cb(err);
}
var nt = nThen(function () {});
paths.forEach(function (path) {
var bypass;
var task;
nt = nt.nThen(function (w) {
// read
read(env, path, w(function (err, _task) {
if (err) {
bypass = true;
env.log.error("TASK_MIGRATION_READ", {
error: err,
path: path,
});
return;
}
}));
}).nThen(function (w) {
if (bypass) { return; }
// rewrite in new format
write(env, task, w(function (err) {
if (err) {
bypass = true;
env.log.error("TASK_MIGRATION_WRITE", {
error: err,
task: task,
});
}
}));
}).nThen(function (w) {
if (bypass) { return; }
// remove
remove(env, path, function (err) {
if (err) {
env.log.error("TASK_MIGRATION_REMOVE", {
error: err,
path: path,
});
}
});
});
});
nt = nt.nThen(function (w) {
cb();
});
}, true);
};
Tasks.create = function (config, cb) { Tasks.create = function (config, cb) {
if (!config.store) { throw new Error("E_STORE_REQUIRED"); } if (!config.store) { throw new Error("E_STORE_REQUIRED"); }
if (!config.log) { throw new Error("E_LOG_REQUIRED"); } if (!config.log) { throw new Error("E_LOG_REQUIRED"); }
@ -271,6 +369,9 @@ Tasks.create = function (config, cb) {
runAll: function (cb) { runAll: function (cb) {
runAll(env, cb); runAll(env, cb);
}, },
migrate: function (cb) {
migrate(env, cb);
},
}); });
}); });
}; };

Loading…
Cancel
Save