diff --git a/lib/compute-index.js b/lib/compute-index.js new file mode 100644 index 000000000..7a146e060 --- /dev/null +++ b/lib/compute-index.js @@ -0,0 +1,173 @@ +/* jshint esversion: 6 */ +/* global process */ + +const HK = require("./hk-util"); +const Store = require("./storage/file"); +const Util = require("./common-util"); +const nThen = require("nthen"); + +const Env = {}; + +var ready = false; +var store; +const init = function (config, cb) { + if (!config) { + return void cb('E_INVALID_CONFIG'); + } + + Store.create(config, function (_store) { + store = _store; + cb(); + }); +}; + +const tryParse = function (Env, str) { + try { + return JSON.parse(str); + } catch (err) { + // XXX + } +}; + +/* computeIndex + can call back with an error or a computed index which includes: + * cpIndex: + * array including any checkpoints pushed within the last 100 messages + * processed by 'sliceCpIndex(cpIndex, line)' + * offsetByHash: + * a map containing message offsets by their hash + * this is for every message in history, so it could be very large... + * except we remove offsets from the map if they occur before the oldest relevant checkpoint + * size: in bytes + * metadata: + * validationKey + * expiration time + * owners + * ??? (anything else we might add in the future) + * line + * the number of messages in history + * including the initial metadata line, if it exists + +*/ +const computeIndex = function (channelName, cb) { + const cpIndex = []; + let messageBuf = []; + let i = 0; + + const CB = Util.once(cb); + + const offsetByHash = {}; + let size = 0; + nThen(function (w) { + // iterate over all messages in the channel log + // old channels can contain metadata as the first message of the log + // skip over metadata as that is handled elsewhere + // otherwise index important messages in the log + store.readMessagesBin(channelName, 0, (msgObj, readMore) => { + let msg; + // keep an eye out for the metadata line if you haven't already seen it + // but only check for metadata on the first line + if (!i && msgObj.buff.indexOf('{') === 0) { + i++; // always increment the message counter + msg = tryParse(Env, msgObj.buff.toString('utf8')); + if (typeof msg === "undefined") { return readMore(); } + + // validate that the current line really is metadata before storing it as such + // skip this, as you already have metadata... + if (HK.isMetadataMessage(msg)) { return readMore(); } + } + i++; + if (msgObj.buff.indexOf('cp|') > -1) { + msg = msg || tryParse(Env, msgObj.buff.toString('utf8')); + if (typeof msg === "undefined") { return readMore(); } + // cache the offsets of checkpoints if they can be parsed + if (msg[2] === 'MSG' && msg[4].indexOf('cp|') === 0) { + cpIndex.push({ + offset: msgObj.offset, + line: i + }); + // we only want to store messages since the latest checkpoint + // so clear the buffer every time you see a new one + messageBuf = []; + } + } + // if it's not metadata or a checkpoint then it should be a regular message + // store it in the buffer + messageBuf.push(msgObj); + return readMore(); + }, w((err) => { + if (err && err.code !== 'ENOENT') { + w.abort(); + return void CB(err); + } + + // once indexing is complete you should have a buffer of messages since the latest checkpoint + // map the 'hash' of each message to its byte offset in the log, to be used for reconnecting clients + messageBuf.forEach((msgObj) => { + const msg = tryParse(Env, msgObj.buff.toString('utf8')); + if (typeof msg === "undefined") { return; } + if (msg[0] === 0 && msg[2] === 'MSG' && typeof(msg[4]) === 'string') { + // msgObj.offset is API guaranteed by our storage module + // it should always be a valid positive integer + offsetByHash[HK.getHash(msg[4])] = msgObj.offset; + } + // There is a trailing \n at the end of the file + size = msgObj.offset + msgObj.buff.length + 1; + }); + })); + }).nThen(function () { + // return the computed index + CB(null, { + // Only keep the checkpoints included in the last 100 messages + cpIndex: HK.sliceCpIndex(cpIndex, i), + offsetByHash: offsetByHash, + size: size, + //metadata: metadata, + line: i + }); + }); +}; + +process.on('message', function (data) { + if (!data || !data.txid) { + return void process.send({ + error:'E_INVAL' + }); + } + const txid = data.txid; + + if (!ready) { + return void init(data.config, function (err) { + if (err) { + return void process.send({ + txid: txid, + error: err, + }); + } + ready = true; + process.send({txid: txid,}); + }); + } + + const channel = data.args; + if (!channel) { + return void process.send({ + error: 'E_NO_CHANNEL', + }); + } + + // computeIndex + computeIndex(channel, function (err, index) { + if (err) { + return void process.send({ + txid: txid, + error: err, + }); + } + return void process.send({ + txid: txid, + value: index, + }); + }); +}); + diff --git a/lib/historyKeeper.js b/lib/historyKeeper.js index a3a33c4e6..23dbf5fd1 100644 --- a/lib/historyKeeper.js +++ b/lib/historyKeeper.js @@ -238,6 +238,19 @@ module.exports.create = function (config, cb) { if (err) { throw new Error(err); } Env.blobStore = blob; })); + }).nThen(function (w) { + HK.initializeIndexWorkers(Env, { + filePath: config.filePath, + archivePath: config.archivePath, + channelExpirationMs: config.channelExpirationMs, + verbose: config.verbose, + openFileLimit: config.openFileLimit, + }, w(function (err, computeIndex) { + if (err) { + throw new Error(err); + } + Env.computeIndex = computeIndex; + })); }).nThen(function (w) { // create a task store require("./storage/tasks").create(config, w(function (e, tasks) { diff --git a/lib/hk-util.js b/lib/hk-util.js index 2bcae56e3..97e5c3014 100644 --- a/lib/hk-util.js +++ b/lib/hk-util.js @@ -7,7 +7,8 @@ const Util = require("./common-util"); const MetaRPC = require("./commands/metadata"); const Nacl = require('tweetnacl/nacl-fast'); const { fork } = require('child_process'); -const numCPUs = require('os').cpus().length; +const OS = require("os"); +const numCPUs = OS.cpus().length; const now = function () { return (new Date()).getTime(); }; const ONE_DAY = 1000 * 60 * 60 * 24; // one day in milliseconds @@ -62,7 +63,7 @@ const tryParse = function (Env, str) { clients from forking on checkpoints and dropping forked history. */ -const sliceCpIndex = function (cpIndex, line) { +const sliceCpIndex = HK.sliceCpIndex = function (cpIndex, line) { // Remove "old" checkpoints (cp sent before 100 messages ago) const minLine = Math.max(0, (line - 100)); let start = cpIndex.slice(0, -2); @@ -203,108 +204,6 @@ const getMetadata = HK.getMetadata = function (Env, channelName, _cb) { }); }; -/* computeIndex - can call back with an error or a computed index which includes: - * cpIndex: - * array including any checkpoints pushed within the last 100 messages - * processed by 'sliceCpIndex(cpIndex, line)' - * offsetByHash: - * a map containing message offsets by their hash - * this is for every message in history, so it could be very large... - * except we remove offsets from the map if they occur before the oldest relevant checkpoint - * size: in bytes - * metadata: - * validationKey - * expiration time - * owners - * ??? (anything else we might add in the future) - * line - * the number of messages in history - * including the initial metadata line, if it exists - -*/ -const computeIndex = function (Env, channelName, cb) { - const store = Env.store; - const Log = Env.Log; - - const cpIndex = []; - let messageBuf = []; - let i = 0; - - const CB = Util.once(cb); - - const offsetByHash = {}; - let size = 0; - nThen(function (w) { - // iterate over all messages in the channel log - // old channels can contain metadata as the first message of the log - // skip over metadata as that is handled elsewhere - // otherwise index important messages in the log - store.readMessagesBin(channelName, 0, (msgObj, readMore) => { - let msg; - // keep an eye out for the metadata line if you haven't already seen it - // but only check for metadata on the first line - if (!i && msgObj.buff.indexOf('{') === 0) { - i++; // always increment the message counter - msg = tryParse(Env, msgObj.buff.toString('utf8')); - if (typeof msg === "undefined") { return readMore(); } - - // validate that the current line really is metadata before storing it as such - // skip this, as you already have metadata... - if (isMetadataMessage(msg)) { return readMore(); } - } - i++; - if (msgObj.buff.indexOf('cp|') > -1) { - msg = msg || tryParse(Env, msgObj.buff.toString('utf8')); - if (typeof msg === "undefined") { return readMore(); } - // cache the offsets of checkpoints if they can be parsed - if (msg[2] === 'MSG' && msg[4].indexOf('cp|') === 0) { - cpIndex.push({ - offset: msgObj.offset, - line: i - }); - // we only want to store messages since the latest checkpoint - // so clear the buffer every time you see a new one - messageBuf = []; - } - } - // if it's not metadata or a checkpoint then it should be a regular message - // store it in the buffer - messageBuf.push(msgObj); - return readMore(); - }, w((err) => { - if (err && err.code !== 'ENOENT') { - w.abort(); - return void CB(err); - } - - // once indexing is complete you should have a buffer of messages since the latest checkpoint - // map the 'hash' of each message to its byte offset in the log, to be used for reconnecting clients - messageBuf.forEach((msgObj) => { - const msg = tryParse(Env, msgObj.buff.toString('utf8')); - if (typeof msg === "undefined") { return; } - if (msg[0] === 0 && msg[2] === 'MSG' && typeof(msg[4]) === 'string') { - // msgObj.offset is API guaranteed by our storage module - // it should always be a valid positive integer - offsetByHash[getHash(msg[4], Log)] = msgObj.offset; - } - // There is a trailing \n at the end of the file - size = msgObj.offset + msgObj.buff.length + 1; - }); - })); - }).nThen(function () { - // return the computed index - CB(null, { - // Only keep the checkpoints included in the last 100 messages - cpIndex: sliceCpIndex(cpIndex, i), - offsetByHash: offsetByHash, - size: size, - //metadata: metadata, - line: i - }); - }); -}; - /* getIndex calls back with an error if anything goes wrong or with a cached index for a channel if it exists @@ -326,7 +225,7 @@ const getIndex = (Env, channelName, cb) => { } Env.batchIndexReads(channelName, cb, function (done) { - computeIndex(Env, channelName, (err, ret) => { + Env.computeIndex(Env, channelName, (err, ret) => { // this is most likely an unrecoverable filesystem error if (err) { return void done(err); } // cache the computed result if possible @@ -912,17 +811,77 @@ HK.onDirectMessage = function (Env, Server, seq, userId, json) { }); }; -/* onChannelMessage - Determine what we should store when a message a broadcasted to a channel" +HK.initializeIndexWorkers = function (Env, config, _cb) { + var cb = Util.once(Util.mkAsync(_cb)); - * ignores ephemeral channels - * ignores messages sent to expired channels - * rejects duplicated checkpoints - * validates messages to channels that have validation keys - * caches the id of the last saved checkpoint - * adds timestamps to incoming messages - * writes messages to the store -*/ + const workers = []; + + const response = Util.response(); + const initWorker = function (worker, cb) { + //console.log("initializing index worker"); + const txid = Util.uid(); + response.expect(txid, function (err) { + if (err) { return void cb(err); } + //console.log("worker initialized"); + workers.push(worker); + cb(); + }, 15000); + + worker.send({ + txid: txid, + config: config, + }); + + worker.on('message', function (res) { + if (!res || !res.txid) { return; } + //console.log(res); + response.handle(res.txid, [res.error, res.value]); + }); + worker.on('exit', function () { + var idx = workers.indexOf(worker); + if (idx !== -1) { + workers.splice(idx, 1); + } + var w = fork('lib/compute-index'); + initWorker(w, function (err) { + if (err) { + throw new Error(err); + } + workers.push(w); + }); + }); + }; + + var workerIndex = 0; + var sendCommand = function (Env, channel, cb) { + workerIndex = (workerIndex + 1) % workers.length; + if (workers.length === 0 || + typeof(workers[workerIndex].send) !== 'function') { + return void cb("NO_WORKERS"); + } + Env.store.getWeakLock(channel, function (next) { + const txid = Util.uid(); + response.expect(txid, Util.both(next, cb), 45000); + workers[workerIndex].send({ + txid: txid, + args: channel, + }); + }); + }; + + nThen(function (w) { + OS.cpus().forEach(function () { + initWorker(fork('lib/compute-index'), w(function (err) { + if (!err) { return; } + w.abort(); + return void cb(err); + })); + }); + }).nThen(function () { + //console.log("index workers ready"); + cb(void 0, sendCommand); + }); +}; HK.initializeValidationWorkers = function (Env) { if (typeof(Env.validateMessage) !== 'undefined') { @@ -983,6 +942,17 @@ HK.initializeValidationWorkers = function (Env) { }; }; +/* onChannelMessage + Determine what we should store when a message a broadcasted to a channel" + + * ignores ephemeral channels + * ignores messages sent to expired channels + * rejects duplicated checkpoints + * validates messages to channels that have validation keys + * caches the id of the last saved checkpoint + * adds timestamps to incoming messages + * writes messages to the store +*/ HK.onChannelMessage = function (Env, Server, channel, msgStruct) { //console.log(+new Date(), "onChannelMessage"); const Log = Env.Log; diff --git a/lib/storage/file.js b/lib/storage/file.js index bc0830d4e..fa7e530e1 100644 --- a/lib/storage/file.js +++ b/lib/storage/file.js @@ -1024,16 +1024,17 @@ module.exports.create = function (conf, cb) { // make sure the store's directory exists Fse.mkdirp(env.root, PERMISSIVE, w(function (err) { if (err && err.code !== 'EEXIST') { - throw err; + throw err; // XXX } })); // make sure the cold storage directory exists Fse.mkdirp(env.archiveRoot, PERMISSIVE, w(function (err) { if (err && err.code !== 'EEXIST') { - throw err; + throw err; // XXX } })); }).nThen(function () { + // XXX leave a place for an error cb({ // OLDER METHODS // write a new message to a log @@ -1078,6 +1079,12 @@ module.exports.create = function (conf, cb) { }); }, + getWeakLock: function (channelName, _cb) { + var cb = Util.once(Util.mkAsync(_cb)); + if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); } + schedule.unordered(channelName, cb); + }, + // METHODS for deleting data // remove a channel and its associated metadata log if present removeChannel: function (channelName, cb) {