From 0d636dabc99276c37d9980ab6458e0f632278a71 Mon Sep 17 00:00:00 2001 From: yflory Date: Tue, 17 Mar 2020 13:29:53 +0100 Subject: [PATCH 1/8] Check signature for history keeper in a different process --- lib/check-signature.js | 40 ++++++++++++++++++++++++++++++ lib/commands/core.js | 1 + lib/hk-util.js | 55 +++++++++++++++++++++++++++++++----------- 3 files changed, 82 insertions(+), 14 deletions(-) create mode 100644 lib/check-signature.js diff --git a/lib/check-signature.js b/lib/check-signature.js new file mode 100644 index 000000000..c81ffbb2d --- /dev/null +++ b/lib/check-signature.js @@ -0,0 +1,40 @@ +const Nacl = require('tweetnacl/nacl-fast'); + +// TODO if this process is using too much CPU, we can use "cluster" to add load balancing to this code + +process.on('message', function (data) { + console.log(+new Date(), "Message received by subprocess"); + if (!data || !data.key || !data.msg || !data.txid) { + process.send({ + error:'E_INVAL' + }); + return; + } + const txid = data.txid; + + const signedMsg = Nacl.util.decodeBase64(data.msg); + var validateKey; + try { + validateKey = Nacl.util.decodeBase64(data.key); + } catch (e) { + process.send({ + txid: txid, + error:'E_BADKEY' + }); + return; + } + // validate the message + const validated = Nacl.sign.open(signedMsg, validateKey); + if (!validated) { + process.send({ + txid: txid, + error:'FAILED' + }); + return; + } + console.log(+new Date(), "Verification done in the subprocess"); + process.send({ + txid: txid, + success: true + }); +}); diff --git a/lib/commands/core.js b/lib/commands/core.js index d7add69b4..bb5156749 100644 --- a/lib/commands/core.js +++ b/lib/commands/core.js @@ -3,6 +3,7 @@ const Core = module.exports; const Util = require("../common-util"); const escapeKeyCharacters = Util.escapeKeyCharacters; +const { fork } = require('child_process'); /* Use Nacl for checking signatures of messages */ const Nacl = require("tweetnacl/nacl-fast"); diff --git a/lib/hk-util.js b/lib/hk-util.js index cb9e9b8ef..6f8c1811a 100644 --- a/lib/hk-util.js +++ b/lib/hk-util.js @@ -6,6 +6,7 @@ const nThen = require('nthen'); const Util = require("./common-util"); const MetaRPC = require("./commands/metadata"); const Nacl = require('tweetnacl/nacl-fast'); +const { fork } = require('child_process'); const now = function () { return (new Date()).getTime(); }; const ONE_DAY = 1000 * 60 * 60 * 24; // one day in milliseconds @@ -921,7 +922,14 @@ HK.onDirectMessage = function (Env, Server, seq, userId, json) { * adds timestamps to incoming messages * writes messages to the store */ +const check = fork('lib/check-signature.js'); +const onChecked = Util.mkEvent(); +check.on('message', function (res) { + onChecked.fire(res); +}); + HK.onChannelMessage = function (Env, Server, channel, msgStruct) { + console.log(+new Date(), "onChannelMessage"); const Log = Env.Log; // TODO our usage of 'channel' here looks prone to errors @@ -962,20 +970,37 @@ HK.onChannelMessage = function (Env, Server, channel, msgStruct) { let signedMsg = (isCp) ? msgStruct[4].replace(CHECKPOINT_PATTERN, '') : msgStruct[4]; // convert the message from a base64 string into a Uint8Array - // FIXME this can fail and the client won't notice - signedMsg = Nacl.util.decodeBase64(signedMsg); - - // FIXME this can blow up - // TODO check that that won't cause any problems other than not being able to append... - const validateKey = Nacl.util.decodeBase64(metadata.validateKey); - // validate the message - const validated = Nacl.sign.open(signedMsg, validateKey); - if (!validated) { - // don't go any further if the message fails validation - w.abort(); - Log.info("HK_SIGNED_MESSAGE_REJECTED", 'Channel '+channel.id); - return; - } + const txid = Util.uid(); + const next = w(); + + // Listen for messages + const onCheck = function (res) { + if (!res) { return; } + if (res.txid !== txid) { return; } + // Wev'e received an answer, remove this handler + console.log(+new Date(), "Received verification response"); + onChecked.unreg(onCheck); + if (res.error) { + // don't go any further if the message fails validation + if (res.error === 'FAILED') { + // Signature doesn't match the public key + Log.info("HK_SIGNED_MESSAGE_REJECTED", 'Channel '+channel.id); + } + w.abort(); + return; + } + // Success, continue to next block + next(); + }; + onChecked.reg(onCheck); + + console.log(+new Date(), "Send verification request"); + // Send the request + check.send({ + txid: txid, + msg: signedMsg, + key: metadata.validateKey + }); }).nThen(function () { // do checkpoint stuff... @@ -1002,7 +1027,9 @@ HK.onChannelMessage = function (Env, Server, channel, msgStruct) { msgStruct.push(now()); // storeMessage + console.log(+new Date(), "Storing message"); storeMessage(Env, channel, JSON.stringify(msgStruct), isCp, getHash(msgStruct[4], Log)); + console.log(+new Date(), "Message stored"); }); }; From fb0eb1b20cfbd229c0fbd82c831dce7bd50366d5 Mon Sep 17 00:00:00 2001 From: yflory Date: Tue, 17 Mar 2020 14:18:41 +0100 Subject: [PATCH 2/8] Use more subprocesses --- lib/check-signature.js | 5 +++++ lib/hk-util.js | 33 ++++++++++++++++++++++++++++----- 2 files changed, 33 insertions(+), 5 deletions(-) diff --git a/lib/check-signature.js b/lib/check-signature.js index c81ffbb2d..8f3d5edad 100644 --- a/lib/check-signature.js +++ b/lib/check-signature.js @@ -1,8 +1,13 @@ const Nacl = require('tweetnacl/nacl-fast'); +// XXX npm "os" and "child_process" + // TODO if this process is using too much CPU, we can use "cluster" to add load balancing to this code +console.log('New child process', process.pid); + process.on('message', function (data) { + console.log('In process', process.pid) console.log(+new Date(), "Message received by subprocess"); if (!data || !data.key || !data.msg || !data.txid) { process.send({ diff --git a/lib/hk-util.js b/lib/hk-util.js index 6f8c1811a..889d5d0b0 100644 --- a/lib/hk-util.js +++ b/lib/hk-util.js @@ -7,6 +7,7 @@ const Util = require("./common-util"); const MetaRPC = require("./commands/metadata"); const Nacl = require('tweetnacl/nacl-fast'); const { fork } = require('child_process'); +const numCPUs = require('os').cpus().length; const now = function () { return (new Date()).getTime(); }; const ONE_DAY = 1000 * 60 * 60 * 24; // one day in milliseconds @@ -922,11 +923,32 @@ HK.onDirectMessage = function (Env, Server, seq, userId, json) { * adds timestamps to incoming messages * writes messages to the store */ -const check = fork('lib/check-signature.js'); + + const onChecked = Util.mkEvent(); -check.on('message', function (res) { - onChecked.fire(res); -}); +// Create our workers +const workers = []; +for (let i = 0; i < numCPUs; i++) { + workers.push(fork('lib/check-signature.js')); +} +var initWorker = function (worker) { + worker.on('message', function (res) { + onChecked.fire(res); + }); + // Spawn a new process in one ends + worker.on('exit', function () { + // XXX make sure it's dead? + var idx = workers.indexOf(worker); + if (idx !== -1) { + workers.splice(idx, 1); + } + // Spawn a new one + var w = fork('lib/check-signature.js'); + workers.push(w); + initWorker(w); + }); +}; +workers.forEach(initWorker); HK.onChannelMessage = function (Env, Server, channel, msgStruct) { console.log(+new Date(), "onChannelMessage"); @@ -996,7 +1018,8 @@ HK.onChannelMessage = function (Env, Server, channel, msgStruct) { console.log(+new Date(), "Send verification request"); // Send the request - check.send({ + const random = Math.floor(Math.random() * 4); + workers[random].send({ txid: txid, msg: signedMsg, key: metadata.validateKey From 923616aef05114c7b24cfdeaf01b25566617eddf Mon Sep 17 00:00:00 2001 From: yflory Date: Tue, 17 Mar 2020 15:10:23 +0100 Subject: [PATCH 3/8] Use each process in order instead of using a random one --- lib/hk-util.js | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/lib/hk-util.js b/lib/hk-util.js index 889d5d0b0..a9b9f0eb1 100644 --- a/lib/hk-util.js +++ b/lib/hk-util.js @@ -931,6 +931,7 @@ const workers = []; for (let i = 0; i < numCPUs; i++) { workers.push(fork('lib/check-signature.js')); } +var nextWorker = 0; var initWorker = function (worker) { worker.on('message', function (res) { onChecked.fire(res); @@ -1018,12 +1019,12 @@ HK.onChannelMessage = function (Env, Server, channel, msgStruct) { console.log(+new Date(), "Send verification request"); // Send the request - const random = Math.floor(Math.random() * 4); - workers[random].send({ + workers[nextWorker].send({ txid: txid, msg: signedMsg, key: metadata.validateKey }); + if (++nextWorker > numCPUs) { nextWorker = 0; } }).nThen(function () { // do checkpoint stuff... From 631ea54b49f0f9accb4967135f3d0ec1fb7167c8 Mon Sep 17 00:00:00 2001 From: ansuz Date: Tue, 17 Mar 2020 15:40:06 -0400 Subject: [PATCH 4/8] lint compliance --- lib/check-signature.js | 4 +++- lib/commands/core.js | 2 +- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/lib/check-signature.js b/lib/check-signature.js index 8f3d5edad..6ffd6329e 100644 --- a/lib/check-signature.js +++ b/lib/check-signature.js @@ -1,3 +1,5 @@ +/* jshint esversion: 6 */ +/* global process */ const Nacl = require('tweetnacl/nacl-fast'); // XXX npm "os" and "child_process" @@ -7,7 +9,7 @@ const Nacl = require('tweetnacl/nacl-fast'); console.log('New child process', process.pid); process.on('message', function (data) { - console.log('In process', process.pid) + console.log('In process', process.pid); console.log(+new Date(), "Message received by subprocess"); if (!data || !data.key || !data.msg || !data.txid) { process.send({ diff --git a/lib/commands/core.js b/lib/commands/core.js index bb5156749..030aaf4ca 100644 --- a/lib/commands/core.js +++ b/lib/commands/core.js @@ -3,7 +3,7 @@ const Core = module.exports; const Util = require("../common-util"); const escapeKeyCharacters = Util.escapeKeyCharacters; -const { fork } = require('child_process'); +//const { fork } = require('child_process'); /* Use Nacl for checking signatures of messages */ const Nacl = require("tweetnacl/nacl-fast"); From 9e85a1411e8507b99e8dc7593bfc8acba39ced62 Mon Sep 17 00:00:00 2001 From: ansuz Date: Tue, 17 Mar 2020 16:06:01 -0400 Subject: [PATCH 5/8] abstract the logic around worker choice out of message handling --- lib/hk-util.js | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/lib/hk-util.js b/lib/hk-util.js index a9b9f0eb1..b3f917693 100644 --- a/lib/hk-util.js +++ b/lib/hk-util.js @@ -926,6 +926,7 @@ HK.onDirectMessage = function (Env, Server, seq, userId, json) { const onChecked = Util.mkEvent(); + // Create our workers const workers = []; for (let i = 0; i < numCPUs; i++) { @@ -951,6 +952,12 @@ var initWorker = function (worker) { }; workers.forEach(initWorker); +const validateMessage = function (msg) { + nextWorker = (nextWorker + 1) % workers.length; + // Send the request + workers[nextWorker].send(msg); +}; + HK.onChannelMessage = function (Env, Server, channel, msgStruct) { console.log(+new Date(), "onChannelMessage"); const Log = Env.Log; @@ -1018,13 +1025,11 @@ HK.onChannelMessage = function (Env, Server, channel, msgStruct) { onChecked.reg(onCheck); console.log(+new Date(), "Send verification request"); - // Send the request - workers[nextWorker].send({ + validateMessage( { txid: txid, msg: signedMsg, key: metadata.validateKey }); - if (++nextWorker > numCPUs) { nextWorker = 0; } }).nThen(function () { // do checkpoint stuff... From 5467e1ffacd9cf4b982ffc57e1be971a12c6e31f Mon Sep 17 00:00:00 2001 From: ansuz Date: Tue, 17 Mar 2020 16:52:41 -0400 Subject: [PATCH 6/8] replace ad-hoc response handler with Util.response --- lib/check-signature.js | 26 ++++++++------- lib/hk-util.js | 73 +++++++++++++++++++++++------------------- 2 files changed, 55 insertions(+), 44 deletions(-) diff --git a/lib/check-signature.js b/lib/check-signature.js index 6ffd6329e..5e6682a73 100644 --- a/lib/check-signature.js +++ b/lib/check-signature.js @@ -9,39 +9,43 @@ const Nacl = require('tweetnacl/nacl-fast'); console.log('New child process', process.pid); process.on('message', function (data) { - console.log('In process', process.pid); - console.log(+new Date(), "Message received by subprocess"); + //console.log('In process', process.pid); + //console.log(+new Date(), "Message received by subprocess"); if (!data || !data.key || !data.msg || !data.txid) { - process.send({ + return void process.send({ error:'E_INVAL' }); - return; } const txid = data.txid; - const signedMsg = Nacl.util.decodeBase64(data.msg); + var signedMsg; + try { + signedMsg = Nacl.util.decodeBase64(data.msg); + } catch (e) { + return void process.send({ + txid: txid, + error: 'E_BAD_MESSAGE', + }); + } + var validateKey; try { validateKey = Nacl.util.decodeBase64(data.key); } catch (e) { - process.send({ + return void process.send({ txid: txid, error:'E_BADKEY' }); - return; } // validate the message const validated = Nacl.sign.open(signedMsg, validateKey); if (!validated) { - process.send({ + return void process.send({ txid: txid, error:'FAILED' }); - return; } - console.log(+new Date(), "Verification done in the subprocess"); process.send({ txid: txid, - success: true }); }); diff --git a/lib/hk-util.js b/lib/hk-util.js index b3f917693..c669254ad 100644 --- a/lib/hk-util.js +++ b/lib/hk-util.js @@ -925,17 +925,20 @@ HK.onDirectMessage = function (Env, Server, seq, userId, json) { */ -const onChecked = Util.mkEvent(); - // Create our workers const workers = []; for (let i = 0; i < numCPUs; i++) { workers.push(fork('lib/check-signature.js')); } var nextWorker = 0; + +const response = Util.response(); + var initWorker = function (worker) { worker.on('message', function (res) { - onChecked.fire(res); + if (!res || !res.txid) { return; } + //console.log(+new Date(), "Received verification response"); + response.handle(res.txid, [res.error]); }); // Spawn a new process in one ends worker.on('exit', function () { @@ -952,14 +955,32 @@ var initWorker = function (worker) { }; workers.forEach(initWorker); -const validateMessage = function (msg) { + +const validateMessage = function (signedMsg, key, _cb) { + // let's be paranoid about asynchrony and only calling back once.. + var cb = Util.once(Util.mkAsync(_cb)); + + var txid = Util.uid(); + + // expect a response within 15s + response.expect(txid, cb, 15000); + nextWorker = (nextWorker + 1) % workers.length; + if (workers.length === 0 || typeof(workers[nextWorker].send) !== 'function') { + console.error(workers); + throw new Error("INVALID_WORKERS"); + } + // Send the request - workers[nextWorker].send(msg); + workers[nextWorker].send({ + txid: txid, + msg: signedMsg, + key: key, + }); }; HK.onChannelMessage = function (Env, Server, channel, msgStruct) { - console.log(+new Date(), "onChannelMessage"); + //console.log(+new Date(), "onChannelMessage"); const Log = Env.Log; // TODO our usage of 'channel' here looks prone to errors @@ -1000,36 +1021,22 @@ HK.onChannelMessage = function (Env, Server, channel, msgStruct) { let signedMsg = (isCp) ? msgStruct[4].replace(CHECKPOINT_PATTERN, '') : msgStruct[4]; // convert the message from a base64 string into a Uint8Array - const txid = Util.uid(); - const next = w(); + //const txid = Util.uid(); // Listen for messages - const onCheck = function (res) { - if (!res) { return; } - if (res.txid !== txid) { return; } - // Wev'e received an answer, remove this handler - console.log(+new Date(), "Received verification response"); - onChecked.unreg(onCheck); - if (res.error) { - // don't go any further if the message fails validation - if (res.error === 'FAILED') { - // Signature doesn't match the public key + //console.log(+new Date(), "Send verification request"); + validateMessage(signedMsg, metadata.validateKey, w(function (err) { + if (err) { + // validation can fail in multiple ways + if (err === 'FAILED') { + // we log this case, but not others for some reason Log.info("HK_SIGNED_MESSAGE_REJECTED", 'Channel '+channel.id); } - w.abort(); - return; + // always abort if there was an error... + return void w.abort(); } - // Success, continue to next block - next(); - }; - onChecked.reg(onCheck); - - console.log(+new Date(), "Send verification request"); - validateMessage( { - txid: txid, - msg: signedMsg, - key: metadata.validateKey - }); + // otherwise it was successful! + })); }).nThen(function () { // do checkpoint stuff... @@ -1056,9 +1063,9 @@ HK.onChannelMessage = function (Env, Server, channel, msgStruct) { msgStruct.push(now()); // storeMessage - console.log(+new Date(), "Storing message"); + //console.log(+new Date(), "Storing message"); storeMessage(Env, channel, JSON.stringify(msgStruct), isCp, getHash(msgStruct[4], Log)); - console.log(+new Date(), "Message stored"); + //console.log(+new Date(), "Message stored"); }); }; From 019e5e708bb7f631a2b71e1245b40a2eb9145e8b Mon Sep 17 00:00:00 2001 From: ansuz Date: Tue, 17 Mar 2020 17:23:35 -0400 Subject: [PATCH 7/8] wrap workers in a function scope and add a validateMessage method to HK's Env --- lib/check-signature.js | 4 +- lib/historyKeeper.js | 2 + lib/hk-util.js | 96 ++++++++++++++++++++++-------------------- 3 files changed, 53 insertions(+), 49 deletions(-) diff --git a/lib/check-signature.js b/lib/check-signature.js index 5e6682a73..26839ccee 100644 --- a/lib/check-signature.js +++ b/lib/check-signature.js @@ -5,9 +5,7 @@ const Nacl = require('tweetnacl/nacl-fast'); // XXX npm "os" and "child_process" // TODO if this process is using too much CPU, we can use "cluster" to add load balancing to this code - -console.log('New child process', process.pid); - +//console.log('New child process', process.pid); process.on('message', function (data) { //console.log('In process', process.pid); //console.log(+new Date(), "Message received by subprocess"); diff --git a/lib/historyKeeper.js b/lib/historyKeeper.js index fcd291414..a3a33c4e6 100644 --- a/lib/historyKeeper.js +++ b/lib/historyKeeper.js @@ -76,6 +76,8 @@ module.exports.create = function (config, cb) { domain: config.domain }; + HK.initializeValidationWorkers(Env); + (function () { var pes = config.premiumUploadSize; if (!isNaN(pes) && pes >= Env.maxUploadSize) { diff --git a/lib/hk-util.js b/lib/hk-util.js index c669254ad..915d5772e 100644 --- a/lib/hk-util.js +++ b/lib/hk-util.js @@ -924,59 +924,63 @@ HK.onDirectMessage = function (Env, Server, seq, userId, json) { * writes messages to the store */ +HK.initializeValidationWorkers = function (Env) { + if (typeof(Env.validateMessage) !== 'undefined') { + return void console.error("validation workers are already initialized"); + } -// Create our workers -const workers = []; -for (let i = 0; i < numCPUs; i++) { - workers.push(fork('lib/check-signature.js')); -} -var nextWorker = 0; - -const response = Util.response(); + // Create our workers + const workers = []; + for (let i = 0; i < numCPUs; i++) { + workers.push(fork('lib/check-signature.js')); + } -var initWorker = function (worker) { - worker.on('message', function (res) { - if (!res || !res.txid) { return; } - //console.log(+new Date(), "Received verification response"); - response.handle(res.txid, [res.error]); - }); - // Spawn a new process in one ends - worker.on('exit', function () { - // XXX make sure it's dead? - var idx = workers.indexOf(worker); - if (idx !== -1) { - workers.splice(idx, 1); - } - // Spawn a new one - var w = fork('lib/check-signature.js'); - workers.push(w); - initWorker(w); - }); -}; -workers.forEach(initWorker); + const response = Util.response(); + var initWorker = function (worker) { + worker.on('message', function (res) { + if (!res || !res.txid) { return; } + //console.log(+new Date(), "Received verification response"); + response.handle(res.txid, [res.error]); + }); + // Spawn a new process in one ends + worker.on('exit', function () { + // XXX make sure it's dead? + var idx = workers.indexOf(worker); + if (idx !== -1) { + workers.splice(idx, 1); + } + // Spawn a new one + var w = fork('lib/check-signature.js'); + workers.push(w); + initWorker(w); + }); + }; + workers.forEach(initWorker); -const validateMessage = function (signedMsg, key, _cb) { - // let's be paranoid about asynchrony and only calling back once.. - var cb = Util.once(Util.mkAsync(_cb)); + var nextWorker = 0; + Env.validateMessage = function (signedMsg, key, _cb) { + // let's be paranoid about asynchrony and only calling back once.. + var cb = Util.once(Util.mkAsync(_cb)); - var txid = Util.uid(); + var txid = Util.uid(); - // expect a response within 15s - response.expect(txid, cb, 15000); + // expect a response within 15s + response.expect(txid, cb, 15000); - nextWorker = (nextWorker + 1) % workers.length; - if (workers.length === 0 || typeof(workers[nextWorker].send) !== 'function') { - console.error(workers); - throw new Error("INVALID_WORKERS"); - } + nextWorker = (nextWorker + 1) % workers.length; + if (workers.length === 0 || typeof(workers[nextWorker].send) !== 'function') { + console.error(workers); + throw new Error("INVALID_WORKERS"); + } - // Send the request - workers[nextWorker].send({ - txid: txid, - msg: signedMsg, - key: key, - }); + // Send the request + workers[nextWorker].send({ + txid: txid, + msg: signedMsg, + key: key, + }); + }; }; HK.onChannelMessage = function (Env, Server, channel, msgStruct) { @@ -1025,7 +1029,7 @@ HK.onChannelMessage = function (Env, Server, channel, msgStruct) { // Listen for messages //console.log(+new Date(), "Send verification request"); - validateMessage(signedMsg, metadata.validateKey, w(function (err) { + Env.validateMessage(signedMsg, metadata.validateKey, w(function (err) { if (err) { // validation can fail in multiple ways if (err === 'FAILED') { From f8ad649b4524d4b7b42f263c607c2389807503e2 Mon Sep 17 00:00:00 2001 From: ansuz Date: Wed, 18 Mar 2020 10:30:42 -0400 Subject: [PATCH 8/8] [style] bail out early to avoid nesting --- lib/hk-util.js | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/lib/hk-util.js b/lib/hk-util.js index 915d5772e..2bcae56e3 100644 --- a/lib/hk-util.js +++ b/lib/hk-util.js @@ -1030,16 +1030,15 @@ HK.onChannelMessage = function (Env, Server, channel, msgStruct) { // Listen for messages //console.log(+new Date(), "Send verification request"); Env.validateMessage(signedMsg, metadata.validateKey, w(function (err) { - if (err) { - // validation can fail in multiple ways - if (err === 'FAILED') { - // we log this case, but not others for some reason - Log.info("HK_SIGNED_MESSAGE_REJECTED", 'Channel '+channel.id); - } - // always abort if there was an error... - return void w.abort(); + // no errors means success + if (!err) { return; } + // validation can fail in multiple ways + if (err === 'FAILED') { + // we log this case, but not others for some reason + Log.info("HK_SIGNED_MESSAGE_REJECTED", 'Channel '+channel.id); } - // otherwise it was successful! + // always abort if there was an error... + return void w.abort(); })); }).nThen(function () { // do checkpoint stuff...