batch concurrent requests for expensive IO and computations

pull/1/head
ansuz 6 years ago
parent ed82936610
commit 65b0112dcb

@ -19,6 +19,7 @@ const getFolderSize = require("get-folder-size");
const Pins = require("./lib/pins"); const Pins = require("./lib/pins");
const Meta = require("./lib/metadata"); const Meta = require("./lib/metadata");
const WriteQueue = require("./lib/write-queue"); const WriteQueue = require("./lib/write-queue");
const BatchRead = require("./lib/batch-read");
var RPC = module.exports; var RPC = module.exports;
@ -231,6 +232,7 @@ var checkSignature = function (signedMsg, signature, publicKey) {
return Nacl.sign.detached.verify(signedBuffer, signatureBuffer, pubBuffer); return Nacl.sign.detached.verify(signedBuffer, signatureBuffer, pubBuffer);
}; };
const batchUserPins = BatchRead();
var loadUserPins = function (Env, publicKey, cb) { var loadUserPins = function (Env, publicKey, cb) {
var session = getSession(Env.Sessions, publicKey); var session = getSession(Env.Sessions, publicKey);
@ -238,6 +240,7 @@ var loadUserPins = function (Env, publicKey, cb) {
return cb(session.channels); return cb(session.channels);
} }
batchUserPins(publicKey, cb, function (done) {
var ref = {}; var ref = {};
var lineHandler = Pins.createLineHandler(ref, function (label, data) { var lineHandler = Pins.createLineHandler(ref, function (label, data) {
Log.error(label, { Log.error(label, {
@ -252,7 +255,8 @@ var loadUserPins = function (Env, publicKey, cb) {
// only put this into the cache if it completes // only put this into the cache if it completes
session.channels = ref.pins; session.channels = ref.pins;
cb(ref.pins); done(ref.pins); // FIXME no error handling?
});
}); });
}; };
@ -290,45 +294,49 @@ var getUploadSize = function (Env, channel, cb) {
}); });
}; };
const batchFileSize = BatchRead();
var getFileSize = function (Env, channel, cb) { var getFileSize = function (Env, channel, cb) {
if (!isValidId(channel)) { return void cb('INVALID_CHAN'); } if (!isValidId(channel)) { return void cb('INVALID_CHAN'); }
batchFileSize(channel, cb, function (done) {
if (channel.length === 32) { if (channel.length === 32) {
if (typeof(Env.msgStore.getChannelSize) !== 'function') { if (typeof(Env.msgStore.getChannelSize) !== 'function') {
return cb('GET_CHANNEL_SIZE_UNSUPPORTED'); return done('GET_CHANNEL_SIZE_UNSUPPORTED');
} }
return void Env.msgStore.getChannelSize(channel, function (e, size /*:number*/) { return void Env.msgStore.getChannelSize(channel, function (e, size /*:number*/) {
if (e) { if (e) {
if (e.code === 'ENOENT') { return void cb(void 0, 0); } if (e.code === 'ENOENT') { return void done(void 0, 0); }
return void cb(e.code); return void done(e.code);
} }
cb(void 0, size); done(void 0, size);
}); });
} }
// 'channel' refers to a file, so you need another API // 'channel' refers to a file, so you need another API
getUploadSize(Env, channel, function (e, size) { getUploadSize(Env, channel, function (e, size) {
if (typeof(size) === 'undefined') { return void cb(e); } if (typeof(size) === 'undefined') { return void done(e); }
cb(void 0, size); done(void 0, size);
});
}); });
}; };
const batchMetadata = BatchRead();
var getMetadata = function (Env, channel, cb) { var getMetadata = function (Env, channel, cb) {
if (!isValidId(channel)) { return void cb('INVALID_CHAN'); } if (!isValidId(channel)) { return void cb('INVALID_CHAN'); }
if (channel.length !== 32) { return cb("INVALID_CHAN"); } if (channel.length !== 32) { return cb("INVALID_CHAN"); }
batchMetadata(channel, cb, function (done) {
var ref = {}; var ref = {};
var lineHandler = Meta.createLineHandler(ref, Log.error); var lineHandler = Meta.createLineHandler(ref, Log.error);
return void Env.msgStore.readChannelMetadata(channel, lineHandler, function (err) { return void Env.msgStore.readChannelMetadata(channel, lineHandler, function (err) {
if (err) { if (err) {
// stream errors? // stream errors?
return void cb(err); return void done(err);
} }
cb(void 0, ref.meta); done(void 0, ref.meta);
});
}); });
}; };
@ -470,19 +478,22 @@ var getDeletedPads = function (Env, channels, cb) {
}); });
}; };
const batchTotalSize = BatchRead();
var getTotalSize = function (Env, publicKey, cb) { var getTotalSize = function (Env, publicKey, cb) {
batchTotalSize(publicKey, cb, function (done) {
var bytes = 0; var bytes = 0;
return void getChannelList(Env, publicKey, function (channels) { return void getChannelList(Env, publicKey, function (channels) {
if (!channels) { return cb('INVALID_PIN_LIST'); } // unexpected if (!channels) { return done('INVALID_PIN_LIST'); } // unexpected
var count = channels.length; var count = channels.length;
if (!count) { cb(void 0, 0); } if (!count) { return void done(void 0, 0); }
channels.forEach(function (channel) { channels.forEach(function (channel) { // FIXME this might as well be nThen
getFileSize(Env, channel, function (e, size) { getFileSize(Env, channel, function (e, size) {
count--; count--;
if (!e) { bytes += size; } if (!e) { bytes += size; }
if (count === 0) { return cb(void 0, bytes); } if (count === 0) { return done(void 0, bytes); }
});
}); });
}); });
}); });
@ -1605,7 +1616,9 @@ var writePrivateMessage = function (Env, args, nfwssCtx, cb) {
}); });
}; };
const batchDiskUsage = BatchRead();
var getDiskUsage = function (Env, cb) { var getDiskUsage = function (Env, cb) {
batchDiskUsage('', cb, function (done) {
var data = {}; var data = {};
nThen(function (waitFor) { nThen(function (waitFor) {
getFolderSize('./', waitFor(function(err, info) { getFolderSize('./', waitFor(function(err, info) {
@ -1627,10 +1640,14 @@ var getDiskUsage = function (Env, cb) {
data.datastore = info; data.datastore = info;
})); }));
}).nThen(function () { }).nThen(function () {
cb (void 0, data); done(void 0, data);
});
}); });
}; };
const batchRegisteredUsers = BatchRead();
var getRegisteredUsers = function (Env, cb) { var getRegisteredUsers = function (Env, cb) {
batchRegisteredUsers('', cb, function (done) {
var dir = Env.paths.pin; var dir = Env.paths.pin;
var folders; var folders;
var users = 0; var users = 0;
@ -1638,7 +1655,7 @@ var getRegisteredUsers = function (Env, cb) {
Fs.readdir(dir, waitFor(function (err, list) { Fs.readdir(dir, waitFor(function (err, list) {
if (err) { if (err) {
waitFor.abort(); waitFor.abort();
return void cb(err); return void done(err);
} }
folders = list; folders = list;
})); }));
@ -1651,7 +1668,8 @@ var getRegisteredUsers = function (Env, cb) {
})); }));
}); });
}).nThen(function () { }).nThen(function () {
cb(void 0, users); done(void 0, users);
});
}); });
}; };
var getActiveSessions = function (Env, ctx, cb) { var getActiveSessions = function (Env, ctx, cb) {

Loading…
Cancel
Save