Merge branch 'staging' of github.com:xwiki-labs/cryptpad into staging

pull/1/head
yflory 5 years ago
commit adc64f0c6e

@ -9,7 +9,6 @@ const Nacl = require("tweetnacl/nacl-fast");
const Util = require("../common-util"); const Util = require("../common-util");
const nThen = require("nthen"); const nThen = require("nthen");
const Saferphore = require("saferphore"); const Saferphore = require("saferphore");
const Pinned = require('../../scripts/pinned');
//const escapeKeyCharacters = Util.escapeKeyCharacters; //const escapeKeyCharacters = Util.escapeKeyCharacters;
const unescapeKeyCharacters = Util.unescapeKeyCharacters; const unescapeKeyCharacters = Util.unescapeKeyCharacters;
@ -432,7 +431,7 @@ Pinning.getDeletedPads = function (Env, channels, cb) {
// inform that the // inform that the
Pinning.loadChannelPins = function (Env) { Pinning.loadChannelPins = function (Env) {
Pinned.load(function (err, data) { Pins.list(function (err, data) {
if (err) { if (err) {
Env.Log.error("LOAD_CHANNEL_PINS", err); Env.Log.error("LOAD_CHANNEL_PINS", err);

@ -2,6 +2,11 @@
var Pins = module.exports; var Pins = module.exports;
const Fs = require("fs");
const Path = require("path");
const Util = require("./common-util");
const Plan = require("./plan");
/* Accepts a reference to an object, and... /* Accepts a reference to an object, and...
either a string describing which log is being processed (backwards compatibility), either a string describing which log is being processed (backwards compatibility),
or a function which will log the error with all relevant data or a function which will log the error with all relevant data
@ -22,7 +27,11 @@ var createLineHandler = Pins.createLineHandler = function (ref, errorHandler) {
// make sure to get ref.pins as the result // make sure to get ref.pins as the result
// it's a weird API but it's faster than unpinning manually // it's a weird API but it's faster than unpinning manually
var pins = ref.pins = {}; var pins = ref.pins = {};
ref.index = 0;
ref.latest = 0; // the latest message (timestamp in ms)
ref.surplus = 0; // how many lines exist behind a reset
return function (line) { return function (line) {
ref.index++;
if (!Boolean(line)) { return; } if (!Boolean(line)) { return; }
var l; var l;
@ -36,10 +45,15 @@ var createLineHandler = Pins.createLineHandler = function (ref, errorHandler) {
return void errorHandler('PIN_LINE_NOT_FORMAT_ERROR', l); return void errorHandler('PIN_LINE_NOT_FORMAT_ERROR', l);
} }
if (typeof(l[2]) === 'number') {
ref.latest = l[2]; // date
}
switch (l[0]) { switch (l[0]) {
case 'RESET': { case 'RESET': {
pins = ref.pins = {}; pins = ref.pins = {};
if (l[1] && l[1].length) { l[1].forEach((x) => { ref.pins[x] = 1; }); } if (l[1] && l[1].length) { l[1].forEach((x) => { ref.pins[x] = 1; }); }
ref.surplus = ref.index;
//jshint -W086 //jshint -W086
// fallthrough // fallthrough
} }
@ -72,5 +86,95 @@ Pins.calculateFromLog = function (pinFile, fileName) {
return Object.keys(ref.pins); return Object.keys(ref.pins);
}; };
// TODO refactor to include a streaming version for use in rpc.js as well /*
pins/
pins/A+/
pins/A+/A+hyhrQLrgYixOomZYxpuEhwfiVzKk1bBp+arH-zbgo=.ndjson
*/
const getSafeKeyFromPath = function (path) {
return path.replace(/^.*\//, '').replace(/\.ndjson/, '');
}
Pins.list = function (done, config) {
const pinPath = config.pinPath || './data/pins';
const plan = Plan(config.workers || 5);
const handler = config.handler || function () {};
// TODO externalize this via optional handlers?
const stats = {
logs: 0,
dirs: 0,
pinned: 0,
lines: 0,
};
const errorHandler = function (label, info) {
console.log(label, info);
};
const pinned = {};
// TODO replace this with lib-readline?
const streamFile = function (path, cb) {
const id = getSafeKeyFromPath(path);
return void Fs.readFile(path, 'utf8', function (err, body) {
if (err) { return void cb(err); }
const ref = {};
const pinHandler = createLineHandler(ref, errorHandler);
var lines = body.split('\n');
stats.lines += lines.length;
lines.forEach(pinHandler);
handler(ref, id, pinned);
cb(void 0, ref);
});
};
const scanDirectory = function (path, cb) {
Fs.readdir(path, function (err, list) {
if (err) {
return void cb(err);
}
cb(void 0, list.map(function (item) {
return Path.join(path, item);
}));
});
};
scanDirectory(pinPath, function (err, paths) {
if (err) { return; } // XXX
paths.forEach(function (path) {
plan.job(1, function (next) {
scanDirectory(path, function (nested_err, nested_paths) {
if (nested_err) { return; } // XXX
stats.dirs++;
nested_paths.forEach(function (nested_path) {
if (!/\.ndjson$/.test(nested_path)) { return; }
plan.job(0, function (next) {
streamFile(nested_path, function (err, ref) {
if (err) { return; } // XXX
stats.logs++;
var set = ref.pins;
for (var item in set) {
if (!pinned.hasOwnProperty(item)) {
pinned[item] = true;
stats.pinned++;
}
}
next();
});
});
});
next();
});
});
});
plan.done(function () {
// err ?
done(void 0, pinned);
}).start();
});
};

@ -0,0 +1,235 @@
/*
There are many situations where we want to do lots of little jobs
in parallel and with few constraints as to their ordering.
One example is recursing over a bunch of directories and reading files.
The naive way to do this is to recurse over all the subdirectories
relative to a root while adding files to a list. Then to iterate over
the files in that list. Unfortunately, this means holding the complete
list of file paths in memory, which can't possible scale as our database grows.
A better way to do this is to recurse into one directory and
iterate over its contents until there are no more, then to backtrack
to the next directory and repeat until no more directories exist.
This kind of thing is easy enough when you perform one task at a time
and use synchronous code, but with multiple asynchronous tasks it's
easy to introduce subtle bugs.
This module is designed for these situations. It allows you to easily
and efficiently schedule a large number of tasks with an associated
degree of priority from 0 (highest priority) to Number.MAX_SAFE_INTEGER.
Initialize your scheduler with a degree of parallelism, and start planning
some initial jobs. Set it to run and it will keep going until all jobs are
complete, at which point it will optionally execute a 'done' callback.
Getting back to the original example:
List the contents of the root directory, then plan subsequent jobs
with a priority of 1 to recurse into subdirectories. The callback
of each of these recursions can then plan higher priority tasks
to actually process the contained files with a priority of 0.
As long as there are more files scheduled it will continue to process
them first. When there are no more files the scheduler will read
the next directory and repopulate the list of files to process.
This will repeat until everything is done.
// load the module
const Plan = require("./plan");
// instantiate a scheduler with a parallelism of 5
var plan = Plan(5)
// plan the first job which schedules more jobs...
.job(1, function (next) {
listRootDirectory(function (files) {
files.forEach(function (file) {
// highest priority, run as soon as there is a free worker
plan.job(0, function (next) {
processFile(file, function (result) {
console.log(result);
// don't forget to call next
next();
});
});
});
next(); // call 'next' to free up one worker
});
})
// chain commands together if you want
.done(function () {
console.log("DONE");
})
// it won't run unless you launch it
.start();
*/
module.exports = function (max) {
var plan = {};
max = max || 5;
// finds an id that isn't in use in a particular map
// accepts an id in case you have one already chosen
// otherwise generates random new ids if one is not passed
// or if there is a collision
var uid = function (map, id) {
if (typeof(id) === 'undefined') {
id = Math.floor(Math.random() * Number.MAX_SAFE_INTEGER);
}
if (id && typeof(map[id]) === 'undefined') {
return id;
}
return uid(map);
};
// the queue of jobs is an array, which will be populated
// with maps for each level of priority
var jobs = [];
// the count of currently running jobs
var count = 0;
// a list of callbacks to be executed once everything is done
var completeHandlers = [];
// the recommended usage is to create a new scheduler for every job
// use it for internals in a scope, and let the garbage collector
// clean up when everything stops. This means you shouldn't
// go passing 'plan' around in a long-lived process!
var FINISHED = false;
var done = function () {
// 'done' gets called when there are no more jobs in the queue
// but other jobs might still be running...
// the count of running processes should never be less than zero
// because we guard against multiple callbacks
if (count < 0) { throw new Error("should never happen"); }
// greater than zero is definitely possible, it just means you aren't done yet
if (count !== 0) { return; }
// you will finish twice if you call 'start' a second time
// this behaviour isn't supported yet.
if (FINISHED) { throw new Error('finished twice'); }
FINISHED = true;
// execute all your 'done' callbacks
completeHandlers.forEach(function (f) { f(); });
};
var run;
// this 'next' is internal only.
// it iterates over all known jobs, running them until
// the scheduler achieves the desired amount of parallelism.
// If there are no more jobs it will call 'done'
// which will shortcircuit if there are still pending tasks.
// Whenever any tasks finishes it will return its lock and
// run as many new jobs as are allowed.
var next = function () {
// array.some skips over bare indexes in sparse arrays
var pending = jobs.some(function (bag /*, priority*/) {
if (!bag || typeof(bag) !== 'object') { return; }
// a bag is a map of jobs for any particular degree of priority
// iterate over jobs in the bag until you're out of 'workers'
for (var id in bag) {
// bail out if you hit max parallelism
if (count >= max) { return true; }
run(bag, id, next);
}
});
// check whether you're done if you hit the end of the array
if (!pending) { done(); }
};
// and here's the part that actually handles jobs...
run = function (bag, id) {
// this is just a sanity check.
// there should only ever be jobs in each bag.
if (typeof(bag[id]) !== 'function') {
throw new Error("expected function");
}
// keep a local reference to the function
var f = bag[id];
// remove it from the bag.
delete bag[id];
// increment the count of running jobs
count++;
// guard against it being called twice.
var called = false;
f(function () {
// watch out! it'll bite you.
// maybe this should just return?
// support that option for 'production' ?
if (called) { throw new Error("called twice"); }
// the code below is safe because we can't call back a second time
called = true;
// decrement the count of running jobs...
count--;
// and finally call next to replace this worker with more job(s)
next();
});
};
// this is exposed as API
plan.job = function (priority, cb) {
// you have to pass both the priority (a non-negative number) and an actual job
if (typeof(priority) !== 'number' || priority < 0) { throw new Error('expected a non-negative number'); }
// a job is an asynchronous function that takes a single parameter:
// a 'next' callback which will keep the whole thing going.
// forgetting to call 'next' means you'll never complete.
if (typeof(cb) !== 'function') { throw new Error('expected function'); }
// initialize the specified priority level if it doesn't already exist
var bag = jobs[priority] = jobs[priority] || {};
// choose a random id that isn't already in use for this priority level
var id = uid(bag);
// add the job to this priority level's bag
// most (all?) javascript engines will append this job to the bottom
// of the map. Meaning when we iterate it will be run later than
// other jobs that were scheduled first, effectively making a FIFO queue.
// However, this is undefined behaviour and you shouldn't ever rely on it.
bag[id] = function (next) {
cb(next);
};
// returning 'plan' lets us chain methods together.
return plan;
};
var started = false;
plan.start = function () {
// don't allow multiple starts
// even though it should work, it's simpler not to.
if (started) { return plan; }
// this seems to imply a 'stop' method
// but I don't need it, so I'm not implementing it now --ansuz
started = true;
// start asynchronously, otherwise jobs will start running
// before you've had a chance to return 'plan', and weird things
// happen.
setTimeout(function () {
next();
});
return plan;
};
// you can pass any number of functions to be executed
// when all pending jobs are complete.
// We don't pass any arguments, so you need to handle return values
// yourself if you want them.
plan.done = function (f) {
if (typeof(f) !== 'function') { throw new Error('expected function'); }
completeHandlers.push(f);
return plan;
};
// That's all! I hope you had fun reading this!
return plan;
};

@ -1,7 +1,6 @@
/* jshint esversion: 6, node: true */ /* jshint esversion: 6, node: true */
const Fs = require('fs'); const Fs = require('fs');
const nThen = require('nthen'); const nThen = require('nthen');
const Pinned = require('./pinned');
const Nacl = require('tweetnacl/nacl-fast'); const Nacl = require('tweetnacl/nacl-fast');
const Path = require('path'); const Path = require('path');
const Pins = require('../lib/pins'); const Pins = require('../lib/pins');
@ -41,7 +40,7 @@ nThen((waitFor) => {
pinned = Pins.calculateFromLog(content.toString('utf8'), f); pinned = Pins.calculateFromLog(content.toString('utf8'), f);
})); }));
}).nThen((waitFor) => { }).nThen((waitFor) => {
Pinned.load(waitFor((err, d) => { Pins.list(waitFor((err, d) => {
data = Object.keys(d); data = Object.keys(d);
}), { }), {
exclude: [edPublic + '.ndjson'] exclude: [edPublic + '.ndjson']

@ -2,7 +2,7 @@ var nThen = require("nthen");
var Store = require("../storage/file"); var Store = require("../storage/file");
var BlobStore = require("../storage/blob"); var BlobStore = require("../storage/blob");
var Pinned = require("./pinned"); var Pins = require("../lib/pins");
var config = require("../lib/load-config"); var config = require("../lib/load-config");
// the administrator should have set an 'inactiveTime' in their config // the administrator should have set an 'inactiveTime' in their config
@ -38,7 +38,7 @@ nThen(function (w) {
store = _; store = _;
})); // load the list of pinned files so you know which files })); // load the list of pinned files so you know which files
// should not be archived or deleted // should not be archived or deleted
Pinned.load(w(function (err, _) { Pins.list(w(function (err, _) {
if (err) { if (err) {
w.abort(); w.abort();
return void console.error(err); return void console.error(err);

@ -0,0 +1,45 @@
/*jshint esversion: 6 */
const Pins = require("../../lib/pins");
var stats = {
users: 0,
lines: 0, // how many lines did you iterate over
surplus: 0, // how many of those lines were not needed?
pinned: 0, // how many files are pinned?
duplicated: 0,
};
var handler = function (ref, id /* safeKey */, pinned) {
if (ref.surplus) {
//console.log("%s has %s trimmable lines", id, ref.surplus);
stats.surplus += ref.surplus;
}
for (var item in ref.pins) {
if (!pinned.hasOwnProperty(item)) {
//console.log("> %s is pinned", item);
stats.pinned++;
} else {
//console.log("> %s was already pinned", item);
stats.duplicated++;
}
}
stats.users++;
stats.lines += ref.index;
//console.log(ref, id);
};
Pins.list(function (err, pinned) {
/*
for (var id in pinned) {
console.log(id);
stats.pinned++;
}
*/
console.log(stats);
}, {
pinPath: require("../../lib/load-config").pinPath,
handler: handler,
});

@ -0,0 +1,41 @@
/*jshint esversion: 6 */
const Plan = require("../../lib/plan");
var rand_delay = function (f) {
setTimeout(f, Math.floor(Math.random() * 1500) + 250);
};
var plan = Plan(6).job(1, function (next) {
[1,2,3,4,5,6,7,8,9,10,11,12].forEach(function (n) {
plan.job(0, function (next) {
rand_delay(function () {
console.log("finishing job %s", n);
next();
});
});
});
console.log("finishing job 0");
next();
}).job(2, function (next) {
console.log("finishing job 13");
[
100,
200,
300,
400
].forEach(function (n) {
plan.job(3, function (next) {
rand_delay(function () {
console.log("finishing job %s", n);
next();
});
});
});
next();
}).done(function () { console.log("DONE"); }).start();
//console.log(plan);
//plan.start();

@ -272,8 +272,8 @@
Util.throttle = function (f, ms) { Util.throttle = function (f, ms) {
var to; var to;
var g = function () { var g = function () {
window.clearTimeout(to); clearTimeout(to);
to = window.setTimeout(Util.bake(f, Util.slice(arguments)), ms); to = setTimeout(Util.bake(f, Util.slice(arguments)), ms);
}; };
return g; return g;
}; };

Loading…
Cancel
Save