diff --git a/lib/api.js b/lib/api.js index 60296d9b5..d633db59a 100644 --- a/lib/api.js +++ b/lib/api.js @@ -4,7 +4,6 @@ const WebSocketServer = require('ws').Server; const NetfluxSrv = require('chainpad-server/NetfluxWebsocketSrv'); module.exports.create = function (config) { - var historyKeeper; var rpc; const log = config.log; const wsConfig = { @@ -50,11 +49,9 @@ module.exports.create = function (config) { log: log, }; // XXX historyKeeper exports a `setConfig` method - historyKeeper = HK.create(hkConfig); - }).nThen(function () { + var wsSrv = new WebSocketServer(wsConfig); - // XXX NetfluxSrv shares some internal functions with historyKeeper - // by passing them to setConfig + var historyKeeper = HK.create(hkConfig); NetfluxSrv.run(wsSrv, config, historyKeeper); }); }; diff --git a/lib/historyKeeper.js b/lib/historyKeeper.js index 55d6b1780..f8d3c8a15 100644 --- a/lib/historyKeeper.js +++ b/lib/historyKeeper.js @@ -10,6 +10,8 @@ const WriteQueue = require("./write-queue"); const BatchRead = require("./batch-read"); const Extras = require("./hk-util.js"); +const STANDARD_CHANNEL_LENGTH = Extras.STANDARD_CHANNEL_LENGTH; +const EPHEMERAL_CHANNEL_LENGTH = Extras.EPHEMERAL_CHANNEL_LENGTH; let Log; const now = function () { return (new Date()).getTime(); }; @@ -77,14 +79,6 @@ module.exports.create = function (cfg) { Log.verbose('HK_ID', 'History keeper ID: ' + HISTORY_KEEPER_ID); - let sendMsg = function () {}; - let STANDARD_CHANNEL_LENGTH, EPHEMERAL_CHANNEL_LENGTH; - const setConfig = function (config) { - STANDARD_CHANNEL_LENGTH = config.STANDARD_CHANNEL_LENGTH; - EPHEMERAL_CHANNEL_LENGTH = config.EPHEMERAL_CHANNEL_LENGTH; - sendMsg = config.sendMsg; - }; - /* computeIndex can call back with an error or a computed index which includes: * cpIndex: @@ -326,7 +320,7 @@ module.exports.create = function (cfg) { const historyKeeperBroadcast = function (ctx, channel, msg) { let chan = ctx.channels[channel] || (([] /*:any*/) /*:Chan_t*/); chan.forEach(function (user) { - sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify(msg)]); + ctx.sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify(msg)]); }); }; @@ -707,7 +701,7 @@ module.exports.create = function (cfg) { // parsed[1] is the channel id // parsed[2] is a validation key or an object containing metadata (optionnal) // parsed[3] is the last known hash (optionnal) - sendMsg(ctx, user, [seq, 'ACK']); + ctx.sendMsg(ctx, user, [seq, 'ACK']); var channelName = parsed[1]; var config = parsed[2]; var metadata = {}; @@ -758,7 +752,7 @@ module.exports.create = function (cfg) { // FIXME this is hard to read because 'checkExpired' has side effects if (checkExpired(ctx, channelName)) { return void waitFor.abort(); } // always send metadata with GET_HISTORY requests - sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify(index.metadata)], w); + ctx.sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify(index.metadata)], w); })); }).nThen(() => { let msgCount = 0; @@ -769,12 +763,12 @@ module.exports.create = function (cfg) { msgCount++; // avoid sending the metadata message a second time if (isMetadataMessage(msg) && metadata_cache[channelName]) { return readMore(); } - sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify(msg)], readMore); + ctx.sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify(msg)], readMore); }, (err) => { if (err && err.code !== 'ENOENT') { if (err.message !== 'EINVAL') { Log.error("HK_GET_HISTORY", err); } const parsedMsg = {error:err.message, channel: channelName}; - sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify(parsedMsg)]); + ctx.sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify(parsedMsg)]); return; } @@ -817,12 +811,12 @@ module.exports.create = function (cfg) { } }); } - sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify(metadata)]); + ctx.sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify(metadata)]); } // End of history message: let parsedMsg = {state: 1, channel: channelName}; - sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify(parsedMsg)]); + ctx.sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify(parsedMsg)]); }); }); }; @@ -831,7 +825,7 @@ module.exports.create = function (cfg) { var channelName = parsed[1]; var map = parsed[2]; if (!(map && typeof(map) === 'object')) { - return void sendMsg(ctx, user, [seq, 'ERROR', 'INVALID_ARGS', HISTORY_KEEPER_ID]); + return void ctx.sendMsg(ctx, user, [seq, 'ERROR', 'INVALID_ARGS', HISTORY_KEEPER_ID]); } var oldestKnownHash = map.from; @@ -839,14 +833,14 @@ module.exports.create = function (cfg) { var desiredCheckpoint = map.cpCount; var txid = map.txid; if (typeof(desiredMessages) !== 'number' && typeof(desiredCheckpoint) !== 'number') { - return void sendMsg(ctx, user, [seq, 'ERROR', 'UNSPECIFIED_COUNT', HISTORY_KEEPER_ID]); + return void ctx.sendMsg(ctx, user, [seq, 'ERROR', 'UNSPECIFIED_COUNT', HISTORY_KEEPER_ID]); } if (!txid) { - return void sendMsg(ctx, user, [seq, 'ERROR', 'NO_TXID', HISTORY_KEEPER_ID]); + return void ctx.sendMsg(ctx, user, [seq, 'ERROR', 'NO_TXID', HISTORY_KEEPER_ID]); } - sendMsg(ctx, user, [seq, 'ACK']); + ctx.sendMsg(ctx, user, [seq, 'ACK']); return void getOlderHistory(channelName, oldestKnownHash, function (messages) { var toSend = []; if (typeof (desiredMessages) === "number") { @@ -862,11 +856,11 @@ module.exports.create = function (cfg) { } } toSend.forEach(function (msg) { - sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, + ctx.sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify(['HISTORY_RANGE', txid, msg])]); }); - sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, + ctx.sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify(['HISTORY_RANGE_END', txid, channelName]) ]); }); @@ -876,20 +870,20 @@ module.exports.create = function (cfg) { // parsed[1] is the channel id // parsed[2] is a validation key (optionnal) // parsed[3] is the last known hash (optionnal) - sendMsg(ctx, user, [seq, 'ACK']); + ctx.sendMsg(ctx, user, [seq, 'ACK']); // FIXME should we send metadata here too? // none of the clientside code which uses this API needs metadata, but it won't hurt to send it (2019-08-22) return void getHistoryAsync(ctx, parsed[1], -1, false, (msg, readMore) => { if (!msg) { return; } - sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify(['FULL_HISTORY', msg])], readMore); + ctx.sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify(['FULL_HISTORY', msg])], readMore); }, (err) => { let parsedMsg = ['FULL_HISTORY_END', parsed[1]]; if (err) { Log.error('HK_GET_FULL_HISTORY', err.stack); parsedMsg = ['ERROR', parsed[1], err.message]; } - sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify(parsedMsg)]); + ctx.sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify(parsedMsg)]); }); }; @@ -899,12 +893,12 @@ module.exports.create = function (cfg) { /* RPC Calls... */ var rpc_call = parsed.slice(1); - sendMsg(ctx, user, [seq, 'ACK']); + ctx.sendMsg(ctx, user, [seq, 'ACK']); try { // slice off the sequence number and pass in the rest of the message rpc(ctx, rpc_call, function (err, output) { if (err) { - sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify([parsed[0], 'ERROR', err])]); + ctx.sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify([parsed[0], 'ERROR', err])]); return; } var msg = rpc_call[0].slice(); @@ -930,7 +924,7 @@ module.exports.create = function (cfg) { let chan = ctx.channels[output.channel]; if (chan && chan.length) { chan.forEach(function (user) { - sendMsg(ctx, user, output.message); + ctx.sendMsg(ctx, user, output.message); //[0, null, 'MSG', user.id, JSON.stringify(output.message)]); }); } @@ -940,10 +934,10 @@ module.exports.create = function (cfg) { } // finally, send a response to the client that sent the RPC - sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify([parsed[0]].concat(output))]); + ctx.sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify([parsed[0]].concat(output))]); }); } catch (e) { - sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify([parsed[0], 'ERROR', 'SERVER_ERROR'])]); + ctx.sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify([parsed[0], 'ERROR', 'SERVER_ERROR'])]); } }; @@ -982,12 +976,25 @@ module.exports.create = function (cfg) { command(ctx, seq, user, parsed); }; + // XXX every one of these values is exported because + // netfluxWebsocketServer needs them to do some magic historyKeeper things + // we could have netflux emit events and let historyKeeper handle them instead return { id: HISTORY_KEEPER_ID, - setConfig: setConfig, - onChannelMessage: onChannelMessage, + + // XXX dropChannel allows netflux to clear historyKeeper's cache + // maybe it should emit a 'channel_dropped' event instead + // and let historyKeeper decide what to do dropChannel: dropChannel, + + // XXX we don't need to export checkExpired if netflux allows it to be HK's responsibility checkExpired: checkExpired, + + // XXX again, if netflux emitted events then historyKeeper could handle them itself + // and netflux wouldn't need to have historyKeeper-specific code onDirectMessage: onDirectMessage, + + // XXX same + onChannelMessage: onChannelMessage, }; }; diff --git a/lib/hk-util.js b/lib/hk-util.js index cea39c4e1..aaa861054 100644 --- a/lib/hk-util.js +++ b/lib/hk-util.js @@ -22,3 +22,12 @@ HK.getHash = function (msg, Log) { return msg.slice(0,64); }; +// historyKeeper should explicitly store any channel +// with a 32 character id +HK.STANDARD_CHANNEL_LENGTH = 32; + +// historyKeeper should not store messages sent to any channel +// with a 34 character id +HK.EPHEMERAL_CHANNEL_LENGTH = 34; + +