|
|
@ -96,11 +96,20 @@ const getSafeKeyFromPath = function (path) {
|
|
|
|
return path.replace(/^.*\//, '').replace(/\.ndjson/, '');
|
|
|
|
return path.replace(/^.*\//, '').replace(/\.ndjson/, '');
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
Pins.list = function (done, config) {
|
|
|
|
Pins.list = function (_done, config) {
|
|
|
|
const pinPath = config.pinPath || './data/pins';
|
|
|
|
const pinPath = config.pinPath || './data/pins';
|
|
|
|
const plan = Plan(config.workers || 5);
|
|
|
|
const plan = Plan(config.workers || 5);
|
|
|
|
const handler = config.handler || function () {};
|
|
|
|
const handler = config.handler || function () {};
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
var isDone = false;
|
|
|
|
|
|
|
|
// ensure that 'done' is only called once
|
|
|
|
|
|
|
|
// that it calls back asynchronously
|
|
|
|
|
|
|
|
// and that it sets 'isDone' to true, so that pending processes
|
|
|
|
|
|
|
|
// know to abort
|
|
|
|
|
|
|
|
const done = Util.once(Util.both(Util.mkAsync(_done), function () {
|
|
|
|
|
|
|
|
isDone = true;
|
|
|
|
|
|
|
|
}));
|
|
|
|
|
|
|
|
|
|
|
|
// TODO externalize this via optional handlers?
|
|
|
|
// TODO externalize this via optional handlers?
|
|
|
|
const stats = {
|
|
|
|
const stats = {
|
|
|
|
logs: 0,
|
|
|
|
logs: 0,
|
|
|
@ -137,29 +146,39 @@ Pins.list = function (done, config) {
|
|
|
|
return void cb(err);
|
|
|
|
return void cb(err);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
cb(void 0, list.map(function (item) {
|
|
|
|
cb(void 0, list.map(function (item) {
|
|
|
|
return Path.join(path, item);
|
|
|
|
return {
|
|
|
|
|
|
|
|
path: Path.join(path, item),
|
|
|
|
|
|
|
|
id: item.replace(/\.ndjson$/, ''),
|
|
|
|
|
|
|
|
};
|
|
|
|
}));
|
|
|
|
}));
|
|
|
|
});
|
|
|
|
});
|
|
|
|
};
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
|
|
scanDirectory(pinPath, function (err, paths) {
|
|
|
|
scanDirectory(pinPath, function (err, dirs) {
|
|
|
|
if (err) { return; } // XXX
|
|
|
|
if (err) {
|
|
|
|
paths.forEach(function (path) {
|
|
|
|
if (err.code === 'ENOENT') { return void cb(void 0, {}); }
|
|
|
|
|
|
|
|
return void done(err);
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
dirs.forEach(function (dir) {
|
|
|
|
plan.job(1, function (next) {
|
|
|
|
plan.job(1, function (next) {
|
|
|
|
scanDirectory(path, function (nested_err, nested_paths) {
|
|
|
|
if (isDone) { return void next(); }
|
|
|
|
if (nested_err) { return; } // XXX
|
|
|
|
scanDirectory(dir.path, function (nested_err, logs) {
|
|
|
|
|
|
|
|
if (nested_err) {
|
|
|
|
|
|
|
|
return void done(err);
|
|
|
|
|
|
|
|
}
|
|
|
|
stats.dirs++;
|
|
|
|
stats.dirs++;
|
|
|
|
nested_paths.forEach(function (nested_path) {
|
|
|
|
logs.forEach(function (log) {
|
|
|
|
if (!/\.ndjson$/.test(nested_path)) { return; }
|
|
|
|
if (!/\.ndjson$/.test(log.path)) { return; }
|
|
|
|
plan.job(0, function (next) {
|
|
|
|
plan.job(0, function (next) {
|
|
|
|
streamFile(nested_path, function (err, ref) {
|
|
|
|
if (isDone) { return void next(); }
|
|
|
|
if (err) { return; } // XXX
|
|
|
|
streamFile(log.path, function (err, ref) {
|
|
|
|
|
|
|
|
if (err) { return void done(err); }
|
|
|
|
stats.logs++;
|
|
|
|
stats.logs++;
|
|
|
|
|
|
|
|
|
|
|
|
var set = ref.pins;
|
|
|
|
var set = ref.pins;
|
|
|
|
for (var item in set) {
|
|
|
|
for (var item in set) {
|
|
|
|
|
|
|
|
(pinned[item] = pinned[item] || {})[log.id] = 1;
|
|
|
|
if (!pinned.hasOwnProperty(item)) {
|
|
|
|
if (!pinned.hasOwnProperty(item)) {
|
|
|
|
pinned[item] = true;
|
|
|
|
|
|
|
|
stats.pinned++;
|
|
|
|
stats.pinned++;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|