initialize Env from server and deduplicate several attributes

pull/1/head
ansuz 4 years ago
parent 6a51e4e88e
commit 369c92c01d

@ -1,13 +1,28 @@
/* jshint esversion: 6 */
const WebSocketServer = require('ws').Server;
const NetfluxSrv = require('chainpad-server');
const Decrees = require("./decrees");
module.exports.create = function (config) {
const nThen = require("nthen");
module.exports.create = function (Env) {
var log = Env.Log;
nThen(function (w) {
Decrees.load(Env, w(function (err) {
if (err && err.code !== "ENOENT") {
log.error('DECREES_LOADING', {
error: err.code || err,
message: err.message,
});
console.error(err);
}
}));
}).nThen(function () {
// asynchronously create a historyKeeper and RPC together
require('./historyKeeper.js').create(config, function (err, historyKeeper) {
require('./historyKeeper.js').create(Env, function (err, historyKeeper) {
if (err) { throw err; }
var log = config.log;
var noop = function () {};
@ -21,7 +36,7 @@ module.exports.create = function (config) {
};
// spawn ws server and attach netflux event handlers
NetfluxSrv.create(new WebSocketServer({ server: config.httpServer}))
NetfluxSrv.create(new WebSocketServer({ server: Env.httpServer}))
.on('channelClose', historyKeeper.channelClose)
.on('channelMessage', historyKeeper.channelMessage)
.on('channelOpen', historyKeeper.channelOpen)
@ -50,4 +65,6 @@ module.exports.create = function (config) {
})
.register(historyKeeper.id, historyKeeper.directMessage);
});
});
};

@ -0,0 +1,200 @@
/* jshint esversion: 6 */
/* globals process */
const Crypto = require('crypto');
const WriteQueue = require("./write-queue");
const BatchRead = require("./batch-read");
const Keys = require("./keys");
const Core = require("./commands/core");
const Quota = require("./commands/quota");
const Util = require("./common-util");
module.exports.create = function (config) {
// mode can be FRESH (default), DEV, or PACKAGE
const Env = {
FRESH_KEY: '',
FRESH_MODE: true,
DEV_MODE: false,
configCache: {},
flushCache: function () {
Env.configCache = {};
Env.FRESH_KEY = +new Date();
if (!(Env.DEV_MODE || Env.FRESH_MODE)) { Env.FRESH_MODE = true; }
if (!Env.Log) { return; }
Env.Log.info("UPDATING_FRESH_KEY", Env.FRESH_KEY);
},
Log: undefined,
// store
id: Crypto.randomBytes(8).toString('hex'),
launchTime: +new Date(),
inactiveTime: config.inactiveTime,
archiveRetentionTime: config.archiveRetentionTime,
accountRetentionTime: config.accountRetentionTime,
// TODO implement mutability
adminEmail: config.adminEmail,
supportMailbox: config.supportMailboxPublicKey,
metadata_cache: {},
channel_cache: {},
queueStorage: WriteQueue(),
queueDeletes: WriteQueue(),
queueValidation: WriteQueue(),
batchIndexReads: BatchRead("HK_GET_INDEX"),
batchMetadata: BatchRead('GET_METADATA'),
batchRegisteredUsers: BatchRead("GET_REGISTERED_USERS"),
batchDiskUsage: BatchRead('GET_DISK_USAGE'),
batchUserPins: BatchRead('LOAD_USER_PINS'),
batchTotalSize: BatchRead('GET_TOTAL_SIZE'),
batchAccountQuery: BatchRead("QUERY_ACCOUNT_SERVER"),
intervals: {},
maxUploadSize: config.maxUploadSize || (20 * 1024 * 1024),
premiumUploadSize: false, // overridden below...
Sessions: {},
paths: {},
//msgStore: config.store,
netfluxUsers: {},
pinStore: undefined,
limits: {},
admins: [],
WARN: function (e, output) { // TODO deprecate this
if (!Env.Log) { return; }
if (e && output) {
Env.Log.warn(e, {
output: output,
message: String(e),
stack: new Error(e).stack,
});
}
},
allowSubscriptions: config.allowSubscriptions === true,
blockDailyCheck: config.blockDailyCheck === true,
myDomain: config.myDomain,
mySubdomain: config.mySubdomain, // only exists for the accounts integration
customLimits: {},
// FIXME this attribute isn't in the default conf
// but it is referenced in Quota
domain: config.domain,
maxWorkers: config.maxWorkers,
disableIntegratedTasks: config.disableIntegratedTasks || false,
disableIntegratedEviction: config.disableIntegratedEviction || false,
lastEviction: +new Date(),
};
(function () {
if (process.env.PACKAGE) {
// `PACKAGE=1 node server` uses the version string from package.json as the cache string
console.log("PACKAGE MODE ENABLED");
Env.FRESH_MODE = false;
Env.DEV_MODE = false;
} else if (process.env.DEV) {
// `DEV=1 node server` will use a random cache string on every page reload
console.log("DEV MODE ENABLED");
Env.FRESH_MODE = false;
Env.DEV_MODE = true;
} else {
// `FRESH=1 node server` will set a random cache string when the server is launched
// and use it for the process lifetime or until it is reset from the admin panel
console.log("FRESH MODE ENABLED");
Env.FRESH_KEY = +new Date();
}
}());
(function () {
var custom = config.customLimits;
if (!custom) { return; }
var stored = Env.customLimits;
Object.keys(custom).forEach(function (k) {
var unsafeKey = Keys.canonicalize(k);
if (!unsafeKey) {
console.log("INVALID_CUSTOM_LIMIT_ID", {
message: "A custom quota upgrade was provided via your config with an invalid identifier. It will be ignored.",
key: k,
value: custom[k],
});
return;
}
if (stored[unsafeKey]) {
console.log("INVALID_CUSTOM_LIMIT_DUPLICATED", {
message: "A duplicated custom quota upgrade was provided via your config which would have overridden an existing value. It will be ignored.",
key: k,
value: custom[k],
});
return;
}
if (!Quota.isValidLimit(custom[k])) {
console.log("INVALID_CUSTOM_LIMIT_VALUE", {
message: "A custom quota upgrade was provided via your config with an invalid value. It will be ignored.",
key: k,
value: custom[k],
});
return;
}
var limit = stored[unsafeKey] = Util.clone(custom[k]);
limit.origin = 'config';
});
}());
(function () {
var pes = config.premiumUploadSize;
if (!isNaN(pes) && pes >= Env.maxUploadSize) {
Env.premiumUploadSize = pes;
}
}());
var paths = Env.paths;
var keyOrDefaultString = function (key, def) {
return typeof(config[key]) === 'string'? config[key]: def;
};
paths.pin = keyOrDefaultString('pinPath', './pins');
paths.block = keyOrDefaultString('blockPath', './block');
paths.data = keyOrDefaultString('filePath', './datastore');
paths.staging = keyOrDefaultString('blobStagingPath', './blobstage');
paths.blob = keyOrDefaultString('blobPath', './blob');
paths.decree = keyOrDefaultString('decreePath', './data/');
paths.archive = keyOrDefaultString('archivePath', './data/archive');
paths.task = keyOrDefaultString('taskPath', './tasks');
Env.defaultStorageLimit = typeof(config.defaultStorageLimit) === 'number' && config.defaultStorageLimit >= 0?
config.defaultStorageLimit:
Core.DEFAULT_LIMIT;
try {
Env.admins = (config.adminKeys || []).map(function (k) {
try {
return Keys.canonicalize(k);
} catch (err) {
return;
}
}).filter(Boolean);
} catch (e) {
console.error("Can't parse admin keys. Please update or fix your config.js file!");
}
return Env;
};

@ -1,168 +1,18 @@
/* jshint esversion: 6 */
const nThen = require('nthen');
const Crypto = require('crypto');
const WriteQueue = require("./write-queue");
const BatchRead = require("./batch-read");
const RPC = require("./rpc");
const HK = require("./hk-util.js");
const Core = require("./commands/core");
const Keys = require("./keys");
const Quota = require("./commands/quota");
const Util = require("./common-util");
const Store = require("./storage/file");
const BlobStore = require("./storage/blob");
const Workers = require("./workers/index");
const Core = require("./commands/core");
module.exports.create = function (config, cb) {
const Log = config.log;
var WARN = function (e, output) {
if (e && output) {
Log.warn(e, {
output: output,
message: String(e),
stack: new Error(e).stack,
});
}
};
module.exports.create = function (Env, cb) {
const Log = Env.Log;
Log.silly('HK_LOADING', 'LOADING HISTORY_KEEPER MODULE');
const Env = {
Log: Log,
// store
id: Crypto.randomBytes(8).toString('hex'),
launchTime: +new Date(),
inactiveTime: config.inactiveTime,
archiveRetentionTime: config.archiveRetentionTime,
accountRetentionTime: config.accountRetentionTime,
metadata_cache: {},
channel_cache: {},
queueStorage: WriteQueue(),
queueDeletes: WriteQueue(),
queueValidation: WriteQueue(),
batchIndexReads: BatchRead("HK_GET_INDEX"),
batchMetadata: BatchRead('GET_METADATA'),
batchRegisteredUsers: BatchRead("GET_REGISTERED_USERS"),
batchDiskUsage: BatchRead('GET_DISK_USAGE'),
batchUserPins: BatchRead('LOAD_USER_PINS'),
batchTotalSize: BatchRead('GET_TOTAL_SIZE'),
batchAccountQuery: BatchRead("QUERY_ACCOUNT_SERVER"),
//historyKeeper: config.historyKeeper,
intervals: config.intervals || {},
maxUploadSize: config.maxUploadSize || (20 * 1024 * 1024),
premiumUploadSize: false, // overridden below...
Sessions: {},
paths: {},
//msgStore: config.store,
netfluxUsers: {},
pinStore: undefined,
limits: {},
admins: [],
WARN: WARN,
flushCache: config.flushCache,
adminEmail: config.adminEmail,
allowSubscriptions: config.allowSubscriptions === true,
blockDailyCheck: config.blockDailyCheck === true,
myDomain: config.myDomain,
mySubdomain: config.mySubdomain, // only exists for the accounts integration
customLimits: {},
// FIXME this attribute isn't in the default conf
// but it is referenced in Quota
domain: config.domain
};
(function () {
var custom = config.customLimits;
if (!custom) { return; }
var stored = Env.customLimits;
Object.keys(custom).forEach(function (k) {
var unsafeKey = Keys.canonicalize(k);
if (!unsafeKey) {
Log.warn("INVALID_CUSTOM_LIMIT_ID", {
message: "A custom quota upgrade was provided via your config with an invalid identifier. It will be ignored.",
key: k,
value: custom[k],
});
return;
}
if (stored[unsafeKey]) {
Log.warn("INVALID_CUSTOM_LIMIT_DUPLICATED", {
message: "A duplicated custom quota upgrade was provided via your config which would have overridden an existing value. It will be ignored.",
key: k,
value: custom[k],
});
return;
}
if (!Quota.isValidLimit(custom[k])) {
Log.warn("INVALID_CUSTOM_LIMIT_VALUE", {
message: "A custom quota upgrade was provided via your config with an invalid value. It will be ignored.",
key: k,
value: custom[k],
});
return;
}
var limit = stored[unsafeKey] = Util.clone(custom[k]);
limit.origin = 'config';
});
}());
(function () {
var pes = config.premiumUploadSize;
if (!isNaN(pes) && pes >= Env.maxUploadSize) {
Env.premiumUploadSize = pes;
}
}());
var paths = Env.paths;
var keyOrDefaultString = function (key, def) {
return typeof(config[key]) === 'string'? config[key]: def;
};
var pinPath = paths.pin = keyOrDefaultString('pinPath', './pins');
paths.block = keyOrDefaultString('blockPath', './block');
paths.data = keyOrDefaultString('filePath', './datastore');
paths.staging = keyOrDefaultString('blobStagingPath', './blobstage');
paths.blob = keyOrDefaultString('blobPath', './blob');
paths.decree = keyOrDefaultString('decreePath', './data/');
Env.defaultStorageLimit = typeof(config.defaultStorageLimit) === 'number' && config.defaultStorageLimit >= 0?
config.defaultStorageLimit:
Core.DEFAULT_LIMIT;
try {
// XXX this should be the same as is exposed in server.js
// /api/config.adminKeys
Env.admins = (config.adminKeys || []).map(function (k) {
try {
return Keys.canonicalize(k);
} catch (err) {
return;
}
}).filter(Boolean);
} catch (e) {
console.error("Can't parse admin keys. Please update or fix your config.js file!");
}
config.historyKeeper = Env.historyKeeper = {
Env.historyKeeper = {
metadata_cache: Env.metadata_cache,
channel_cache: Env.channel_cache,
@ -262,6 +112,8 @@ module.exports.create = function (config, cb) {
Log.verbose('HK_ID', 'History keeper ID: ' + Env.id);
var pinPath = Env.paths.pin;
nThen(function (w) {
// create a pin store
Store.create({
@ -272,18 +124,20 @@ module.exports.create = function (config, cb) {
}));
// create a channel store
Store.create(config, w(function (err, _store) {
Store.create({
filePath: Env.paths.data,
archivepath: Env.paths.archive,
}, w(function (err, _store) {
if (err) { throw err; }
config.store = _store;
Env.msgStore = _store; // API used by rpc
Env.store = _store; // API used by historyKeeper
}));
// create a blob store
BlobStore.create({
blobPath: config.blobPath,
blobStagingPath: config.blobStagingPath,
archivePath: config.archivePath,
blobPath: Env.paths.blob,
blobStagingPath: Env.paths.staging,
archivePath: Env.paths.archive,
getSession: function (safeKey) {
return Core.getSession(Env.Sessions, safeKey);
},
@ -293,32 +147,28 @@ module.exports.create = function (config, cb) {
}));
}).nThen(function (w) {
Workers.initialize(Env, {
blobPath: config.blobPath,
blobStagingPath: config.blobStagingPath,
taskPath: config.taskPath,
pinPath: pinPath,
filePath: config.filePath,
archivePath: config.archivePath,
channelExpirationMs: config.channelExpirationMs,
verbose: config.verbose,
openFileLimit: config.openFileLimit,
inactiveTime: config.inactiveTime,
archiveRetentionTime: config.archiveRetentionTime,
accountRetentionTime: config.accountRetentionTime,
maxWorkers: config.maxWorkers,
blobPath: Env.paths.blob,
blobStagingPath: Env.paths.staging,
taskPath: Env.paths.task,
pinPath: Env.paths.pin,
filePath: Env.paths.data,
archivePath: Env.paths.archive,
inactiveTime: Env.inactiveTime,
archiveRetentionTime: Env.archiveRetentionTime,
accountRetentionTime: Env.accountRetentionTime,
maxWorkers: Env.maxWorkers,
}, w(function (err) {
if (err) {
throw new Error(err);
}
}));
}).nThen(function () {
config.intervals = config.intervals || {};
if (config.disableIntegratedTasks) { return; }
if (Env.disableIntegratedTasks) { return; }
var tasks_running;
config.intervals.taskExpiration = setInterval(function () {
Env.intervals.taskExpiration = setInterval(function () {
if (tasks_running) { return; }
tasks_running = true;
Env.runTasks(function (err) {
@ -329,19 +179,18 @@ module.exports.create = function (config, cb) {
});
}, 1000 * 60 * 5); // run every five minutes
}).nThen(function () {
if (config.disableIntegratedEviction) { return; }
if (Env.disableIntegratedEviction) { return; }
const ONE_DAY = 24 * 1000 * 60 * 60;
// setting the time of the last eviction to "now"
// effectively makes it so that we'll start evicting after the server
// has been up for at least one day
var last_eviction = +new Date();
var active = false;
config.intervals.eviction = setInterval(function () {
Env.intervals.eviction = setInterval(function () {
if (active) { return; }
var now = +new Date();
// evict inactive data once per day
if (last_eviction && (now - ONE_DAY) < last_eviction) { return; }
if ((now - ONE_DAY) < Env.lastEviction) { return; }
active = true;
Env.evictInactive(function (err) {
if (err) {
@ -349,28 +198,16 @@ module.exports.create = function (config, cb) {
Log.error('EVICT_INACTIVE_MAIN_ERROR', err);
}
active = false;
last_eviction = now;
Env.lastEviction = now;
});
}, 60 * 1000);
}).nThen(function () {
var Decrees = require("./decrees");
Decrees.load(Env, function (err) {
if (err && err.code !== "ENOENT") {
Log.error('DECREES_LOADING', {
error: err.code || err,
message: err.message,
});
console.error(err);
}
});
}).nThen(function () {
RPC.create(Env, function (err, _rpc) {
if (err) { throw err; }
Env.rpc = _rpc;
cb(void 0, config.historyKeeper);
cb(void 0, Env.historyKeeper);
});
});
};

@ -12,31 +12,10 @@ var Default = require("./lib/defaults");
var Keys = require("./lib/keys");
var config = require("./lib/load-config");
var Env = require("./lib/env").create(config);
var app = Express();
// mode can be FRESH (default), DEV, or PACKAGE
var FRESH_KEY = '';
var FRESH_MODE = true;
var DEV_MODE = false;
if (process.env.PACKAGE) {
// `PACKAGE=1 node server` uses the version string from package.json as the cache string
console.log("PACKAGE MODE ENABLED");
FRESH_MODE = false;
DEV_MODE = false;
} else if (process.env.DEV) {
// `DEV=1 node server` will use a random cache string on every page reload
console.log("DEV MODE ENABLED");
FRESH_MODE = false;
DEV_MODE = true;
} else {
// `FRESH=1 node server` will set a random cache string when the server is launched
// and use it for the process lifetime or until it is reset from the admin panel
console.log("FRESH MODE ENABLED");
FRESH_KEY = +new Date();
}
(function () {
// you absolutely must provide an 'httpUnsafeOrigin'
if (typeof(config.httpUnsafeOrigin) !== 'string') {
@ -64,7 +43,7 @@ if (process.env.PACKAGE) {
config.httpSafePort = config.httpPort + 1;
}
if (DEV_MODE) { return; }
if (Env.DEV_MODE) { return; }
console.log(`
m m mm mmmmm mm m mmmmm mm m mmm m
# # # ## # "# #"m # # #"m # m" " #
@ -81,15 +60,6 @@ if (process.env.PACKAGE) {
}
}());
var configCache = {};
config.flushCache = function () {
configCache = {};
FRESH_KEY = +new Date();
if (!(DEV_MODE || FRESH_MODE)) { FRESH_MODE = true; }
if (!config.log) { return; }
config.log.info("UPDATING_FRESH_KEY", FRESH_KEY);
};
var setHeaders = (function () {
// load the default http headers unless the admin has provided their own via the config file
var headers;
@ -144,6 +114,7 @@ if (!config.logFeedback) { return; }
const logFeedback = function (url) {
url.replace(/\?(.*?)=/, function (all, fb) {
if (!config.log) { return; }
config.log.feedback(fb, '');
});
};
@ -182,7 +153,7 @@ app.get(mainPagePattern, Express.static(__dirname + '/customize'));
app.get(mainPagePattern, Express.static(__dirname + '/customize.dist'));
app.use("/blob", Express.static(Path.join(__dirname, (config.blobPath || './blob')), {
maxAge: DEV_MODE? "0d": "365d"
maxAge: Env.DEV_MODE? "0d": "365d"
}));
app.use("/datastore", Express.static(Path.join(__dirname, (config.filePath || './datastore')), {
maxAge: "0d"
@ -197,23 +168,10 @@ app.use("/customize.dist", Express.static(__dirname + '/customize.dist'));
app.use(/^\/[^\/]*$/, Express.static('customize'));
app.use(/^\/[^\/]*$/, Express.static('customize.dist'));
var admins = [];
try {
admins = (config.adminKeys || []).map(function (k) {
var unsafeKey = Keys.canonicalize(k);
// return each admin's "unsafeKey"
// this might throw and invalidate all the other admin's keys
// but we want to get the admin's attention anyway.
// breaking everything is a good way to accomplish that.
if (!unsafeKey) { throw new Error(); }
return unsafeKey;
});
} catch (e) { console.error("Can't parse admin keys"); }
var serveConfig = (function () {
// if dev mode: never cache
var cacheString = function () {
return (FRESH_KEY? '-' + FRESH_KEY: '') + (DEV_MODE? '-' + (+new Date()): '');
return (Env.FRESH_KEY? '-' + Env.FRESH_KEY: '') + (Env.DEV_MODE? '-' + (+new Date()): '');
};
var template = function (host) {
@ -228,12 +186,12 @@ var serveConfig = (function () {
allowSubscriptions: (config.allowSubscriptions === true),
websocketPath: config.externalWebsocketURL,
httpUnsafeOrigin: config.httpUnsafeOrigin,
adminEmail: config.adminEmail, // XXX mutable
adminKeys: admins,
inactiveTime: config.inactiveTime, // XXX mutable
supportMailbox: config.supportMailboxPublicKey,
maxUploadSize: config.maxUploadSize, // XXX mutable
premiumUploadSize: config.premiumUploadSize, // XXX mutable
adminEmail: Env.adminEmail,
adminKeys: Env.admins,
inactiveTime: Env.inactiveTime,
supportMailbox: Env.supportMailboxPublicKey,
maxUploadSize: Env.maxUploadSize,
premiumUploadSize: Env.premiumUploadSize,
}, null, '\t'),
'obj.httpSafeOrigin = ' + (function () {
if (config.httpSafeOrigin) { return '"' + config.httpSafeOrigin + '"'; }
@ -254,28 +212,29 @@ var serveConfig = (function () {
var host = req.headers.host.replace(/\:[0-9]+/, '');
res.setHeader('Content-Type', 'text/javascript');
// don't cache anything if you're in dev mode
if (DEV_MODE) {
if (Env.DEV_MODE) {
return void res.send(template(host));
}
// generate a lookup key for the cache
var cacheKey = host + ':' + cacheString();
// XXX we must be able to clear the cache when updating any mutable key
// FIXME mutable
// we must be able to clear the cache when updating any mutable key
// if there's nothing cached for that key...
if (!configCache[cacheKey]) {
if (!Env.configCache[cacheKey]) {
// generate the response and cache it in memory
configCache[cacheKey] = template(host);
Env.configCache[cacheKey] = template(host);
// and create a function to conditionally evict cache entries
// which have not been accessed in the last 20 seconds
cleanUp[cacheKey] = Util.throttle(function () {
delete cleanUp[cacheKey];
delete configCache[cacheKey];
delete Env.configCache[cacheKey];
}, 20000);
}
// successive calls to this function
cleanUp[cacheKey]();
return void res.send(configCache[cacheKey]);
return void res.send(Env.configCache[cacheKey]);
};
}());
@ -298,7 +257,7 @@ app.use(function (req, res, next) {
send404(res, custom_four04_path);
});
var httpServer = Http.createServer(app);
var httpServer = Env.httpServer = Http.createServer(app);
nThen(function (w) {
Fs.exists(__dirname + "/customize", w(function (e) {
@ -324,11 +283,12 @@ nThen(function (w) {
// Initialize logging then start the API server
require("./lib/log").create(config, function (_log) {
Env.Log = _log;
config.log = _log;
config.httpServer = httpServer;
if (config.externalWebsocketURL) { return; }
require("./lib/api").create(config);
require("./lib/api").create(Env);
});
});

Loading…
Cancel
Save