diff --git a/lib/commands/admin-rpc.js b/lib/commands/admin-rpc.js new file mode 100644 index 000000000..9e32fe2a8 --- /dev/null +++ b/lib/commands/admin-rpc.js @@ -0,0 +1,121 @@ +/*jshint esversion: 6 */ +const BatchRead = require("../batch-read"); +const nThen = require("nthen"); +const getFolderSize = require("get-folder-size"); +var Fs = require("fs"); + +var Admin = module.exports; + +var getActiveSessions = function (Env, ctx, cb) { + var total = ctx.users ? Object.keys(ctx.users).length : '?'; + + var ips = []; + Object.keys(ctx.users).forEach(function (u) { + var user = ctx.users[u]; + var socket = user.socket; + var req = socket.upgradeReq; + var conn = req && req.connection; + var ip = (req && req.headers && req.headers['x-forwarded-for']) || (conn && conn.remoteAddress); + if (ip && ips.indexOf(ip) === -1) { + ips.push(ip); + } + }); + + cb (void 0, [total, ips.length]); +}; + +var shutdown = function (Env, ctx, cb) { + return void cb('E_NOT_IMPLEMENTED'); + //clearInterval(Env.sessionExpirationInterval); + // XXX set a flag to prevent incoming database writes + // XXX disconnect all users and reject new connections + // XXX wait until all pending writes are complete + // then process.exit(0); + // and allow system functionality to restart the server +}; + +const batchRegisteredUsers = BatchRead("GET_REGISTERED_USERS"); +var getRegisteredUsers = function (Env, cb) { + batchRegisteredUsers('', cb, function (done) { + var dir = Env.paths.pin; + var folders; + var users = 0; + nThen(function (waitFor) { + Fs.readdir(dir, waitFor(function (err, list) { + if (err) { + waitFor.abort(); + return void done(err); + } + folders = list; + })); + }).nThen(function (waitFor) { + folders.forEach(function (f) { + var dir = Env.paths.pin + '/' + f; + Fs.readdir(dir, waitFor(function (err, list) { + if (err) { return; } + users += list.length; + })); + }); + }).nThen(function () { + done(void 0, users); + }); + }); +}; + +const batchDiskUsage = BatchRead("GET_DISK_USAGE"); +var getDiskUsage = function (Env, cb) { + batchDiskUsage('', cb, function (done) { + var data = {}; + nThen(function (waitFor) { + getFolderSize('./', waitFor(function(err, info) { + data.total = info; + })); + getFolderSize(Env.paths.pin, waitFor(function(err, info) { + data.pin = info; + })); + getFolderSize(Env.paths.blob, waitFor(function(err, info) { + data.blob = info; + })); + getFolderSize(Env.paths.staging, waitFor(function(err, info) { + data.blobstage = info; + })); + getFolderSize(Env.paths.block, waitFor(function(err, info) { + data.block = info; + })); + getFolderSize(Env.paths.data, waitFor(function(err, info) { + data.datastore = info; + })); + }).nThen(function () { + done(void 0, data); + }); + }); +}; + + + +Admin.command = function (Env, ctx, publicKey, config, data, cb) { + var admins = Env.admins; + if (admins.indexOf(publicKey) === -1) { + return void cb("FORBIDDEN"); + } + // Handle commands here + switch (data[0]) { + case 'ACTIVE_SESSIONS': + return getActiveSessions(Env, ctx, cb); + case 'ACTIVE_PADS': + return cb(void 0, ctx.channels ? Object.keys(ctx.channels).length : '?'); + case 'REGISTERED_USERS': + return getRegisteredUsers(Env, cb); + case 'DISK_USAGE': + return getDiskUsage(Env, cb); + case 'FLUSH_CACHE': + config.flushCache(); + return cb(void 0, true); + case 'SHUTDOWN': + return shutdown(Env, ctx, cb); + default: + return cb('UNHANDLED_ADMIN_COMMAND'); + } +}; + + diff --git a/lib/rpc.js b/lib/rpc.js index 867cfd283..f96e8a717 100644 --- a/lib/rpc.js +++ b/lib/rpc.js @@ -15,7 +15,6 @@ const Package = require('../package.json'); const Pinned = require('../scripts/pinned'); const Saferphore = require("saferphore"); const nThen = require("nthen"); -const getFolderSize = require("get-folder-size"); const Pins = require("./pins"); const Meta = require("./metadata"); const WriteQueue = require("./write-queue"); @@ -26,6 +25,8 @@ const escapeKeyCharacters = Util.escapeKeyCharacters; const unescapeKeyCharacters = Util.unescapeKeyCharacters; const mkEvent = Util.mkEvent; +const Admin = require("./commands/admin-rpc"); + var RPC = module.exports; var Store = require("../storage/file"); @@ -1207,115 +1208,6 @@ var writePrivateMessage = function (Env, args, nfwssCtx, cb) { }); }; -const batchDiskUsage = BatchRead("GET_DISK_USAGE"); -var getDiskUsage = function (Env, cb) { - batchDiskUsage('', cb, function (done) { - var data = {}; - nThen(function (waitFor) { - getFolderSize('./', waitFor(function(err, info) { - data.total = info; - })); - getFolderSize(Env.paths.pin, waitFor(function(err, info) { - data.pin = info; - })); - getFolderSize(Env.paths.blob, waitFor(function(err, info) { - data.blob = info; - })); - getFolderSize(Env.paths.staging, waitFor(function(err, info) { - data.blobstage = info; - })); - getFolderSize(Env.paths.block, waitFor(function(err, info) { - data.block = info; - })); - getFolderSize(Env.paths.data, waitFor(function(err, info) { - data.datastore = info; - })); - }).nThen(function () { - done(void 0, data); - }); - }); -}; - -const batchRegisteredUsers = BatchRead("GET_REGISTERED_USERS"); -var getRegisteredUsers = function (Env, cb) { - batchRegisteredUsers('', cb, function (done) { - var dir = Env.paths.pin; - var folders; - var users = 0; - nThen(function (waitFor) { - Fs.readdir(dir, waitFor(function (err, list) { - if (err) { - waitFor.abort(); - return void done(err); - } - folders = list; - })); - }).nThen(function (waitFor) { - folders.forEach(function (f) { - var dir = Env.paths.pin + '/' + f; - Fs.readdir(dir, waitFor(function (err, list) { - if (err) { return; } - users += list.length; - })); - }); - }).nThen(function () { - done(void 0, users); - }); - }); -}; -var getActiveSessions = function (Env, ctx, cb) { - var total = ctx.users ? Object.keys(ctx.users).length : '?'; - - var ips = []; - Object.keys(ctx.users).forEach(function (u) { - var user = ctx.users[u]; - var socket = user.socket; - var req = socket.upgradeReq; - var conn = req && req.connection; - var ip = (req && req.headers && req.headers['x-forwarded-for']) || (conn && conn.remoteAddress); - if (ip && ips.indexOf(ip) === -1) { - ips.push(ip); - } - }); - - cb (void 0, [total, ips.length]); -}; - -var shutdown = function (Env, ctx, cb) { - return void cb('E_NOT_IMPLEMENTED'); - //clearInterval(Env.sessionExpirationInterval); - // XXX set a flag to prevent incoming database writes - // XXX disconnect all users and reject new connections - // XXX wait until all pending writes are complete - // then process.exit(0); - // and allow system functionality to restart the server -}; - -var adminCommand = function (Env, ctx, publicKey, config, data, cb) { - var admins = Env.admins; - if (admins.indexOf(publicKey) === -1) { - return void cb("FORBIDDEN"); - } - // Handle commands here - switch (data[0]) { - case 'ACTIVE_SESSIONS': - return getActiveSessions(Env, ctx, cb); - case 'ACTIVE_PADS': - return cb(void 0, ctx.channels ? Object.keys(ctx.channels).length : '?'); - case 'REGISTERED_USERS': - return getRegisteredUsers(Env, cb); - case 'DISK_USAGE': - return getDiskUsage(Env, cb); - case 'FLUSH_CACHE': - config.flushCache(); - return cb(void 0, true); - case 'SHUTDOWN': - return shutdown(Env, ctx, cb); - default: - return cb('UNHANDLED_ADMIN_COMMAND'); - } -}; - var isUnauthenticatedCall = function (call) { return [ 'GET_FILE_SIZE', @@ -1717,7 +1609,7 @@ RPC.create = function (config, cb) { Respond(e); }); case 'ADMIN': - return void adminCommand(Env, ctx, safeKey, config, msg[1], function (e, result) { + return void Admin.command(Env, ctx, safeKey, config, msg[1], function (e, result) { if (e) { WARN(e, result); return void Respond(e); @@ -1754,9 +1646,9 @@ RPC.create = function (config, cb) { if (e) { WARN('limitUpdate', e); } - applyCustomLimits(Env, config); }); }; + applyCustomLimits(Env, config); updateLimitDaily(); setInterval(updateLimitDaily, 24*3600*1000);