diff --git a/lib/commands/core.js b/lib/commands/core.js new file mode 100644 index 000000000..7e34232cb --- /dev/null +++ b/lib/commands/core.js @@ -0,0 +1,42 @@ +/*jshint esversion: 6 */ +const Core = module.exports; +const Util = require("../common-util"); +const escapeKeyCharacters = Util.escapeKeyCharacters; + +Core.DEFAULT_LIMIT = 50 * 1024 * 1024; +Core.SESSION_EXPIRATION_TIME = 60 * 1000; + +Core.isValidId = function (chan) { + return chan && chan.length && /^[a-zA-Z0-9=+-]*$/.test(chan) && + [32, 48].indexOf(chan.length) > -1; +}; + +var makeToken = Core.makeToken = function () { + return Number(Math.floor(Math.random() * Number.MAX_SAFE_INTEGER)) + .toString(16); +}; + +Core.getSession = function (Sessions, key) { + var safeKey = escapeKeyCharacters(key); + if (Sessions[safeKey]) { + Sessions[safeKey].atime = +new Date(); + return Sessions[safeKey]; + } + var user = Sessions[safeKey] = {}; + user.atime = +new Date(); + user.tokens = [ + makeToken() + ]; + return user; +}; + + + +// getChannelList +// getSession +// getHash +// getMultipleFileSize +// sumChannelSizes +// getFreeSpace +// getLimit + diff --git a/lib/commands/pin-rpc.js b/lib/commands/pin-rpc.js new file mode 100644 index 000000000..3412f329e --- /dev/null +++ b/lib/commands/pin-rpc.js @@ -0,0 +1,399 @@ +/*jshint esversion: 6 */ +const Core = require("./core"); + +const BatchRead = require("../batch-read"); +const Pins = require("../pins"); + +const Pinning = module.exports; +const Nacl = require("tweetnacl/nacl-fast"); +const Util = require("../common-util"); +const nThen = require("nthen"); + +//const escapeKeyCharacters = Util.escapeKeyCharacters; +const unescapeKeyCharacters = Util.unescapeKeyCharacters; + +var sumChannelSizes = function (sizes) { + return Object.keys(sizes).map(function (id) { return sizes[id]; }) + .filter(function (x) { + // only allow positive numbers + return !(typeof(x) !== 'number' || x <= 0); + }) + .reduce(function (a, b) { return a + b; }, 0); +}; + +// XXX it's possible for this to respond before the server has had a chance +// to fetch the limits. Maybe we should respond with an error... +// or wait until we actually know the limits before responding +var getLimit = Pinning.getLimit = function (Env, publicKey, cb) { + var unescapedKey = unescapeKeyCharacters(publicKey); + var limit = Env.limits[unescapedKey]; + var defaultLimit = typeof(Env.defaultStorageLimit) === 'number'? + Env.defaultStorageLimit: Core.DEFAULT_LIMIT; + + var toSend = limit && typeof(limit.limit) === "number"? + [limit.limit, limit.plan, limit.note] : [defaultLimit, '', '']; + + cb(void 0, toSend); +}; + +var addPinned = function ( + Env, + publicKey /*:string*/, + channelList /*Array*/, + cb /*:()=>void*/) +{ + Env.evPinnedPadsReady.reg(() => { + channelList.forEach((c) => { + const x = Env.pinnedPads[c] = Env.pinnedPads[c] || {}; + x[publicKey] = 1; + }); + cb(); + }); +}; +var removePinned = function ( + Env, + publicKey /*:string*/, + channelList /*Array*/, + cb /*:()=>void*/) +{ + Env.evPinnedPadsReady.reg(() => { + channelList.forEach((c) => { + const x = Env.pinnedPads[c]; + if (!x) { return; } + delete x[publicKey]; + }); + cb(); + }); +}; + +var getMultipleFileSize = function (Env, channels, cb) { + if (!Array.isArray(channels)) { return cb('INVALID_PIN_LIST'); } + if (typeof(Env.msgStore.getChannelSize) !== 'function') { + return cb('GET_CHANNEL_SIZE_UNSUPPORTED'); + } + + var i = channels.length; + var counts = {}; + + var done = function () { + i--; + if (i === 0) { return cb(void 0, counts); } + }; + + channels.forEach(function (channel) { + Pinning.getFileSize(Env, channel, function (e, size) { + if (e) { + // most likely error here is that a file no longer exists + // but a user still has it in their drive, and wants to know + // its size. We should find a way to inform them of this in + // the future. For now we can just tell them it has no size. + + //WARN('getFileSize', e); + counts[channel] = 0; + return done(); + } + counts[channel] = size; + done(); + }); + }); +}; + +const batchUserPins = BatchRead("LOAD_USER_PINS"); +var loadUserPins = function (Env, publicKey, cb) { + var session = Core.getSession(Env.Sessions, publicKey); + + if (session.channels) { + return cb(session.channels); + } + + batchUserPins(publicKey, cb, function (done) { + var ref = {}; + var lineHandler = Pins.createLineHandler(ref, function (label, data) { + Env.Log.error(label, { + log: publicKey, + data: data, + }); + }); + + // if channels aren't in memory. load them from disk + Env.pinStore.getMessages(publicKey, lineHandler, function () { + // no more messages + + // only put this into the cache if it completes + session.channels = ref.pins; + done(ref.pins); // FIXME no error handling? + }); + }); +}; + +var truthyKeys = function (O) { + return Object.keys(O).filter(function (k) { + return O[k]; + }); +}; + +var getChannelList = Pinning.getChannelList = function (Env, publicKey, _cb) { + var cb = Util.once(Util.mkAsync(_cb)); + loadUserPins(Env, publicKey, function (pins) { + cb(truthyKeys(pins)); + }); +}; + +const batchTotalSize = BatchRead("GET_TOTAL_SIZE"); +Pinning.getTotalSize = function (Env, publicKey, cb) { + var unescapedKey = unescapeKeyCharacters(publicKey); + var limit = Env.limits[unescapedKey]; + + // Get a common key if multiple users share the same quota, otherwise take the public key + var batchKey = (limit && Array.isArray(limit.users)) ? limit.users.join('') : publicKey; + + batchTotalSize(batchKey, cb, function (done) { + var channels = []; + var bytes = 0; + nThen(function (waitFor) { + // Get the channels list for our user account + Pinning.getChannelList(Env, publicKey, waitFor(function (_channels) { + if (!_channels) { + waitFor.abort(); + return done('INVALID_PIN_LIST'); + } + Array.prototype.push.apply(channels, _channels); + })); + // Get the channels list for users sharing our quota + if (limit && Array.isArray(limit.users) && limit.users.length > 1) { + limit.users.forEach(function (key) { + if (key === unescapedKey) { return; } // Don't count ourselves twice + getChannelList(Env, key, waitFor(function (_channels) { + if (!_channels) { return; } // Broken user, don't count their quota + Array.prototype.push.apply(channels, _channels); + })); + }); + } + }).nThen(function (waitFor) { + // Get size of the channels + var list = []; // Contains the channels already counted in the quota to avoid duplicates + channels.forEach(function (channel) { // TODO semaphore? + if (list.indexOf(channel) !== -1) { return; } + list.push(channel); + Pinning.getFileSize(Env, channel, waitFor(function (e, size) { + if (!e) { bytes += size; } + })); + }); + }).nThen(function () { + done(void 0, bytes); + }); + }); +}; + +/* Users should be able to clear their own pin log with an authenticated RPC +*/ +Pinning.removePins = function (Env, safeKey, cb) { + if (typeof(Env.pinStore.removeChannel) !== 'function') { + return void cb("E_NOT_IMPLEMENTED"); + } + Env.pinStore.removeChannel(safeKey, function (err) { + Env.Log.info('DELETION_PIN_BY_OWNER_RPC', { + safeKey: safeKey, + status: err? String(err): 'SUCCESS', + }); + + cb(err); + }); +}; + +Pinning.trimPins = function (Env, safeKey, cb) { + // XXX trim to latest pin checkpoint + cb("NOT_IMPLEMENTED"); +}; + +var getFreeSpace = Pinning.getFreeSpace = function (Env, publicKey, cb) { + getLimit(Env, publicKey, function (e, limit) { + if (e) { return void cb(e); } + Pinning.getTotalSize(Env, publicKey, function (e, size) { + if (typeof(size) === 'undefined') { return void cb(e); } + + var rem = limit[0] - size; + if (typeof(rem) !== 'number') { + return void cb('invalid_response'); + } + cb(void 0, rem); + }); + }); +}; + +var hashChannelList = function (A) { + var uniques = []; + + A.forEach(function (a) { + if (uniques.indexOf(a) === -1) { uniques.push(a); } + }); + uniques.sort(); + + var hash = Nacl.util.encodeBase64(Nacl.hash(Nacl + .util.decodeUTF8(JSON.stringify(uniques)))); + + return hash; +}; + +var getHash = Pinning.getHash = function (Env, publicKey, cb) { + getChannelList(Env, publicKey, function (channels) { + cb(void 0, hashChannelList(channels)); + }); +}; + +Pinning.pinChannel = function (Env, publicKey, channels, cb) { + if (!channels && channels.filter) { + return void cb('INVALID_PIN_LIST'); + } + + // get channel list ensures your session has a cached channel list + getChannelList(Env, publicKey, function (pinned) { + var session = Core.getSession(Env.Sessions, publicKey); + + // only pin channels which are not already pinned + var toStore = channels.filter(function (channel) { + return pinned.indexOf(channel) === -1; + }); + + if (toStore.length === 0) { + return void getHash(Env, publicKey, cb); + } + + getMultipleFileSize(Env, toStore, function (e, sizes) { + if (typeof(sizes) === 'undefined') { return void cb(e); } + var pinSize = sumChannelSizes(sizes); + + getFreeSpace(Env, publicKey, function (e, free) { + if (typeof(free) === 'undefined') { + Env.WARN('getFreeSpace', e); + return void cb(e); + } + if (pinSize > free) { return void cb('E_OVER_LIMIT'); } + + Env.pinStore.message(publicKey, JSON.stringify(['PIN', toStore, +new Date()]), + function (e) { + if (e) { return void cb(e); } + toStore.forEach(function (channel) { + session.channels[channel] = true; + }); + addPinned(Env, publicKey, toStore, () => {}); + getHash(Env, publicKey, cb); + }); + }); + }); + }); +}; + +Pinning.unpinChannel = function (Env, publicKey, channels, cb) { + if (!channels && channels.filter) { + // expected array + return void cb('INVALID_PIN_LIST'); + } + + getChannelList(Env, publicKey, function (pinned) { + var session = Core.getSession(Env.Sessions, publicKey); + + // only unpin channels which are pinned + var toStore = channels.filter(function (channel) { + return pinned.indexOf(channel) !== -1; + }); + + if (toStore.length === 0) { + return void getHash(Env, publicKey, cb); + } + + Env.pinStore.message(publicKey, JSON.stringify(['UNPIN', toStore, +new Date()]), + function (e) { + if (e) { return void cb(e); } + toStore.forEach(function (channel) { + delete session.channels[channel]; + }); + removePinned(Env, publicKey, toStore, () => {}); + getHash(Env, publicKey, cb); + }); + }); +}; + +Pinning.resetUserPins = function (Env, publicKey, channelList, cb) { + if (!Array.isArray(channelList)) { return void cb('INVALID_PIN_LIST'); } + var session = Core.getSession(Env.Sessions, publicKey); + + if (!channelList.length) { + return void getHash(Env, publicKey, function (e, hash) { + if (e) { return cb(e); } + cb(void 0, hash); + }); + } + + var pins = {}; + getMultipleFileSize(Env, channelList, function (e, sizes) { + if (typeof(sizes) === 'undefined') { return void cb(e); } + var pinSize = sumChannelSizes(sizes); + + + getLimit(Env, publicKey, function (e, limit) { + if (e) { + Env.WARN('[RESET_ERR]', e); + return void cb(e); + } + + /* we want to let people pin, even if they are over their limit, + but they should only be able to do this once. + + This prevents data loss in the case that someone registers, but + does not have enough free space to pin their migrated data. + + They will not be able to pin additional pads until they upgrade + or delete enough files to go back under their limit. */ + if (pinSize > limit[0] && session.hasPinned) { return void(cb('E_OVER_LIMIT')); } + Env.pinStore.message(publicKey, JSON.stringify(['RESET', channelList, +new Date()]), + function (e) { + if (e) { return void cb(e); } + channelList.forEach(function (channel) { + pins[channel] = true; + }); + + var oldChannels; + if (session.channels && typeof(session.channels) === 'object') { + oldChannels = Object.keys(session.channels); + } else { + oldChannels = []; + } + removePinned(Env, publicKey, oldChannels, () => { + addPinned(Env, publicKey, channelList, ()=>{}); + }); + + // update in-memory cache IFF the reset was allowed. + session.channels = pins; + getHash(Env, publicKey, function (e, hash) { + cb(e, hash); + }); + }); + }); + }); +}; + +Pinning.getFileSize = function (Env, channel, _cb) { + var cb = Util.once(Util.mkAsync(_cb)); + if (!Core.isValidId(channel)) { return void cb('INVALID_CHAN'); } + if (channel.length === 32) { + if (typeof(Env.msgStore.getChannelSize) !== 'function') { + return cb('GET_CHANNEL_SIZE_UNSUPPORTED'); + } + + return void Env.msgStore.getChannelSize(channel, function (e, size /*:number*/) { + if (e) { + if (e.code === 'ENOENT') { return void cb(void 0, 0); } + return void cb(e.code); + } + cb(void 0, size); + }); + } + + // 'channel' refers to a file, so you need another API + Env.blobStore.size(channel, function (e, size) { + if (typeof(size) === 'undefined') { return void cb(e); } + cb(void 0, size); + }); +}; + diff --git a/lib/rpc.js b/lib/rpc.js index f96e8a717..a064ebe64 100644 --- a/lib/rpc.js +++ b/lib/rpc.js @@ -15,10 +15,10 @@ const Package = require('../package.json'); const Pinned = require('../scripts/pinned'); const Saferphore = require("saferphore"); const nThen = require("nthen"); -const Pins = require("./pins"); const Meta = require("./metadata"); const WriteQueue = require("./write-queue"); const BatchRead = require("./batch-read"); +const Core = require("./commands/core"); const Util = require("./common-util"); const escapeKeyCharacters = Util.escapeKeyCharacters; @@ -26,15 +26,13 @@ const unescapeKeyCharacters = Util.unescapeKeyCharacters; const mkEvent = Util.mkEvent; const Admin = require("./commands/admin-rpc"); +const Pinning = require("./commands/pin-rpc"); var RPC = module.exports; var Store = require("../storage/file"); var BlobStore = require("../storage/blob"); -var DEFAULT_LIMIT = 50 * 1024 * 1024; -var SESSION_EXPIRATION_TIME = 60 * 1000; - var Log; var WARN = function (e, output) { @@ -47,16 +45,6 @@ var WARN = function (e, output) { } }; -var isValidId = function (chan) { - return chan && chan.length && /^[a-zA-Z0-9=+-]*$/.test(chan) && - [32, 48].indexOf(chan.length) > -1; -}; - -var makeToken = function () { - return Number(Math.floor(Math.random() * Number.MAX_SAFE_INTEGER)) - .toString(16); -}; - var makeCookie = function (token) { var time = (+new Date()); time -= time % 5000; @@ -81,20 +69,6 @@ var parseCookie = function (cookie) { return c; }; -var getSession = function (Sessions, key) { - var safeKey = escapeKeyCharacters(key); - if (Sessions[safeKey]) { - Sessions[safeKey].atime = +new Date(); - return Sessions[safeKey]; - } - var user = Sessions[safeKey] = {}; - user.atime = +new Date(); - user.tokens = [ - makeToken() - ]; - return user; -}; - var isTooOld = function (time, now) { return (now - time) > 300000; }; @@ -121,7 +95,7 @@ var expireSessions = function (Sessions) { var addTokenForKey = function (Sessions, publicKey, token) { if (!Sessions[publicKey]) { throw new Error('undefined user'); } - var user = getSession(Sessions, publicKey); + var user = Core.getSession(Sessions, publicKey); user.tokens.push(token); user.atime = +new Date(); if (user.tokens.length > 2) { user.tokens.shift(); } @@ -143,7 +117,7 @@ var isValidCookie = function (Sessions, publicKey, cookie) { return false; } - var user = getSession(Sessions, publicKey); + var user = Core.getSession(Sessions, publicKey); if (!user) { return false; } var idx = user.tokens.indexOf(parsed.seq); @@ -151,7 +125,7 @@ var isValidCookie = function (Sessions, publicKey, cookie) { if (idx > 0) { // make a new token - addTokenForKey(Sessions, publicKey, makeToken()); + addTokenForKey(Sessions, publicKey, Core.makeToken()); } return true; @@ -195,74 +169,9 @@ var checkSignature = function (signedMsg, signature, publicKey) { return Nacl.sign.detached.verify(signedBuffer, signatureBuffer, pubBuffer); }; -const batchUserPins = BatchRead("LOAD_USER_PINS"); -var loadUserPins = function (Env, publicKey, cb) { - var session = getSession(Env.Sessions, publicKey); - - if (session.channels) { - return cb(session.channels); - } - - batchUserPins(publicKey, cb, function (done) { - var ref = {}; - var lineHandler = Pins.createLineHandler(ref, function (label, data) { - Log.error(label, { - log: publicKey, - data: data, - }); - }); - - // if channels aren't in memory. load them from disk - Env.pinStore.getMessages(publicKey, lineHandler, function () { - // no more messages - - // only put this into the cache if it completes - session.channels = ref.pins; - done(ref.pins); // FIXME no error handling? - }); - }); -}; - -var truthyKeys = function (O) { - return Object.keys(O).filter(function (k) { - return O[k]; - }); -}; - -var getChannelList = function (Env, publicKey, _cb) { - var cb = Util.once(Util.mkAsync(_cb)); - loadUserPins(Env, publicKey, function (pins) { - cb(truthyKeys(pins)); - }); -}; - -var getFileSize = function (Env, channel, _cb) { - var cb = Util.once(Util.mkAsync(_cb)); - if (!isValidId(channel)) { return void cb('INVALID_CHAN'); } - if (channel.length === 32) { - if (typeof(Env.msgStore.getChannelSize) !== 'function') { - return cb('GET_CHANNEL_SIZE_UNSUPPORTED'); - } - - return void Env.msgStore.getChannelSize(channel, function (e, size /*:number*/) { - if (e) { - if (e.code === 'ENOENT') { return void cb(void 0, 0); } - return void cb(e.code); - } - cb(void 0, size); - }); - } - - // 'channel' refers to a file, so you need another API - Env.blobStore.size(channel, function (e, size) { - if (typeof(size) === 'undefined') { return void cb(e); } - cb(void 0, size); - }); -}; - const batchMetadata = BatchRead("GET_METADATA"); var getMetadata = function (Env, channel, cb) { - if (!isValidId(channel)) { return void cb('INVALID_CHAN'); } + if (!Core.isValidId(channel)) { return void cb('INVALID_CHAN'); } if (channel.length !== 32) { return cb("INVALID_CHAN_LENGTH"); } batchMetadata(channel, cb, function (done) { @@ -309,7 +218,7 @@ var queueMetadata = WriteQueue(); var setMetadata = function (Env, data, unsafeKey, cb) { var channel = data.channel; var command = data.command; - if (!channel || !isValidId(channel)) { return void cb ('INVALID_CHAN'); } + if (!channel || !Core.isValidId(channel)) { return void cb ('INVALID_CHAN'); } if (!command || typeof (command) !== 'string') { return void cb ('INVALID_COMMAND'); } if (Meta.commands.indexOf(command) === -1) { return void('UNSUPPORTED_COMMAND'); } @@ -382,38 +291,6 @@ var setMetadata = function (Env, data, unsafeKey, cb) { }); }; -var getMultipleFileSize = function (Env, channels, cb) { - if (!Array.isArray(channels)) { return cb('INVALID_PIN_LIST'); } - if (typeof(Env.msgStore.getChannelSize) !== 'function') { - return cb('GET_CHANNEL_SIZE_UNSUPPORTED'); - } - - var i = channels.length; - var counts = {}; - - var done = function () { - i--; - if (i === 0) { return cb(void 0, counts); } - }; - - channels.forEach(function (channel) { - getFileSize(Env, channel, function (e, size) { - if (e) { - // most likely error here is that a file no longer exists - // but a user still has it in their drive, and wants to know - // its size. We should find a way to inform them of this in - // the future. For now we can just tell them it has no size. - - //WARN('getFileSize', e); - counts[channel] = 0; - return done(); - } - counts[channel] = size; - done(); - }); - }); -}; - /* accepts a list, and returns a sublist of channel or file ids which seem to have been deleted from the server (file size 0) @@ -429,7 +306,7 @@ var getDeletedPads = function (Env, channels, cb) { var job = function (channel, wait) { return function (give) { - getFileSize(Env, channel, wait(give(function (e, size) { + Pinning.getFileSize(Env, channel, wait(give(function (e, size) { if (e) { return; } if (size === 0) { absentees.push(channel); } }))); @@ -445,72 +322,6 @@ var getDeletedPads = function (Env, channels, cb) { }); }; -const batchTotalSize = BatchRead("GET_TOTAL_SIZE"); -var getTotalSize = function (Env, publicKey, cb) { - var unescapedKey = unescapeKeyCharacters(publicKey); - var limit = Env.limits[unescapedKey]; - - // Get a common key if multiple users share the same quota, otherwise take the public key - var batchKey = (limit && Array.isArray(limit.users)) ? limit.users.join('') : publicKey; - - batchTotalSize(batchKey, cb, function (done) { - var channels = []; - var bytes = 0; - nThen(function (waitFor) { - // Get the channels list for our user account - getChannelList(Env, publicKey, waitFor(function (_channels) { - if (!_channels) { - waitFor.abort(); - return done('INVALID_PIN_LIST'); - } - Array.prototype.push.apply(channels, _channels); - })); - // Get the channels list for users sharing our quota - if (limit && Array.isArray(limit.users) && limit.users.length > 1) { - limit.users.forEach(function (key) { - if (key === unescapedKey) { return; } // Don't count ourselves twice - getChannelList(Env, key, waitFor(function (_channels) { - if (!_channels) { return; } // Broken user, don't count their quota - Array.prototype.push.apply(channels, _channels); - })); - }); - } - }).nThen(function (waitFor) { - // Get size of the channels - var list = []; // Contains the channels already counted in the quota to avoid duplicates - channels.forEach(function (channel) { // TODO semaphore? - if (list.indexOf(channel) !== -1) { return; } - list.push(channel); - getFileSize(Env, channel, waitFor(function (e, size) { - if (!e) { bytes += size; } - })); - }); - }).nThen(function () { - done(void 0, bytes); - }); - }); -}; - -var hashChannelList = function (A) { - var uniques = []; - - A.forEach(function (a) { - if (uniques.indexOf(a) === -1) { uniques.push(a); } - }); - uniques.sort(); - - var hash = Nacl.util.encodeBase64(Nacl.hash(Nacl - .util.decodeUTF8(JSON.stringify(uniques)))); - - return hash; -}; - -var getHash = function (Env, publicKey, cb) { - getChannelList(Env, publicKey, function (channels) { - cb(void 0, hashChannelList(channels)); - }); -}; - var applyCustomLimits = function (Env, config) { var isLimit = function (o) { var valid = o && typeof(o) === 'object' && @@ -551,7 +362,7 @@ var updateLimits = function (Env, config, publicKey, cb /*:(?string, ?any[])=>vo if (typeof cb !== "function") { cb = function () {}; } var defaultLimit = typeof(config.defaultStorageLimit) === 'number'? - config.defaultStorageLimit: DEFAULT_LIMIT; + config.defaultStorageLimit: Core.DEFAULT_LIMIT; var userId; if (publicKey) { @@ -612,45 +423,6 @@ var updateLimits = function (Env, config, publicKey, cb /*:(?string, ?any[])=>vo req.end(body); }; -// XXX it's possible for this to respond before the server has had a chance -// to fetch the limits. Maybe we should respond with an error... -// or wait until we actually know the limits before responding -var getLimit = function (Env, publicKey, cb) { - var unescapedKey = unescapeKeyCharacters(publicKey); - var limit = Env.limits[unescapedKey]; - var defaultLimit = typeof(Env.defaultStorageLimit) === 'number'? - Env.defaultStorageLimit: DEFAULT_LIMIT; - - var toSend = limit && typeof(limit.limit) === "number"? - [limit.limit, limit.plan, limit.note] : [defaultLimit, '', '']; - - cb(void 0, toSend); -}; - -var getFreeSpace = function (Env, publicKey, cb) { - getLimit(Env, publicKey, function (e, limit) { - if (e) { return void cb(e); } - getTotalSize(Env, publicKey, function (e, size) { - if (typeof(size) === 'undefined') { return void cb(e); } - - var rem = limit[0] - size; - if (typeof(rem) !== 'number') { - return void cb('invalid_response'); - } - cb(void 0, rem); - }); - }); -}; - -var sumChannelSizes = function (sizes) { - return Object.keys(sizes).map(function (id) { return sizes[id]; }) - .filter(function (x) { - // only allow positive numbers - return !(typeof(x) !== 'number' || x <= 0); - }) - .reduce(function (a, b) { return a + b; }, 0); -}; - // inform that the var loadChannelPins = function (Env) { Pinned.load(function (err, data) { @@ -670,35 +442,7 @@ var loadChannelPins = function (Env) { pinPath: Env.paths.pin, }); }; -var addPinned = function ( - Env, - publicKey /*:string*/, - channelList /*Array*/, - cb /*:()=>void*/) -{ - Env.evPinnedPadsReady.reg(() => { - channelList.forEach((c) => { - const x = Env.pinnedPads[c] = Env.pinnedPads[c] || {}; - x[publicKey] = 1; - }); - cb(); - }); -}; -var removePinned = function ( - Env, - publicKey /*:string*/, - channelList /*Array*/, - cb /*:()=>void*/) -{ - Env.evPinnedPadsReady.reg(() => { - channelList.forEach((c) => { - const x = Env.pinnedPads[c]; - if (!x) { return; } - delete x[publicKey]; - }); - cb(); - }); -}; + var isChannelPinned = function (Env, channel, cb) { Env.evPinnedPadsReady.reg(() => { if (Env.pinnedPads[channel] && Object.keys(Env.pinnedPads[channel]).length) { @@ -710,138 +454,6 @@ var isChannelPinned = function (Env, channel, cb) { }); }; -var pinChannel = function (Env, publicKey, channels, cb) { - if (!channels && channels.filter) { - return void cb('INVALID_PIN_LIST'); - } - - // get channel list ensures your session has a cached channel list - getChannelList(Env, publicKey, function (pinned) { - var session = getSession(Env.Sessions, publicKey); - - // only pin channels which are not already pinned - var toStore = channels.filter(function (channel) { - return pinned.indexOf(channel) === -1; - }); - - if (toStore.length === 0) { - return void getHash(Env, publicKey, cb); - } - - getMultipleFileSize(Env, toStore, function (e, sizes) { - if (typeof(sizes) === 'undefined') { return void cb(e); } - var pinSize = sumChannelSizes(sizes); - - getFreeSpace(Env, publicKey, function (e, free) { - if (typeof(free) === 'undefined') { - WARN('getFreeSpace', e); - return void cb(e); - } - if (pinSize > free) { return void cb('E_OVER_LIMIT'); } - - Env.pinStore.message(publicKey, JSON.stringify(['PIN', toStore, +new Date()]), - function (e) { - if (e) { return void cb(e); } - toStore.forEach(function (channel) { - session.channels[channel] = true; - }); - addPinned(Env, publicKey, toStore, () => {}); - getHash(Env, publicKey, cb); - }); - }); - }); - }); -}; - -var unpinChannel = function (Env, publicKey, channels, cb) { - if (!channels && channels.filter) { - // expected array - return void cb('INVALID_PIN_LIST'); - } - - getChannelList(Env, publicKey, function (pinned) { - var session = getSession(Env.Sessions, publicKey); - - // only unpin channels which are pinned - var toStore = channels.filter(function (channel) { - return pinned.indexOf(channel) !== -1; - }); - - if (toStore.length === 0) { - return void getHash(Env, publicKey, cb); - } - - Env.pinStore.message(publicKey, JSON.stringify(['UNPIN', toStore, +new Date()]), - function (e) { - if (e) { return void cb(e); } - toStore.forEach(function (channel) { - delete session.channels[channel]; - }); - removePinned(Env, publicKey, toStore, () => {}); - getHash(Env, publicKey, cb); - }); - }); -}; - -var resetUserPins = function (Env, publicKey, channelList, cb) { - if (!Array.isArray(channelList)) { return void cb('INVALID_PIN_LIST'); } - var session = getSession(Env.Sessions, publicKey); - - if (!channelList.length) { - return void getHash(Env, publicKey, function (e, hash) { - if (e) { return cb(e); } - cb(void 0, hash); - }); - } - - var pins = {}; - getMultipleFileSize(Env, channelList, function (e, sizes) { - if (typeof(sizes) === 'undefined') { return void cb(e); } - var pinSize = sumChannelSizes(sizes); - - - getLimit(Env, publicKey, function (e, limit) { - if (e) { - WARN('[RESET_ERR]', e); - return void cb(e); - } - - /* we want to let people pin, even if they are over their limit, - but they should only be able to do this once. - - This prevents data loss in the case that someone registers, but - does not have enough free space to pin their migrated data. - - They will not be able to pin additional pads until they upgrade - or delete enough files to go back under their limit. */ - if (pinSize > limit[0] && session.hasPinned) { return void(cb('E_OVER_LIMIT')); } - Env.pinStore.message(publicKey, JSON.stringify(['RESET', channelList, +new Date()]), - function (e) { - if (e) { return void cb(e); } - channelList.forEach(function (channel) { - pins[channel] = true; - }); - - var oldChannels; - if (session.channels && typeof(session.channels) === 'object') { - oldChannels = Object.keys(session.channels); - } else { - oldChannels = []; - } - removePinned(Env, publicKey, oldChannels, () => { - addPinned(Env, publicKey, channelList, ()=>{}); - }); - - // update in-memory cache IFF the reset was allowed. - session.channels = pins; - getHash(Env, publicKey, function (e, hash) { - cb(e, hash); - }); - }); - }); - }); -}; - var clearOwnedChannel = function (Env, channelId, unsafeKey, cb) { if (typeof(channelId) !== 'string' || channelId.length !== 32) { return cb('INVALID_ARGUMENTS'); @@ -861,7 +473,7 @@ var clearOwnedChannel = function (Env, channelId, unsafeKey, cb) { }; var removeOwnedChannel = function (Env, channelId, unsafeKey, cb) { - if (typeof(channelId) !== 'string' || !isValidId(channelId)) { + if (typeof(channelId) !== 'string' || !Core.isValidId(channelId)) { return cb('INVALID_ARGUMENTS'); } @@ -948,26 +560,6 @@ var removeOwnedChannelHistory = function (Env, channelId, unsafeKey, hash, cb) { }); }; -/* Users should be able to clear their own pin log with an authenticated RPC -*/ -var removePins = function (Env, safeKey, cb) { - if (typeof(Env.pinStore.removeChannel) !== 'function') { - return void cb("E_NOT_IMPLEMENTED"); - } - Env.pinStore.removeChannel(safeKey, function (err) { - Log.info('DELETION_PIN_BY_OWNER_RPC', { - safeKey: safeKey, - status: err? String(err): 'SUCCESS', - }); - - cb(err); - }); -}; - -var trimPins = function (Env, safeKey, cb) { - // XXX trim to latest pin checkpoint - cb("NOT_IMPLEMENTED"); -}; /* We assume that the server is secured against MitM attacks @@ -1136,7 +728,7 @@ var ARRAY_LINE = /^\[/; otherwise false */ var isNewChannel = function (Env, channel, cb) { - if (!isValidId(channel)) { return void cb('INVALID_CHAN'); } + if (!Core.isValidId(channel)) { return void cb('INVALID_CHAN'); } if (channel.length !== 32) { return void cb('INVALID_CHAN'); } var done = false; @@ -1172,7 +764,7 @@ var writePrivateMessage = function (Env, args, nfwssCtx, cb) { if (!msg) { return void cb("INVALID_MESSAGE"); } // don't support anything except regular channels - if (!isValidId(channelId) || channelId.length !== 32) { + if (!Core.isValidId(channelId) || channelId.length !== 32) { return void cb("INVALID_CHAN"); } @@ -1270,7 +862,7 @@ var upload_status = function (Env, safeKey, filesize, _cb) { // FIXME FILES }).nThen(function () { // if yuo're here then there are no pending uploads // check if you have space in your quota to upload something of this size - getFreeSpace(Env, safeKey, function (e, free) { + Pinning.getFreeSpace(Env, safeKey, function (e, free) { if (e) { return void cb(e); } if (filesize >= free) { return cb('NOT_ENOUGH_SPACE'); } cb(void 0, false); @@ -1300,6 +892,8 @@ RPC.create = function (config, cb) { limits: {}, admins: [], sessionExpirationInterval: undefined, + Log: Log, + WARN: WARN, }; try { @@ -1344,7 +938,7 @@ RPC.create = function (config, cb) { break; } case 'GET_FILE_SIZE': - return void getFileSize(Env, msg[1], function (e, size) { + return void Pinning.getFileSize(Env, msg[1], function (e, size) { WARN(e, msg[1]); respond(e, [null, size, null]); }); @@ -1354,7 +948,7 @@ RPC.create = function (config, cb) { respond(e, [null, data, null]); }); case 'GET_MULTIPLE_FILE_SIZE': - return void getMultipleFileSize(Env, msg[1], function (e, dict) { + return void Pinning.getMultipleFileSize(Env, msg[1], function (e, dict) { if (e) { WARN(e, dict); return respond(e); @@ -1414,7 +1008,7 @@ RPC.create = function (config, cb) { // make sure a user object is initialized in the cookie jar if (publicKey) { - getSession(Sessions, publicKey); + Core.getSession(Sessions, publicKey); } else { Log.debug("NO_PUBLIC_KEY_PROVIDED", publicKey); } @@ -1471,38 +1065,33 @@ RPC.create = function (config, cb) { switch (msg[0]) { case 'COOKIE': return void Respond(void 0); case 'RESET': - return resetUserPins(Env, safeKey, msg[1], function (e, hash) { + return Pinning.resetUserPins(Env, safeKey, msg[1], function (e, hash) { //WARN(e, hash); return void Respond(e, hash); }); case 'PIN': - return pinChannel(Env, safeKey, msg[1], function (e, hash) { + return Pinning.pinChannel(Env, safeKey, msg[1], function (e, hash) { WARN(e, hash); Respond(e, hash); }); case 'UNPIN': - return unpinChannel(Env, safeKey, msg[1], function (e, hash) { + return Pinning.unpinChannel(Env, safeKey, msg[1], function (e, hash) { WARN(e, hash); Respond(e, hash); }); case 'GET_HASH': - return void getHash(Env, safeKey, function (e, hash) { + return void Pinning.getHash(Env, safeKey, function (e, hash) { WARN(e, hash); Respond(e, hash); }); case 'GET_TOTAL_SIZE': // TODO cache this, since it will get called quite a bit - return getTotalSize(Env, safeKey, function (e, size) { + return Pinning.getTotalSize(Env, safeKey, function (e, size) { if (e) { WARN(e, safeKey); return void Respond(e); } Respond(e, size); }); - case 'GET_FILE_SIZE': - return void getFileSize(Env, msg[1], function (e, size) { - WARN(e, msg[1]); - Respond(e, size); - }); case 'UPDATE_LIMITS': return void updateLimits(Env, config, safeKey, function (e, limit) { if (e) { @@ -1512,21 +1101,13 @@ RPC.create = function (config, cb) { Respond(void 0, limit); }); case 'GET_LIMIT': - return void getLimit(Env, safeKey, function (e, limit) { + return void Pinning.getLimit(Env, safeKey, function (e, limit) { if (e) { WARN(e, limit); return void Respond(e); } Respond(void 0, limit); }); - case 'GET_MULTIPLE_FILE_SIZE': - return void getMultipleFileSize(Env, msg[1], function (e, dict) { - if (e) { - WARN(e, dict); - return void Respond(e); - } - Respond(void 0, dict); - }); case 'EXPIRE_SESSION': return void setTimeout(function () { expireSession(Sessions, safeKey); @@ -1549,12 +1130,12 @@ RPC.create = function (config, cb) { Respond(void 0, 'OK'); }); case 'REMOVE_PINS': - return void removePins(Env, safeKey, function (e) { + return void Pinning.removePins(Env, safeKey, function (e) { if (e) { return void Respond(e); } Respond(void 0, "OK"); }); case 'TRIM_PINS': - return void trimPins(Env, safeKey, function (e) { + return void Pinning.trimPins(Env, safeKey, function (e) { if (e) { return void Respond(e); } Respond(void 0, "OK"); }); @@ -1568,7 +1149,7 @@ RPC.create = function (config, cb) { return void upload_status(Env, safeKey, filesize, function (e, yes) { if (!e && !yes) { // no pending uploads, set the new size - var user = getSession(Sessions, safeKey); + var user = Core.getSession(Sessions, safeKey); user.pendingUploadSize = filesize; user.currentUploadSize = 0; } @@ -1665,7 +1246,7 @@ RPC.create = function (config, cb) { blobStagingPath: config.blobStagingPath, archivePath: config.archivePath, getSession: function (safeKey) { - return getSession(Sessions, safeKey); + return Core.getSession(Sessions, safeKey); }, }, w(function (err, blob) { if (err) { throw new Error(err); } @@ -1677,6 +1258,6 @@ RPC.create = function (config, cb) { // XXX allow for graceful shutdown Env.sessionExpirationInterval = setInterval(function () { expireSessions(Sessions); - }, SESSION_EXPIRATION_TIME); + }, Core.SESSION_EXPIRATION_TIME); }); };