diff --git a/lib/pins.js b/lib/pins.js index 41e871446..d840991a3 100644 --- a/lib/pins.js +++ b/lib/pins.js @@ -7,6 +7,9 @@ const Path = require("path"); const Util = require("./common-util"); const Plan = require("./plan"); +const Semaphore = require('saferphore'); +const nThen = require('nthen'); + /* Accepts a reference to an object, and... either a string describing which log is being processed (backwards compatibility), or a function which will log the error with all relevant data @@ -194,3 +197,63 @@ Pins.list = function (_done, config) { }).start(); }); }; + +Pins.load = function (cb, config) { + const sema = Semaphore.create(config.workers || 5); + + let dirList; + const fileList = []; + const pinned = {}; + + var pinPath = config.pinPath || './pins'; + var done = Util.once(cb); + + nThen((waitFor) => { + // recurse over the configured pinPath, or the default + Fs.readdir(pinPath, waitFor((err, list) => { + if (err) { + if (err.code === 'ENOENT') { + dirList = []; + return; // this ends up calling back with an empty object + } + waitFor.abort(); + return void done(err); + } + dirList = list; + })); + }).nThen((waitFor) => { + dirList.forEach((f) => { + sema.take((returnAfter) => { + // iterate over all the subdirectories in the pin store + Fs.readdir(Path.join(pinPath, f), waitFor(returnAfter((err, list2) => { + if (err) { + waitFor.abort(); + return void done(err); + } + list2.forEach((ff) => { + if (config && config.exclude && config.exclude.indexOf(ff) > -1) { return; } + fileList.push(Path.join(pinPath, f, ff)); + }); + }))); + }); + }); + }).nThen((waitFor) => { + fileList.forEach((f) => { + sema.take((returnAfter) => { + Fs.readFile(f, waitFor(returnAfter((err, content) => { + if (err) { + waitFor.abort(); + return void done(err); + } + const hashes = Pins.calculateFromLog(content.toString('utf8'), f); + hashes.forEach((x) => { + (pinned[x] = pinned[x] || {})[f.replace(/.*\/([^/]*).ndjson$/, (x, y)=>y)] = 1; + }); + }))); + }); + }); + }).nThen(() => { + done(void 0, pinned); + }); +}; + diff --git a/lib/storage/file.js b/lib/storage/file.js index 2d27ce185..00222c3f6 100644 --- a/lib/storage/file.js +++ b/lib/storage/file.js @@ -14,6 +14,28 @@ const readFileBin = require("../stream-file").readFileBin; const BatchRead = require("../batch-read"); const Schedule = require("../schedule"); + +/* Each time you write to a channel it will either use an open file descriptor + for that channel or open a new descriptor if one is not available. These are + automatically closed after this window to prevent a file descriptor leak, so + writes that take longer than this time may be dropped! */ +const CHANNEL_WRITE_WINDOW = 300000; + +/* Each time you read a channel it will have this many milliseconds to complete + otherwise it will be closed to prevent a file descriptor leak. The server will + lock up if it uses all available file descriptors, so it's important to close + them. The tradeoff with this timeout is that some functions, the stream, and + and the timeout itself are stored in memory. A longer timeout uses more memory + and running out of memory will also kill the server. */ +const STREAM_CLOSE_TIMEOUT = 300000; + +/* The above timeout closes the stream, but apparently that doesn't always work. + We set yet another timeout to allow the runtime to gracefully close the stream + (flushing all pending writes/reads and doing who knows what else). After this timeout + it will be MERCILESSLY DESTROYED. This isn't graceful, but again, file descriptor + leaks are bad. */ +const STREAM_DESTROY_TIMEOUT = 30000; + const isValidChannelId = function (id) { return typeof(id) === 'string' && id.length >= 32 && id.length < 50 && @@ -64,7 +86,7 @@ const destroyStream = function (stream) { try { stream.close(); } catch (err) { console.error(err); } setTimeout(function () { try { stream.destroy(); } catch (err) { console.error(err); } - }, 15000); + }, STREAM_DESTROY_TIMEOUT); }; const ensureStreamCloses = function (stream, id, ms) { @@ -74,7 +96,7 @@ const ensureStreamCloses = function (stream, id, ms) { // this can only be a timeout error... console.log("stream close error:", err, id); } - }), ms || 45000), []); + }), ms || STREAM_CLOSE_TIMEOUT), []); }; // readMessagesBin asynchronously iterates over the messages in a channel log @@ -729,7 +751,7 @@ var getChannel = function (env, id, _callback) { delete env.channels[id]; destroyStream(channel.writeStream, path); //console.log("closing writestream"); - }, 120000); + }, CHANNEL_WRITE_WINDOW); channel.delayClose(); env.channels[id] = channel; done(void 0, channel); diff --git a/scripts/compare-pin-methods.js b/scripts/compare-pin-methods.js new file mode 100644 index 000000000..de7ef114d --- /dev/null +++ b/scripts/compare-pin-methods.js @@ -0,0 +1,42 @@ +/* jshint esversion: 6, node: true */ +const nThen = require("nthen"); +const Pins = require("../lib/pins"); +const Assert = require("assert"); + +const config = require("../lib/load-config"); + +var compare = function () { + console.log(config); + var conf = { + pinPath: config.pinPath, + }; + + var list, load; + + nThen(function (w) { + Pins.list(w(function (err, p) { + if (err) { throw err; } + list = p; + console.log(p); + console.log(list); + console.log(); + }), conf); + }).nThen(function (w) { + Pins.load(w(function (err, p) { + if (err) { throw err; } + load = p; + console.log(load); + console.log(); + }), conf); + }).nThen(function () { + console.log({ + listLength: Object.keys(list).length, + loadLength: Object.keys(load).length, + }); + + Assert.deepEqual(list, load); + console.log("methods are equivalent"); + }); +}; + +compare(); diff --git a/scripts/evict-inactive.js b/scripts/evict-inactive.js index a3a595ca4..1d7b87e91 100644 --- a/scripts/evict-inactive.js +++ b/scripts/evict-inactive.js @@ -42,7 +42,7 @@ nThen(function (w) { store = _; })); // load the list of pinned files so you know which files // should not be archived or deleted - Pins.list(w(function (err, _) { + Pins.load(w(function (err, _) { if (err) { w.abort(); return void console.error(err); diff --git a/www/auth/main.js b/www/auth/main.js index c155204ec..fcbeaf3af 100644 --- a/www/auth/main.js +++ b/www/auth/main.js @@ -71,7 +71,7 @@ define([ // Get contacts and extract their avatar channel and key var getData = function (obj, href) { var parsed = Hash.parsePadUrl(href); - if (!parsed || parsed.type !== "file") { return; } // XXX + if (!parsed || parsed.type !== "file") { return; } var secret = Hash.getSecrets('file', parsed.hash); if (!secret.keys || !secret.channel) { return; } obj.avatarKey = Hash.encodeBase64(secret.keys && secret.keys.cryptKey); @@ -81,7 +81,7 @@ define([ contacts.friends = proxy.friends || {}; Object.keys(contacts.friends).map(function (key) { var friend = contacts.friends[key]; - // if (!friend) { return; } // XXX how should this be handled? + if (!friend) { return; } var ret = { edPublic: friend.edPublic, name: friend.displayName, @@ -91,7 +91,7 @@ define([ }); Object.keys(contacts.teams).map(function (key) { var team = contacts.teams[key]; - // if (!team) { return; } // XXX how should this be handled. Is this possible? + if (!team) { return; } var avatar = team.metadata && team.metadata.avatar; var ret = { edPublic: team.keys && team.keys.drive && team.keys.drive.edPublic,