From 95965c1deec3a75d506834374f49ed5bbacebee5 Mon Sep 17 00:00:00 2001 From: ansuz Date: Tue, 7 Apr 2020 20:03:41 -0400 Subject: [PATCH] keep a parallel implementation of the pin loader to validate the new one --- lib/pins.js | 63 ++++++++++++++++++++++++++++++++++ scripts/compare-pin-methods.js | 42 +++++++++++++++++++++++ scripts/evict-inactive.js | 2 +- 3 files changed, 106 insertions(+), 1 deletion(-) create mode 100644 scripts/compare-pin-methods.js diff --git a/lib/pins.js b/lib/pins.js index 41e871446..d840991a3 100644 --- a/lib/pins.js +++ b/lib/pins.js @@ -7,6 +7,9 @@ const Path = require("path"); const Util = require("./common-util"); const Plan = require("./plan"); +const Semaphore = require('saferphore'); +const nThen = require('nthen'); + /* Accepts a reference to an object, and... either a string describing which log is being processed (backwards compatibility), or a function which will log the error with all relevant data @@ -194,3 +197,63 @@ Pins.list = function (_done, config) { }).start(); }); }; + +Pins.load = function (cb, config) { + const sema = Semaphore.create(config.workers || 5); + + let dirList; + const fileList = []; + const pinned = {}; + + var pinPath = config.pinPath || './pins'; + var done = Util.once(cb); + + nThen((waitFor) => { + // recurse over the configured pinPath, or the default + Fs.readdir(pinPath, waitFor((err, list) => { + if (err) { + if (err.code === 'ENOENT') { + dirList = []; + return; // this ends up calling back with an empty object + } + waitFor.abort(); + return void done(err); + } + dirList = list; + })); + }).nThen((waitFor) => { + dirList.forEach((f) => { + sema.take((returnAfter) => { + // iterate over all the subdirectories in the pin store + Fs.readdir(Path.join(pinPath, f), waitFor(returnAfter((err, list2) => { + if (err) { + waitFor.abort(); + return void done(err); + } + list2.forEach((ff) => { + if (config && config.exclude && config.exclude.indexOf(ff) > -1) { return; } + fileList.push(Path.join(pinPath, f, ff)); + }); + }))); + }); + }); + }).nThen((waitFor) => { + fileList.forEach((f) => { + sema.take((returnAfter) => { + Fs.readFile(f, waitFor(returnAfter((err, content) => { + if (err) { + waitFor.abort(); + return void done(err); + } + const hashes = Pins.calculateFromLog(content.toString('utf8'), f); + hashes.forEach((x) => { + (pinned[x] = pinned[x] || {})[f.replace(/.*\/([^/]*).ndjson$/, (x, y)=>y)] = 1; + }); + }))); + }); + }); + }).nThen(() => { + done(void 0, pinned); + }); +}; + diff --git a/scripts/compare-pin-methods.js b/scripts/compare-pin-methods.js new file mode 100644 index 000000000..de7ef114d --- /dev/null +++ b/scripts/compare-pin-methods.js @@ -0,0 +1,42 @@ +/* jshint esversion: 6, node: true */ +const nThen = require("nthen"); +const Pins = require("../lib/pins"); +const Assert = require("assert"); + +const config = require("../lib/load-config"); + +var compare = function () { + console.log(config); + var conf = { + pinPath: config.pinPath, + }; + + var list, load; + + nThen(function (w) { + Pins.list(w(function (err, p) { + if (err) { throw err; } + list = p; + console.log(p); + console.log(list); + console.log(); + }), conf); + }).nThen(function (w) { + Pins.load(w(function (err, p) { + if (err) { throw err; } + load = p; + console.log(load); + console.log(); + }), conf); + }).nThen(function () { + console.log({ + listLength: Object.keys(list).length, + loadLength: Object.keys(load).length, + }); + + Assert.deepEqual(list, load); + console.log("methods are equivalent"); + }); +}; + +compare(); diff --git a/scripts/evict-inactive.js b/scripts/evict-inactive.js index a3a595ca4..1d7b87e91 100644 --- a/scripts/evict-inactive.js +++ b/scripts/evict-inactive.js @@ -42,7 +42,7 @@ nThen(function (w) { store = _; })); // load the list of pinned files so you know which files // should not be archived or deleted - Pins.list(w(function (err, _) { + Pins.load(w(function (err, _) { if (err) { w.abort(); return void console.error(err);