From 65b0112dcb42332738d5eb32d9e14cf21a770d9c Mon Sep 17 00:00:00 2001 From: ansuz Date: Thu, 5 Sep 2019 15:35:01 +0200 Subject: [PATCH] batch concurrent requests for expensive IO and computations --- rpc.js | 194 +++++++++++++++++++++++++++++++-------------------------- 1 file changed, 106 insertions(+), 88 deletions(-) diff --git a/rpc.js b/rpc.js index 97b317754..c920f65c3 100644 --- a/rpc.js +++ b/rpc.js @@ -19,6 +19,7 @@ const getFolderSize = require("get-folder-size"); const Pins = require("./lib/pins"); const Meta = require("./lib/metadata"); const WriteQueue = require("./lib/write-queue"); +const BatchRead = require("./lib/batch-read"); var RPC = module.exports; @@ -231,6 +232,7 @@ var checkSignature = function (signedMsg, signature, publicKey) { return Nacl.sign.detached.verify(signedBuffer, signatureBuffer, pubBuffer); }; +const batchUserPins = BatchRead(); var loadUserPins = function (Env, publicKey, cb) { var session = getSession(Env.Sessions, publicKey); @@ -238,21 +240,23 @@ var loadUserPins = function (Env, publicKey, cb) { return cb(session.channels); } - var ref = {}; - var lineHandler = Pins.createLineHandler(ref, function (label, data) { - Log.error(label, { - log: publicKey, - data: data, + batchUserPins(publicKey, cb, function (done) { + var ref = {}; + var lineHandler = Pins.createLineHandler(ref, function (label, data) { + Log.error(label, { + log: publicKey, + data: data, + }); }); - }); - // if channels aren't in memory. load them from disk - Env.pinStore.getMessages(publicKey, lineHandler, function () { - // no more messages + // if channels aren't in memory. load them from disk + Env.pinStore.getMessages(publicKey, lineHandler, function () { + // no more messages - // only put this into the cache if it completes - session.channels = ref.pins; - cb(ref.pins); + // only put this into the cache if it completes + session.channels = ref.pins; + done(ref.pins); // FIXME no error handling? + }); }); }; @@ -290,45 +294,49 @@ var getUploadSize = function (Env, channel, cb) { }); }; +const batchFileSize = BatchRead(); var getFileSize = function (Env, channel, cb) { if (!isValidId(channel)) { return void cb('INVALID_CHAN'); } + batchFileSize(channel, cb, function (done) { + if (channel.length === 32) { + if (typeof(Env.msgStore.getChannelSize) !== 'function') { + return done('GET_CHANNEL_SIZE_UNSUPPORTED'); + } - if (channel.length === 32) { - if (typeof(Env.msgStore.getChannelSize) !== 'function') { - return cb('GET_CHANNEL_SIZE_UNSUPPORTED'); + return void Env.msgStore.getChannelSize(channel, function (e, size /*:number*/) { + if (e) { + if (e.code === 'ENOENT') { return void done(void 0, 0); } + return void done(e.code); + } + done(void 0, size); + }); } - return void Env.msgStore.getChannelSize(channel, function (e, size /*:number*/) { - if (e) { - if (e.code === 'ENOENT') { return void cb(void 0, 0); } - return void cb(e.code); - } - cb(void 0, size); + // 'channel' refers to a file, so you need another API + getUploadSize(Env, channel, function (e, size) { + if (typeof(size) === 'undefined') { return void done(e); } + done(void 0, size); }); - } - - // 'channel' refers to a file, so you need another API - getUploadSize(Env, channel, function (e, size) { - if (typeof(size) === 'undefined') { return void cb(e); } - cb(void 0, size); }); }; - +const batchMetadata = BatchRead(); var getMetadata = function (Env, channel, cb) { if (!isValidId(channel)) { return void cb('INVALID_CHAN'); } if (channel.length !== 32) { return cb("INVALID_CHAN"); } - var ref = {}; - var lineHandler = Meta.createLineHandler(ref, Log.error); + batchMetadata(channel, cb, function (done) { + var ref = {}; + var lineHandler = Meta.createLineHandler(ref, Log.error); - return void Env.msgStore.readChannelMetadata(channel, lineHandler, function (err) { - if (err) { - // stream errors? - return void cb(err); - } - cb(void 0, ref.meta); + return void Env.msgStore.readChannelMetadata(channel, lineHandler, function (err) { + if (err) { + // stream errors? + return void done(err); + } + done(void 0, ref.meta); + }); }); }; @@ -470,19 +478,22 @@ var getDeletedPads = function (Env, channels, cb) { }); }; +const batchTotalSize = BatchRead(); var getTotalSize = function (Env, publicKey, cb) { - var bytes = 0; - return void getChannelList(Env, publicKey, function (channels) { - if (!channels) { return cb('INVALID_PIN_LIST'); } // unexpected - - var count = channels.length; - if (!count) { cb(void 0, 0); } - - channels.forEach(function (channel) { - getFileSize(Env, channel, function (e, size) { - count--; - if (!e) { bytes += size; } - if (count === 0) { return cb(void 0, bytes); } + batchTotalSize(publicKey, cb, function (done) { + var bytes = 0; + return void getChannelList(Env, publicKey, function (channels) { + if (!channels) { return done('INVALID_PIN_LIST'); } // unexpected + + var count = channels.length; + if (!count) { return void done(void 0, 0); } + + channels.forEach(function (channel) { // FIXME this might as well be nThen + getFileSize(Env, channel, function (e, size) { + count--; + if (!e) { bytes += size; } + if (count === 0) { return done(void 0, bytes); } + }); }); }); }); @@ -1605,53 +1616,60 @@ var writePrivateMessage = function (Env, args, nfwssCtx, cb) { }); }; +const batchDiskUsage = BatchRead(); var getDiskUsage = function (Env, cb) { - var data = {}; - nThen(function (waitFor) { - getFolderSize('./', waitFor(function(err, info) { - data.total = info; - })); - getFolderSize(Env.paths.pin, waitFor(function(err, info) { - data.pin = info; - })); - getFolderSize(Env.paths.blob, waitFor(function(err, info) { - data.blob = info; - })); - getFolderSize(Env.paths.staging, waitFor(function(err, info) { - data.blobstage = info; - })); - getFolderSize(Env.paths.block, waitFor(function(err, info) { - data.block = info; - })); - getFolderSize(Env.paths.data, waitFor(function(err, info) { - data.datastore = info; - })); - }).nThen(function () { - cb (void 0, data); + batchDiskUsage('', cb, function (done) { + var data = {}; + nThen(function (waitFor) { + getFolderSize('./', waitFor(function(err, info) { + data.total = info; + })); + getFolderSize(Env.paths.pin, waitFor(function(err, info) { + data.pin = info; + })); + getFolderSize(Env.paths.blob, waitFor(function(err, info) { + data.blob = info; + })); + getFolderSize(Env.paths.staging, waitFor(function(err, info) { + data.blobstage = info; + })); + getFolderSize(Env.paths.block, waitFor(function(err, info) { + data.block = info; + })); + getFolderSize(Env.paths.data, waitFor(function(err, info) { + data.datastore = info; + })); + }).nThen(function () { + done(void 0, data); + }); }); }; + +const batchRegisteredUsers = BatchRead(); var getRegisteredUsers = function (Env, cb) { - var dir = Env.paths.pin; - var folders; - var users = 0; - nThen(function (waitFor) { - Fs.readdir(dir, waitFor(function (err, list) { - if (err) { - waitFor.abort(); - return void cb(err); - } - folders = list; - })); - }).nThen(function (waitFor) { - folders.forEach(function (f) { - var dir = Env.paths.pin + '/' + f; + batchRegisteredUsers('', cb, function (done) { + var dir = Env.paths.pin; + var folders; + var users = 0; + nThen(function (waitFor) { Fs.readdir(dir, waitFor(function (err, list) { - if (err) { return; } - users += list.length; + if (err) { + waitFor.abort(); + return void done(err); + } + folders = list; })); + }).nThen(function (waitFor) { + folders.forEach(function (f) { + var dir = Env.paths.pin + '/' + f; + Fs.readdir(dir, waitFor(function (err, list) { + if (err) { return; } + users += list.length; + })); + }); + }).nThen(function () { + done(void 0, users); }); - }).nThen(function () { - cb(void 0, users); }); }; var getActiveSessions = function (Env, ctx, cb) {