From 70a0d4efb451aed2aed4c57fc21b2f14e5f0665d Mon Sep 17 00:00:00 2001 From: ansuz Date: Wed, 25 Mar 2020 17:43:57 -0400 Subject: [PATCH] move more database reads into the database worker --- lib/commands/pin-rpc.js | 277 ++--------------------------------- lib/historyKeeper.js | 3 + lib/hk-util.js | 62 ++++++-- lib/workers/compute-index.js | 129 ++++++++++++++++ 4 files changed, 193 insertions(+), 278 deletions(-) diff --git a/lib/commands/pin-rpc.js b/lib/commands/pin-rpc.js index b831dbb37..d77732ce9 100644 --- a/lib/commands/pin-rpc.js +++ b/lib/commands/pin-rpc.js @@ -1,14 +1,10 @@ /*jshint esversion: 6 */ const Core = require("./core"); -const BatchRead = require("../batch-read"); -const Pins = require("../pins"); - const Pinning = module.exports; const Nacl = require("tweetnacl/nacl-fast"); const Util = require("../common-util"); const nThen = require("nthen"); -const Saferphore = require("saferphore"); //const escapeKeyCharacters = Util.escapeKeyCharacters; const unescapeKeyCharacters = Util.unescapeKeyCharacters; @@ -37,123 +33,8 @@ var getLimit = Pinning.getLimit = function (Env, safeKey, cb) { cb(void 0, toSend); }; -const answerDeferred = function (Env, channel, bool) { - const pending = Env.pendingPinInquiries; - const stack = pending[channel]; - if (!Array.isArray(stack)) { return; } - - delete pending[channel]; - - stack.forEach(function (cb) { - cb(void 0, bool); - }); -}; - -var addPinned = function ( - Env, - safeKey /*:string*/, - channelList /*Array*/, - cb /*:()=>void*/) -{ - channelList.forEach(function (channel) { - Pins.addUserPinToState(Env.pinnedPads, safeKey, channel); - answerDeferred(Env, channel, true); - }); - cb(); -}; - -const isEmpty = function (obj) { - if (!obj || typeof(obj) !== 'object') { return true; } - for (var key in obj) { - if (obj.hasOwnProperty(key)) { return true; } - } - return false; -}; - -const deferUserTask = function (Env, safeKey, deferred) { - const pending = Env.pendingUnpins; - (pending[safeKey] = pending[safeKey] || []).push(deferred); -}; - -const runUserDeferred = function (Env, safeKey) { - const pending = Env.pendingUnpins; - const stack = pending[safeKey]; - if (!Array.isArray(stack)) { return; } - delete pending[safeKey]; - - stack.forEach(function (cb) { - cb(); - }); -}; - -const runRemainingDeferred = function (Env) { - const pending = Env.pendingUnpins; - for (var safeKey in pending) { - runUserDeferred(Env, safeKey); - } -}; - -const removeSelfFromPinned = function (Env, safeKey, channelList) { - channelList.forEach(function (channel) { - const channelPinStatus = Env.pinnedPads[channel]; - if (!channelPinStatus) { return; } - delete channelPinStatus[safeKey]; - if (isEmpty(channelPinStatus)) { - delete Env.pinnedPads[channel]; - } - }); -}; - -var removePinned = function ( - Env, - safeKey /*:string*/, - channelList /*Array*/, - cb /*:()=>void*/) -{ - - // if pins are already loaded then you can just unpin normally - if (Env.pinsLoaded) { - removeSelfFromPinned(Env, safeKey, channelList); - return void cb(); - } - - // otherwise defer until later... - deferUserTask(Env, safeKey, function () { - removeSelfFromPinned(Env, safeKey, channelList); - cb(); - }); -}; - var getMultipleFileSize = function (Env, channels, cb) { - if (!Array.isArray(channels)) { return cb('INVALID_PIN_LIST'); } - if (typeof(Env.msgStore.getChannelSize) !== 'function') { - return cb('GET_CHANNEL_SIZE_UNSUPPORTED'); - } - - var i = channels.length; - var counts = {}; - - var done = function () { - i--; - if (i === 0) { return cb(void 0, counts); } - }; - - channels.forEach(function (channel) { - Pinning.getFileSize(Env, channel, function (e, size) { - if (e) { - // most likely error here is that a file no longer exists - // but a user still has it in their drive, and wants to know - // its size. We should find a way to inform them of this in - // the future. For now we can just tell them it has no size. - - //WARN('getFileSize', e); - counts[channel] = 0; - return done(); - } - counts[channel] = size; - done(); - }); - }); + Env.getMultipleFileSize(channels, cb); }; var loadUserPins = function (Env, safeKey, cb) { @@ -188,7 +69,6 @@ var getChannelList = Pinning.getChannelList = function (Env, safeKey, _cb) { }); }; -const batchTotalSize = BatchRead("GET_TOTAL_SIZE"); Pinning.getTotalSize = function (Env, safeKey, cb) { var unsafeKey = unescapeKeyCharacters(safeKey); var limit = Env.limits[unsafeKey]; @@ -196,9 +76,14 @@ Pinning.getTotalSize = function (Env, safeKey, cb) { // Get a common key if multiple users share the same quota, otherwise take the public key var batchKey = (limit && Array.isArray(limit.users)) ? limit.users.join('') : safeKey; - batchTotalSize(batchKey, cb, function (done) { + Env.batchTotalSize(batchKey, cb, function (done) { var channels = []; - var bytes = 0; + + var addUnique = function (channel) { + if (channels.indexOf(channel) !== -1) { return; } + channels.push(channel); + }; + nThen(function (waitFor) { // Get the channels list for our user account getChannelList(Env, safeKey, waitFor(function (_channels) { @@ -206,7 +91,7 @@ Pinning.getTotalSize = function (Env, safeKey, cb) { waitFor.abort(); return done('INVALID_PIN_LIST'); } - Array.prototype.push.apply(channels, _channels); + _channels.forEach(addUnique); })); // Get the channels list for users sharing our quota if (limit && Array.isArray(limit.users) && limit.users.length > 1) { @@ -214,22 +99,12 @@ Pinning.getTotalSize = function (Env, safeKey, cb) { if (key === unsafeKey) { return; } // Don't count ourselves twice getChannelList(Env, key, waitFor(function (_channels) { if (!_channels) { return; } // Broken user, don't count their quota - Array.prototype.push.apply(channels, _channels); + _channels.forEach(addUnique); })); }); } - }).nThen(function (waitFor) { - // Get size of the channels - var list = []; // Contains the channels already counted in the quota to avoid duplicates - channels.forEach(function (channel) { // TODO semaphore? - if (list.indexOf(channel) !== -1) { return; } - list.push(channel); - Pinning.getFileSize(Env, channel, waitFor(function (e, size) { - if (!e) { bytes += size; } - })); - }); }).nThen(function () { - done(void 0, bytes); + Env.getTotalSize(channels, done); }); }); }; @@ -237,9 +112,6 @@ Pinning.getTotalSize = function (Env, safeKey, cb) { /* Users should be able to clear their own pin log with an authenticated RPC */ Pinning.removePins = function (Env, safeKey, cb) { - if (typeof(Env.pinStore.removeChannel) !== 'function') { - return void cb("E_NOT_IMPLEMENTED"); - } Env.pinStore.removeChannel(safeKey, function (err) { Env.Log.info('DELETION_PIN_BY_OWNER_RPC', { safeKey: safeKey, @@ -325,7 +197,6 @@ Pinning.pinChannel = function (Env, safeKey, channels, cb) { toStore.forEach(function (channel) { session.channels[channel] = true; }); - addPinned(Env, safeKey, toStore, () => {}); getHash(Env, safeKey, cb); }); }); @@ -357,7 +228,6 @@ Pinning.unpinChannel = function (Env, safeKey, channels, cb) { toStore.forEach(function (channel) { delete session.channels[channel]; }); - removePinned(Env, safeKey, toStore, () => {}); getHash(Env, safeKey, cb); }); }); @@ -408,9 +278,6 @@ Pinning.resetUserPins = function (Env, safeKey, channelList, cb) { } else { oldChannels = []; } - removePinned(Env, safeKey, oldChannels, () => { - addPinned(Env, safeKey, channelList, ()=>{}); - }); // update in-memory cache IFF the reset was allowed. session.channels = pins; @@ -422,28 +289,8 @@ Pinning.resetUserPins = function (Env, safeKey, channelList, cb) { }); }; -Pinning.getFileSize = function (Env, channel, _cb) { - var cb = Util.once(Util.mkAsync(_cb)); - if (!Core.isValidId(channel)) { return void cb('INVALID_CHAN'); } - if (channel.length === 32) { - if (typeof(Env.msgStore.getChannelSize) !== 'function') { - return cb('GET_CHANNEL_SIZE_UNSUPPORTED'); - } - - return void Env.msgStore.getChannelSize(channel, function (e, size /*:number*/) { - if (e) { - if (e.code === 'ENOENT') { return void cb(void 0, 0); } - return void cb(e.code); - } - cb(void 0, size); - }); - } - - // 'channel' refers to a file, so you need another API - Env.blobStore.size(channel, function (e, size) { - if (typeof(size) === 'undefined') { return void cb(e); } - cb(void 0, size); - }); +Pinning.getFileSize = function (Env, channel, cb) { + Env.getFileSize(channel, cb); }; /* accepts a list, and returns a sublist of channel or file ids which seem @@ -453,107 +300,11 @@ Pinning.getFileSize = function (Env, channel, _cb) { ENOENT, but for now it's simplest to just rely on getFileSize... */ Pinning.getDeletedPads = function (Env, channels, cb) { - if (!Array.isArray(channels)) { return cb('INVALID_LIST'); } - var L = channels.length; - - var sem = Saferphore.create(10); - var absentees = []; - - var job = function (channel, wait) { - return function (give) { - Pinning.getFileSize(Env, channel, wait(give(function (e, size) { - if (e) { return; } - if (size === 0) { absentees.push(channel); } - }))); - }; - }; - - nThen(function (w) { - for (var i = 0; i < L; i++) { - sem.take(job(channels[i], w)); - } - }).nThen(function () { - cb(void 0, absentees); - }); + Env.getDeletedPads(channels, cb); }; -const answerNoConclusively = function (Env) { - const pending = Env.pendingPinInquiries; - for (var channel in pending) { - answerDeferred(Env, channel, false); - } -}; - -// inform that the -Pinning.loadChannelPins = function (Env) { - const stats = { - surplus: 0, - pinned: 0, - duplicated: 0, - // in theory we could use this number for the admin panel - // but we'd have to keep updating it whenever a new pin log - // was created or deleted. In practice it's probably not worth the trouble - users: 0, - }; - - const handler = function (ref, safeKey, pinned) { - if (ref.surplus) { - stats.surplus += ref.surplus; - } - for (var channel in ref.pins) { - if (!pinned.hasOwnProperty(channel)) { - answerDeferred(Env, channel, true); - stats.pinned++; - } else { - stats.duplicated++; - } - } - stats.users++; - runUserDeferred(Env, safeKey); - }; - - Pins.list(function (err) { - if (err) { - Env.pinsLoaded = true; - Env.Log.error("LOAD_CHANNEL_PINS", err); - return; - } - - Env.pinsLoaded = true; - answerNoConclusively(Env); - runRemainingDeferred(Env); - }, { - pinPath: Env.paths.pin, - handler: handler, - pinned: Env.pinnedPads, - workers: Env.pinWorkers, - }); -}; - -/* -const deferResponse = function (Env, channel, cb) { - const pending = Env.pendingPinInquiries; - (pending[channel] = pending[channel] || []).push(cb); -}; -*/ - // FIXME this will be removed from the client Pinning.isChannelPinned = function (Env, channel, cb) { return void cb(void 0, true); -/* - // if the pins are fully loaded then you can answer yes/no definitively - if (Env.pinsLoaded) { - return void cb(void 0, !isEmpty(Env.pinnedPads[channel])); - } - - // you may already know that a channel is pinned - // even if you're still loading. answer immediately if so - if (!isEmpty(Env.pinnedPads[channel])) { return cb(void 0, true); } - - // if you're still loading them then can answer 'yes' as soon - // as you learn that one account has pinned a file. - // negative responses have to wait until the end - deferResponse(Env, channel, cb); -*/ }; diff --git a/lib/historyKeeper.js b/lib/historyKeeper.js index 13be09bd1..caa0ef462 100644 --- a/lib/historyKeeper.js +++ b/lib/historyKeeper.js @@ -43,6 +43,7 @@ module.exports.create = function (config, cb) { batchRegisteredUsers: BatchRead("GET_REGISTERED_USERS"), batchDiskUsage: BatchRead('GET_DISK_USAGE'), batchUserPins: BatchRead('LOAD_USER_PINS'), + batchTotalSize: BatchRead('GET_TOTAL_SIZE'), //historyKeeper: config.historyKeeper, intervals: config.intervals || {}, @@ -243,6 +244,8 @@ module.exports.create = function (config, cb) { })); }).nThen(function (w) { HK.initializeIndexWorkers(Env, { + blobPath: config.blobPath, + blobStagingPath: config.blobStagingPath, pinPath: pinPath, filePath: config.filePath, archivePath: config.archivePath, diff --git a/lib/hk-util.js b/lib/hk-util.js index d17cf778b..57e0afe36 100644 --- a/lib/hk-util.js +++ b/lib/hk-util.js @@ -337,10 +337,8 @@ const storeMessage = function (Env, channel, msg, isCp, optionalMessageHash) { * -1 if you didn't find it */ -const getHistoryOffset = (Env, channelName, lastKnownHash, _cb) => { // XXX child process +const getHistoryOffset = (Env, channelName, lastKnownHash, _cb) => { const cb = Util.once(Util.mkAsync(_cb)); - const store = Env.store; - const Log = Env.Log; // lastKnownhash === -1 means we want the complete history if (lastKnownHash === -1) { return void cb(null, 0); } @@ -384,7 +382,7 @@ const getHistoryOffset = (Env, channelName, lastKnownHash, _cb) => { // XXX chil offset = lkh; })); - }).nThen((waitFor) => { + }).nThen((w) => { // if offset is less than zero then presumably the channel has no messages // returning falls through to the next block and therefore returns -1 if (offset !== -1) { return; } @@ -392,18 +390,12 @@ const getHistoryOffset = (Env, channelName, lastKnownHash, _cb) => { // XXX chil // do a lookup from the index // FIXME maybe we don't need this anymore? // otherwise we have a non-negative offset and we can start to read from there - store.readMessagesBin(channelName, 0, (msgObj, readMore, abort) => { - // tryParse return a parsed message or undefined - const msg = tryParse(Env, msgObj.buff.toString('utf8')); - // if it was undefined then go onto the next message - if (typeof msg === "undefined") { return readMore(); } - if (typeof(msg[4]) !== 'string' || lastKnownHash !== getHash(msg[4], Log)) { - return void readMore(); + Env.getHashOffset(channelName, lastKnownHash, w(function (err, _offset) { + if (err) { + w.abort(); + return void cb(err); } - offset = msgObj.offset; - abort(); - }, waitFor(function (err) { - if (err) { waitFor.abort(); return void cb(err); } + offset = _offset; })); }).nThen(() => { cb(null, offset); @@ -897,6 +889,46 @@ HK.initializeIndexWorkers = function (Env, config, _cb) { }); }; + Env.getFileSize = function (channel, cb) { + sendCommand({ + command: 'GET_FILE_SIZE', + channel: channel, + }, cb); + }; + + Env.getDeletedPads = function (channels, cb) { + sendCommand({ + command: "GET_DELETED_PADS", + channels: channels, + }, cb); + }; + + Env.getTotalSize = function (channels, cb) { + // we could take out locks for all of these channels, + // but it's OK if the size is slightly off + sendCommand({ + command: 'GET_TOTAL_SIZE', + channels: channels, + }, cb); + }; + + Env.getMultipleFileSize = function (channels, cb) { + sendCommand({ + command: "GET_MULTIPLE_FILE_SIZE", + channels: channels, + }, cb); + }; + + Env.getHashOffset = function (channel, hash, cb) { + Env.store.getWeakLock(channel, function (next) { + sendCommand({ + command: 'GET_HASH_OFFSET', + channel: channel, + hash: hash, + }, Util.both(next, cb)); + }); + }; + //console.log("index workers ready"); cb(void 0); }); diff --git a/lib/workers/compute-index.js b/lib/workers/compute-index.js index dd3798aa6..37fa9ac1a 100644 --- a/lib/workers/compute-index.js +++ b/lib/workers/compute-index.js @@ -3,16 +3,20 @@ const HK = require("../hk-util"); const Store = require("../storage/file"); +const BlobStore = require("../storage/blob"); const Util = require("../common-util"); const nThen = require("nthen"); const Meta = require("../metadata"); const Pins = require("../pins"); +const Core = require("../commands/core"); +const Saferphore = require("saferphore"); const Env = {}; var ready = false; var store; var pinStore; +var blobStore; const init = function (config, _cb) { const cb = Util.once(Util.mkAsync(_cb)); if (!config) { @@ -36,6 +40,18 @@ const init = function (config, _cb) { } pinStore = _pinStore; })); + BlobStore.create({ + blobPath: config.blobPath, + blobStagingPath: config.blobStagingPath, + archivePath: config.archivePath, + getSession: function () {}, + }, w(function (err, blob) { + if (err) { + w.abort(); + return void cb(err); + } + blobStore = blob; + })); }).nThen(function () { cb(); }); @@ -218,11 +234,124 @@ const getPinState = function (data, cb, errorHandler) { }); }; +const _getFileSize = function (channel, _cb) { + var cb = Util.once(Util.mkAsync(_cb)); + if (!Core.isValidId(channel)) { return void cb('INVALID_CHAN'); } + if (channel.length === 32) { + return void store.getChannelSize(channel, function (e, size) { + if (e) { + if (e.code === 'ENOENT') { return void cb(void 0, 0); } + return void cb(e.code); + } + cb(void 0, size); + }); + } + + // 'channel' refers to a file, so you need another API + blobStore.size(channel, function (e, size) { + if (typeof(size) === 'undefined') { return void cb(e); } + cb(void 0, size); + }); +}; + +const getFileSize = function (data, cb) { + _getFileSize(data.channel, cb); +}; + +const _iterateFiles = function (channels, handler, cb) { + if (!Array.isArray(channels)) { return cb('INVALID_LIST'); } + var L = channels.length; + var sem = Saferphore.create(10); + + // (channel, next) => { ??? } + var job = function (channel, wait) { + return function (give) { + handler(channel, wait(give())); + }; + }; + + nThen(function (w) { + for (var i = 0; i < L; i++) { + sem.take(job(channels[i], w)); + } + }).nThen(function () { + cb(); + }); +}; + +const getTotalSize = function (data, cb) { + var bytes = 0; + _iterateFiles(data.channels, function (channel, next) { + _getFileSize(channel, function (err, size) { + if (!err) { bytes += size; } + next(); + }); + }, function (err) { + if (err) { return cb(err); } + cb(void 0, bytes); + }); +}; + +const getDeletedPads = function (data, cb) { + var absentees = []; + _iterateFiles(data.channels, function (channel, next) { + _getFileSize(channel, function (err, size) { + if (err) { return next(); } + if (size === 0) { absentees.push(channel); } + next(); + }); + }, function (err) { + if (err) { return void cb(err); } + cb(void 0, absentees); + }); +}; + +const getMultipleFileSize = function (data, cb) { + const counts = {}; + _iterateFiles(data.channels, function (channel, next) { + _getFileSize(channel, function (err, size) { + counts[channel] = err? 0: size; + next(); + }); + }, function (err) { + if (err) { + return void cb(err); + } + cb(void 0, counts); + }); +}; + +const getHashOffset = function (data, cb) { + const channelName = data.channel; + const lastKnownHash = data.lastKnownHash; + + var offset = -1; + store.readMessagesBin(channelName, 0, (msgObj, readMore, abort) => { + // tryParse return a parsed message or undefined + const msg = tryParse(Env, msgObj.buff.toString('utf8')); + // if it was undefined then go onto the next message + if (typeof msg === "undefined") { return readMore(); } + if (typeof(msg[4]) !== 'string' || lastKnownHash !== HK.getHash(msg[4])) { + return void readMore(); + } + offset = msgObj.offset; + abort(); + }, function (err) { + if (err) { return void cb(err); } + cb(void 0, offset); + }); +}; + const COMMANDS = { COMPUTE_INDEX: computeIndex, COMPUTE_METADATA: computeMetadata, GET_OLDER_HISTORY: getOlderHistory, GET_PIN_STATE: getPinState, + GET_FILE_SIZE: getFileSize, + GET_TOTAL_SIZE: getTotalSize, + GET_DELETED_PADS: getDeletedPads, + GET_MULTIPLE_FILE_SIZE: getMultipleFileSize, + GET_HASH_OFFSET: getHashOffset, }; process.on('message', function (data) {