drop usage of historyKeeper.setConfig

pull/1/head
ansuz 5 years ago
parent 80c012f34d
commit b922860339

@ -4,7 +4,6 @@ const WebSocketServer = require('ws').Server;
const NetfluxSrv = require('chainpad-server/NetfluxWebsocketSrv'); const NetfluxSrv = require('chainpad-server/NetfluxWebsocketSrv');
module.exports.create = function (config) { module.exports.create = function (config) {
var historyKeeper;
var rpc; var rpc;
const log = config.log; const log = config.log;
const wsConfig = { const wsConfig = {
@ -50,11 +49,9 @@ module.exports.create = function (config) {
log: log, log: log,
}; };
// XXX historyKeeper exports a `setConfig` method // XXX historyKeeper exports a `setConfig` method
historyKeeper = HK.create(hkConfig);
}).nThen(function () {
var wsSrv = new WebSocketServer(wsConfig); var wsSrv = new WebSocketServer(wsConfig);
// XXX NetfluxSrv shares some internal functions with historyKeeper var historyKeeper = HK.create(hkConfig);
// by passing them to setConfig
NetfluxSrv.run(wsSrv, config, historyKeeper); NetfluxSrv.run(wsSrv, config, historyKeeper);
}); });
}; };

@ -10,6 +10,8 @@ const WriteQueue = require("./write-queue");
const BatchRead = require("./batch-read"); const BatchRead = require("./batch-read");
const Extras = require("./hk-util.js"); const Extras = require("./hk-util.js");
const STANDARD_CHANNEL_LENGTH = Extras.STANDARD_CHANNEL_LENGTH;
const EPHEMERAL_CHANNEL_LENGTH = Extras.EPHEMERAL_CHANNEL_LENGTH;
let Log; let Log;
const now = function () { return (new Date()).getTime(); }; const now = function () { return (new Date()).getTime(); };
@ -77,14 +79,6 @@ module.exports.create = function (cfg) {
Log.verbose('HK_ID', 'History keeper ID: ' + HISTORY_KEEPER_ID); Log.verbose('HK_ID', 'History keeper ID: ' + HISTORY_KEEPER_ID);
let sendMsg = function () {};
let STANDARD_CHANNEL_LENGTH, EPHEMERAL_CHANNEL_LENGTH;
const setConfig = function (config) {
STANDARD_CHANNEL_LENGTH = config.STANDARD_CHANNEL_LENGTH;
EPHEMERAL_CHANNEL_LENGTH = config.EPHEMERAL_CHANNEL_LENGTH;
sendMsg = config.sendMsg;
};
/* computeIndex /* computeIndex
can call back with an error or a computed index which includes: can call back with an error or a computed index which includes:
* cpIndex: * cpIndex:
@ -326,7 +320,7 @@ module.exports.create = function (cfg) {
const historyKeeperBroadcast = function (ctx, channel, msg) { const historyKeeperBroadcast = function (ctx, channel, msg) {
let chan = ctx.channels[channel] || (([] /*:any*/) /*:Chan_t*/); let chan = ctx.channels[channel] || (([] /*:any*/) /*:Chan_t*/);
chan.forEach(function (user) { chan.forEach(function (user) {
sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify(msg)]); ctx.sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify(msg)]);
}); });
}; };
@ -707,7 +701,7 @@ module.exports.create = function (cfg) {
// parsed[1] is the channel id // parsed[1] is the channel id
// parsed[2] is a validation key or an object containing metadata (optionnal) // parsed[2] is a validation key or an object containing metadata (optionnal)
// parsed[3] is the last known hash (optionnal) // parsed[3] is the last known hash (optionnal)
sendMsg(ctx, user, [seq, 'ACK']); ctx.sendMsg(ctx, user, [seq, 'ACK']);
var channelName = parsed[1]; var channelName = parsed[1];
var config = parsed[2]; var config = parsed[2];
var metadata = {}; var metadata = {};
@ -758,7 +752,7 @@ module.exports.create = function (cfg) {
// FIXME this is hard to read because 'checkExpired' has side effects // FIXME this is hard to read because 'checkExpired' has side effects
if (checkExpired(ctx, channelName)) { return void waitFor.abort(); } if (checkExpired(ctx, channelName)) { return void waitFor.abort(); }
// always send metadata with GET_HISTORY requests // always send metadata with GET_HISTORY requests
sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify(index.metadata)], w); ctx.sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify(index.metadata)], w);
})); }));
}).nThen(() => { }).nThen(() => {
let msgCount = 0; let msgCount = 0;
@ -769,12 +763,12 @@ module.exports.create = function (cfg) {
msgCount++; msgCount++;
// avoid sending the metadata message a second time // avoid sending the metadata message a second time
if (isMetadataMessage(msg) && metadata_cache[channelName]) { return readMore(); } if (isMetadataMessage(msg) && metadata_cache[channelName]) { return readMore(); }
sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify(msg)], readMore); ctx.sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify(msg)], readMore);
}, (err) => { }, (err) => {
if (err && err.code !== 'ENOENT') { if (err && err.code !== 'ENOENT') {
if (err.message !== 'EINVAL') { Log.error("HK_GET_HISTORY", err); } if (err.message !== 'EINVAL') { Log.error("HK_GET_HISTORY", err); }
const parsedMsg = {error:err.message, channel: channelName}; const parsedMsg = {error:err.message, channel: channelName};
sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify(parsedMsg)]); ctx.sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify(parsedMsg)]);
return; return;
} }
@ -817,12 +811,12 @@ module.exports.create = function (cfg) {
} }
}); });
} }
sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify(metadata)]); ctx.sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify(metadata)]);
} }
// End of history message: // End of history message:
let parsedMsg = {state: 1, channel: channelName}; let parsedMsg = {state: 1, channel: channelName};
sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify(parsedMsg)]); ctx.sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify(parsedMsg)]);
}); });
}); });
}; };
@ -831,7 +825,7 @@ module.exports.create = function (cfg) {
var channelName = parsed[1]; var channelName = parsed[1];
var map = parsed[2]; var map = parsed[2];
if (!(map && typeof(map) === 'object')) { if (!(map && typeof(map) === 'object')) {
return void sendMsg(ctx, user, [seq, 'ERROR', 'INVALID_ARGS', HISTORY_KEEPER_ID]); return void ctx.sendMsg(ctx, user, [seq, 'ERROR', 'INVALID_ARGS', HISTORY_KEEPER_ID]);
} }
var oldestKnownHash = map.from; var oldestKnownHash = map.from;
@ -839,14 +833,14 @@ module.exports.create = function (cfg) {
var desiredCheckpoint = map.cpCount; var desiredCheckpoint = map.cpCount;
var txid = map.txid; var txid = map.txid;
if (typeof(desiredMessages) !== 'number' && typeof(desiredCheckpoint) !== 'number') { if (typeof(desiredMessages) !== 'number' && typeof(desiredCheckpoint) !== 'number') {
return void sendMsg(ctx, user, [seq, 'ERROR', 'UNSPECIFIED_COUNT', HISTORY_KEEPER_ID]); return void ctx.sendMsg(ctx, user, [seq, 'ERROR', 'UNSPECIFIED_COUNT', HISTORY_KEEPER_ID]);
} }
if (!txid) { if (!txid) {
return void sendMsg(ctx, user, [seq, 'ERROR', 'NO_TXID', HISTORY_KEEPER_ID]); return void ctx.sendMsg(ctx, user, [seq, 'ERROR', 'NO_TXID', HISTORY_KEEPER_ID]);
} }
sendMsg(ctx, user, [seq, 'ACK']); ctx.sendMsg(ctx, user, [seq, 'ACK']);
return void getOlderHistory(channelName, oldestKnownHash, function (messages) { return void getOlderHistory(channelName, oldestKnownHash, function (messages) {
var toSend = []; var toSend = [];
if (typeof (desiredMessages) === "number") { if (typeof (desiredMessages) === "number") {
@ -862,11 +856,11 @@ module.exports.create = function (cfg) {
} }
} }
toSend.forEach(function (msg) { toSend.forEach(function (msg) {
sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, ctx.sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id,
JSON.stringify(['HISTORY_RANGE', txid, msg])]); JSON.stringify(['HISTORY_RANGE', txid, msg])]);
}); });
sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, ctx.sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id,
JSON.stringify(['HISTORY_RANGE_END', txid, channelName]) JSON.stringify(['HISTORY_RANGE_END', txid, channelName])
]); ]);
}); });
@ -876,20 +870,20 @@ module.exports.create = function (cfg) {
// parsed[1] is the channel id // parsed[1] is the channel id
// parsed[2] is a validation key (optionnal) // parsed[2] is a validation key (optionnal)
// parsed[3] is the last known hash (optionnal) // parsed[3] is the last known hash (optionnal)
sendMsg(ctx, user, [seq, 'ACK']); ctx.sendMsg(ctx, user, [seq, 'ACK']);
// FIXME should we send metadata here too? // FIXME should we send metadata here too?
// none of the clientside code which uses this API needs metadata, but it won't hurt to send it (2019-08-22) // none of the clientside code which uses this API needs metadata, but it won't hurt to send it (2019-08-22)
return void getHistoryAsync(ctx, parsed[1], -1, false, (msg, readMore) => { return void getHistoryAsync(ctx, parsed[1], -1, false, (msg, readMore) => {
if (!msg) { return; } if (!msg) { return; }
sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify(['FULL_HISTORY', msg])], readMore); ctx.sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify(['FULL_HISTORY', msg])], readMore);
}, (err) => { }, (err) => {
let parsedMsg = ['FULL_HISTORY_END', parsed[1]]; let parsedMsg = ['FULL_HISTORY_END', parsed[1]];
if (err) { if (err) {
Log.error('HK_GET_FULL_HISTORY', err.stack); Log.error('HK_GET_FULL_HISTORY', err.stack);
parsedMsg = ['ERROR', parsed[1], err.message]; parsedMsg = ['ERROR', parsed[1], err.message];
} }
sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify(parsedMsg)]); ctx.sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify(parsedMsg)]);
}); });
}; };
@ -899,12 +893,12 @@ module.exports.create = function (cfg) {
/* RPC Calls... */ /* RPC Calls... */
var rpc_call = parsed.slice(1); var rpc_call = parsed.slice(1);
sendMsg(ctx, user, [seq, 'ACK']); ctx.sendMsg(ctx, user, [seq, 'ACK']);
try { try {
// slice off the sequence number and pass in the rest of the message // slice off the sequence number and pass in the rest of the message
rpc(ctx, rpc_call, function (err, output) { rpc(ctx, rpc_call, function (err, output) {
if (err) { if (err) {
sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify([parsed[0], 'ERROR', err])]); ctx.sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify([parsed[0], 'ERROR', err])]);
return; return;
} }
var msg = rpc_call[0].slice(); var msg = rpc_call[0].slice();
@ -930,7 +924,7 @@ module.exports.create = function (cfg) {
let chan = ctx.channels[output.channel]; let chan = ctx.channels[output.channel];
if (chan && chan.length) { if (chan && chan.length) {
chan.forEach(function (user) { chan.forEach(function (user) {
sendMsg(ctx, user, output.message); ctx.sendMsg(ctx, user, output.message);
//[0, null, 'MSG', user.id, JSON.stringify(output.message)]); //[0, null, 'MSG', user.id, JSON.stringify(output.message)]);
}); });
} }
@ -940,10 +934,10 @@ module.exports.create = function (cfg) {
} }
// finally, send a response to the client that sent the RPC // finally, send a response to the client that sent the RPC
sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify([parsed[0]].concat(output))]); ctx.sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify([parsed[0]].concat(output))]);
}); });
} catch (e) { } catch (e) {
sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify([parsed[0], 'ERROR', 'SERVER_ERROR'])]); ctx.sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify([parsed[0], 'ERROR', 'SERVER_ERROR'])]);
} }
}; };
@ -982,12 +976,25 @@ module.exports.create = function (cfg) {
command(ctx, seq, user, parsed); command(ctx, seq, user, parsed);
}; };
// XXX every one of these values is exported because
// netfluxWebsocketServer needs them to do some magic historyKeeper things
// we could have netflux emit events and let historyKeeper handle them instead
return { return {
id: HISTORY_KEEPER_ID, id: HISTORY_KEEPER_ID,
setConfig: setConfig,
onChannelMessage: onChannelMessage, // XXX dropChannel allows netflux to clear historyKeeper's cache
// maybe it should emit a 'channel_dropped' event instead
// and let historyKeeper decide what to do
dropChannel: dropChannel, dropChannel: dropChannel,
// XXX we don't need to export checkExpired if netflux allows it to be HK's responsibility
checkExpired: checkExpired, checkExpired: checkExpired,
// XXX again, if netflux emitted events then historyKeeper could handle them itself
// and netflux wouldn't need to have historyKeeper-specific code
onDirectMessage: onDirectMessage, onDirectMessage: onDirectMessage,
// XXX same
onChannelMessage: onChannelMessage,
}; };
}; };

@ -22,3 +22,12 @@ HK.getHash = function (msg, Log) {
return msg.slice(0,64); return msg.slice(0,64);
}; };
// historyKeeper should explicitly store any channel
// with a 32 character id
HK.STANDARD_CHANNEL_LENGTH = 32;
// historyKeeper should not store messages sent to any channel
// with a 34 character id
HK.EPHEMERAL_CHANNEL_LENGTH = 34;

Loading…
Cancel
Save