From 5467e1ffacd9cf4b982ffc57e1be971a12c6e31f Mon Sep 17 00:00:00 2001 From: ansuz Date: Tue, 17 Mar 2020 16:52:41 -0400 Subject: [PATCH] replace ad-hoc response handler with Util.response --- lib/check-signature.js | 26 ++++++++------- lib/hk-util.js | 73 +++++++++++++++++++++++------------------- 2 files changed, 55 insertions(+), 44 deletions(-) diff --git a/lib/check-signature.js b/lib/check-signature.js index 6ffd6329e..5e6682a73 100644 --- a/lib/check-signature.js +++ b/lib/check-signature.js @@ -9,39 +9,43 @@ const Nacl = require('tweetnacl/nacl-fast'); console.log('New child process', process.pid); process.on('message', function (data) { - console.log('In process', process.pid); - console.log(+new Date(), "Message received by subprocess"); + //console.log('In process', process.pid); + //console.log(+new Date(), "Message received by subprocess"); if (!data || !data.key || !data.msg || !data.txid) { - process.send({ + return void process.send({ error:'E_INVAL' }); - return; } const txid = data.txid; - const signedMsg = Nacl.util.decodeBase64(data.msg); + var signedMsg; + try { + signedMsg = Nacl.util.decodeBase64(data.msg); + } catch (e) { + return void process.send({ + txid: txid, + error: 'E_BAD_MESSAGE', + }); + } + var validateKey; try { validateKey = Nacl.util.decodeBase64(data.key); } catch (e) { - process.send({ + return void process.send({ txid: txid, error:'E_BADKEY' }); - return; } // validate the message const validated = Nacl.sign.open(signedMsg, validateKey); if (!validated) { - process.send({ + return void process.send({ txid: txid, error:'FAILED' }); - return; } - console.log(+new Date(), "Verification done in the subprocess"); process.send({ txid: txid, - success: true }); }); diff --git a/lib/hk-util.js b/lib/hk-util.js index b3f917693..c669254ad 100644 --- a/lib/hk-util.js +++ b/lib/hk-util.js @@ -925,17 +925,20 @@ HK.onDirectMessage = function (Env, Server, seq, userId, json) { */ -const onChecked = Util.mkEvent(); - // Create our workers const workers = []; for (let i = 0; i < numCPUs; i++) { workers.push(fork('lib/check-signature.js')); } var nextWorker = 0; + +const response = Util.response(); + var initWorker = function (worker) { worker.on('message', function (res) { - onChecked.fire(res); + if (!res || !res.txid) { return; } + //console.log(+new Date(), "Received verification response"); + response.handle(res.txid, [res.error]); }); // Spawn a new process in one ends worker.on('exit', function () { @@ -952,14 +955,32 @@ var initWorker = function (worker) { }; workers.forEach(initWorker); -const validateMessage = function (msg) { + +const validateMessage = function (signedMsg, key, _cb) { + // let's be paranoid about asynchrony and only calling back once.. + var cb = Util.once(Util.mkAsync(_cb)); + + var txid = Util.uid(); + + // expect a response within 15s + response.expect(txid, cb, 15000); + nextWorker = (nextWorker + 1) % workers.length; + if (workers.length === 0 || typeof(workers[nextWorker].send) !== 'function') { + console.error(workers); + throw new Error("INVALID_WORKERS"); + } + // Send the request - workers[nextWorker].send(msg); + workers[nextWorker].send({ + txid: txid, + msg: signedMsg, + key: key, + }); }; HK.onChannelMessage = function (Env, Server, channel, msgStruct) { - console.log(+new Date(), "onChannelMessage"); + //console.log(+new Date(), "onChannelMessage"); const Log = Env.Log; // TODO our usage of 'channel' here looks prone to errors @@ -1000,36 +1021,22 @@ HK.onChannelMessage = function (Env, Server, channel, msgStruct) { let signedMsg = (isCp) ? msgStruct[4].replace(CHECKPOINT_PATTERN, '') : msgStruct[4]; // convert the message from a base64 string into a Uint8Array - const txid = Util.uid(); - const next = w(); + //const txid = Util.uid(); // Listen for messages - const onCheck = function (res) { - if (!res) { return; } - if (res.txid !== txid) { return; } - // Wev'e received an answer, remove this handler - console.log(+new Date(), "Received verification response"); - onChecked.unreg(onCheck); - if (res.error) { - // don't go any further if the message fails validation - if (res.error === 'FAILED') { - // Signature doesn't match the public key + //console.log(+new Date(), "Send verification request"); + validateMessage(signedMsg, metadata.validateKey, w(function (err) { + if (err) { + // validation can fail in multiple ways + if (err === 'FAILED') { + // we log this case, but not others for some reason Log.info("HK_SIGNED_MESSAGE_REJECTED", 'Channel '+channel.id); } - w.abort(); - return; + // always abort if there was an error... + return void w.abort(); } - // Success, continue to next block - next(); - }; - onChecked.reg(onCheck); - - console.log(+new Date(), "Send verification request"); - validateMessage( { - txid: txid, - msg: signedMsg, - key: metadata.validateKey - }); + // otherwise it was successful! + })); }).nThen(function () { // do checkpoint stuff... @@ -1056,9 +1063,9 @@ HK.onChannelMessage = function (Env, Server, channel, msgStruct) { msgStruct.push(now()); // storeMessage - console.log(+new Date(), "Storing message"); + //console.log(+new Date(), "Storing message"); storeMessage(Env, channel, JSON.stringify(msgStruct), isCp, getHash(msgStruct[4], Log)); - console.log(+new Date(), "Message stored"); + //console.log(+new Date(), "Message stored"); }); };