diff --git a/historyKeeper.js b/historyKeeper.js index 2c71fae02..9fbc4ac39 100644 --- a/historyKeeper.js +++ b/historyKeeper.js @@ -27,6 +27,18 @@ const tryParse = function (str) { } }; +/* sliceCpIndex + returns a list of all checkpoints which might be relevant for a client connecting to a session + + * if there are two or fewer checkpoints, return everything you have + * if there are more than two + * return at least two + * plus any more which were received within the last 100 messages + + This is important because the additional history is what prevents + clients from forking on checkpoints and dropping forked history. + +*/ const sliceCpIndex = function (cpIndex, line) { // Remove "old" checkpoints (cp sent before 100 messages ago) const minLine = Math.max(0, (line - 100)); @@ -59,6 +71,25 @@ module.exports.create = function (cfg) { sendMsg = config.sendMsg; }; + /* computeIndex + can call back with an error or a computed index which includes: + * cpIndex: + * array including any checkpoints pushed within the last 100 messages + * processed by 'sliceCpIndex(cpIndex, line)' + * offsetByHash: + * a map containing message offsets by their hash + * this is for every message in history, so it could be very large... + * size: in bytes + * metadata: + * validationKey + * expiration time + * owners + * ??? (anything else we might add in the future) + * line + * the number of messages in history + * including the initial metadata line, if it exists + + */ const computeIndex = function (channelName, cb) { const cpIndex = []; let messageBuf = []; @@ -107,6 +138,8 @@ module.exports.create = function (cfg) { const msg = tryParse(msgObj.buff.toString('utf8')); if (typeof msg === "undefined") { return; } if (msg[0] === 0 && msg[2] === 'MSG' && typeof(msg[4]) === 'string') { + // msgObj.offset is API guaranteed by our storage module + // it should always be a valid positive integer offsetByHash[getHash(msg[4])] = msgObj.offset; } // There is a trailing \n at the end of the file @@ -141,6 +174,15 @@ module.exports.create = function (cfg) { }); }; + /* getIndex + calls back with an error if anything goes wrong + or with a cached index for a channel if it exists + (along with metadata) + otherwise it calls back with the index computed by 'computeIndex' + + as an added bonus: + if the channel exists but its index does not then it caches the index + */ const getIndex = (ctx, channelName, cb) => { const chan = ctx.channels[channelName]; if (chan && chan.index) { return void cb(undefined, chan.index); } @@ -158,7 +200,25 @@ module.exports.create = function (cfg) { } */ + /* storeMessage + * ctx + * channel id + * the message to store + * whether the message is a checkpoint + * optionally the hash of the message + * it's not always used, but we guard against it + + * async but doesn't have a callback + * source of a race condition whereby: + * two messaages can be inserted + * two offsets can be computed using the total size of all the messages + * but the offsets don't correspond to the actual location of the newlines + * because the two actions were performed like ABba... + * the fix is to use callbacks and implement queueing for writes + * to guarantee that offset computation is always atomic with writes + + */ const storeMessage = function (ctx, channel, msg, isCp, maybeMsgHash) { const msgBin = new Buffer(msg + '\n', 'utf8'); // Store the message first, and update the index only once it's stored. @@ -241,10 +301,42 @@ module.exports.create = function (cfg) { storeMessage(ctx, channel, JSON.stringify(msgStruct), isCp, getHash(msgStruct[4])); }; + /* dropChannel + * exported as API + * used by chainpad-server/NetfluxWebsocketSrv.js + * cleans up memory structures which are managed entirely by the historyKeeper + * the netflux server manages other memory in ctx.channels + */ const dropChannel = function (chanName) { delete historyKeeperKeys[chanName]; }; + /* getHistoryOffset + returns a number representing the byte offset from the start of the log + for whatever history you're seeking. + + query by providing a 'lastKnownHash', which is really just a string of + the first 64 characters of an encrypted message. + + this function embeds a lot of the history keeper's logic: + + 0. if you passed -1 as the lastKnownHash it means you want the complete history + * I'm not sure why you'd need to call this function if you know it will return 0 in this case... + * ansuz + 1. if you provided a lastKnownHash and that message does not exist in the history: + * either the client has made a mistake or the history they knew about no longer exists + * call back with EINVAL + 2. if you did not provide a lastKnownHash + * and there are fewer than two checkpoints: + * return 0 (read from the start of the file) + * and there are two or more checkpoints: + * return the offset of the earliest checkpoint which 'sliceCpIndex' considers relevant + 3. if you did provide a lastKnownHash + * read through the log until you find the hash that you're looking for + * call back with either the bytee offset of the message that you found OR + * -1 if you didn't find it + + */ const getHistoryOffset = (ctx, channelName, lastKnownHash, cb /*:(e:?Error, os:?number)=>void*/) => { // lastKnownhash === -1 means we want the complete history if (lastKnownHash === -1) { return void cb(null, 0); } @@ -253,7 +345,10 @@ module.exports.create = function (cfg) { getIndex(ctx, channelName, waitFor((err, index) => { if (err) { waitFor.abort(); return void cb(err); } - // Check last known hash + // Check last known hash, this guards against NaN and other invalid offsets + // the offset is *the end of the message*, so if they passed a valid lkh + // it cannot be zero, so it will get past this guard + // XXX const lkh = index.offsetByHash[lastKnownHash]; if (lastKnownHash && typeof(lkh) !== "number") { waitFor.abort(); @@ -280,9 +375,15 @@ module.exports.create = function (cfg) { offset = lkh; })); }).nThen((waitFor) => { + // if offset is less than zero then presumably the channel has no messages + // returning falls through to the next block and therefore returns -1 if (offset !== -1) { return; } + + // otherwise we have a non-negative offset and we can start to read from there store.readMessagesBin(channelName, 0, (msgObj, rmcb, abort) => { + // tryParse return a parsed message or undefined const msg = tryParse(msgObj.buff.toString('utf8')); + // if it was undefined then go onto the next message if (typeof msg === "undefined") { return rmcb(); } if (typeof(msg[4]) !== 'string' || lastKnownHash !== getHash(msg[4])) { return void rmcb(); @@ -297,6 +398,12 @@ module.exports.create = function (cfg) { }); }; + /* getHistoryAsync + * finds the appropriate byte offset from which to begin reading using 'getHistoryOffset' + * streams through the rest of the messages, safely parsing them and returning the parsed content to the handler + * calls back when it has reached the end of the log + + */ const getHistoryAsync = (ctx, channelName, lastKnownHash, beforeHash, handler, cb) => { let offset = -1; nThen((waitFor) => { @@ -319,6 +426,12 @@ module.exports.create = function (cfg) { }); }; + /* getOlderHistory + * allows clients to query for all messages until a known hash is read + * stores all messages in history as they are read + * can therefore be very expensive for memory + * should probably be converted to a streaming interface + */ const getOlderHistory = function (channelName, oldestKnownHash, cb) { var messageBuffer = []; var found = false; @@ -359,13 +472,20 @@ module.exports.create = function (cfg) { }; */ - + /* historyKeeperBroadcast + * uses API from the netflux server to send messages to every member of a channel + * sendMsg runs in a try-catch and drops users if sending a message fails + */ const historyKeeperBroadcast = function (ctx, channel, msg) { let chan = ctx.channels[channel] || (([] /*:any*/) /*:Chan_t*/); chan.forEach(function (user) { sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify(msg)]); }); }; + + /* onChannelCleared + * broadcasts to all clients in a channel if that channel is deleted + */ const onChannelCleared = function (ctx, channel) { historyKeeperBroadcast(ctx, channel, { error: 'ECLEARED', @@ -385,6 +505,18 @@ module.exports.create = function (cfg) { }; // Check if the selected channel is expired // If it is, remove it from memory and broadcast a message to its members + + /* checkExpired + * synchronously returns true or undefined to indicate whether the channel is expired + * according to its metadata + * has some side effects: + * closes the channel via the store.closeChannel API + * and then broadcasts to all channel members that the channel has expired + * removes the channel from the netflux-server's in-memory cache + * removes the channel metadata from history keeper's in-memory cache + + FIXME the boolean nature of this API should be separated from its side effects + */ const checkExpired = function (ctx, channel) { if (channel && channel.length === STANDARD_CHANNEL_LENGTH && historyKeeperKeys[channel] && historyKeeperKeys[channel].expire && historyKeeperKeys[channel].expire < +new Date()) { @@ -401,6 +533,19 @@ module.exports.create = function (cfg) { return; }; + /* onDirectMessage + * exported for use by the netflux-server + * parses and handles all direct messages directed to the history keeper + * check if it's expired and execute all the associated side-effects + * routes queries to the appropriate handlers + * GET_HISTORY + * GET_HISTORY_RANGE + * GET_FULL_HISTORY + * RPC + * if the rpc has special hooks that the history keeper needs to be aware of... + * execute them here... + + */ const onDirectMessage = function (ctx, seq, user, json) { let parsed; let channelName;