add a bunch of comments to the historyKeeper

pull/1/head
ansuz 6 years ago
parent f2a7f8b7bc
commit ddb1b7b019

@ -27,6 +27,18 @@ const tryParse = function (str) {
}
};
/* sliceCpIndex
returns a list of all checkpoints which might be relevant for a client connecting to a session
* if there are two or fewer checkpoints, return everything you have
* if there are more than two
* return at least two
* plus any more which were received within the last 100 messages
This is important because the additional history is what prevents
clients from forking on checkpoints and dropping forked history.
*/
const sliceCpIndex = function (cpIndex, line) {
// Remove "old" checkpoints (cp sent before 100 messages ago)
const minLine = Math.max(0, (line - 100));
@ -59,6 +71,25 @@ module.exports.create = function (cfg) {
sendMsg = config.sendMsg;
};
/* computeIndex
can call back with an error or a computed index which includes:
* cpIndex:
* array including any checkpoints pushed within the last 100 messages
* processed by 'sliceCpIndex(cpIndex, line)'
* offsetByHash:
* a map containing message offsets by their hash
* this is for every message in history, so it could be very large...
* size: in bytes
* metadata:
* validationKey
* expiration time
* owners
* ??? (anything else we might add in the future)
* line
* the number of messages in history
* including the initial metadata line, if it exists
*/
const computeIndex = function (channelName, cb) {
const cpIndex = [];
let messageBuf = [];
@ -107,6 +138,8 @@ module.exports.create = function (cfg) {
const msg = tryParse(msgObj.buff.toString('utf8'));
if (typeof msg === "undefined") { return; }
if (msg[0] === 0 && msg[2] === 'MSG' && typeof(msg[4]) === 'string') {
// msgObj.offset is API guaranteed by our storage module
// it should always be a valid positive integer
offsetByHash[getHash(msg[4])] = msgObj.offset;
}
// There is a trailing \n at the end of the file
@ -141,6 +174,15 @@ module.exports.create = function (cfg) {
});
};
/* getIndex
calls back with an error if anything goes wrong
or with a cached index for a channel if it exists
(along with metadata)
otherwise it calls back with the index computed by 'computeIndex'
as an added bonus:
if the channel exists but its index does not then it caches the index
*/
const getIndex = (ctx, channelName, cb) => {
const chan = ctx.channels[channelName];
if (chan && chan.index) { return void cb(undefined, chan.index); }
@ -158,7 +200,25 @@ module.exports.create = function (cfg) {
}
*/
/* storeMessage
* ctx
* channel id
* the message to store
* whether the message is a checkpoint
* optionally the hash of the message
* it's not always used, but we guard against it
* async but doesn't have a callback
* source of a race condition whereby:
* two messaages can be inserted
* two offsets can be computed using the total size of all the messages
* but the offsets don't correspond to the actual location of the newlines
* because the two actions were performed like ABba...
* the fix is to use callbacks and implement queueing for writes
* to guarantee that offset computation is always atomic with writes
*/
const storeMessage = function (ctx, channel, msg, isCp, maybeMsgHash) {
const msgBin = new Buffer(msg + '\n', 'utf8');
// Store the message first, and update the index only once it's stored.
@ -241,10 +301,42 @@ module.exports.create = function (cfg) {
storeMessage(ctx, channel, JSON.stringify(msgStruct), isCp, getHash(msgStruct[4]));
};
/* dropChannel
* exported as API
* used by chainpad-server/NetfluxWebsocketSrv.js
* cleans up memory structures which are managed entirely by the historyKeeper
* the netflux server manages other memory in ctx.channels
*/
const dropChannel = function (chanName) {
delete historyKeeperKeys[chanName];
};
/* getHistoryOffset
returns a number representing the byte offset from the start of the log
for whatever history you're seeking.
query by providing a 'lastKnownHash', which is really just a string of
the first 64 characters of an encrypted message.
this function embeds a lot of the history keeper's logic:
0. if you passed -1 as the lastKnownHash it means you want the complete history
* I'm not sure why you'd need to call this function if you know it will return 0 in this case...
* ansuz
1. if you provided a lastKnownHash and that message does not exist in the history:
* either the client has made a mistake or the history they knew about no longer exists
* call back with EINVAL
2. if you did not provide a lastKnownHash
* and there are fewer than two checkpoints:
* return 0 (read from the start of the file)
* and there are two or more checkpoints:
* return the offset of the earliest checkpoint which 'sliceCpIndex' considers relevant
3. if you did provide a lastKnownHash
* read through the log until you find the hash that you're looking for
* call back with either the bytee offset of the message that you found OR
* -1 if you didn't find it
*/
const getHistoryOffset = (ctx, channelName, lastKnownHash, cb /*:(e:?Error, os:?number)=>void*/) => {
// lastKnownhash === -1 means we want the complete history
if (lastKnownHash === -1) { return void cb(null, 0); }
@ -253,7 +345,10 @@ module.exports.create = function (cfg) {
getIndex(ctx, channelName, waitFor((err, index) => {
if (err) { waitFor.abort(); return void cb(err); }
// Check last known hash
// Check last known hash, this guards against NaN and other invalid offsets
// the offset is *the end of the message*, so if they passed a valid lkh
// it cannot be zero, so it will get past this guard
// XXX
const lkh = index.offsetByHash[lastKnownHash];
if (lastKnownHash && typeof(lkh) !== "number") {
waitFor.abort();
@ -280,9 +375,15 @@ module.exports.create = function (cfg) {
offset = lkh;
}));
}).nThen((waitFor) => {
// if offset is less than zero then presumably the channel has no messages
// returning falls through to the next block and therefore returns -1
if (offset !== -1) { return; }
// otherwise we have a non-negative offset and we can start to read from there
store.readMessagesBin(channelName, 0, (msgObj, rmcb, abort) => {
// tryParse return a parsed message or undefined
const msg = tryParse(msgObj.buff.toString('utf8'));
// if it was undefined then go onto the next message
if (typeof msg === "undefined") { return rmcb(); }
if (typeof(msg[4]) !== 'string' || lastKnownHash !== getHash(msg[4])) {
return void rmcb();
@ -297,6 +398,12 @@ module.exports.create = function (cfg) {
});
};
/* getHistoryAsync
* finds the appropriate byte offset from which to begin reading using 'getHistoryOffset'
* streams through the rest of the messages, safely parsing them and returning the parsed content to the handler
* calls back when it has reached the end of the log
*/
const getHistoryAsync = (ctx, channelName, lastKnownHash, beforeHash, handler, cb) => {
let offset = -1;
nThen((waitFor) => {
@ -319,6 +426,12 @@ module.exports.create = function (cfg) {
});
};
/* getOlderHistory
* allows clients to query for all messages until a known hash is read
* stores all messages in history as they are read
* can therefore be very expensive for memory
* should probably be converted to a streaming interface
*/
const getOlderHistory = function (channelName, oldestKnownHash, cb) {
var messageBuffer = [];
var found = false;
@ -359,13 +472,20 @@ module.exports.create = function (cfg) {
};
*/
/* historyKeeperBroadcast
* uses API from the netflux server to send messages to every member of a channel
* sendMsg runs in a try-catch and drops users if sending a message fails
*/
const historyKeeperBroadcast = function (ctx, channel, msg) {
let chan = ctx.channels[channel] || (([] /*:any*/) /*:Chan_t*/);
chan.forEach(function (user) {
sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify(msg)]);
});
};
/* onChannelCleared
* broadcasts to all clients in a channel if that channel is deleted
*/
const onChannelCleared = function (ctx, channel) {
historyKeeperBroadcast(ctx, channel, {
error: 'ECLEARED',
@ -385,6 +505,18 @@ module.exports.create = function (cfg) {
};
// Check if the selected channel is expired
// If it is, remove it from memory and broadcast a message to its members
/* checkExpired
* synchronously returns true or undefined to indicate whether the channel is expired
* according to its metadata
* has some side effects:
* closes the channel via the store.closeChannel API
* and then broadcasts to all channel members that the channel has expired
* removes the channel from the netflux-server's in-memory cache
* removes the channel metadata from history keeper's in-memory cache
FIXME the boolean nature of this API should be separated from its side effects
*/
const checkExpired = function (ctx, channel) {
if (channel && channel.length === STANDARD_CHANNEL_LENGTH && historyKeeperKeys[channel] &&
historyKeeperKeys[channel].expire && historyKeeperKeys[channel].expire < +new Date()) {
@ -401,6 +533,19 @@ module.exports.create = function (cfg) {
return;
};
/* onDirectMessage
* exported for use by the netflux-server
* parses and handles all direct messages directed to the history keeper
* check if it's expired and execute all the associated side-effects
* routes queries to the appropriate handlers
* GET_HISTORY
* GET_HISTORY_RANGE
* GET_FULL_HISTORY
* RPC
* if the rpc has special hooks that the history keeper needs to be aware of...
* execute them here...
*/
const onDirectMessage = function (ctx, seq, user, json) {
let parsed;
let channelName;

Loading…
Cancel
Save