finally get around to reorganizing the messiest part of history keeper

pull/1/head
ansuz 5 years ago
parent d1b8d8668e
commit 9de073c269

@ -730,227 +730,204 @@ module.exports.create = function (cfg) {
} }
}; };
/* onDirectMessage const handleGetHistory = function (ctx, seq, user, parsed) {
* exported for use by the netflux-server // parsed[1] is the channel id
* parses and handles all direct messages directed to the history keeper // parsed[2] is a validation key or an object containing metadata (optionnal)
* check if it's expired and execute all the associated side-effects // parsed[3] is the last known hash (optionnal)
* routes queries to the appropriate handlers sendMsg(ctx, user, [seq, 'ACK']);
* GET_HISTORY var channelName = parsed[1];
* GET_HISTORY_RANGE var config = parsed[2];
* GET_FULL_HISTORY var metadata = {};
* RPC var lastKnownHash;
* if the rpc has special hooks that the history keeper needs to be aware of...
* execute them here... // clients can optionally pass a map of attributes
// if the channel already exists this map will be ignored
*/ // otherwise it will be stored as the initial metadata state for the channel
const onDirectMessage = function (ctx, seq, user, json) { if (config && typeof config === "object" && !Array.isArray(parsed[2])) {
let parsed; lastKnownHash = config.lastKnownHash;
let channelName; metadata = config.metadata || {};
if (metadata.expire) {
Log.silly('HK_MESSAGE', json); metadata.expire = +metadata.expire * 1000 + (+new Date());
try {
parsed = JSON.parse(json[2]);
} catch (err) {
Log.error("HK_PARSE_CLIENT_MESSAGE", json);
return;
}
// If the requested history is for an expired channel, abort
// Note the if we don't have the keys for that channel in metadata_cache, we'll
// have to abort later (once we know the expiration time)
if (checkExpired(ctx, parsed[1])) { return; }
if (parsed[0] === 'GET_HISTORY') {
// parsed[1] is the channel id
// parsed[2] is a validation key or an object containing metadata (optionnal)
// parsed[3] is the last known hash (optionnal)
sendMsg(ctx, user, [seq, 'ACK']);
channelName = parsed[1];
var config = parsed[2];
var metadata = {};
var lastKnownHash;
// clients can optionally pass a map of attributes
// if the channel already exists this map will be ignored
// otherwise it will be stored as the initial metadata state for the channel
if (config && typeof config === "object" && !Array.isArray(parsed[2])) {
lastKnownHash = config.lastKnownHash;
metadata = config.metadata || {};
if (metadata.expire) {
metadata.expire = +metadata.expire * 1000 + (+new Date());
}
}
metadata.channel = channelName;
metadata.created = +new Date();
// if the user sends us an invalid key, we won't be able to validate their messages
// so they'll never get written to the log anyway. Let's just drop their message
// on the floor instead of doing a bunch of extra work
// TODO send them an error message so they know something is wrong
if (metadata.validateKey && !isValidValidateKeyString(metadata.validateKey)) {
return void Log.error('HK_INVALID_KEY', metadata.validateKey);
} }
}
metadata.channel = channelName;
metadata.created = +new Date();
// if the user sends us an invalid key, we won't be able to validate their messages
// so they'll never get written to the log anyway. Let's just drop their message
// on the floor instead of doing a bunch of extra work
// TODO send them an error message so they know something is wrong
if (metadata.validateKey && !isValidValidateKeyString(metadata.validateKey)) {
return void Log.error('HK_INVALID_KEY', metadata.validateKey);
}
nThen(function (waitFor) { nThen(function (waitFor) {
var w = waitFor(); var w = waitFor();
/* unless this is a young channel, we will serve all messages from an offset /* unless this is a young channel, we will serve all messages from an offset
this will not include the channel metadata, so we need to explicitly fetch that. this will not include the channel metadata, so we need to explicitly fetch that.
unfortunately, we can't just serve it blindly, since then young channels will unfortunately, we can't just serve it blindly, since then young channels will
send the metadata twice, so let's do a quick check of what we're going to serve... send the metadata twice, so let's do a quick check of what we're going to serve...
*/
getIndex(ctx, channelName, waitFor((err, index) => {
/* if there's an error here, it should be encountered
and handled by the next nThen block.
so, let's just fall through...
*/ */
getIndex(ctx, channelName, waitFor((err, index) => { if (err) { return w(); }
/* if there's an error here, it should be encountered
and handled by the next nThen block.
so, let's just fall through... // it's possible that the channel doesn't have metadata
*/ // but in that case there's no point in checking if the channel expired
if (err) { return w(); } // or in trying to send metadata, so just skip this block
if (!index || !index.metadata) { return void w(); }
// And then check if the channel is expired. If it is, send the error and abort
// it's possible that the channel doesn't have metadata // FIXME this is hard to read because 'checkExpired' has side effects
// but in that case there's no point in checking if the channel expired if (checkExpired(ctx, channelName)) { return void waitFor.abort(); }
// or in trying to send metadata, so just skip this block // always send metadata with GET_HISTORY requests
if (!index || !index.metadata) { return void w(); } sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify(index.metadata)], w);
// And then check if the channel is expired. If it is, send the error and abort }));
// FIXME this is hard to read because 'checkExpired' has side effects }).nThen(() => {
if (checkExpired(ctx, channelName)) { return void waitFor.abort(); } let msgCount = 0;
// always send metadata with GET_HISTORY requests
sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify(index.metadata)], w);
}));
}).nThen(() => {
let msgCount = 0;
// TODO compute lastKnownHash in a manner such that it will always skip past the metadata line?
getHistoryAsync(ctx, channelName, lastKnownHash, false, (msg, readMore) => {
if (!msg) { return; }
msgCount++;
// avoid sending the metadata message a second time
if (isMetadataMessage(msg) && metadata_cache[channelName]) { return readMore(); }
sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify(msg)], readMore);
}, (err) => {
if (err && err.code !== 'ENOENT') {
if (err.message !== 'EINVAL') { Log.error("HK_GET_HISTORY", err); }
const parsedMsg = {error:err.message, channel: channelName};
sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify(parsedMsg)]);
return;
}
const chan = ctx.channels[channelName];
if (msgCount === 0 && !metadata_cache[channelName] && chan && chan.indexOf(user) > -1) { // TODO compute lastKnownHash in a manner such that it will always skip past the metadata line?
metadata_cache[channelName] = metadata; getHistoryAsync(ctx, channelName, lastKnownHash, false, (msg, readMore) => {
if (!msg) { return; }
msgCount++;
// avoid sending the metadata message a second time
if (isMetadataMessage(msg) && metadata_cache[channelName]) { return readMore(); }
sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify(msg)], readMore);
}, (err) => {
if (err && err.code !== 'ENOENT') {
if (err.message !== 'EINVAL') { Log.error("HK_GET_HISTORY", err); }
const parsedMsg = {error:err.message, channel: channelName};
sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify(parsedMsg)]);
return;
}
// the index will have already been constructed and cached at this point const chan = ctx.channels[channelName];
// but it will not have detected any metadata because it hasn't been written yet
// this means that the cache starts off as invalid, so we have to correct it if (msgCount === 0 && !metadata_cache[channelName] && chan && chan.indexOf(user) > -1) {
if (chan && chan.index) { chan.index.metadata = metadata; } metadata_cache[channelName] = metadata;
// the index will have already been constructed and cached at this point
// but it will not have detected any metadata because it hasn't been written yet
// this means that the cache starts off as invalid, so we have to correct it
if (chan && chan.index) { chan.index.metadata = metadata; }
// new channels will always have their metadata written to a dedicated metadata log
// but any lines after the first which are not amendments in a particular format will be ignored.
// Thus we should be safe from race conditions here if just write metadata to the log as below...
// TODO validate this logic
// otherwise maybe we need to check that the metadata log is empty as well
store.writeMetadata(channelName, JSON.stringify(metadata), function (err) {
if (err) {
// FIXME tell the user that there was a channel error?
return void Log.error('HK_WRITE_METADATA', {
channel: channelName,
error: err,
});
}
});
// new channels will always have their metadata written to a dedicated metadata log // write tasks
// but any lines after the first which are not amendments in a particular format will be ignored. if(tasks && metadata.expire && typeof(metadata.expire) === 'number') {
// Thus we should be safe from race conditions here if just write metadata to the log as below... // the fun part...
// TODO validate this logic // the user has said they want this pad to expire at some point
// otherwise maybe we need to check that the metadata log is empty as well tasks.write(metadata.expire, "EXPIRE", [ channelName ], function (err) {
store.writeMetadata(channelName, JSON.stringify(metadata), function (err) {
if (err) { if (err) {
// FIXME tell the user that there was a channel error? // if there is an error, we don't want to crash the whole server...
return void Log.error('HK_WRITE_METADATA', { // just log it, and if there's a problem you'll be able to fix it
channel: channelName, // at a later date with the provided information
error: err, Log.error('HK_CREATE_EXPIRE_TASK', err);
}); Log.info('HK_INVALID_EXPIRE_TASK', JSON.stringify([metadata.expire, 'EXPIRE', channelName]));
} }
}); });
// write tasks
if(tasks && metadata.expire && typeof(metadata.expire) === 'number') {
// the fun part...
// the user has said they want this pad to expire at some point
tasks.write(metadata.expire, "EXPIRE", [ channelName ], function (err) {
if (err) {
// if there is an error, we don't want to crash the whole server...
// just log it, and if there's a problem you'll be able to fix it
// at a later date with the provided information
Log.error('HK_CREATE_EXPIRE_TASK', err);
Log.info('HK_INVALID_EXPIRE_TASK', JSON.stringify([metadata.expire, 'EXPIRE', channelName]));
}
});
}
sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify(metadata)]);
} }
sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify(metadata)]);
}
// End of history message: // End of history message:
let parsedMsg = {state: 1, channel: channelName}; let parsedMsg = {state: 1, channel: channelName};
sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify(parsedMsg)]); sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify(parsedMsg)]);
});
}); });
} else if (parsed[0] === 'GET_HISTORY_RANGE') { });
channelName = parsed[1]; };
var map = parsed[2];
if (!(map && typeof(map) === 'object')) {
return void sendMsg(ctx, user, [seq, 'ERROR', 'INVALID_ARGS', HISTORY_KEEPER_ID]);
}
var oldestKnownHash = map.from; const handleGetHistoryRange = function (ctx, seq, user, parsed) {
var desiredMessages = map.count; var channelName = parsed[1];
var desiredCheckpoint = map.cpCount; var map = parsed[2];
var txid = map.txid; if (!(map && typeof(map) === 'object')) {
if (typeof(desiredMessages) !== 'number' && typeof(desiredCheckpoint) !== 'number') { return void sendMsg(ctx, user, [seq, 'ERROR', 'INVALID_ARGS', HISTORY_KEEPER_ID]);
return void sendMsg(ctx, user, [seq, 'ERROR', 'UNSPECIFIED_COUNT', HISTORY_KEEPER_ID]); }
}
if (!txid) { var oldestKnownHash = map.from;
return void sendMsg(ctx, user, [seq, 'ERROR', 'NO_TXID', HISTORY_KEEPER_ID]); var desiredMessages = map.count;
} var desiredCheckpoint = map.cpCount;
var txid = map.txid;
if (typeof(desiredMessages) !== 'number' && typeof(desiredCheckpoint) !== 'number') {
return void sendMsg(ctx, user, [seq, 'ERROR', 'UNSPECIFIED_COUNT', HISTORY_KEEPER_ID]);
}
sendMsg(ctx, user, [seq, 'ACK']); if (!txid) {
return void getOlderHistory(channelName, oldestKnownHash, function (messages) { return void sendMsg(ctx, user, [seq, 'ERROR', 'NO_TXID', HISTORY_KEEPER_ID]);
var toSend = []; }
if (typeof (desiredMessages) === "number") {
toSend = messages.slice(-desiredMessages); sendMsg(ctx, user, [seq, 'ACK']);
} else { return void getOlderHistory(channelName, oldestKnownHash, function (messages) {
let cpCount = 0; var toSend = [];
for (var i = messages.length - 1; i >= 0; i--) { if (typeof (desiredMessages) === "number") {
if (/^cp\|/.test(messages[i][4]) && i !== (messages.length - 1)) { toSend = messages.slice(-desiredMessages);
cpCount++; } else {
} let cpCount = 0;
toSend.unshift(messages[i]); for (var i = messages.length - 1; i >= 0; i--) {
if (cpCount >= desiredCheckpoint) { break; } if (/^cp\|/.test(messages[i][4]) && i !== (messages.length - 1)) {
cpCount++;
} }
toSend.unshift(messages[i]);
if (cpCount >= desiredCheckpoint) { break; }
} }
toSend.forEach(function (msg) { }
sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, toSend.forEach(function (msg) {
JSON.stringify(['HISTORY_RANGE', txid, msg])]);
});
sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id,
JSON.stringify(['HISTORY_RANGE_END', txid, channelName]) JSON.stringify(['HISTORY_RANGE', txid, msg])]);
]);
});
} else if (parsed[0] === 'GET_FULL_HISTORY') {
// parsed[1] is the channel id
// parsed[2] is a validation key (optionnal)
// parsed[3] is the last known hash (optionnal)
sendMsg(ctx, user, [seq, 'ACK']);
// FIXME should we send metadata here too?
// none of the clientside code which uses this API needs metadata, but it won't hurt to send it (2019-08-22)
getHistoryAsync(ctx, parsed[1], -1, false, (msg, readMore) => {
if (!msg) { return; }
sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify(['FULL_HISTORY', msg])], readMore);
}, (err) => {
let parsedMsg = ['FULL_HISTORY_END', parsed[1]];
if (err) {
Log.error('HK_GET_FULL_HISTORY', err.stack);
parsedMsg = ['ERROR', parsed[1], err.message];
}
sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify(parsedMsg)]);
}); });
} else if (rpc) {
/* RPC Calls... */
var rpc_call = parsed.slice(1);
sendMsg(ctx, user, [seq, 'ACK']); sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id,
try { JSON.stringify(['HISTORY_RANGE_END', txid, channelName])
]);
});
};
const handleGetFullHistory = function (ctx, seq, user, parsed) {
// parsed[1] is the channel id
// parsed[2] is a validation key (optionnal)
// parsed[3] is the last known hash (optionnal)
sendMsg(ctx, user, [seq, 'ACK']);
// FIXME should we send metadata here too?
// none of the clientside code which uses this API needs metadata, but it won't hurt to send it (2019-08-22)
return void getHistoryAsync(ctx, parsed[1], -1, false, (msg, readMore) => {
if (!msg) { return; }
sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify(['FULL_HISTORY', msg])], readMore);
}, (err) => {
let parsedMsg = ['FULL_HISTORY_END', parsed[1]];
if (err) {
Log.error('HK_GET_FULL_HISTORY', err.stack);
parsedMsg = ['ERROR', parsed[1], err.message];
}
sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify(parsedMsg)]);
});
};
const handleRPC = function (ctx, seq, user, parsed) {
if (typeof(rpc) !== 'function') { return; }
/* RPC Calls... */
var rpc_call = parsed.slice(1);
sendMsg(ctx, user, [seq, 'ACK']);
try {
// slice off the sequence number and pass in the rest of the message // slice off the sequence number and pass in the rest of the message
rpc(ctx, rpc_call, function (err, output) { rpc(ctx, rpc_call, function (err, output) {
if (err) { if (err) {
@ -992,10 +969,43 @@ module.exports.create = function (cfg) {
// finally, send a response to the client that sent the RPC // finally, send a response to the client that sent the RPC
sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify([parsed[0]].concat(output))]); sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify([parsed[0]].concat(output))]);
}); });
} catch (e) { } catch (e) {
sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify([parsed[0], 'ERROR', 'SERVER_ERROR'])]); sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify([parsed[0], 'ERROR', 'SERVER_ERROR'])]);
} }
};
/* onDirectMessage
* exported for use by the netflux-server
* parses and handles all direct messages directed to the history keeper
* check if it's expired and execute all the associated side-effects
* routes queries to the appropriate handlers
*/
const onDirectMessage = function (ctx, seq, user, json) {
Log.silly('HK_MESSAGE', json);
let parsed;
try {
parsed = JSON.parse(json[2]);
} catch (err) {
Log.error("HK_PARSE_CLIENT_MESSAGE", json);
return;
}
// If the requested history is for an expired channel, abort
// Note the if we don't have the keys for that channel in metadata_cache, we'll
// have to abort later (once we know the expiration time)
if (checkExpired(ctx, parsed[1])) { return; }
if (parsed[0] === 'GET_HISTORY') {
return void handleGetHistory(ctx, seq, user, parsed);
}
if (parsed[0] === 'GET_HISTORY_RANGE') {
return void handleGetHistoryRange(ctx, seq, user, parsed);
}
if (parsed[0] === 'GET_FULL_HISTORY') {
return void handleGetFullHistory(ctx, seq, user, parsed);
} }
return void handleRPC(ctx, seq, user, parsed);
}; };
return { return {

Loading…
Cancel
Save