diff --git a/lib/commands/pin-rpc.js b/lib/commands/pin-rpc.js index e490f713d..3fc339d00 100644 --- a/lib/commands/pin-rpc.js +++ b/lib/commands/pin-rpc.js @@ -9,7 +9,6 @@ const Nacl = require("tweetnacl/nacl-fast"); const Util = require("../common-util"); const nThen = require("nthen"); const Saferphore = require("saferphore"); -const Pinned = require('../../scripts/pinned'); //const escapeKeyCharacters = Util.escapeKeyCharacters; const unescapeKeyCharacters = Util.unescapeKeyCharacters; @@ -432,7 +431,7 @@ Pinning.getDeletedPads = function (Env, channels, cb) { // inform that the Pinning.loadChannelPins = function (Env) { - Pinned.load(function (err, data) { + Pins.list(function (err, data) { if (err) { Env.Log.error("LOAD_CHANNEL_PINS", err); diff --git a/lib/pins.js b/lib/pins.js index 23b1364a3..cb3d3f0c7 100644 --- a/lib/pins.js +++ b/lib/pins.js @@ -2,6 +2,11 @@ var Pins = module.exports; +const Fs = require("fs"); +const Path = require("path"); +const Util = require("./common-util"); +const Plan = require("./plan"); + /* Accepts a reference to an object, and... either a string describing which log is being processed (backwards compatibility), or a function which will log the error with all relevant data @@ -22,7 +27,11 @@ var createLineHandler = Pins.createLineHandler = function (ref, errorHandler) { // make sure to get ref.pins as the result // it's a weird API but it's faster than unpinning manually var pins = ref.pins = {}; + ref.index = 0; + ref.latest = 0; // the latest message (timestamp in ms) + ref.surplus = 0; // how many lines exist behind a reset return function (line) { + ref.index++; if (!Boolean(line)) { return; } var l; @@ -36,10 +45,15 @@ var createLineHandler = Pins.createLineHandler = function (ref, errorHandler) { return void errorHandler('PIN_LINE_NOT_FORMAT_ERROR', l); } + if (typeof(l[2]) === 'number') { + ref.latest = l[2]; // date + } + switch (l[0]) { case 'RESET': { pins = ref.pins = {}; if (l[1] && l[1].length) { l[1].forEach((x) => { ref.pins[x] = 1; }); } + ref.surplus = ref.index; //jshint -W086 // fallthrough } @@ -72,5 +86,95 @@ Pins.calculateFromLog = function (pinFile, fileName) { return Object.keys(ref.pins); }; -// TODO refactor to include a streaming version for use in rpc.js as well +/* + pins/ + pins/A+/ + pins/A+/A+hyhrQLrgYixOomZYxpuEhwfiVzKk1bBp+arH-zbgo=.ndjson +*/ + +const getSafeKeyFromPath = function (path) { + return path.replace(/^.*\//, '').replace(/\.ndjson/, ''); +} + +Pins.list = function (done, config) { + const pinPath = config.pinPath || './data/pins'; + const plan = Plan(config.workers || 5); + const handler = config.handler || function () {}; + + // TODO externalize this via optional handlers? + const stats = { + logs: 0, + dirs: 0, + pinned: 0, + lines: 0, + }; + + const errorHandler = function (label, info) { + console.log(label, info); + }; + + const pinned = {}; + + // TODO replace this with lib-readline? + const streamFile = function (path, cb) { + const id = getSafeKeyFromPath(path); + + return void Fs.readFile(path, 'utf8', function (err, body) { + if (err) { return void cb(err); } + const ref = {}; + const pinHandler = createLineHandler(ref, errorHandler); + var lines = body.split('\n'); + stats.lines += lines.length; + lines.forEach(pinHandler); + handler(ref, id, pinned); + cb(void 0, ref); + }); + }; + const scanDirectory = function (path, cb) { + Fs.readdir(path, function (err, list) { + if (err) { + return void cb(err); + } + cb(void 0, list.map(function (item) { + return Path.join(path, item); + })); + }); + }; + + scanDirectory(pinPath, function (err, paths) { + if (err) { return; } // XXX + paths.forEach(function (path) { + plan.job(1, function (next) { + scanDirectory(path, function (nested_err, nested_paths) { + if (nested_err) { return; } // XXX + stats.dirs++; + nested_paths.forEach(function (nested_path) { + if (!/\.ndjson$/.test(nested_path)) { return; } + plan.job(0, function (next) { + streamFile(nested_path, function (err, ref) { + if (err) { return; } // XXX + stats.logs++; + + var set = ref.pins; + for (var item in set) { + if (!pinned.hasOwnProperty(item)) { + pinned[item] = true; + stats.pinned++; + } + } + next(); + }); + }); + }); + next(); + }); + }); + }); + + plan.done(function () { + // err ? + done(void 0, pinned); + }).start(); + }); +}; diff --git a/lib/plan.js b/lib/plan.js new file mode 100644 index 000000000..a7dbb4ec8 --- /dev/null +++ b/lib/plan.js @@ -0,0 +1,235 @@ +/* + +There are many situations where we want to do lots of little jobs +in parallel and with few constraints as to their ordering. + +One example is recursing over a bunch of directories and reading files. +The naive way to do this is to recurse over all the subdirectories +relative to a root while adding files to a list. Then to iterate over +the files in that list. Unfortunately, this means holding the complete +list of file paths in memory, which can't possible scale as our database grows. + +A better way to do this is to recurse into one directory and +iterate over its contents until there are no more, then to backtrack +to the next directory and repeat until no more directories exist. +This kind of thing is easy enough when you perform one task at a time +and use synchronous code, but with multiple asynchronous tasks it's +easy to introduce subtle bugs. + +This module is designed for these situations. It allows you to easily +and efficiently schedule a large number of tasks with an associated +degree of priority from 0 (highest priority) to Number.MAX_SAFE_INTEGER. + +Initialize your scheduler with a degree of parallelism, and start planning +some initial jobs. Set it to run and it will keep going until all jobs are +complete, at which point it will optionally execute a 'done' callback. + +Getting back to the original example: + +List the contents of the root directory, then plan subsequent jobs +with a priority of 1 to recurse into subdirectories. The callback +of each of these recursions can then plan higher priority tasks +to actually process the contained files with a priority of 0. + +As long as there are more files scheduled it will continue to process +them first. When there are no more files the scheduler will read +the next directory and repopulate the list of files to process. +This will repeat until everything is done. + +// load the module +const Plan = require("./plan"); + +// instantiate a scheduler with a parallelism of 5 +var plan = Plan(5) + +// plan the first job which schedules more jobs... +.job(1, function (next) { + listRootDirectory(function (files) { + files.forEach(function (file) { + // highest priority, run as soon as there is a free worker + plan.job(0, function (next) { + processFile(file, function (result) { + console.log(result); + // don't forget to call next + next(); + }); + }); + }); + next(); // call 'next' to free up one worker + }); +}) +// chain commands together if you want +.done(function () { + console.log("DONE"); +}) +// it won't run unless you launch it +.start(); + +*/ + +module.exports = function (max) { + var plan = {}; + max = max || 5; + + // finds an id that isn't in use in a particular map + // accepts an id in case you have one already chosen + // otherwise generates random new ids if one is not passed + // or if there is a collision + var uid = function (map, id) { + if (typeof(id) === 'undefined') { + id = Math.floor(Math.random() * Number.MAX_SAFE_INTEGER); + } + if (id && typeof(map[id]) === 'undefined') { + return id; + } + return uid(map); + }; + + // the queue of jobs is an array, which will be populated + // with maps for each level of priority + var jobs = []; + + // the count of currently running jobs + var count = 0; + + // a list of callbacks to be executed once everything is done + var completeHandlers = []; + + // the recommended usage is to create a new scheduler for every job + // use it for internals in a scope, and let the garbage collector + // clean up when everything stops. This means you shouldn't + // go passing 'plan' around in a long-lived process! + var FINISHED = false; + var done = function () { + // 'done' gets called when there are no more jobs in the queue + // but other jobs might still be running... + + // the count of running processes should never be less than zero + // because we guard against multiple callbacks + if (count < 0) { throw new Error("should never happen"); } + // greater than zero is definitely possible, it just means you aren't done yet + if (count !== 0) { return; } + // you will finish twice if you call 'start' a second time + // this behaviour isn't supported yet. + if (FINISHED) { throw new Error('finished twice'); } + FINISHED = true; + // execute all your 'done' callbacks + completeHandlers.forEach(function (f) { f(); }); + }; + + var run; + + // this 'next' is internal only. + // it iterates over all known jobs, running them until + // the scheduler achieves the desired amount of parallelism. + // If there are no more jobs it will call 'done' + // which will shortcircuit if there are still pending tasks. + // Whenever any tasks finishes it will return its lock and + // run as many new jobs as are allowed. + var next = function () { + // array.some skips over bare indexes in sparse arrays + var pending = jobs.some(function (bag /*, priority*/) { + if (!bag || typeof(bag) !== 'object') { return; } + // a bag is a map of jobs for any particular degree of priority + // iterate over jobs in the bag until you're out of 'workers' + for (var id in bag) { + // bail out if you hit max parallelism + if (count >= max) { return true; } + run(bag, id, next); + } + }); + // check whether you're done if you hit the end of the array + if (!pending) { done(); } + }; + + // and here's the part that actually handles jobs... + run = function (bag, id) { + // this is just a sanity check. + // there should only ever be jobs in each bag. + if (typeof(bag[id]) !== 'function') { + throw new Error("expected function"); + } + + // keep a local reference to the function + var f = bag[id]; + // remove it from the bag. + delete bag[id]; + // increment the count of running jobs + count++; + + // guard against it being called twice. + var called = false; + f(function () { + // watch out! it'll bite you. + // maybe this should just return? + // support that option for 'production' ? + if (called) { throw new Error("called twice"); } + // the code below is safe because we can't call back a second time + called = true; + + // decrement the count of running jobs... + count--; + + // and finally call next to replace this worker with more job(s) + next(); + }); + }; + + // this is exposed as API + plan.job = function (priority, cb) { + // you have to pass both the priority (a non-negative number) and an actual job + if (typeof(priority) !== 'number' || priority < 0) { throw new Error('expected a non-negative number'); } + // a job is an asynchronous function that takes a single parameter: + // a 'next' callback which will keep the whole thing going. + // forgetting to call 'next' means you'll never complete. + if (typeof(cb) !== 'function') { throw new Error('expected function'); } + + // initialize the specified priority level if it doesn't already exist + var bag = jobs[priority] = jobs[priority] || {}; + // choose a random id that isn't already in use for this priority level + var id = uid(bag); + + // add the job to this priority level's bag + // most (all?) javascript engines will append this job to the bottom + // of the map. Meaning when we iterate it will be run later than + // other jobs that were scheduled first, effectively making a FIFO queue. + // However, this is undefined behaviour and you shouldn't ever rely on it. + bag[id] = function (next) { + cb(next); + }; + // returning 'plan' lets us chain methods together. + return plan; + }; + + var started = false; + plan.start = function () { + // don't allow multiple starts + // even though it should work, it's simpler not to. + if (started) { return plan; } + // this seems to imply a 'stop' method + // but I don't need it, so I'm not implementing it now --ansuz + started = true; + + // start asynchronously, otherwise jobs will start running + // before you've had a chance to return 'plan', and weird things + // happen. + setTimeout(function () { + next(); + }); + return plan; + }; + + // you can pass any number of functions to be executed + // when all pending jobs are complete. + // We don't pass any arguments, so you need to handle return values + // yourself if you want them. + plan.done = function (f) { + if (typeof(f) !== 'function') { throw new Error('expected function'); } + completeHandlers.push(f); + return plan; + }; + + // That's all! I hope you had fun reading this! + return plan; +}; + diff --git a/scripts/check-account-deletion.js b/scripts/check-account-deletion.js index 0532e69ed..91020bde9 100644 --- a/scripts/check-account-deletion.js +++ b/scripts/check-account-deletion.js @@ -1,7 +1,6 @@ /* jshint esversion: 6, node: true */ const Fs = require('fs'); const nThen = require('nthen'); -const Pinned = require('./pinned'); const Nacl = require('tweetnacl/nacl-fast'); const Path = require('path'); const Pins = require('../lib/pins'); @@ -41,7 +40,7 @@ nThen((waitFor) => { pinned = Pins.calculateFromLog(content.toString('utf8'), f); })); }).nThen((waitFor) => { - Pinned.load(waitFor((err, d) => { + Pins.list(waitFor((err, d) => { data = Object.keys(d); }), { exclude: [edPublic + '.ndjson'] diff --git a/scripts/evict-inactive.js b/scripts/evict-inactive.js index f0e801909..e3419ffad 100644 --- a/scripts/evict-inactive.js +++ b/scripts/evict-inactive.js @@ -2,7 +2,7 @@ var nThen = require("nthen"); var Store = require("../storage/file"); var BlobStore = require("../storage/blob"); -var Pinned = require("./pinned"); +var Pins = require("../lib/pins"); var config = require("../lib/load-config"); // the administrator should have set an 'inactiveTime' in their config @@ -38,7 +38,7 @@ nThen(function (w) { store = _; })); // load the list of pinned files so you know which files // should not be archived or deleted - Pinned.load(w(function (err, _) { + Pins.list(w(function (err, _) { if (err) { w.abort(); return void console.error(err); diff --git a/scripts/tests/test-pins.js b/scripts/tests/test-pins.js new file mode 100644 index 000000000..eea164230 --- /dev/null +++ b/scripts/tests/test-pins.js @@ -0,0 +1,45 @@ +/*jshint esversion: 6 */ +const Pins = require("../../lib/pins"); + +var stats = { + users: 0, + lines: 0, // how many lines did you iterate over + surplus: 0, // how many of those lines were not needed? + pinned: 0, // how many files are pinned? + duplicated: 0, +}; + +var handler = function (ref, id /* safeKey */, pinned) { + if (ref.surplus) { + //console.log("%s has %s trimmable lines", id, ref.surplus); + stats.surplus += ref.surplus; + } + + for (var item in ref.pins) { + if (!pinned.hasOwnProperty(item)) { + //console.log("> %s is pinned", item); + stats.pinned++; + } else { + //console.log("> %s was already pinned", item); + stats.duplicated++; + } + } + + stats.users++; + stats.lines += ref.index; + //console.log(ref, id); +}; + +Pins.list(function (err, pinned) { +/* + for (var id in pinned) { + console.log(id); + stats.pinned++; + } +*/ + console.log(stats); +}, { + pinPath: require("../../lib/load-config").pinPath, + handler: handler, +}); + diff --git a/scripts/tests/test-plan.js b/scripts/tests/test-plan.js new file mode 100644 index 000000000..e8624514a --- /dev/null +++ b/scripts/tests/test-plan.js @@ -0,0 +1,41 @@ +/*jshint esversion: 6 */ +const Plan = require("../../lib/plan"); + +var rand_delay = function (f) { + setTimeout(f, Math.floor(Math.random() * 1500) + 250); +}; + +var plan = Plan(6).job(1, function (next) { + [1,2,3,4,5,6,7,8,9,10,11,12].forEach(function (n) { + plan.job(0, function (next) { + rand_delay(function () { + console.log("finishing job %s", n); + next(); + }); + }); + }); + console.log("finishing job 0"); + next(); +}).job(2, function (next) { + console.log("finishing job 13"); + + [ + 100, + 200, + 300, + 400 + ].forEach(function (n) { + plan.job(3, function (next) { + rand_delay(function () { + console.log("finishing job %s", n); + next(); + }); + }); + }); + + next(); +}).done(function () { console.log("DONE"); }).start(); + +//console.log(plan); + +//plan.start(); diff --git a/www/common/common-util.js b/www/common/common-util.js index de5a68ca2..1af32103f 100644 --- a/www/common/common-util.js +++ b/www/common/common-util.js @@ -272,8 +272,8 @@ Util.throttle = function (f, ms) { var to; var g = function () { - window.clearTimeout(to); - to = window.setTimeout(Util.bake(f, Util.slice(arguments)), ms); + clearTimeout(to); + to = setTimeout(Util.bake(f, Util.slice(arguments)), ms); }; return g; };