273 lines
8.6 KiB
JavaScript
273 lines
8.6 KiB
JavaScript
/*jshint esversion: 6 */
|
|
|
|
var Pins = module.exports;
|
|
|
|
const Fs = require("fs");
|
|
const Path = require("path");
|
|
const Util = require("./common-util");
|
|
const Plan = require("./plan");
|
|
|
|
const Semaphore = require('saferphore');
|
|
const nThen = require('nthen');
|
|
|
|
/* Accepts a reference to an object, and...
|
|
either a string describing which log is being processed (backwards compatibility),
|
|
or a function which will log the error with all relevant data
|
|
*/
|
|
var createLineHandler = Pins.createLineHandler = function (ref, errorHandler) {
|
|
var fileName;
|
|
if (typeof(errorHandler) === 'string') {
|
|
fileName = errorHandler;
|
|
errorHandler = function (label, data) {
|
|
console.error(label, {
|
|
log: fileName,
|
|
data: data,
|
|
});
|
|
};
|
|
}
|
|
|
|
// passing the reference to an object allows us to overwrite accumulated pins
|
|
// make sure to get ref.pins as the result
|
|
// it's a weird API but it's faster than unpinning manually
|
|
var pins = ref.pins = {};
|
|
ref.index = 0;
|
|
ref.latest = 0; // the latest message (timestamp in ms)
|
|
ref.surplus = 0; // how many lines exist behind a reset
|
|
return function (line) {
|
|
ref.index++;
|
|
if (!Boolean(line)) { return; }
|
|
|
|
var l;
|
|
try {
|
|
l = JSON.parse(line);
|
|
} catch (e) {
|
|
return void errorHandler('PIN_LINE_PARSE_ERROR', line);
|
|
}
|
|
|
|
if (!Array.isArray(l)) {
|
|
return void errorHandler('PIN_LINE_NOT_FORMAT_ERROR', l);
|
|
}
|
|
|
|
if (typeof(l[2]) === 'number') {
|
|
ref.latest = l[2]; // date
|
|
}
|
|
|
|
switch (l[0]) {
|
|
case 'RESET': {
|
|
pins = ref.pins = {};
|
|
if (l[1] && l[1].length) { l[1].forEach((x) => { ref.pins[x] = 1; }); }
|
|
ref.surplus = ref.index;
|
|
//jshint -W086
|
|
// fallthrough
|
|
}
|
|
case 'PIN': {
|
|
l[1].forEach((x) => { pins[x] = 1; });
|
|
break;
|
|
}
|
|
case 'UNPIN': {
|
|
l[1].forEach((x) => { delete pins[x]; });
|
|
break;
|
|
}
|
|
default:
|
|
errorHandler("PIN_LINE_UNSUPPORTED_COMMAND", l);
|
|
}
|
|
};
|
|
};
|
|
|
|
|
|
var processPinFile = function (pinFile, fileName) {
|
|
var ref = {};
|
|
var handler = createLineHandler(ref, fileName);
|
|
pinFile.split('\n').forEach(handler);
|
|
return ref;
|
|
};
|
|
|
|
/*
|
|
takes contents of a pinFile (UTF8 string)
|
|
and the pin file's name
|
|
returns an array of of channel ids which are pinned
|
|
|
|
throw errors on pin logs with invalid pin data
|
|
*/
|
|
Pins.calculateFromLog = function (pinFile, fileName) {
|
|
var ref = processPinFile(pinFile, fileName);
|
|
return Object.keys(ref.pins);
|
|
};
|
|
|
|
/*
|
|
pins/
|
|
pins/A+/
|
|
pins/A+/A+hyhrQLrgYixOomZYxpuEhwfiVzKk1bBp+arH-zbgo=.ndjson
|
|
*/
|
|
|
|
const getSafeKeyFromPath = function (path) {
|
|
return path.replace(/^.*\//, '').replace(/\.ndjson/, '');
|
|
};
|
|
|
|
const addUserPinToState = Pins.addUserPinToState = function (state, safeKey, itemId) {
|
|
(state[itemId] = state[itemId] || {})[safeKey] = 1;
|
|
};
|
|
|
|
Pins.list = function (_done, config) {
|
|
// allow for a configurable pin store location
|
|
const pinPath = config.pinPath || './data/pins';
|
|
|
|
// allow for a configurable amount of parallelism
|
|
const plan = Plan(config.workers || 5);
|
|
|
|
// run a supplied handler whenever you finish reading a log
|
|
// or noop if not supplied.
|
|
const handler = config.handler || function () {};
|
|
|
|
// use and mutate a supplied object for state if it's passed
|
|
const pinned = config.pinned || {};
|
|
|
|
var isDone = false;
|
|
// ensure that 'done' is only called once
|
|
// that it calls back asynchronously
|
|
// and that it sets 'isDone' to true, so that pending processes
|
|
// know to abort
|
|
const done = Util.once(Util.both(Util.mkAsync(_done), function () {
|
|
isDone = true;
|
|
}));
|
|
|
|
const errorHandler = function (label, info) {
|
|
console.log(label, info);
|
|
};
|
|
|
|
// TODO replace this with lib-readline?
|
|
const streamFile = function (path, cb) {
|
|
const id = getSafeKeyFromPath(path);
|
|
|
|
return void Fs.readFile(path, 'utf8', function (err, body) {
|
|
if (err) { return void cb(err); }
|
|
const ref = {};
|
|
const pinHandler = createLineHandler(ref, errorHandler);
|
|
var lines = body.split('\n');
|
|
lines.forEach(pinHandler);
|
|
handler(ref, id, pinned);
|
|
cb(void 0, ref);
|
|
});
|
|
};
|
|
|
|
const scanDirectory = function (path, cb) {
|
|
Fs.readdir(path, function (err, list) {
|
|
if (err) {
|
|
return void cb(err);
|
|
}
|
|
cb(void 0, list.map(function (item) {
|
|
return {
|
|
path: Path.join(path, item),
|
|
id: item.replace(/\.ndjson$/, ''),
|
|
};
|
|
}));
|
|
});
|
|
};
|
|
|
|
scanDirectory(pinPath, function (err, dirs) {
|
|
if (err) {
|
|
if (err.code === 'ENOENT') { return void done(void 0, {}); }
|
|
return void done(err);
|
|
}
|
|
dirs.forEach(function (dir) {
|
|
plan.job(1, function (next) {
|
|
if (isDone) { return void next(); }
|
|
scanDirectory(dir.path, function (nested_err, logs) {
|
|
if (nested_err) {
|
|
return void done(err);
|
|
}
|
|
logs.forEach(function (log) {
|
|
if (!/\.ndjson$/.test(log.path)) { return; }
|
|
plan.job(0, function (next) {
|
|
if (isDone) { return void next(); }
|
|
streamFile(log.path, function (err, ref) {
|
|
if (err) { return void done(err); }
|
|
|
|
var set = ref.pins;
|
|
for (var item in set) {
|
|
addUserPinToState(pinned, log.id, item);
|
|
}
|
|
next();
|
|
});
|
|
});
|
|
});
|
|
next();
|
|
});
|
|
});
|
|
});
|
|
|
|
plan.done(function () {
|
|
// err ?
|
|
done(void 0, pinned);
|
|
}).start();
|
|
});
|
|
};
|
|
|
|
Pins.load = function (cb, config) {
|
|
const sema = Semaphore.create(config.workers || 5);
|
|
|
|
let dirList;
|
|
const fileList = [];
|
|
const pinned = {};
|
|
|
|
var pinPath = config.pinPath || './pins';
|
|
var done = Util.once(cb);
|
|
var handler = config.handler;
|
|
|
|
nThen((waitFor) => {
|
|
// recurse over the configured pinPath, or the default
|
|
Fs.readdir(pinPath, waitFor((err, list) => {
|
|
if (err) {
|
|
if (err.code === 'ENOENT') {
|
|
dirList = [];
|
|
return; // this ends up calling back with an empty object
|
|
}
|
|
waitFor.abort();
|
|
return void done(err);
|
|
}
|
|
dirList = list;
|
|
}));
|
|
}).nThen((waitFor) => {
|
|
dirList.forEach((f) => {
|
|
sema.take((returnAfter) => {
|
|
// iterate over all the subdirectories in the pin store
|
|
Fs.readdir(Path.join(pinPath, f), waitFor(returnAfter((err, list2) => {
|
|
if (err) {
|
|
waitFor.abort();
|
|
return void done(err);
|
|
}
|
|
list2.forEach((ff) => {
|
|
if (config && config.exclude && config.exclude.indexOf(ff) > -1) { return; }
|
|
fileList.push(Path.join(pinPath, f, ff));
|
|
});
|
|
})));
|
|
});
|
|
});
|
|
}).nThen((waitFor) => {
|
|
fileList.forEach((f) => {
|
|
sema.take((returnAfter) => {
|
|
var next = waitFor(returnAfter());
|
|
Fs.readFile(f, (err, content) => {
|
|
if (err) {
|
|
waitFor.abort();
|
|
return void done(err);
|
|
}
|
|
var id = f.replace(/.*\/([^/]*).ndjson$/, (x, y)=>y);
|
|
var contentString = content.toString('utf8');
|
|
if (handler) {
|
|
return void handler(processPinFile(contentString, f), id, next);
|
|
}
|
|
const hashes = Pins.calculateFromLog(contentString, f);
|
|
hashes.forEach((x) => {
|
|
(pinned[x] = pinned[x] || {})[id] = 1;
|
|
});
|
|
next();
|
|
});
|
|
});
|
|
});
|
|
}).nThen(() => {
|
|
done(void 0, pinned);
|
|
});
|
|
};
|
|
|