diff --git a/lib/commands/pin-rpc.js b/lib/commands/pin-rpc.js index 3fc339d00..504733372 100644 --- a/lib/commands/pin-rpc.js +++ b/lib/commands/pin-rpc.js @@ -25,9 +25,9 @@ var sumChannelSizes = function (sizes) { // FIXME it's possible for this to respond before the server has had a chance // to fetch the limits. Maybe we should respond with an error... // or wait until we actually know the limits before responding -var getLimit = Pinning.getLimit = function (Env, publicKey, cb) { - var unescapedKey = unescapeKeyCharacters(publicKey); - var limit = Env.limits[unescapedKey]; +var getLimit = Pinning.getLimit = function (Env, safeKey, cb) { + var unsafeKey = unescapeKeyCharacters(safeKey); + var limit = Env.limits[unsafeKey]; var defaultLimit = typeof(Env.defaultStorageLimit) === 'number'? Env.defaultStorageLimit: Core.DEFAULT_LIMIT; @@ -37,32 +37,89 @@ var getLimit = Pinning.getLimit = function (Env, publicKey, cb) { cb(void 0, toSend); }; +const answerDeferred = function (Env, channel, bool) { + const pending = Env.pendingPinInquiries; + const stack = pending[channel]; + if (!Array.isArray(stack)) { return; } + + delete pending[channel]; + + stack.forEach(function (cb) { + cb(void 0, bool); + }); +}; + var addPinned = function ( Env, - publicKey /*:string*/, + safeKey /*:string*/, channelList /*Array*/, cb /*:()=>void*/) { - Env.evPinnedPadsReady.reg(() => { - channelList.forEach((c) => { - const x = Env.pinnedPads[c] = Env.pinnedPads[c] || {}; - x[publicKey] = 1; - }); + channelList.forEach(function (channel) { + Pins.addUserPinToState(Env.pinnedPads, safeKey, channel); + answerDeferred(Env, channel, true); + }); + cb(); +}; + +const isEmpty = function (obj) { + if (!obj || typeof(obj) !== 'object') { return true; } + for (var key in obj) { + if (obj.hasOwnProperty(key)) { return true; } + } + return false; +}; + +const deferUserTask = function (Env, safeKey, deferred) { + const pending = Env.pendingUnpins; + (pending[safeKey] = pending[safeKey] || []).push(deferred); +}; + +const runUserDeferred = function (Env, safeKey) { + const pending = Env.pendingUnpins; + const stack = pending[safeKey]; + if (!Array.isArray(stack)) { return; } + delete pending[safeKey]; + + stack.forEach(function (cb) { cb(); }); }; + +const runRemainingDeferred = function (Env) { + const pending = Env.pendingUnpins; + for (var safeKey in pending) { + runUserDeferred(Env, safeKey); + } +}; + +const removeSelfFromPinned = function (Env, safeKey, channelList) { + channelList.forEach(function (channel) { + const channelPinStatus = Env.pinnedPads[channel]; + if (!channelPinStatus) { return; } + delete channelPinStatus[safeKey]; + if (isEmpty(channelPinStatus)) { + delete Env.pinnedPads[channel]; + } + }); +}; + var removePinned = function ( Env, - publicKey /*:string*/, + safeKey /*:string*/, channelList /*Array*/, cb /*:()=>void*/) { - Env.evPinnedPadsReady.reg(() => { - channelList.forEach((c) => { - const x = Env.pinnedPads[c]; - if (!x) { return; } - delete x[publicKey]; - }); + + // if pins are already loaded then you can just unpin normally + if (Env.pinsLoaded) { + removeSelfFromPinned(Env, safeKey, channelList); + return void cb(); + } + + // otherwise defer until later... + deferUserTask(Env, safeKey, function () { + removeSelfFromPinned(Env, safeKey, channelList); cb(); }); }; @@ -100,24 +157,24 @@ var getMultipleFileSize = function (Env, channels, cb) { }; const batchUserPins = BatchRead("LOAD_USER_PINS"); -var loadUserPins = function (Env, publicKey, cb) { - var session = Core.getSession(Env.Sessions, publicKey); +var loadUserPins = function (Env, safeKey, cb) { + var session = Core.getSession(Env.Sessions, safeKey); if (session.channels) { return cb(session.channels); } - batchUserPins(publicKey, cb, function (done) { + batchUserPins(safeKey, cb, function (done) { var ref = {}; var lineHandler = Pins.createLineHandler(ref, function (label, data) { Env.Log.error(label, { - log: publicKey, + log: safeKey, data: data, }); }); // if channels aren't in memory. load them from disk - Env.pinStore.getMessages(publicKey, lineHandler, function () { + Env.pinStore.getMessages(safeKey, lineHandler, function () { // no more messages // only put this into the cache if it completes @@ -133,27 +190,27 @@ var truthyKeys = function (O) { }); }; -var getChannelList = Pinning.getChannelList = function (Env, publicKey, _cb) { +var getChannelList = Pinning.getChannelList = function (Env, safeKey, _cb) { var cb = Util.once(Util.mkAsync(_cb)); - loadUserPins(Env, publicKey, function (pins) { + loadUserPins(Env, safeKey, function (pins) { cb(truthyKeys(pins)); }); }; const batchTotalSize = BatchRead("GET_TOTAL_SIZE"); -Pinning.getTotalSize = function (Env, publicKey, cb) { - var unescapedKey = unescapeKeyCharacters(publicKey); - var limit = Env.limits[unescapedKey]; +Pinning.getTotalSize = function (Env, safeKey, cb) { + var unsafeKey = unescapeKeyCharacters(safeKey); + var limit = Env.limits[unsafeKey]; // Get a common key if multiple users share the same quota, otherwise take the public key - var batchKey = (limit && Array.isArray(limit.users)) ? limit.users.join('') : publicKey; + var batchKey = (limit && Array.isArray(limit.users)) ? limit.users.join('') : safeKey; batchTotalSize(batchKey, cb, function (done) { var channels = []; var bytes = 0; nThen(function (waitFor) { // Get the channels list for our user account - Pinning.getChannelList(Env, publicKey, waitFor(function (_channels) { + getChannelList(Env, safeKey, waitFor(function (_channels) { if (!_channels) { waitFor.abort(); return done('INVALID_PIN_LIST'); @@ -163,7 +220,7 @@ Pinning.getTotalSize = function (Env, publicKey, cb) { // Get the channels list for users sharing our quota if (limit && Array.isArray(limit.users) && limit.users.length > 1) { limit.users.forEach(function (key) { - if (key === unescapedKey) { return; } // Don't count ourselves twice + if (key === unsafeKey) { return; } // Don't count ourselves twice getChannelList(Env, key, waitFor(function (_channels) { if (!_channels) { return; } // Broken user, don't count their quota Array.prototype.push.apply(channels, _channels); @@ -207,10 +264,10 @@ Pinning.trimPins = function (Env, safeKey, cb) { cb("NOT_IMPLEMENTED"); }; -var getFreeSpace = Pinning.getFreeSpace = function (Env, publicKey, cb) { - getLimit(Env, publicKey, function (e, limit) { +var getFreeSpace = Pinning.getFreeSpace = function (Env, safeKey, cb) { + getLimit(Env, safeKey, function (e, limit) { if (e) { return void cb(e); } - Pinning.getTotalSize(Env, publicKey, function (e, size) { + Pinning.getTotalSize(Env, safeKey, function (e, size) { if (typeof(size) === 'undefined') { return void cb(e); } var rem = limit[0] - size; @@ -236,20 +293,20 @@ var hashChannelList = function (A) { return hash; }; -var getHash = Pinning.getHash = function (Env, publicKey, cb) { - getChannelList(Env, publicKey, function (channels) { +var getHash = Pinning.getHash = function (Env, safeKey, cb) { + getChannelList(Env, safeKey, function (channels) { cb(void 0, hashChannelList(channels)); }); }; -Pinning.pinChannel = function (Env, publicKey, channels, cb) { +Pinning.pinChannel = function (Env, safeKey, channels, cb) { if (!channels && channels.filter) { return void cb('INVALID_PIN_LIST'); } // get channel list ensures your session has a cached channel list - getChannelList(Env, publicKey, function (pinned) { - var session = Core.getSession(Env.Sessions, publicKey); + getChannelList(Env, safeKey, function (pinned) { + var session = Core.getSession(Env.Sessions, safeKey); // only pin channels which are not already pinned var toStore = channels.filter(function (channel) { @@ -257,42 +314,42 @@ Pinning.pinChannel = function (Env, publicKey, channels, cb) { }); if (toStore.length === 0) { - return void getHash(Env, publicKey, cb); + return void getHash(Env, safeKey, cb); } getMultipleFileSize(Env, toStore, function (e, sizes) { if (typeof(sizes) === 'undefined') { return void cb(e); } var pinSize = sumChannelSizes(sizes); - getFreeSpace(Env, publicKey, function (e, free) { + getFreeSpace(Env, safeKey, function (e, free) { if (typeof(free) === 'undefined') { Env.WARN('getFreeSpace', e); return void cb(e); } if (pinSize > free) { return void cb('E_OVER_LIMIT'); } - Env.pinStore.message(publicKey, JSON.stringify(['PIN', toStore, +new Date()]), + Env.pinStore.message(safeKey, JSON.stringify(['PIN', toStore, +new Date()]), function (e) { if (e) { return void cb(e); } toStore.forEach(function (channel) { session.channels[channel] = true; }); - addPinned(Env, publicKey, toStore, () => {}); - getHash(Env, publicKey, cb); + addPinned(Env, safeKey, toStore, () => {}); + getHash(Env, safeKey, cb); }); }); }); }); }; -Pinning.unpinChannel = function (Env, publicKey, channels, cb) { +Pinning.unpinChannel = function (Env, safeKey, channels, cb) { if (!channels && channels.filter) { // expected array return void cb('INVALID_PIN_LIST'); } - getChannelList(Env, publicKey, function (pinned) { - var session = Core.getSession(Env.Sessions, publicKey); + getChannelList(Env, safeKey, function (pinned) { + var session = Core.getSession(Env.Sessions, safeKey); // only unpin channels which are pinned var toStore = channels.filter(function (channel) { @@ -300,27 +357,27 @@ Pinning.unpinChannel = function (Env, publicKey, channels, cb) { }); if (toStore.length === 0) { - return void getHash(Env, publicKey, cb); + return void getHash(Env, safeKey, cb); } - Env.pinStore.message(publicKey, JSON.stringify(['UNPIN', toStore, +new Date()]), + Env.pinStore.message(safeKey, JSON.stringify(['UNPIN', toStore, +new Date()]), function (e) { if (e) { return void cb(e); } toStore.forEach(function (channel) { delete session.channels[channel]; }); - removePinned(Env, publicKey, toStore, () => {}); - getHash(Env, publicKey, cb); + removePinned(Env, safeKey, toStore, () => {}); + getHash(Env, safeKey, cb); }); }); }; -Pinning.resetUserPins = function (Env, publicKey, channelList, cb) { +Pinning.resetUserPins = function (Env, safeKey, channelList, cb) { if (!Array.isArray(channelList)) { return void cb('INVALID_PIN_LIST'); } - var session = Core.getSession(Env.Sessions, publicKey); + var session = Core.getSession(Env.Sessions, safeKey); if (!channelList.length) { - return void getHash(Env, publicKey, function (e, hash) { + return void getHash(Env, safeKey, function (e, hash) { if (e) { return cb(e); } cb(void 0, hash); }); @@ -332,7 +389,7 @@ Pinning.resetUserPins = function (Env, publicKey, channelList, cb) { var pinSize = sumChannelSizes(sizes); - getLimit(Env, publicKey, function (e, limit) { + getLimit(Env, safeKey, function (e, limit) { if (e) { Env.WARN('[RESET_ERR]', e); return void cb(e); @@ -347,7 +404,7 @@ Pinning.resetUserPins = function (Env, publicKey, channelList, cb) { They will not be able to pin additional pads until they upgrade or delete enough files to go back under their limit. */ if (pinSize > limit[0] && session.hasPinned) { return void(cb('E_OVER_LIMIT')); } - Env.pinStore.message(publicKey, JSON.stringify(['RESET', channelList, +new Date()]), + Env.pinStore.message(safeKey, JSON.stringify(['RESET', channelList, +new Date()]), function (e) { if (e) { return void cb(e); } channelList.forEach(function (channel) { @@ -360,13 +417,13 @@ Pinning.resetUserPins = function (Env, publicKey, channelList, cb) { } else { oldChannels = []; } - removePinned(Env, publicKey, oldChannels, () => { - addPinned(Env, publicKey, channelList, ()=>{}); + removePinned(Env, safeKey, oldChannels, () => { + addPinned(Env, safeKey, channelList, ()=>{}); }); // update in-memory cache IFF the reset was allowed. session.channels = pins; - getHash(Env, publicKey, function (e, hash) { + getHash(Env, safeKey, function (e, hash) { cb(e, hash); }); }); @@ -429,35 +486,74 @@ Pinning.getDeletedPads = function (Env, channels, cb) { }); }; +const answerNoConclusively = function (Env) { + const pending = Env.pendingPinInquiries; + for (var channel in pending) { + answerDeferred(Env, channel, false); + } +}; + // inform that the Pinning.loadChannelPins = function (Env) { - Pins.list(function (err, data) { + const stats = { + surplus: 0, + pinned: 0, + duplicated: 0, + users: 0, // XXX useful for admin panel ? + }; + + const handler = function (ref, safeKey, pinned) { + if (ref.surplus) { + stats.surplus += ref.surplus; + } + for (var channel in ref.pins) { + if (!pinned.hasOwnProperty(channel)) { + answerDeferred(Env, channel, true); + stats.pinned++; + } else { + stats.duplicated++; + } + } + stats.users++; + runUserDeferred(Env, safeKey); + }; + + Pins.list(function (err) { if (err) { + Env.pinsLoaded = true; Env.Log.error("LOAD_CHANNEL_PINS", err); - - // FIXME not sure what should be done here instead - Env.pinnedPads = {}; - Env.evPinnedPadsReady.fire(); return; } - - Env.pinnedPads = data; - Env.evPinnedPadsReady.fire(); + Env.pinsLoaded = true; + answerNoConclusively(Env); + runRemainingDeferred(Env); }, { pinPath: Env.paths.pin, + handler: handler, + pinned: Env.pinnedPads, + workers: Env.pinWorkers, }); }; -Pinning.isChannelPinned = function (Env, channel, cb) { - Env.evPinnedPadsReady.reg(() => { - if (Env.pinnedPads[channel] && Object.keys(Env.pinnedPads[channel]).length) { // FIXME 'Object.keys' here is overkill. We only need to know that it isn't empty - cb(void 0, true); - } else { - delete Env.pinnedPads[channel]; - cb(void 0, false); - } - }); +const deferResponse = function (Env, channel, cb) { + const pending = Env.pendingPinInquiries; + (pending[channel] = pending[channel] || []).push(cb); }; +Pinning.isChannelPinned = function (Env, channel, cb) { + // if the pins are fully loaded then you can answer yes/no definitively + if (Env.pinsLoaded) { + return void cb(void 0, !isEmpty(Env.pinnedPads[channel])); + } + + // you may already know that a channel is pinned + // even if you're still loading. answer immediately if so + if (!isEmpty(Env.pinnedPads[channel])) { return cb(void 0, true); } + + // if you're still loading them then can answer 'yes' as soon + // as you learn that one account has pinned a file. + // negative responses have to wait until the end + deferResponse(Env, channel, cb); +}; diff --git a/lib/pins.js b/lib/pins.js index 3c1bf3967..41e871446 100644 --- a/lib/pins.js +++ b/lib/pins.js @@ -94,13 +94,26 @@ Pins.calculateFromLog = function (pinFile, fileName) { const getSafeKeyFromPath = function (path) { return path.replace(/^.*\//, '').replace(/\.ndjson/, ''); -} +}; + +const addUserPinToState = Pins.addUserPinToState = function (state, safeKey, itemId) { + (state[itemId] = state[itemId] || {})[safeKey] = 1; +}; Pins.list = function (_done, config) { + // allow for a configurable pin store location const pinPath = config.pinPath || './data/pins'; + + // allow for a configurable amount of parallelism const plan = Plan(config.workers || 5); + + // run a supplied handler whenever you finish reading a log + // or noop if not supplied. const handler = config.handler || function () {}; + // use and mutate a supplied object for state if it's passed + const pinned = config.pinned || {}; + var isDone = false; // ensure that 'done' is only called once // that it calls back asynchronously @@ -110,20 +123,10 @@ Pins.list = function (_done, config) { isDone = true; })); - // TODO externalize this via optional handlers? - const stats = { - logs: 0, - dirs: 0, - pinned: 0, - lines: 0, - }; - const errorHandler = function (label, info) { console.log(label, info); }; - const pinned = {}; - // TODO replace this with lib-readline? const streamFile = function (path, cb) { const id = getSafeKeyFromPath(path); @@ -133,7 +136,6 @@ Pins.list = function (_done, config) { const ref = {}; const pinHandler = createLineHandler(ref, errorHandler); var lines = body.split('\n'); - stats.lines += lines.length; lines.forEach(pinHandler); handler(ref, id, pinned); cb(void 0, ref); @@ -156,7 +158,7 @@ Pins.list = function (_done, config) { scanDirectory(pinPath, function (err, dirs) { if (err) { - if (err.code === 'ENOENT') { return void cb(void 0, {}); } + if (err.code === 'ENOENT') { return void done(void 0, {}); } return void done(err); } dirs.forEach(function (dir) { @@ -166,21 +168,16 @@ Pins.list = function (_done, config) { if (nested_err) { return void done(err); } - stats.dirs++; logs.forEach(function (log) { if (!/\.ndjson$/.test(log.path)) { return; } plan.job(0, function (next) { if (isDone) { return void next(); } streamFile(log.path, function (err, ref) { if (err) { return void done(err); } - stats.logs++; var set = ref.pins; for (var item in set) { - (pinned[item] = pinned[item] || {})[log.id] = 1; - if (!pinned.hasOwnProperty(item)) { - stats.pinned++; - } + addUserPinToState(pinned, log.id, item); } next(); }); diff --git a/lib/rpc.js b/lib/rpc.js index 23597eb5b..eb09ddcd0 100644 --- a/lib/rpc.js +++ b/lib/rpc.js @@ -2,7 +2,6 @@ const nThen = require("nthen"); const Util = require("./common-util"); -const mkEvent = Util.mkEvent; const Core = require("./commands/core"); const Admin = require("./commands/admin-rpc"); @@ -219,9 +218,14 @@ RPC.create = function (config, cb) { Sessions: {}, paths: {}, msgStore: config.store, + pinStore: undefined, pinnedPads: {}, - evPinnedPadsReady: mkEvent(true), + pinsLoaded: false, + pendingPinInquiries: {}, + pendingUnpins: {}, + pinWorkers: 5, + limits: {}, admins: [], Log: Log,