diff --git a/lib/schedule.js b/lib/schedule.js new file mode 100644 index 000000000..1e93a2126 --- /dev/null +++ b/lib/schedule.js @@ -0,0 +1,173 @@ +var WriteQueue = require("./write-queue"); +var Util = require("./common-util"); + +/* This module provides implements a FIFO scheduler + which assumes the existence of three types of async tasks: + + 1. ordered tasks which must be executed sequentially + 2. unordered tasks which can be executed in parallel + 3. blocking tasks which must block the execution of all other tasks + + The scheduler assumes there will be many resources identified by strings, + and that the constraints described above will only apply in the context + of identical string ids. + + Many blocking tasks may be executed in parallel so long as they + concern resources identified by different ids. + +USAGE: + + const schedule = require("./schedule")(); + + // schedule two sequential tasks using the resource 'pewpew' + schedule.ordered('pewpew', function (next) { + appendToFile('beep\n', next); + }); + schedule.ordered('pewpew', function (next) { + appendToFile('boop\n', next); + }); + + // schedule a task that can happen whenever + schedule.unordered('pewpew', function (next) { + displayFileSize(next); + }); + + // schedule a blocking task which will wait + // until the all unordered tasks have completed before commencing + schedule.blocking('pewpew', function (next) { + deleteFile(next); + }); + + // this will be queued for after the blocking task + schedule.ordered('pewpew', function (next) { + appendFile('boom', next); + }); + +*/ + +// return a uid which is not already in a map +var unusedUid = function (set) { + var uid = Util.uid(); + if (set[uid]) { return unusedUid(); } + return uid; +}; + +// return an existing session, creating one if it does not already exist +var lookup = function (map, id) { + return (map[id] = map[id] || { + //blocking: [], + active: {}, + blocked: {}, + }); +}; + +var isEmpty = function (map) { + for (var key in map) { + if (map.hasOwnProperty(key)) { return false; } + } + return true; +}; + +// XXX enforce asynchrony everywhere +module.exports = function () { + // every scheduler instance has its own queue + var queue = WriteQueue(); + + // ordered tasks don't require any extra logic + var Ordered = function (id, task) { + queue(id, task); + }; + + // unordered and blocking tasks need a little extra state + var map = {}; + + // regular garbage collection keeps memory consumption low + var collectGarbage = function (id) { + // avoid using 'lookup' since it creates a session implicitly + var local = map[id]; + // bail out if no session + if (!local) { return; } + // bail out if there are blocking or active tasks + if (local.lock) { return; } + if (!isEmpty(local.active)) { return; } + // if there are no pending actions then delete the session + delete map[id]; + }; + + // unordered tasks run immediately if there are no blocking tasks scheduled + // or immediately after blocking tasks finish + var runImmediately = function (local, task) { + // set a flag in the map of active unordered tasks + // to prevent blocking tasks from running until you finish + var uid = unusedUid(local.active); + local.active[uid] = true; + + task(function () { + // remove the flag you set to indicate that your task completed + delete local.active[uid]; + // don't do anything if other unordered tasks are still running + if (!isEmpty(local.active)) { return; } + // bail out if there are no blocking tasks scheduled or ready + if (typeof(local.waiting) !== 'function') { + return void collectGarbage(); + } + local.waiting(); + }); + }; + + var runOnceUnblocked = function (local, task) { + var uid = unusedUid(local.blocked); + local.blocked[uid] = function () { + runImmediately(local, task); + }; + }; + + // 'unordered' tasks are scheduled to run in after the most recently received blocking task + // or immediately and in parallel if there are no blocking tasks scheduled. + var Unordered = function (id, task) { + var local = lookup(map, id); + if (local.lock) { return runOnceUnblocked(local, task); } + runImmediately(local, task); + }; + + var runBlocked = function (local) { + for (var task in local.blocked) { + runImmediately(local, local.blocked[task]); + } + }; + + // 'blocking' tasks must be run alone. + // They are queued alongside ordered tasks, + // and wait until any running 'unordered' tasks complete before commencing. + var Blocking = function (id, task) { + var local = lookup(map, id); + + queue(id, function (next) { + // start right away if there are no running unordered tasks + if (isEmpty(local.active)) { + local.lock = true; + return void task(function () { + delete local.lock; + runBlocked(local); + next(); + }); + } + // otherwise wait until the running tasks have completed + local.waiting = function () { + local.lock = true; + task(function () { + delete local.lock; + delete local.waiting; + runBlocked(local); + next(); + }); + }; + }); + }; + + return { + ordered: Ordered, + unordered: Unordered, + blocking: Blocking, + }; +}; diff --git a/scripts/tests/test-scheduler.js b/scripts/tests/test-scheduler.js new file mode 100644 index 000000000..6a076d5aa --- /dev/null +++ b/scripts/tests/test-scheduler.js @@ -0,0 +1,220 @@ +/* three types of actions: + * read + * write + * append + each of which take a random amount of time + +*/ +var Util = require("../../lib/common-util"); +var schedule = require("../../lib/schedule")(); +var nThen = require("nthen"); + +var rand = function (n) { + return Math.floor(Math.random() * n); +}; + +var rand_time = function () { + // between 51 and 151 + return rand(300) + 25; +}; + +var makeAction = function (type) { + var i = 0; + return function (time) { + var j = i++; + return function (next) { + console.log(" Beginning action: %s#%s", type, j); + setTimeout(function () { + console.log(" Completed action: %s#%s", type, j); + next(); + }, time); + return j; + }; + }; +}; + +var TYPES = ['WRITE', 'READ', 'APPEND']; +var chooseAction = function () { + var n = rand(100); + + if (n < 50) { return 'APPEND'; } + if (n < 90) { return 'READ'; } + return 'WRITE'; + + //return TYPES[rand(3)]; +}; + +var test = function (script, cb) { + var uid = Util.uid(); + + var TO_RUN = script.length; + var total_run = 0; + + var parallel = 0; + var last_run_ordered = -1; + //var i = 0; + + var ACTIONS = {}; + TYPES.forEach(function (type) { + ACTIONS[type] = makeAction(type); + }); + + nThen(function (w) { + setTimeout(w(), 3000); + // run scripted actions with assertions + script.forEach(function (scene) { + var type = scene[0]; + var time = typeof(scene[1]) === 'number'? scene[1]: rand_time(); + + var action = ACTIONS[type](time); + console.log("Queuing action of type: %s(%s)", type, time); + + var proceed = w(); + + switch (type) { + case 'APPEND': + return schedule.ordered(uid, w(function (next) { + parallel++; + var temp = action(function () { + parallel--; + total_run++; + proceed(); + next(); + }); + if (temp !== (last_run_ordered + 1)) { + throw new Error("out of order"); + } + last_run_ordered = temp; + })); + case 'WRITE': + return schedule.blocking(uid, w(function (next) { + parallel++; + action(function () { + parallel--; + total_run++; + proceed(); + next(); + }); + if (parallel > 1) { + console.log("parallelism === %s", parallel); + throw new Error("too much parallel"); + } + })); + case 'READ': + return schedule.unordered(uid, w(function (next) { + parallel++; + action(function () { + parallel--; + total_run++; + proceed(); + next(); + }); + })); + default: + throw new Error("wut"); + } + }); + }).nThen(function () { + // make assertions about the whole script + if (total_run !== TO_RUN) { + console.log("Ran %s / %s", total_run, TO_RUN); + throw new Error("skipped tasks"); + } + console.log("total_run === %s", total_run); + + cb(); + }); +}; + + +var randomScript = function () { + var len = rand(15) + 10; + var script = []; + while (len--) { + script.push([ + chooseAction(), + rand_time(), + ]); + } + return script; +}; + +var WRITE = function (t) { + return ['WRITE', t]; +}; +var READ = function (t) { + return ['READ', t]; +}; + +var APPEND = function (t) { + return ['APPEND', t]; +}; + +nThen(function (w) { + test([ + ['READ', 150], + ['APPEND', 200], + ['APPEND', 100], + ['READ', 350], + ['WRITE', 400], + ['APPEND', 275], + ['APPEND', 187], + ['WRITE', 330], + ['WRITE', 264], + ['WRITE', 256], + ], w(function () { + console.log("finished pre-scripted test\n"); + })); +}).nThen(function (w) { + test([ + WRITE(289), + APPEND(281), + READ(207), + WRITE(225), + READ(279), + WRITE(300), + READ(331), + APPEND(341), + APPEND(385), + READ(313), + WRITE(285), + READ(304), + APPEND(273), + APPEND(150), + WRITE(246), + READ(244), + WRITE(172), + APPEND(253), + READ(215), + READ(296), + APPEND(281), + APPEND(296), + WRITE(168), + ], w(function () { + console.log("finished 2nd pre-scripted test\n"); + })); +}).nThen(function () { + var totalTests = 50; + var randomTests = 1; + + var last = nThen(function () { + console.log("beginning randomized tests"); + }); + + var queueRandomTest = function (i) { + last = last.nThen(function (w) { + console.log("running random test script #%s\n", i); + test(randomScript(), w(function () { + console.log("finished random test #%s\n", i); + })); + }); + }; + + while (randomTests <=totalTests) { queueRandomTest(randomTests++); } + + last.nThen(function () { + console.log("finished %s random tests", totalTests); + }); +}); + +