diff --git a/lib/check-signature.js b/lib/check-signature.js new file mode 100644 index 000000000..26839ccee --- /dev/null +++ b/lib/check-signature.js @@ -0,0 +1,49 @@ +/* jshint esversion: 6 */ +/* global process */ +const Nacl = require('tweetnacl/nacl-fast'); + +// XXX npm "os" and "child_process" + +// TODO if this process is using too much CPU, we can use "cluster" to add load balancing to this code +//console.log('New child process', process.pid); +process.on('message', function (data) { + //console.log('In process', process.pid); + //console.log(+new Date(), "Message received by subprocess"); + if (!data || !data.key || !data.msg || !data.txid) { + return void process.send({ + error:'E_INVAL' + }); + } + const txid = data.txid; + + var signedMsg; + try { + signedMsg = Nacl.util.decodeBase64(data.msg); + } catch (e) { + return void process.send({ + txid: txid, + error: 'E_BAD_MESSAGE', + }); + } + + var validateKey; + try { + validateKey = Nacl.util.decodeBase64(data.key); + } catch (e) { + return void process.send({ + txid: txid, + error:'E_BADKEY' + }); + } + // validate the message + const validated = Nacl.sign.open(signedMsg, validateKey); + if (!validated) { + return void process.send({ + txid: txid, + error:'FAILED' + }); + } + process.send({ + txid: txid, + }); +}); diff --git a/lib/commands/core.js b/lib/commands/core.js index d7add69b4..030aaf4ca 100644 --- a/lib/commands/core.js +++ b/lib/commands/core.js @@ -3,6 +3,7 @@ const Core = module.exports; const Util = require("../common-util"); const escapeKeyCharacters = Util.escapeKeyCharacters; +//const { fork } = require('child_process'); /* Use Nacl for checking signatures of messages */ const Nacl = require("tweetnacl/nacl-fast"); diff --git a/lib/historyKeeper.js b/lib/historyKeeper.js index fcd291414..a3a33c4e6 100644 --- a/lib/historyKeeper.js +++ b/lib/historyKeeper.js @@ -76,6 +76,8 @@ module.exports.create = function (config, cb) { domain: config.domain }; + HK.initializeValidationWorkers(Env); + (function () { var pes = config.premiumUploadSize; if (!isNaN(pes) && pes >= Env.maxUploadSize) { diff --git a/lib/hk-util.js b/lib/hk-util.js index cb9e9b8ef..2bcae56e3 100644 --- a/lib/hk-util.js +++ b/lib/hk-util.js @@ -6,6 +6,8 @@ const nThen = require('nthen'); const Util = require("./common-util"); const MetaRPC = require("./commands/metadata"); const Nacl = require('tweetnacl/nacl-fast'); +const { fork } = require('child_process'); +const numCPUs = require('os').cpus().length; const now = function () { return (new Date()).getTime(); }; const ONE_DAY = 1000 * 60 * 60 * 24; // one day in milliseconds @@ -921,7 +923,68 @@ HK.onDirectMessage = function (Env, Server, seq, userId, json) { * adds timestamps to incoming messages * writes messages to the store */ + +HK.initializeValidationWorkers = function (Env) { + if (typeof(Env.validateMessage) !== 'undefined') { + return void console.error("validation workers are already initialized"); + } + + // Create our workers + const workers = []; + for (let i = 0; i < numCPUs; i++) { + workers.push(fork('lib/check-signature.js')); + } + + const response = Util.response(); + + var initWorker = function (worker) { + worker.on('message', function (res) { + if (!res || !res.txid) { return; } + //console.log(+new Date(), "Received verification response"); + response.handle(res.txid, [res.error]); + }); + // Spawn a new process in one ends + worker.on('exit', function () { + // XXX make sure it's dead? + var idx = workers.indexOf(worker); + if (idx !== -1) { + workers.splice(idx, 1); + } + // Spawn a new one + var w = fork('lib/check-signature.js'); + workers.push(w); + initWorker(w); + }); + }; + workers.forEach(initWorker); + + var nextWorker = 0; + Env.validateMessage = function (signedMsg, key, _cb) { + // let's be paranoid about asynchrony and only calling back once.. + var cb = Util.once(Util.mkAsync(_cb)); + + var txid = Util.uid(); + + // expect a response within 15s + response.expect(txid, cb, 15000); + + nextWorker = (nextWorker + 1) % workers.length; + if (workers.length === 0 || typeof(workers[nextWorker].send) !== 'function') { + console.error(workers); + throw new Error("INVALID_WORKERS"); + } + + // Send the request + workers[nextWorker].send({ + txid: txid, + msg: signedMsg, + key: key, + }); + }; +}; + HK.onChannelMessage = function (Env, Server, channel, msgStruct) { + //console.log(+new Date(), "onChannelMessage"); const Log = Env.Log; // TODO our usage of 'channel' here looks prone to errors @@ -962,20 +1025,21 @@ HK.onChannelMessage = function (Env, Server, channel, msgStruct) { let signedMsg = (isCp) ? msgStruct[4].replace(CHECKPOINT_PATTERN, '') : msgStruct[4]; // convert the message from a base64 string into a Uint8Array - // FIXME this can fail and the client won't notice - signedMsg = Nacl.util.decodeBase64(signedMsg); + //const txid = Util.uid(); - // FIXME this can blow up - // TODO check that that won't cause any problems other than not being able to append... - const validateKey = Nacl.util.decodeBase64(metadata.validateKey); - // validate the message - const validated = Nacl.sign.open(signedMsg, validateKey); - if (!validated) { - // don't go any further if the message fails validation - w.abort(); - Log.info("HK_SIGNED_MESSAGE_REJECTED", 'Channel '+channel.id); - return; - } + // Listen for messages + //console.log(+new Date(), "Send verification request"); + Env.validateMessage(signedMsg, metadata.validateKey, w(function (err) { + // no errors means success + if (!err) { return; } + // validation can fail in multiple ways + if (err === 'FAILED') { + // we log this case, but not others for some reason + Log.info("HK_SIGNED_MESSAGE_REJECTED", 'Channel '+channel.id); + } + // always abort if there was an error... + return void w.abort(); + })); }).nThen(function () { // do checkpoint stuff... @@ -1002,7 +1066,9 @@ HK.onChannelMessage = function (Env, Server, channel, msgStruct) { msgStruct.push(now()); // storeMessage + //console.log(+new Date(), "Storing message"); storeMessage(Env, channel, JSON.stringify(msgStruct), isCp, getHash(msgStruct[4], Log)); + //console.log(+new Date(), "Message stored"); }); };