implement and test a complex constrained scheduler

pull/1/head
ansuz 5 years ago
parent 47fdf9de9f
commit 7072fe4fa4

@ -0,0 +1,173 @@
var WriteQueue = require("./write-queue");
var Util = require("./common-util");
/* This module provides implements a FIFO scheduler
which assumes the existence of three types of async tasks:
1. ordered tasks which must be executed sequentially
2. unordered tasks which can be executed in parallel
3. blocking tasks which must block the execution of all other tasks
The scheduler assumes there will be many resources identified by strings,
and that the constraints described above will only apply in the context
of identical string ids.
Many blocking tasks may be executed in parallel so long as they
concern resources identified by different ids.
USAGE:
const schedule = require("./schedule")();
// schedule two sequential tasks using the resource 'pewpew'
schedule.ordered('pewpew', function (next) {
appendToFile('beep\n', next);
});
schedule.ordered('pewpew', function (next) {
appendToFile('boop\n', next);
});
// schedule a task that can happen whenever
schedule.unordered('pewpew', function (next) {
displayFileSize(next);
});
// schedule a blocking task which will wait
// until the all unordered tasks have completed before commencing
schedule.blocking('pewpew', function (next) {
deleteFile(next);
});
// this will be queued for after the blocking task
schedule.ordered('pewpew', function (next) {
appendFile('boom', next);
});
*/
// return a uid which is not already in a map
var unusedUid = function (set) {
var uid = Util.uid();
if (set[uid]) { return unusedUid(); }
return uid;
};
// return an existing session, creating one if it does not already exist
var lookup = function (map, id) {
return (map[id] = map[id] || {
//blocking: [],
active: {},
blocked: {},
});
};
var isEmpty = function (map) {
for (var key in map) {
if (map.hasOwnProperty(key)) { return false; }
}
return true;
};
// XXX enforce asynchrony everywhere
module.exports = function () {
// every scheduler instance has its own queue
var queue = WriteQueue();
// ordered tasks don't require any extra logic
var Ordered = function (id, task) {
queue(id, task);
};
// unordered and blocking tasks need a little extra state
var map = {};
// regular garbage collection keeps memory consumption low
var collectGarbage = function (id) {
// avoid using 'lookup' since it creates a session implicitly
var local = map[id];
// bail out if no session
if (!local) { return; }
// bail out if there are blocking or active tasks
if (local.lock) { return; }
if (!isEmpty(local.active)) { return; }
// if there are no pending actions then delete the session
delete map[id];
};
// unordered tasks run immediately if there are no blocking tasks scheduled
// or immediately after blocking tasks finish
var runImmediately = function (local, task) {
// set a flag in the map of active unordered tasks
// to prevent blocking tasks from running until you finish
var uid = unusedUid(local.active);
local.active[uid] = true;
task(function () {
// remove the flag you set to indicate that your task completed
delete local.active[uid];
// don't do anything if other unordered tasks are still running
if (!isEmpty(local.active)) { return; }
// bail out if there are no blocking tasks scheduled or ready
if (typeof(local.waiting) !== 'function') {
return void collectGarbage();
}
local.waiting();
});
};
var runOnceUnblocked = function (local, task) {
var uid = unusedUid(local.blocked);
local.blocked[uid] = function () {
runImmediately(local, task);
};
};
// 'unordered' tasks are scheduled to run in after the most recently received blocking task
// or immediately and in parallel if there are no blocking tasks scheduled.
var Unordered = function (id, task) {
var local = lookup(map, id);
if (local.lock) { return runOnceUnblocked(local, task); }
runImmediately(local, task);
};
var runBlocked = function (local) {
for (var task in local.blocked) {
runImmediately(local, local.blocked[task]);
}
};
// 'blocking' tasks must be run alone.
// They are queued alongside ordered tasks,
// and wait until any running 'unordered' tasks complete before commencing.
var Blocking = function (id, task) {
var local = lookup(map, id);
queue(id, function (next) {
// start right away if there are no running unordered tasks
if (isEmpty(local.active)) {
local.lock = true;
return void task(function () {
delete local.lock;
runBlocked(local);
next();
});
}
// otherwise wait until the running tasks have completed
local.waiting = function () {
local.lock = true;
task(function () {
delete local.lock;
delete local.waiting;
runBlocked(local);
next();
});
};
});
};
return {
ordered: Ordered,
unordered: Unordered,
blocking: Blocking,
};
};

@ -0,0 +1,220 @@
/* three types of actions:
* read
* write
* append
each of which take a random amount of time
*/
var Util = require("../../lib/common-util");
var schedule = require("../../lib/schedule")();
var nThen = require("nthen");
var rand = function (n) {
return Math.floor(Math.random() * n);
};
var rand_time = function () {
// between 51 and 151
return rand(300) + 25;
};
var makeAction = function (type) {
var i = 0;
return function (time) {
var j = i++;
return function (next) {
console.log(" Beginning action: %s#%s", type, j);
setTimeout(function () {
console.log(" Completed action: %s#%s", type, j);
next();
}, time);
return j;
};
};
};
var TYPES = ['WRITE', 'READ', 'APPEND'];
var chooseAction = function () {
var n = rand(100);
if (n < 50) { return 'APPEND'; }
if (n < 90) { return 'READ'; }
return 'WRITE';
//return TYPES[rand(3)];
};
var test = function (script, cb) {
var uid = Util.uid();
var TO_RUN = script.length;
var total_run = 0;
var parallel = 0;
var last_run_ordered = -1;
//var i = 0;
var ACTIONS = {};
TYPES.forEach(function (type) {
ACTIONS[type] = makeAction(type);
});
nThen(function (w) {
setTimeout(w(), 3000);
// run scripted actions with assertions
script.forEach(function (scene) {
var type = scene[0];
var time = typeof(scene[1]) === 'number'? scene[1]: rand_time();
var action = ACTIONS[type](time);
console.log("Queuing action of type: %s(%s)", type, time);
var proceed = w();
switch (type) {
case 'APPEND':
return schedule.ordered(uid, w(function (next) {
parallel++;
var temp = action(function () {
parallel--;
total_run++;
proceed();
next();
});
if (temp !== (last_run_ordered + 1)) {
throw new Error("out of order");
}
last_run_ordered = temp;
}));
case 'WRITE':
return schedule.blocking(uid, w(function (next) {
parallel++;
action(function () {
parallel--;
total_run++;
proceed();
next();
});
if (parallel > 1) {
console.log("parallelism === %s", parallel);
throw new Error("too much parallel");
}
}));
case 'READ':
return schedule.unordered(uid, w(function (next) {
parallel++;
action(function () {
parallel--;
total_run++;
proceed();
next();
});
}));
default:
throw new Error("wut");
}
});
}).nThen(function () {
// make assertions about the whole script
if (total_run !== TO_RUN) {
console.log("Ran %s / %s", total_run, TO_RUN);
throw new Error("skipped tasks");
}
console.log("total_run === %s", total_run);
cb();
});
};
var randomScript = function () {
var len = rand(15) + 10;
var script = [];
while (len--) {
script.push([
chooseAction(),
rand_time(),
]);
}
return script;
};
var WRITE = function (t) {
return ['WRITE', t];
};
var READ = function (t) {
return ['READ', t];
};
var APPEND = function (t) {
return ['APPEND', t];
};
nThen(function (w) {
test([
['READ', 150],
['APPEND', 200],
['APPEND', 100],
['READ', 350],
['WRITE', 400],
['APPEND', 275],
['APPEND', 187],
['WRITE', 330],
['WRITE', 264],
['WRITE', 256],
], w(function () {
console.log("finished pre-scripted test\n");
}));
}).nThen(function (w) {
test([
WRITE(289),
APPEND(281),
READ(207),
WRITE(225),
READ(279),
WRITE(300),
READ(331),
APPEND(341),
APPEND(385),
READ(313),
WRITE(285),
READ(304),
APPEND(273),
APPEND(150),
WRITE(246),
READ(244),
WRITE(172),
APPEND(253),
READ(215),
READ(296),
APPEND(281),
APPEND(296),
WRITE(168),
], w(function () {
console.log("finished 2nd pre-scripted test\n");
}));
}).nThen(function () {
var totalTests = 50;
var randomTests = 1;
var last = nThen(function () {
console.log("beginning randomized tests");
});
var queueRandomTest = function (i) {
last = last.nThen(function (w) {
console.log("running random test script #%s\n", i);
test(randomScript(), w(function () {
console.log("finished random test #%s\n", i);
}));
});
};
while (randomTests <=totalTests) { queueRandomTest(randomTests++); }
last.nThen(function () {
console.log("finished %s random tests", totalTests);
});
});
Loading…
Cancel
Save