diff --git a/lib/check-signature.js b/lib/check-signature.js index 5e6682a73..26839ccee 100644 --- a/lib/check-signature.js +++ b/lib/check-signature.js @@ -5,9 +5,7 @@ const Nacl = require('tweetnacl/nacl-fast'); // XXX npm "os" and "child_process" // TODO if this process is using too much CPU, we can use "cluster" to add load balancing to this code - -console.log('New child process', process.pid); - +//console.log('New child process', process.pid); process.on('message', function (data) { //console.log('In process', process.pid); //console.log(+new Date(), "Message received by subprocess"); diff --git a/lib/historyKeeper.js b/lib/historyKeeper.js index fcd291414..a3a33c4e6 100644 --- a/lib/historyKeeper.js +++ b/lib/historyKeeper.js @@ -76,6 +76,8 @@ module.exports.create = function (config, cb) { domain: config.domain }; + HK.initializeValidationWorkers(Env); + (function () { var pes = config.premiumUploadSize; if (!isNaN(pes) && pes >= Env.maxUploadSize) { diff --git a/lib/hk-util.js b/lib/hk-util.js index c669254ad..915d5772e 100644 --- a/lib/hk-util.js +++ b/lib/hk-util.js @@ -924,59 +924,63 @@ HK.onDirectMessage = function (Env, Server, seq, userId, json) { * writes messages to the store */ +HK.initializeValidationWorkers = function (Env) { + if (typeof(Env.validateMessage) !== 'undefined') { + return void console.error("validation workers are already initialized"); + } -// Create our workers -const workers = []; -for (let i = 0; i < numCPUs; i++) { - workers.push(fork('lib/check-signature.js')); -} -var nextWorker = 0; - -const response = Util.response(); + // Create our workers + const workers = []; + for (let i = 0; i < numCPUs; i++) { + workers.push(fork('lib/check-signature.js')); + } -var initWorker = function (worker) { - worker.on('message', function (res) { - if (!res || !res.txid) { return; } - //console.log(+new Date(), "Received verification response"); - response.handle(res.txid, [res.error]); - }); - // Spawn a new process in one ends - worker.on('exit', function () { - // XXX make sure it's dead? - var idx = workers.indexOf(worker); - if (idx !== -1) { - workers.splice(idx, 1); - } - // Spawn a new one - var w = fork('lib/check-signature.js'); - workers.push(w); - initWorker(w); - }); -}; -workers.forEach(initWorker); + const response = Util.response(); + var initWorker = function (worker) { + worker.on('message', function (res) { + if (!res || !res.txid) { return; } + //console.log(+new Date(), "Received verification response"); + response.handle(res.txid, [res.error]); + }); + // Spawn a new process in one ends + worker.on('exit', function () { + // XXX make sure it's dead? + var idx = workers.indexOf(worker); + if (idx !== -1) { + workers.splice(idx, 1); + } + // Spawn a new one + var w = fork('lib/check-signature.js'); + workers.push(w); + initWorker(w); + }); + }; + workers.forEach(initWorker); -const validateMessage = function (signedMsg, key, _cb) { - // let's be paranoid about asynchrony and only calling back once.. - var cb = Util.once(Util.mkAsync(_cb)); + var nextWorker = 0; + Env.validateMessage = function (signedMsg, key, _cb) { + // let's be paranoid about asynchrony and only calling back once.. + var cb = Util.once(Util.mkAsync(_cb)); - var txid = Util.uid(); + var txid = Util.uid(); - // expect a response within 15s - response.expect(txid, cb, 15000); + // expect a response within 15s + response.expect(txid, cb, 15000); - nextWorker = (nextWorker + 1) % workers.length; - if (workers.length === 0 || typeof(workers[nextWorker].send) !== 'function') { - console.error(workers); - throw new Error("INVALID_WORKERS"); - } + nextWorker = (nextWorker + 1) % workers.length; + if (workers.length === 0 || typeof(workers[nextWorker].send) !== 'function') { + console.error(workers); + throw new Error("INVALID_WORKERS"); + } - // Send the request - workers[nextWorker].send({ - txid: txid, - msg: signedMsg, - key: key, - }); + // Send the request + workers[nextWorker].send({ + txid: txid, + msg: signedMsg, + key: key, + }); + }; }; HK.onChannelMessage = function (Env, Server, channel, msgStruct) { @@ -1025,7 +1029,7 @@ HK.onChannelMessage = function (Env, Server, channel, msgStruct) { // Listen for messages //console.log(+new Date(), "Send verification request"); - validateMessage(signedMsg, metadata.validateKey, w(function (err) { + Env.validateMessage(signedMsg, metadata.validateKey, w(function (err) { if (err) { // validation can fail in multiple ways if (err === 'FAILED') {