From b4b51ed86af6ee900b1f50830f54734be9af5f6b Mon Sep 17 00:00:00 2001 From: ansuz Date: Fri, 26 Jan 2018 15:24:07 +0100 Subject: [PATCH] add a subsystem for scheduling tasks --- .gitignore | 1 + server.js | 56 ++++++++++++++++------------- storage/tasks.js | 94 ++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 126 insertions(+), 25 deletions(-) create mode 100644 storage/tasks.js diff --git a/.gitignore b/.gitignore index fc1136152..741aedaf7 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,5 @@ datastore +tasks www/bower_components/* node_modules /config.js diff --git a/server.js b/server.js index a51c94343..5a2cbfbb8 100644 --- a/server.js +++ b/server.js @@ -9,6 +9,7 @@ var WebSocketServer = require('ws').Server; var NetfluxSrv = require('./node_modules/chainpad-server/NetfluxWebsocketSrv'); var Package = require('./package.json'); var Path = require("path"); +var nThen = require("nthen"); var config; try { @@ -198,32 +199,37 @@ if (config.httpSafePort) { var wsConfig = { server: httpServer }; -var createSocketServer = function (err, rpc) { - if(!config.useExternalWebsocket) { - if (websocketPort !== config.httpPort) { - console.log("setting up a new websocket server"); - wsConfig = { port: websocketPort}; - } - var wsSrv = new WebSocketServer(wsConfig); - Storage.create(config, function (store) { - NetfluxSrv.run(store, wsSrv, config, rpc); - }); - } -}; +var rpc; -var loadRPC = function (cb) { - config.rpc = typeof(config.rpc) === 'undefined'? './rpc.js' : config.rpc; +var nt = nThen(function (w) { + if (!config.enableTaskScheduling) { return; } + var Tasks = require("./storage/tasks"); - if (typeof(config.rpc) === 'string') { - // load pin store... - var Rpc = require(config.rpc); - Rpc.create(config, function (e, rpc) { - if (e) { throw e; } - cb(void 0, rpc); - }); - } else { - cb(); + console.log("loading task scheduler"); + Tasks.create(config, w(function (e, tasks) { + config.tasks = tasks; + })); +}).nThen(function (w) { + config.rpc = typeof(config.rpc) === 'undefined'? './rpc.js' : config.rpc; + if (typeof(config.rpc) !== 'string') { return; } + // load pin store... + var Rpc = require(config.rpc); + Rpc.create(config, w(function (e, _rpc) { + if (e) { + w.abort(); + throw e; + } + rpc = _rpc; + })); +}).nThen(function () { + if(config.useExternalWebsocket) { return; } + if (websocketPort !== config.httpPort) { + console.log("setting up a new websocket server"); + wsConfig = { port: websocketPort}; } -}; + var wsSrv = new WebSocketServer(wsConfig); + Storage.create(config, function (store) { + NetfluxSrv.run(store, wsSrv, config, rpc); + }); +}); -loadRPC(createSocketServer); diff --git a/storage/tasks.js b/storage/tasks.js new file mode 100644 index 000000000..b25a3a637 --- /dev/null +++ b/storage/tasks.js @@ -0,0 +1,94 @@ +var Fs = require("fs"); +var Path = require("path"); +var nacl = require("tweetnacl"); +var nThen = require("nthen"); + +var Tasks = module.exports; + +var encode = function (time, command, args) { + if (typeof(time) !== 'number') { return null; } + if (typeof(command) !== 'string') { return null; } + if (!Array.isArray(args)) { return [time, command]; } + return [time, command].concat(args); +}; + +var randomId = function () { + var bytes = Array.prototype.slice.call(nacl.randomBytes(16)); + return bytes.map(function (b) { + var n = Number(b & 0xff).toString(16); + return n.length === 1? '0' + n: n; + }).join(''); +}; + +var mkPath = function (env, id) { + return Path.join(env.root, id.slice(0, 2), id) + '.ndjson'; +}; + +var getFreeId = function (env, cb, tries) { + if (tries > 5) { return void cb('ETOOMANYTRIES'); } + + // generate a unique id + var id = randomId(); + + // derive a path from that id + var path = mkPath(env, id); + + Fs.stat(path, function (err) { + if (err && err.code === "ENOENT") { + cb(void 0, id); + } else { + getFreeId(env, cb); + } + }); +}; + +var write = function (env, task, cb) { + var str = JSON.stringify(task) + '\n'; + var id = nacl.util.encodeBase64(nacl.hash(nacl.util.decodeUTF8(str))).replace(/\//g, '-'); + + var path = mkPath(env, id); + nThen(function (w) { + // check if the file already exists... + Fs.stat(path, w(function (err) { + if (err && err.code === 'ENOENT') { return; } + w.abort(); cb(); + })); + }).nThen(function (w) { + // create the parent directory if it does not exist + var dir = id.slice(0, 2); + var dirpath = Path.join(env.root, dir); + + Fs.mkdir(dirpath, 0x1ff, w(function (err) { + if (err && err.code !== 'EEXIST') { + return void cb(err); + } + })); + }).nThen(function (w) { + // write the file to the path + Fs.writeFile(mkPath(env, id), str, function (e) { + if (e) { return void cb(e); } + cb(); + }); + }); +}; + +Tasks.create = function (config, cb) { + var env = { + root: config.taskPath || './tasks', + }; + + // make sure the path exists... + Fs.mkdir(env.root, 0x1ff, function (err) { + if (err && err.code !== 'EEXIST') { + throw err; + } + cb(void 0, { + write: function (time, command, args, cb) { + var task = encode(time, command, args); + write(env, task, cb); + }, + }); + }); +}; + +