From 369c92c01daa1d80daf4abf7bfe5a5bd0cb00793 Mon Sep 17 00:00:00 2001 From: ansuz Date: Mon, 12 Oct 2020 17:39:53 +0530 Subject: [PATCH] initialize Env from server and deduplicate several attributes --- lib/api.js | 25 ++++- lib/env.js | 200 ++++++++++++++++++++++++++++++++++++++ lib/historyKeeper.js | 227 ++++++------------------------------------- server.js | 84 +++++----------- 4 files changed, 275 insertions(+), 261 deletions(-) create mode 100644 lib/env.js diff --git a/lib/api.js b/lib/api.js index cc88aaed7..07ec77d0b 100644 --- a/lib/api.js +++ b/lib/api.js @@ -1,13 +1,28 @@ /* jshint esversion: 6 */ const WebSocketServer = require('ws').Server; const NetfluxSrv = require('chainpad-server'); +const Decrees = require("./decrees"); -module.exports.create = function (config) { +const nThen = require("nthen"); + +module.exports.create = function (Env) { + var log = Env.Log; + +nThen(function (w) { + Decrees.load(Env, w(function (err) { + if (err && err.code !== "ENOENT") { + log.error('DECREES_LOADING', { + error: err.code || err, + message: err.message, + }); + console.error(err); + } + })); +}).nThen(function () { // asynchronously create a historyKeeper and RPC together - require('./historyKeeper.js').create(config, function (err, historyKeeper) { + require('./historyKeeper.js').create(Env, function (err, historyKeeper) { if (err) { throw err; } - var log = config.log; var noop = function () {}; @@ -21,7 +36,7 @@ module.exports.create = function (config) { }; // spawn ws server and attach netflux event handlers - NetfluxSrv.create(new WebSocketServer({ server: config.httpServer})) + NetfluxSrv.create(new WebSocketServer({ server: Env.httpServer})) .on('channelClose', historyKeeper.channelClose) .on('channelMessage', historyKeeper.channelMessage) .on('channelOpen', historyKeeper.channelOpen) @@ -50,4 +65,6 @@ module.exports.create = function (config) { }) .register(historyKeeper.id, historyKeeper.directMessage); }); +}); + }; diff --git a/lib/env.js b/lib/env.js new file mode 100644 index 000000000..53771bbcb --- /dev/null +++ b/lib/env.js @@ -0,0 +1,200 @@ +/* jshint esversion: 6 */ +/* globals process */ + +const Crypto = require('crypto'); +const WriteQueue = require("./write-queue"); +const BatchRead = require("./batch-read"); + +const Keys = require("./keys"); +const Core = require("./commands/core"); + +const Quota = require("./commands/quota"); +const Util = require("./common-util"); + +module.exports.create = function (config) { + // mode can be FRESH (default), DEV, or PACKAGE + + const Env = { + FRESH_KEY: '', + FRESH_MODE: true, + DEV_MODE: false, + configCache: {}, + flushCache: function () { + Env.configCache = {}; + Env.FRESH_KEY = +new Date(); + if (!(Env.DEV_MODE || Env.FRESH_MODE)) { Env.FRESH_MODE = true; } + if (!Env.Log) { return; } + Env.Log.info("UPDATING_FRESH_KEY", Env.FRESH_KEY); + }, + + Log: undefined, + // store + id: Crypto.randomBytes(8).toString('hex'), + + launchTime: +new Date(), + + inactiveTime: config.inactiveTime, + archiveRetentionTime: config.archiveRetentionTime, + accountRetentionTime: config.accountRetentionTime, + + // TODO implement mutability + adminEmail: config.adminEmail, + supportMailbox: config.supportMailboxPublicKey, + + metadata_cache: {}, + channel_cache: {}, + queueStorage: WriteQueue(), + queueDeletes: WriteQueue(), + queueValidation: WriteQueue(), + + batchIndexReads: BatchRead("HK_GET_INDEX"), + batchMetadata: BatchRead('GET_METADATA'), + batchRegisteredUsers: BatchRead("GET_REGISTERED_USERS"), + batchDiskUsage: BatchRead('GET_DISK_USAGE'), + batchUserPins: BatchRead('LOAD_USER_PINS'), + batchTotalSize: BatchRead('GET_TOTAL_SIZE'), + batchAccountQuery: BatchRead("QUERY_ACCOUNT_SERVER"), + + intervals: {}, + maxUploadSize: config.maxUploadSize || (20 * 1024 * 1024), + premiumUploadSize: false, // overridden below... + Sessions: {}, + paths: {}, + //msgStore: config.store, + + netfluxUsers: {}, + + pinStore: undefined, + + limits: {}, + admins: [], + WARN: function (e, output) { // TODO deprecate this + if (!Env.Log) { return; } + if (e && output) { + Env.Log.warn(e, { + output: output, + message: String(e), + stack: new Error(e).stack, + }); + } + }, + + allowSubscriptions: config.allowSubscriptions === true, + blockDailyCheck: config.blockDailyCheck === true, + + myDomain: config.myDomain, + mySubdomain: config.mySubdomain, // only exists for the accounts integration + customLimits: {}, + // FIXME this attribute isn't in the default conf + // but it is referenced in Quota + domain: config.domain, + + maxWorkers: config.maxWorkers, + disableIntegratedTasks: config.disableIntegratedTasks || false, + disableIntegratedEviction: config.disableIntegratedEviction || false, + lastEviction: +new Date(), + }; + + (function () { + if (process.env.PACKAGE) { + // `PACKAGE=1 node server` uses the version string from package.json as the cache string + console.log("PACKAGE MODE ENABLED"); + Env.FRESH_MODE = false; + Env.DEV_MODE = false; + } else if (process.env.DEV) { + // `DEV=1 node server` will use a random cache string on every page reload + console.log("DEV MODE ENABLED"); + Env.FRESH_MODE = false; + Env.DEV_MODE = true; + } else { + // `FRESH=1 node server` will set a random cache string when the server is launched + // and use it for the process lifetime or until it is reset from the admin panel + console.log("FRESH MODE ENABLED"); + Env.FRESH_KEY = +new Date(); + } + }()); + + + + + (function () { + var custom = config.customLimits; + if (!custom) { return; } + + var stored = Env.customLimits; + + Object.keys(custom).forEach(function (k) { + var unsafeKey = Keys.canonicalize(k); + + if (!unsafeKey) { + console.log("INVALID_CUSTOM_LIMIT_ID", { + message: "A custom quota upgrade was provided via your config with an invalid identifier. It will be ignored.", + key: k, + value: custom[k], + }); + return; + } + + if (stored[unsafeKey]) { + console.log("INVALID_CUSTOM_LIMIT_DUPLICATED", { + message: "A duplicated custom quota upgrade was provided via your config which would have overridden an existing value. It will be ignored.", + key: k, + value: custom[k], + }); + return; + } + + if (!Quota.isValidLimit(custom[k])) { + console.log("INVALID_CUSTOM_LIMIT_VALUE", { + message: "A custom quota upgrade was provided via your config with an invalid value. It will be ignored.", + key: k, + value: custom[k], + }); + return; + } + + var limit = stored[unsafeKey] = Util.clone(custom[k]); + limit.origin = 'config'; + }); + }()); + + (function () { + var pes = config.premiumUploadSize; + if (!isNaN(pes) && pes >= Env.maxUploadSize) { + Env.premiumUploadSize = pes; + } + }()); + + var paths = Env.paths; + + var keyOrDefaultString = function (key, def) { + return typeof(config[key]) === 'string'? config[key]: def; + }; + + paths.pin = keyOrDefaultString('pinPath', './pins'); + paths.block = keyOrDefaultString('blockPath', './block'); + paths.data = keyOrDefaultString('filePath', './datastore'); + paths.staging = keyOrDefaultString('blobStagingPath', './blobstage'); + paths.blob = keyOrDefaultString('blobPath', './blob'); + paths.decree = keyOrDefaultString('decreePath', './data/'); + paths.archive = keyOrDefaultString('archivePath', './data/archive'); + paths.task = keyOrDefaultString('taskPath', './tasks'); + + Env.defaultStorageLimit = typeof(config.defaultStorageLimit) === 'number' && config.defaultStorageLimit >= 0? + config.defaultStorageLimit: + Core.DEFAULT_LIMIT; + + try { + Env.admins = (config.adminKeys || []).map(function (k) { + try { + return Keys.canonicalize(k); + } catch (err) { + return; + } + }).filter(Boolean); + } catch (e) { + console.error("Can't parse admin keys. Please update or fix your config.js file!"); + } + + return Env; +}; diff --git a/lib/historyKeeper.js b/lib/historyKeeper.js index 5278a771c..299756cac 100644 --- a/lib/historyKeeper.js +++ b/lib/historyKeeper.js @@ -1,168 +1,18 @@ /* jshint esversion: 6 */ const nThen = require('nthen'); -const Crypto = require('crypto'); -const WriteQueue = require("./write-queue"); -const BatchRead = require("./batch-read"); const RPC = require("./rpc"); const HK = require("./hk-util.js"); -const Core = require("./commands/core"); - -const Keys = require("./keys"); -const Quota = require("./commands/quota"); -const Util = require("./common-util"); - const Store = require("./storage/file"); const BlobStore = require("./storage/blob"); const Workers = require("./workers/index"); +const Core = require("./commands/core"); -module.exports.create = function (config, cb) { - const Log = config.log; - var WARN = function (e, output) { - if (e && output) { - Log.warn(e, { - output: output, - message: String(e), - stack: new Error(e).stack, - }); - } - }; - +module.exports.create = function (Env, cb) { + const Log = Env.Log; Log.silly('HK_LOADING', 'LOADING HISTORY_KEEPER MODULE'); - const Env = { - Log: Log, - // store - id: Crypto.randomBytes(8).toString('hex'), - - launchTime: +new Date(), - - inactiveTime: config.inactiveTime, - archiveRetentionTime: config.archiveRetentionTime, - accountRetentionTime: config.accountRetentionTime, - - metadata_cache: {}, - channel_cache: {}, - queueStorage: WriteQueue(), - queueDeletes: WriteQueue(), - queueValidation: WriteQueue(), - - batchIndexReads: BatchRead("HK_GET_INDEX"), - batchMetadata: BatchRead('GET_METADATA'), - batchRegisteredUsers: BatchRead("GET_REGISTERED_USERS"), - batchDiskUsage: BatchRead('GET_DISK_USAGE'), - batchUserPins: BatchRead('LOAD_USER_PINS'), - batchTotalSize: BatchRead('GET_TOTAL_SIZE'), - batchAccountQuery: BatchRead("QUERY_ACCOUNT_SERVER"), - - //historyKeeper: config.historyKeeper, - intervals: config.intervals || {}, - maxUploadSize: config.maxUploadSize || (20 * 1024 * 1024), - premiumUploadSize: false, // overridden below... - Sessions: {}, - paths: {}, - //msgStore: config.store, - - netfluxUsers: {}, - - pinStore: undefined, - - limits: {}, - admins: [], - WARN: WARN, - flushCache: config.flushCache, - adminEmail: config.adminEmail, - allowSubscriptions: config.allowSubscriptions === true, - blockDailyCheck: config.blockDailyCheck === true, - - myDomain: config.myDomain, - mySubdomain: config.mySubdomain, // only exists for the accounts integration - customLimits: {}, - // FIXME this attribute isn't in the default conf - // but it is referenced in Quota - domain: config.domain - }; - - (function () { - var custom = config.customLimits; - if (!custom) { return; } - - var stored = Env.customLimits; - - Object.keys(custom).forEach(function (k) { - var unsafeKey = Keys.canonicalize(k); - - if (!unsafeKey) { - Log.warn("INVALID_CUSTOM_LIMIT_ID", { - message: "A custom quota upgrade was provided via your config with an invalid identifier. It will be ignored.", - key: k, - value: custom[k], - }); - return; - } - - if (stored[unsafeKey]) { - Log.warn("INVALID_CUSTOM_LIMIT_DUPLICATED", { - message: "A duplicated custom quota upgrade was provided via your config which would have overridden an existing value. It will be ignored.", - key: k, - value: custom[k], - }); - return; - } - - if (!Quota.isValidLimit(custom[k])) { - Log.warn("INVALID_CUSTOM_LIMIT_VALUE", { - message: "A custom quota upgrade was provided via your config with an invalid value. It will be ignored.", - key: k, - value: custom[k], - }); - return; - } - - var limit = stored[unsafeKey] = Util.clone(custom[k]); - limit.origin = 'config'; - }); - }()); - - (function () { - var pes = config.premiumUploadSize; - if (!isNaN(pes) && pes >= Env.maxUploadSize) { - Env.premiumUploadSize = pes; - } - }()); - - var paths = Env.paths; - - var keyOrDefaultString = function (key, def) { - return typeof(config[key]) === 'string'? config[key]: def; - }; - - var pinPath = paths.pin = keyOrDefaultString('pinPath', './pins'); - paths.block = keyOrDefaultString('blockPath', './block'); - paths.data = keyOrDefaultString('filePath', './datastore'); - paths.staging = keyOrDefaultString('blobStagingPath', './blobstage'); - paths.blob = keyOrDefaultString('blobPath', './blob'); - paths.decree = keyOrDefaultString('decreePath', './data/'); - - Env.defaultStorageLimit = typeof(config.defaultStorageLimit) === 'number' && config.defaultStorageLimit >= 0? - config.defaultStorageLimit: - Core.DEFAULT_LIMIT; - - try { - // XXX this should be the same as is exposed in server.js - // /api/config.adminKeys - Env.admins = (config.adminKeys || []).map(function (k) { - try { - return Keys.canonicalize(k); - } catch (err) { - return; - } - }).filter(Boolean); - } catch (e) { - console.error("Can't parse admin keys. Please update or fix your config.js file!"); - } - - config.historyKeeper = Env.historyKeeper = { + Env.historyKeeper = { metadata_cache: Env.metadata_cache, channel_cache: Env.channel_cache, @@ -262,6 +112,8 @@ module.exports.create = function (config, cb) { Log.verbose('HK_ID', 'History keeper ID: ' + Env.id); + var pinPath = Env.paths.pin; + nThen(function (w) { // create a pin store Store.create({ @@ -272,18 +124,20 @@ module.exports.create = function (config, cb) { })); // create a channel store - Store.create(config, w(function (err, _store) { + Store.create({ + filePath: Env.paths.data, + archivepath: Env.paths.archive, + }, w(function (err, _store) { if (err) { throw err; } - config.store = _store; Env.msgStore = _store; // API used by rpc Env.store = _store; // API used by historyKeeper })); // create a blob store BlobStore.create({ - blobPath: config.blobPath, - blobStagingPath: config.blobStagingPath, - archivePath: config.archivePath, + blobPath: Env.paths.blob, + blobStagingPath: Env.paths.staging, + archivePath: Env.paths.archive, getSession: function (safeKey) { return Core.getSession(Env.Sessions, safeKey); }, @@ -293,32 +147,28 @@ module.exports.create = function (config, cb) { })); }).nThen(function (w) { Workers.initialize(Env, { - blobPath: config.blobPath, - blobStagingPath: config.blobStagingPath, - taskPath: config.taskPath, - pinPath: pinPath, - filePath: config.filePath, - archivePath: config.archivePath, - channelExpirationMs: config.channelExpirationMs, - verbose: config.verbose, - openFileLimit: config.openFileLimit, - - inactiveTime: config.inactiveTime, - archiveRetentionTime: config.archiveRetentionTime, - accountRetentionTime: config.accountRetentionTime, - - maxWorkers: config.maxWorkers, + blobPath: Env.paths.blob, + blobStagingPath: Env.paths.staging, + taskPath: Env.paths.task, + pinPath: Env.paths.pin, + filePath: Env.paths.data, + archivePath: Env.paths.archive, + + inactiveTime: Env.inactiveTime, + archiveRetentionTime: Env.archiveRetentionTime, + accountRetentionTime: Env.accountRetentionTime, + + maxWorkers: Env.maxWorkers, }, w(function (err) { if (err) { throw new Error(err); } })); }).nThen(function () { - config.intervals = config.intervals || {}; - if (config.disableIntegratedTasks) { return; } + if (Env.disableIntegratedTasks) { return; } var tasks_running; - config.intervals.taskExpiration = setInterval(function () { + Env.intervals.taskExpiration = setInterval(function () { if (tasks_running) { return; } tasks_running = true; Env.runTasks(function (err) { @@ -329,19 +179,18 @@ module.exports.create = function (config, cb) { }); }, 1000 * 60 * 5); // run every five minutes }).nThen(function () { - if (config.disableIntegratedEviction) { return; } + if (Env.disableIntegratedEviction) { return; } const ONE_DAY = 24 * 1000 * 60 * 60; // setting the time of the last eviction to "now" // effectively makes it so that we'll start evicting after the server // has been up for at least one day - var last_eviction = +new Date(); var active = false; - config.intervals.eviction = setInterval(function () { + Env.intervals.eviction = setInterval(function () { if (active) { return; } var now = +new Date(); // evict inactive data once per day - if (last_eviction && (now - ONE_DAY) < last_eviction) { return; } + if ((now - ONE_DAY) < Env.lastEviction) { return; } active = true; Env.evictInactive(function (err) { if (err) { @@ -349,28 +198,16 @@ module.exports.create = function (config, cb) { Log.error('EVICT_INACTIVE_MAIN_ERROR', err); } active = false; - last_eviction = now; + Env.lastEviction = now; }); }, 60 * 1000); }).nThen(function () { - var Decrees = require("./decrees"); - - Decrees.load(Env, function (err) { - if (err && err.code !== "ENOENT") { - Log.error('DECREES_LOADING', { - error: err.code || err, - message: err.message, - }); - console.error(err); - } - }); - }).nThen(function () { RPC.create(Env, function (err, _rpc) { if (err) { throw err; } Env.rpc = _rpc; - cb(void 0, config.historyKeeper); + cb(void 0, Env.historyKeeper); }); }); }; diff --git a/server.js b/server.js index ba0e9270f..0d048d3a5 100644 --- a/server.js +++ b/server.js @@ -12,31 +12,10 @@ var Default = require("./lib/defaults"); var Keys = require("./lib/keys"); var config = require("./lib/load-config"); +var Env = require("./lib/env").create(config); var app = Express(); -// mode can be FRESH (default), DEV, or PACKAGE - -var FRESH_KEY = ''; -var FRESH_MODE = true; -var DEV_MODE = false; -if (process.env.PACKAGE) { -// `PACKAGE=1 node server` uses the version string from package.json as the cache string - console.log("PACKAGE MODE ENABLED"); - FRESH_MODE = false; - DEV_MODE = false; -} else if (process.env.DEV) { -// `DEV=1 node server` will use a random cache string on every page reload - console.log("DEV MODE ENABLED"); - FRESH_MODE = false; - DEV_MODE = true; -} else { -// `FRESH=1 node server` will set a random cache string when the server is launched -// and use it for the process lifetime or until it is reset from the admin panel - console.log("FRESH MODE ENABLED"); - FRESH_KEY = +new Date(); -} - (function () { // you absolutely must provide an 'httpUnsafeOrigin' if (typeof(config.httpUnsafeOrigin) !== 'string') { @@ -64,7 +43,7 @@ if (process.env.PACKAGE) { config.httpSafePort = config.httpPort + 1; } - if (DEV_MODE) { return; } + if (Env.DEV_MODE) { return; } console.log(` m m mm mmmmm mm m mmmmm mm m mmm m # # # ## # "# #"m # # #"m # m" " # @@ -81,15 +60,6 @@ if (process.env.PACKAGE) { } }()); -var configCache = {}; -config.flushCache = function () { - configCache = {}; - FRESH_KEY = +new Date(); - if (!(DEV_MODE || FRESH_MODE)) { FRESH_MODE = true; } - if (!config.log) { return; } - config.log.info("UPDATING_FRESH_KEY", FRESH_KEY); -}; - var setHeaders = (function () { // load the default http headers unless the admin has provided their own via the config file var headers; @@ -144,6 +114,7 @@ if (!config.logFeedback) { return; } const logFeedback = function (url) { url.replace(/\?(.*?)=/, function (all, fb) { + if (!config.log) { return; } config.log.feedback(fb, ''); }); }; @@ -182,7 +153,7 @@ app.get(mainPagePattern, Express.static(__dirname + '/customize')); app.get(mainPagePattern, Express.static(__dirname + '/customize.dist')); app.use("/blob", Express.static(Path.join(__dirname, (config.blobPath || './blob')), { - maxAge: DEV_MODE? "0d": "365d" + maxAge: Env.DEV_MODE? "0d": "365d" })); app.use("/datastore", Express.static(Path.join(__dirname, (config.filePath || './datastore')), { maxAge: "0d" @@ -197,23 +168,10 @@ app.use("/customize.dist", Express.static(__dirname + '/customize.dist')); app.use(/^\/[^\/]*$/, Express.static('customize')); app.use(/^\/[^\/]*$/, Express.static('customize.dist')); -var admins = []; -try { - admins = (config.adminKeys || []).map(function (k) { - var unsafeKey = Keys.canonicalize(k); - // return each admin's "unsafeKey" - // this might throw and invalidate all the other admin's keys - // but we want to get the admin's attention anyway. - // breaking everything is a good way to accomplish that. - if (!unsafeKey) { throw new Error(); } - return unsafeKey; - }); -} catch (e) { console.error("Can't parse admin keys"); } - var serveConfig = (function () { // if dev mode: never cache var cacheString = function () { - return (FRESH_KEY? '-' + FRESH_KEY: '') + (DEV_MODE? '-' + (+new Date()): ''); + return (Env.FRESH_KEY? '-' + Env.FRESH_KEY: '') + (Env.DEV_MODE? '-' + (+new Date()): ''); }; var template = function (host) { @@ -228,12 +186,12 @@ var serveConfig = (function () { allowSubscriptions: (config.allowSubscriptions === true), websocketPath: config.externalWebsocketURL, httpUnsafeOrigin: config.httpUnsafeOrigin, - adminEmail: config.adminEmail, // XXX mutable - adminKeys: admins, - inactiveTime: config.inactiveTime, // XXX mutable - supportMailbox: config.supportMailboxPublicKey, - maxUploadSize: config.maxUploadSize, // XXX mutable - premiumUploadSize: config.premiumUploadSize, // XXX mutable + adminEmail: Env.adminEmail, + adminKeys: Env.admins, + inactiveTime: Env.inactiveTime, + supportMailbox: Env.supportMailboxPublicKey, + maxUploadSize: Env.maxUploadSize, + premiumUploadSize: Env.premiumUploadSize, }, null, '\t'), 'obj.httpSafeOrigin = ' + (function () { if (config.httpSafeOrigin) { return '"' + config.httpSafeOrigin + '"'; } @@ -254,28 +212,29 @@ var serveConfig = (function () { var host = req.headers.host.replace(/\:[0-9]+/, ''); res.setHeader('Content-Type', 'text/javascript'); // don't cache anything if you're in dev mode - if (DEV_MODE) { + if (Env.DEV_MODE) { return void res.send(template(host)); } // generate a lookup key for the cache var cacheKey = host + ':' + cacheString(); - // XXX we must be able to clear the cache when updating any mutable key + // FIXME mutable + // we must be able to clear the cache when updating any mutable key // if there's nothing cached for that key... - if (!configCache[cacheKey]) { + if (!Env.configCache[cacheKey]) { // generate the response and cache it in memory - configCache[cacheKey] = template(host); + Env.configCache[cacheKey] = template(host); // and create a function to conditionally evict cache entries // which have not been accessed in the last 20 seconds cleanUp[cacheKey] = Util.throttle(function () { delete cleanUp[cacheKey]; - delete configCache[cacheKey]; + delete Env.configCache[cacheKey]; }, 20000); } // successive calls to this function cleanUp[cacheKey](); - return void res.send(configCache[cacheKey]); + return void res.send(Env.configCache[cacheKey]); }; }()); @@ -298,7 +257,7 @@ app.use(function (req, res, next) { send404(res, custom_four04_path); }); -var httpServer = Http.createServer(app); +var httpServer = Env.httpServer = Http.createServer(app); nThen(function (w) { Fs.exists(__dirname + "/customize", w(function (e) { @@ -324,11 +283,12 @@ nThen(function (w) { // Initialize logging then start the API server require("./lib/log").create(config, function (_log) { + Env.Log = _log; config.log = _log; - config.httpServer = httpServer; if (config.externalWebsocketURL) { return; } - require("./lib/api").create(config); + + require("./lib/api").create(Env); }); });