diff --git a/lib/workers/crypto-worker.js b/lib/workers/crypto-worker.js deleted file mode 100644 index 5ed58ac7c..000000000 --- a/lib/workers/crypto-worker.js +++ /dev/null @@ -1,113 +0,0 @@ -/* jshint esversion: 6 */ -/* global process */ -const Nacl = require('tweetnacl/nacl-fast'); - -const COMMANDS = {}; - -COMMANDS.INLINE = function (data, cb) { - var signedMsg; - try { - signedMsg = Nacl.util.decodeBase64(data.msg); - } catch (e) { - return void cb('E_BAD_MESSAGE'); - } - - var validateKey; - try { - validateKey = Nacl.util.decodeBase64(data.key); - } catch (e) { - return void cb("E_BADKEY"); - } - // validate the message - const validated = Nacl.sign.open(signedMsg, validateKey); - if (!validated) { - return void cb("FAILED"); - } - cb(); -}; - -const checkDetachedSignature = function (signedMsg, signature, publicKey) { - if (!(signedMsg && publicKey)) { return false; } - - var signedBuffer; - var pubBuffer; - var signatureBuffer; - - try { - signedBuffer = Nacl.util.decodeUTF8(signedMsg); - } catch (e) { - throw new Error("INVALID_SIGNED_BUFFER"); - } - - try { - pubBuffer = Nacl.util.decodeBase64(publicKey); - } catch (e) { - throw new Error("INVALID_PUBLIC_KEY"); - } - - try { - signatureBuffer = Nacl.util.decodeBase64(signature); - } catch (e) { - throw new Error("INVALID_SIGNATURE"); - } - - if (pubBuffer.length !== 32) { - throw new Error("INVALID_PUBLIC_KEY_LENGTH"); - } - - if (signatureBuffer.length !== 64) { - throw new Error("INVALID_SIGNATURE_LENGTH"); - } - - if (Nacl.sign.detached.verify(signedBuffer, signatureBuffer, pubBuffer) !== true) { - throw new Error("FAILED"); - } -}; - -COMMANDS.DETACHED = function (data, cb) { - try { - checkDetachedSignature(data.msg, data.sig, data.key); - } catch (err) { - return void cb(err && err.message); - } - cb(); -}; - -COMMANDS.HASH_CHANNEL_LIST = function (data, cb) { - var channels = data.channels; - if (!Array.isArray(channels)) { return void cb('INVALID_CHANNEL_LIST'); } - var uniques = []; - - channels.forEach(function (a) { - if (uniques.indexOf(a) === -1) { uniques.push(a); } - }); - uniques.sort(); - - var hash = Nacl.util.encodeBase64(Nacl.hash(Nacl - .util.decodeUTF8(JSON.stringify(uniques)))); - - cb(void 0, hash); -}; - -process.on('message', function (data) { - if (!data || !data.txid) { - return void process.send({ - error:'E_INVAL' - }); - } - - const cb = function (err, value) { - process.send({ - txid: data.txid, - error: err, - value: value, - }); - }; - - const command = COMMANDS[data.command]; - if (typeof(command) !== 'function') { - return void cb("E_BAD_COMMAND"); - } - - command(data, cb); -}); diff --git a/lib/workers/db-worker.js b/lib/workers/db-worker.js index 0b9de4f53..d76ce61dd 100644 --- a/lib/workers/db-worker.js +++ b/lib/workers/db-worker.js @@ -12,6 +12,7 @@ const Core = require("../commands/core"); const Saferphore = require("saferphore"); const Logger = require("../log"); const Tasks = require("../storage/tasks"); +const Nacl = require('tweetnacl/nacl-fast'); const Env = { Log: {}, @@ -432,6 +433,91 @@ const COMMANDS = { RUN_TASKS: runTasks, }; +COMMANDS.INLINE = function (data, cb) { + var signedMsg; + try { + signedMsg = Nacl.util.decodeBase64(data.msg); + } catch (e) { + return void cb('E_BAD_MESSAGE'); + } + + var validateKey; + try { + validateKey = Nacl.util.decodeBase64(data.key); + } catch (e) { + return void cb("E_BADKEY"); + } + // validate the message + const validated = Nacl.sign.open(signedMsg, validateKey); + if (!validated) { + return void cb("FAILED"); + } + cb(); +}; + +const checkDetachedSignature = function (signedMsg, signature, publicKey) { + if (!(signedMsg && publicKey)) { return false; } + + var signedBuffer; + var pubBuffer; + var signatureBuffer; + + try { + signedBuffer = Nacl.util.decodeUTF8(signedMsg); + } catch (e) { + throw new Error("INVALID_SIGNED_BUFFER"); + } + + try { + pubBuffer = Nacl.util.decodeBase64(publicKey); + } catch (e) { + throw new Error("INVALID_PUBLIC_KEY"); + } + + try { + signatureBuffer = Nacl.util.decodeBase64(signature); + } catch (e) { + throw new Error("INVALID_SIGNATURE"); + } + + if (pubBuffer.length !== 32) { + throw new Error("INVALID_PUBLIC_KEY_LENGTH"); + } + + if (signatureBuffer.length !== 64) { + throw new Error("INVALID_SIGNATURE_LENGTH"); + } + + if (Nacl.sign.detached.verify(signedBuffer, signatureBuffer, pubBuffer) !== true) { + throw new Error("FAILED"); + } +}; + +COMMANDS.DETACHED = function (data, cb) { + try { + checkDetachedSignature(data.msg, data.sig, data.key); + } catch (err) { + return void cb(err && err.message); + } + cb(); +}; + +COMMANDS.HASH_CHANNEL_LIST = function (data, cb) { + var channels = data.channels; + if (!Array.isArray(channels)) { return void cb('INVALID_CHANNEL_LIST'); } + var uniques = []; + + channels.forEach(function (a) { + if (uniques.indexOf(a) === -1) { uniques.push(a); } + }); + uniques.sort(); + + var hash = Nacl.util.encodeBase64(Nacl.hash(Nacl + .util.decodeUTF8(JSON.stringify(uniques)))); + + cb(void 0, hash); +}; + process.on('message', function (data) { if (!data || !data.txid || !data.pid) { return void process.send({ diff --git a/lib/workers/index.js b/lib/workers/index.js index d2944225c..02b3fb868 100644 --- a/lib/workers/index.js +++ b/lib/workers/index.js @@ -3,103 +3,14 @@ const Util = require("../common-util"); const nThen = require('nthen'); const OS = require("os"); -const numCPUs = OS.cpus().length; const { fork } = require('child_process'); const Workers = module.exports; const PID = process.pid; -const CRYPTO_PATH = 'lib/workers/crypto-worker'; const DB_PATH = 'lib/workers/db-worker'; +const MAX_JOBS = 16; -Workers.initializeValidationWorkers = function (Env) { - if (typeof(Env.validateMessage) !== 'undefined') { - return void console.error("validation workers are already initialized"); - } - - // Create our workers - const workers = []; - for (let i = 0; i < numCPUs; i++) { - workers.push(fork(CRYPTO_PATH)); - } - - const response = Util.response(function (errLabel, info) { - Env.Log.error('HK_VALIDATE_WORKER__' + errLabel, info); - }); - - var initWorker = function (worker) { - worker.on('message', function (res) { - if (!res || !res.txid) { return; } - response.handle(res.txid, [res.error, res.value]); - }); - - var substituteWorker = Util.once( function () { - Env.Log.info("SUBSTITUTE_VALIDATION_WORKER", ''); - var idx = workers.indexOf(worker); - if (idx !== -1) { - workers.splice(idx, 1); - } - // Spawn a new one - var w = fork(CRYPTO_PATH); - workers.push(w); - initWorker(w); - }); - - // Spawn a new process in one ends - worker.on('exit', substituteWorker); - worker.on('close', substituteWorker); - worker.on('error', function (err) { - substituteWorker(); - Env.Log.error('VALIDATION_WORKER_ERROR', { - error: err, - }); - }); - }; - workers.forEach(initWorker); - - var nextWorker = 0; - const send = function (msg, _cb) { - var cb = Util.once(Util.mkAsync(_cb)); - // let's be paranoid about asynchrony and only calling back once.. - nextWorker = (nextWorker + 1) % workers.length; - if (workers.length === 0 || typeof(workers[nextWorker].send) !== 'function') { - return void cb("INVALID_WORKERS"); - } - - var txid = msg.txid = Util.uid(); - - // expect a response within 45s - response.expect(txid, cb, 60000); - - // Send the request - workers[nextWorker].send(msg); - }; - - Env.validateMessage = function (signedMsg, key, cb) { - send({ - msg: signedMsg, - key: key, - command: 'INLINE', - }, cb); - }; - - Env.checkSignature = function (signedMsg, signature, publicKey, cb) { - send({ - command: 'DETACHED', - sig: signature, - msg: signedMsg, - key: publicKey, - }, cb); - }; - - Env.hashChannelList = function (channels, cb) { - send({ - command: 'HASH_CHANNEL_LIST', - channels: channels, - }, cb); - }; -}; - -Workers.initializeIndexWorkers = function (Env, config, _cb) { +Workers.initialize = function (Env, config, _cb) { var cb = Util.once(Util.mkAsync(_cb)); const workers = []; @@ -124,12 +35,18 @@ Workers.initializeIndexWorkers = function (Env, config, _cb) { return response.expected(id)? guid(): id; }; - const MAX_JOBS = 32; var workerOffset = -1; + var queue = []; var getAvailableWorkerIndex = function () { +// If there is already a backlog of tasks you can avoid some work +// by going to the end of the line + if (queue.length) { return -1; } + var L = workers.length; if (L === 0) { - console.log("no workers available"); // XXX + Log.error('NO_WORKERS_AVAILABLE', { + queue: queue.length, + }); return -1; } @@ -157,8 +74,6 @@ Workers.initializeIndexWorkers = function (Env, config, _cb) { return -1; }; - var queue = []; - var sendCommand = function (msg, _cb) { var index = getAvailableWorkerIndex(); @@ -383,11 +298,33 @@ Workers.initializeIndexWorkers = function (Env, config, _cb) { }, cb); }; + // Synchronous crypto functions + Env.validateMessage = function (signedMsg, key, cb) { + sendCommand({ + msg: signedMsg, + key: key, + command: 'INLINE', + }, cb); + }; + + Env.checkSignature = function (signedMsg, signature, publicKey, cb) { + sendCommand({ + command: 'DETACHED', + sig: signature, + msg: signedMsg, + key: publicKey, + }, cb); + }; + + Env.hashChannelList = function (channels, cb) { + sendCommand({ + command: 'HASH_CHANNEL_LIST', + channels: channels, + }, cb); + }; + cb(void 0); }); }; -Workers.initialize = function (Env, config, cb) { - Workers.initializeValidationWorkers(Env); - Workers.initializeIndexWorkers(Env, config, cb); -}; +