Respond to pinning RPCs as soon as possible

(instead of waiting until you've read an unbounded number
of pin logs while queries back up in memory)

Also replace instances of 'publicKey' with 'safeKey' or 'unsafeKey'
to clearly and correctly indicate their format.
pull/1/head
ansuz 5 years ago
parent cded52f83f
commit 38c1700173

@ -25,9 +25,9 @@ var sumChannelSizes = function (sizes) {
// FIXME it's possible for this to respond before the server has had a chance
// to fetch the limits. Maybe we should respond with an error...
// or wait until we actually know the limits before responding
var getLimit = Pinning.getLimit = function (Env, publicKey, cb) {
var unescapedKey = unescapeKeyCharacters(publicKey);
var limit = Env.limits[unescapedKey];
var getLimit = Pinning.getLimit = function (Env, safeKey, cb) {
var unsafeKey = unescapeKeyCharacters(safeKey);
var limit = Env.limits[unsafeKey];
var defaultLimit = typeof(Env.defaultStorageLimit) === 'number'?
Env.defaultStorageLimit: Core.DEFAULT_LIMIT;
@ -37,32 +37,89 @@ var getLimit = Pinning.getLimit = function (Env, publicKey, cb) {
cb(void 0, toSend);
};
const answerDeferred = function (Env, channel, bool) {
const pending = Env.pendingPinInquiries;
const stack = pending[channel];
if (!Array.isArray(stack)) { return; }
delete pending[channel];
stack.forEach(function (cb) {
cb(void 0, bool);
});
};
var addPinned = function (
Env,
publicKey /*:string*/,
safeKey /*:string*/,
channelList /*Array<string>*/,
cb /*:()=>void*/)
{
Env.evPinnedPadsReady.reg(() => {
channelList.forEach((c) => {
const x = Env.pinnedPads[c] = Env.pinnedPads[c] || {};
x[publicKey] = 1;
});
channelList.forEach(function (channel) {
Pins.addUserPinToState(Env.pinnedPads, safeKey, channel);
answerDeferred(Env, channel, true);
});
cb();
};
const isEmpty = function (obj) {
if (!obj || typeof(obj) !== 'object') { return true; }
for (var key in obj) {
if (obj.hasOwnProperty(key)) { return true; }
}
return false;
};
const deferUserTask = function (Env, safeKey, deferred) {
const pending = Env.pendingUnpins;
(pending[safeKey] = pending[safeKey] || []).push(deferred);
};
const runUserDeferred = function (Env, safeKey) {
const pending = Env.pendingUnpins;
const stack = pending[safeKey];
if (!Array.isArray(stack)) { return; }
delete pending[safeKey];
stack.forEach(function (cb) {
cb();
});
};
const runRemainingDeferred = function (Env) {
const pending = Env.pendingUnpins;
for (var safeKey in pending) {
runUserDeferred(Env, safeKey);
}
};
const removeSelfFromPinned = function (Env, safeKey, channelList) {
channelList.forEach(function (channel) {
const channelPinStatus = Env.pinnedPads[channel];
if (!channelPinStatus) { return; }
delete channelPinStatus[safeKey];
if (isEmpty(channelPinStatus)) {
delete Env.pinnedPads[channel];
}
});
};
var removePinned = function (
Env,
publicKey /*:string*/,
safeKey /*:string*/,
channelList /*Array<string>*/,
cb /*:()=>void*/)
{
Env.evPinnedPadsReady.reg(() => {
channelList.forEach((c) => {
const x = Env.pinnedPads[c];
if (!x) { return; }
delete x[publicKey];
});
// if pins are already loaded then you can just unpin normally
if (Env.pinsLoaded) {
removeSelfFromPinned(Env, safeKey, channelList);
return void cb();
}
// otherwise defer until later...
deferUserTask(Env, safeKey, function () {
removeSelfFromPinned(Env, safeKey, channelList);
cb();
});
};
@ -100,24 +157,24 @@ var getMultipleFileSize = function (Env, channels, cb) {
};
const batchUserPins = BatchRead("LOAD_USER_PINS");
var loadUserPins = function (Env, publicKey, cb) {
var session = Core.getSession(Env.Sessions, publicKey);
var loadUserPins = function (Env, safeKey, cb) {
var session = Core.getSession(Env.Sessions, safeKey);
if (session.channels) {
return cb(session.channels);
}
batchUserPins(publicKey, cb, function (done) {
batchUserPins(safeKey, cb, function (done) {
var ref = {};
var lineHandler = Pins.createLineHandler(ref, function (label, data) {
Env.Log.error(label, {
log: publicKey,
log: safeKey,
data: data,
});
});
// if channels aren't in memory. load them from disk
Env.pinStore.getMessages(publicKey, lineHandler, function () {
Env.pinStore.getMessages(safeKey, lineHandler, function () {
// no more messages
// only put this into the cache if it completes
@ -133,27 +190,27 @@ var truthyKeys = function (O) {
});
};
var getChannelList = Pinning.getChannelList = function (Env, publicKey, _cb) {
var getChannelList = Pinning.getChannelList = function (Env, safeKey, _cb) {
var cb = Util.once(Util.mkAsync(_cb));
loadUserPins(Env, publicKey, function (pins) {
loadUserPins(Env, safeKey, function (pins) {
cb(truthyKeys(pins));
});
};
const batchTotalSize = BatchRead("GET_TOTAL_SIZE");
Pinning.getTotalSize = function (Env, publicKey, cb) {
var unescapedKey = unescapeKeyCharacters(publicKey);
var limit = Env.limits[unescapedKey];
Pinning.getTotalSize = function (Env, safeKey, cb) {
var unsafeKey = unescapeKeyCharacters(safeKey);
var limit = Env.limits[unsafeKey];
// Get a common key if multiple users share the same quota, otherwise take the public key
var batchKey = (limit && Array.isArray(limit.users)) ? limit.users.join('') : publicKey;
var batchKey = (limit && Array.isArray(limit.users)) ? limit.users.join('') : safeKey;
batchTotalSize(batchKey, cb, function (done) {
var channels = [];
var bytes = 0;
nThen(function (waitFor) {
// Get the channels list for our user account
Pinning.getChannelList(Env, publicKey, waitFor(function (_channels) {
getChannelList(Env, safeKey, waitFor(function (_channels) {
if (!_channels) {
waitFor.abort();
return done('INVALID_PIN_LIST');
@ -163,7 +220,7 @@ Pinning.getTotalSize = function (Env, publicKey, cb) {
// Get the channels list for users sharing our quota
if (limit && Array.isArray(limit.users) && limit.users.length > 1) {
limit.users.forEach(function (key) {
if (key === unescapedKey) { return; } // Don't count ourselves twice
if (key === unsafeKey) { return; } // Don't count ourselves twice
getChannelList(Env, key, waitFor(function (_channels) {
if (!_channels) { return; } // Broken user, don't count their quota
Array.prototype.push.apply(channels, _channels);
@ -207,10 +264,10 @@ Pinning.trimPins = function (Env, safeKey, cb) {
cb("NOT_IMPLEMENTED");
};
var getFreeSpace = Pinning.getFreeSpace = function (Env, publicKey, cb) {
getLimit(Env, publicKey, function (e, limit) {
var getFreeSpace = Pinning.getFreeSpace = function (Env, safeKey, cb) {
getLimit(Env, safeKey, function (e, limit) {
if (e) { return void cb(e); }
Pinning.getTotalSize(Env, publicKey, function (e, size) {
Pinning.getTotalSize(Env, safeKey, function (e, size) {
if (typeof(size) === 'undefined') { return void cb(e); }
var rem = limit[0] - size;
@ -236,20 +293,20 @@ var hashChannelList = function (A) {
return hash;
};
var getHash = Pinning.getHash = function (Env, publicKey, cb) {
getChannelList(Env, publicKey, function (channels) {
var getHash = Pinning.getHash = function (Env, safeKey, cb) {
getChannelList(Env, safeKey, function (channels) {
cb(void 0, hashChannelList(channels));
});
};
Pinning.pinChannel = function (Env, publicKey, channels, cb) {
Pinning.pinChannel = function (Env, safeKey, channels, cb) {
if (!channels && channels.filter) {
return void cb('INVALID_PIN_LIST');
}
// get channel list ensures your session has a cached channel list
getChannelList(Env, publicKey, function (pinned) {
var session = Core.getSession(Env.Sessions, publicKey);
getChannelList(Env, safeKey, function (pinned) {
var session = Core.getSession(Env.Sessions, safeKey);
// only pin channels which are not already pinned
var toStore = channels.filter(function (channel) {
@ -257,42 +314,42 @@ Pinning.pinChannel = function (Env, publicKey, channels, cb) {
});
if (toStore.length === 0) {
return void getHash(Env, publicKey, cb);
return void getHash(Env, safeKey, cb);
}
getMultipleFileSize(Env, toStore, function (e, sizes) {
if (typeof(sizes) === 'undefined') { return void cb(e); }
var pinSize = sumChannelSizes(sizes);
getFreeSpace(Env, publicKey, function (e, free) {
getFreeSpace(Env, safeKey, function (e, free) {
if (typeof(free) === 'undefined') {
Env.WARN('getFreeSpace', e);
return void cb(e);
}
if (pinSize > free) { return void cb('E_OVER_LIMIT'); }
Env.pinStore.message(publicKey, JSON.stringify(['PIN', toStore, +new Date()]),
Env.pinStore.message(safeKey, JSON.stringify(['PIN', toStore, +new Date()]),
function (e) {
if (e) { return void cb(e); }
toStore.forEach(function (channel) {
session.channels[channel] = true;
});
addPinned(Env, publicKey, toStore, () => {});
getHash(Env, publicKey, cb);
addPinned(Env, safeKey, toStore, () => {});
getHash(Env, safeKey, cb);
});
});
});
});
};
Pinning.unpinChannel = function (Env, publicKey, channels, cb) {
Pinning.unpinChannel = function (Env, safeKey, channels, cb) {
if (!channels && channels.filter) {
// expected array
return void cb('INVALID_PIN_LIST');
}
getChannelList(Env, publicKey, function (pinned) {
var session = Core.getSession(Env.Sessions, publicKey);
getChannelList(Env, safeKey, function (pinned) {
var session = Core.getSession(Env.Sessions, safeKey);
// only unpin channels which are pinned
var toStore = channels.filter(function (channel) {
@ -300,27 +357,27 @@ Pinning.unpinChannel = function (Env, publicKey, channels, cb) {
});
if (toStore.length === 0) {
return void getHash(Env, publicKey, cb);
return void getHash(Env, safeKey, cb);
}
Env.pinStore.message(publicKey, JSON.stringify(['UNPIN', toStore, +new Date()]),
Env.pinStore.message(safeKey, JSON.stringify(['UNPIN', toStore, +new Date()]),
function (e) {
if (e) { return void cb(e); }
toStore.forEach(function (channel) {
delete session.channels[channel];
});
removePinned(Env, publicKey, toStore, () => {});
getHash(Env, publicKey, cb);
removePinned(Env, safeKey, toStore, () => {});
getHash(Env, safeKey, cb);
});
});
};
Pinning.resetUserPins = function (Env, publicKey, channelList, cb) {
Pinning.resetUserPins = function (Env, safeKey, channelList, cb) {
if (!Array.isArray(channelList)) { return void cb('INVALID_PIN_LIST'); }
var session = Core.getSession(Env.Sessions, publicKey);
var session = Core.getSession(Env.Sessions, safeKey);
if (!channelList.length) {
return void getHash(Env, publicKey, function (e, hash) {
return void getHash(Env, safeKey, function (e, hash) {
if (e) { return cb(e); }
cb(void 0, hash);
});
@ -332,7 +389,7 @@ Pinning.resetUserPins = function (Env, publicKey, channelList, cb) {
var pinSize = sumChannelSizes(sizes);
getLimit(Env, publicKey, function (e, limit) {
getLimit(Env, safeKey, function (e, limit) {
if (e) {
Env.WARN('[RESET_ERR]', e);
return void cb(e);
@ -347,7 +404,7 @@ Pinning.resetUserPins = function (Env, publicKey, channelList, cb) {
They will not be able to pin additional pads until they upgrade
or delete enough files to go back under their limit. */
if (pinSize > limit[0] && session.hasPinned) { return void(cb('E_OVER_LIMIT')); }
Env.pinStore.message(publicKey, JSON.stringify(['RESET', channelList, +new Date()]),
Env.pinStore.message(safeKey, JSON.stringify(['RESET', channelList, +new Date()]),
function (e) {
if (e) { return void cb(e); }
channelList.forEach(function (channel) {
@ -360,13 +417,13 @@ Pinning.resetUserPins = function (Env, publicKey, channelList, cb) {
} else {
oldChannels = [];
}
removePinned(Env, publicKey, oldChannels, () => {
addPinned(Env, publicKey, channelList, ()=>{});
removePinned(Env, safeKey, oldChannels, () => {
addPinned(Env, safeKey, channelList, ()=>{});
});
// update in-memory cache IFF the reset was allowed.
session.channels = pins;
getHash(Env, publicKey, function (e, hash) {
getHash(Env, safeKey, function (e, hash) {
cb(e, hash);
});
});
@ -429,35 +486,74 @@ Pinning.getDeletedPads = function (Env, channels, cb) {
});
};
const answerNoConclusively = function (Env) {
const pending = Env.pendingPinInquiries;
for (var channel in pending) {
answerDeferred(Env, channel, false);
}
};
// inform that the
Pinning.loadChannelPins = function (Env) {
Pins.list(function (err, data) {
const stats = {
surplus: 0,
pinned: 0,
duplicated: 0,
users: 0, // XXX useful for admin panel ?
};
const handler = function (ref, safeKey, pinned) {
if (ref.surplus) {
stats.surplus += ref.surplus;
}
for (var channel in ref.pins) {
if (!pinned.hasOwnProperty(channel)) {
answerDeferred(Env, channel, true);
stats.pinned++;
} else {
stats.duplicated++;
}
}
stats.users++;
runUserDeferred(Env, safeKey);
};
Pins.list(function (err) {
if (err) {
Env.pinsLoaded = true;
Env.Log.error("LOAD_CHANNEL_PINS", err);
// FIXME not sure what should be done here instead
Env.pinnedPads = {};
Env.evPinnedPadsReady.fire();
return;
}
Env.pinnedPads = data;
Env.evPinnedPadsReady.fire();
Env.pinsLoaded = true;
answerNoConclusively(Env);
runRemainingDeferred(Env);
}, {
pinPath: Env.paths.pin,
handler: handler,
pinned: Env.pinnedPads,
workers: Env.pinWorkers,
});
};
Pinning.isChannelPinned = function (Env, channel, cb) {
Env.evPinnedPadsReady.reg(() => {
if (Env.pinnedPads[channel] && Object.keys(Env.pinnedPads[channel]).length) { // FIXME 'Object.keys' here is overkill. We only need to know that it isn't empty
cb(void 0, true);
} else {
delete Env.pinnedPads[channel];
cb(void 0, false);
}
});
const deferResponse = function (Env, channel, cb) {
const pending = Env.pendingPinInquiries;
(pending[channel] = pending[channel] || []).push(cb);
};
Pinning.isChannelPinned = function (Env, channel, cb) {
// if the pins are fully loaded then you can answer yes/no definitively
if (Env.pinsLoaded) {
return void cb(void 0, !isEmpty(Env.pinnedPads[channel]));
}
// you may already know that a channel is pinned
// even if you're still loading. answer immediately if so
if (!isEmpty(Env.pinnedPads[channel])) { return cb(void 0, true); }
// if you're still loading them then can answer 'yes' as soon
// as you learn that one account has pinned a file.
// negative responses have to wait until the end
deferResponse(Env, channel, cb);
};

@ -94,13 +94,26 @@ Pins.calculateFromLog = function (pinFile, fileName) {
const getSafeKeyFromPath = function (path) {
return path.replace(/^.*\//, '').replace(/\.ndjson/, '');
}
};
const addUserPinToState = Pins.addUserPinToState = function (state, safeKey, itemId) {
(state[itemId] = state[itemId] || {})[safeKey] = 1;
};
Pins.list = function (_done, config) {
// allow for a configurable pin store location
const pinPath = config.pinPath || './data/pins';
// allow for a configurable amount of parallelism
const plan = Plan(config.workers || 5);
// run a supplied handler whenever you finish reading a log
// or noop if not supplied.
const handler = config.handler || function () {};
// use and mutate a supplied object for state if it's passed
const pinned = config.pinned || {};
var isDone = false;
// ensure that 'done' is only called once
// that it calls back asynchronously
@ -110,20 +123,10 @@ Pins.list = function (_done, config) {
isDone = true;
}));
// TODO externalize this via optional handlers?
const stats = {
logs: 0,
dirs: 0,
pinned: 0,
lines: 0,
};
const errorHandler = function (label, info) {
console.log(label, info);
};
const pinned = {};
// TODO replace this with lib-readline?
const streamFile = function (path, cb) {
const id = getSafeKeyFromPath(path);
@ -133,7 +136,6 @@ Pins.list = function (_done, config) {
const ref = {};
const pinHandler = createLineHandler(ref, errorHandler);
var lines = body.split('\n');
stats.lines += lines.length;
lines.forEach(pinHandler);
handler(ref, id, pinned);
cb(void 0, ref);
@ -156,7 +158,7 @@ Pins.list = function (_done, config) {
scanDirectory(pinPath, function (err, dirs) {
if (err) {
if (err.code === 'ENOENT') { return void cb(void 0, {}); }
if (err.code === 'ENOENT') { return void done(void 0, {}); }
return void done(err);
}
dirs.forEach(function (dir) {
@ -166,21 +168,16 @@ Pins.list = function (_done, config) {
if (nested_err) {
return void done(err);
}
stats.dirs++;
logs.forEach(function (log) {
if (!/\.ndjson$/.test(log.path)) { return; }
plan.job(0, function (next) {
if (isDone) { return void next(); }
streamFile(log.path, function (err, ref) {
if (err) { return void done(err); }
stats.logs++;
var set = ref.pins;
for (var item in set) {
(pinned[item] = pinned[item] || {})[log.id] = 1;
if (!pinned.hasOwnProperty(item)) {
stats.pinned++;
}
addUserPinToState(pinned, log.id, item);
}
next();
});

@ -2,7 +2,6 @@
const nThen = require("nthen");
const Util = require("./common-util");
const mkEvent = Util.mkEvent;
const Core = require("./commands/core");
const Admin = require("./commands/admin-rpc");
@ -219,9 +218,14 @@ RPC.create = function (config, cb) {
Sessions: {},
paths: {},
msgStore: config.store,
pinStore: undefined,
pinnedPads: {},
evPinnedPadsReady: mkEvent(true),
pinsLoaded: false,
pendingPinInquiries: {},
pendingUnpins: {},
pinWorkers: 5,
limits: {},
admins: [],
Log: Log,

Loading…
Cancel
Save