|
|
|
@ -220,6 +220,10 @@ module.exports.create = function (cfg) {
|
|
|
|
|
if the channel exists but its index does not then it caches the index
|
|
|
|
|
*/
|
|
|
|
|
const getIndex = (ctx, channelName, cb) => {
|
|
|
|
|
// FIXME don't allow more than one index to be computed at a time
|
|
|
|
|
// if one is in progress, the callback to a queue
|
|
|
|
|
// whenever you completed, empty the queue in order
|
|
|
|
|
|
|
|
|
|
const chan = ctx.channels[channelName];
|
|
|
|
|
if (chan && chan.index) {
|
|
|
|
|
// enforce async behaviour
|
|
|
|
@ -262,6 +266,8 @@ module.exports.create = function (cfg) {
|
|
|
|
|
TODO rename maybeMsgHash to optionalMsgHash
|
|
|
|
|
*/
|
|
|
|
|
const storeMessage = function (ctx, channel, msg, isCp, maybeMsgHash) {
|
|
|
|
|
// TODO implement a queue so that we know messages are written in order
|
|
|
|
|
|
|
|
|
|
const msgBin = new Buffer(msg + '\n', 'utf8');
|
|
|
|
|
// Store the message first, and update the index only once it's stored.
|
|
|
|
|
// store.messageBin can be async so updating the index first may
|
|
|
|
@ -312,43 +318,79 @@ module.exports.create = function (cfg) {
|
|
|
|
|
* caches the id of the last saved checkpoint
|
|
|
|
|
* adds timestamps to incoming messages
|
|
|
|
|
* writes messages to the store
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
*/
|
|
|
|
|
const onChannelMessage = function (ctx, channel, msgStruct) {
|
|
|
|
|
// don't store messages if the channel id indicates that it's an ephemeral message
|
|
|
|
|
if (!channel.id || channel.id.length === EPHEMERAL_CHANNEL_LENGTH) { return; }
|
|
|
|
|
|
|
|
|
|
const isCp = /^cp\|/.test(msgStruct[4]);
|
|
|
|
|
if (metadata_cache[channel.id] && metadata_cache[channel.id].expire &&
|
|
|
|
|
metadata_cache[channel.id].expire < +new Date()) {
|
|
|
|
|
return; // Don't store messages on expired channel
|
|
|
|
|
// TODO if a channel expired a long time ago but it's still here, remove it
|
|
|
|
|
}
|
|
|
|
|
let id;
|
|
|
|
|
if (isCp) {
|
|
|
|
|
/*::if (typeof(msgStruct[4]) !== 'string') { throw new Error(); }*/
|
|
|
|
|
// id becomes either null or an array or results...
|
|
|
|
|
id = CHECKPOINT_PATTERN.exec(msgStruct[4]);
|
|
|
|
|
if (Array.isArray(id) && id[2] && id[2] === channel.lastSavedCp) {
|
|
|
|
|
// Reject duplicate checkpoints
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
var metadata = metadata_cache[channel.id];
|
|
|
|
|
if (metadata && metadata.validateKey) {
|
|
|
|
|
/*::if (typeof(msgStruct[4]) !== 'string') { throw new Error(); }*/
|
|
|
|
|
|
|
|
|
|
let metadata;
|
|
|
|
|
nThen(function (w) {
|
|
|
|
|
// getIndex (and therefore the latest metadata)
|
|
|
|
|
getIndex(ctx, channel.id, w(function (err, index) {
|
|
|
|
|
if (err) {
|
|
|
|
|
w.abort();
|
|
|
|
|
return void Log.error('CHANNEL_MESSAGE_ERROR', err);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (!index.metadata) {
|
|
|
|
|
// if there's no channel metadata then it can't be an expiring channel
|
|
|
|
|
// nor can we possibly validate it
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
metadata = index.metadata;
|
|
|
|
|
|
|
|
|
|
if (metadata.expire && metadata.expire < +new Date()) {
|
|
|
|
|
// don't store message sent to expired channels
|
|
|
|
|
w.abort();
|
|
|
|
|
return;
|
|
|
|
|
// TODO if a channel expired a long time ago but it's still here, remove it
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// if there's no validateKey present skip to the next block
|
|
|
|
|
if (!metadata.validateKey) { return; }
|
|
|
|
|
|
|
|
|
|
// trim the checkpoint indicator off the message if it's present
|
|
|
|
|
let signedMsg = (isCp) ? msgStruct[4].replace(CHECKPOINT_PATTERN, '') : msgStruct[4];
|
|
|
|
|
// convert the message from a base64 string into a Uint8Array
|
|
|
|
|
|
|
|
|
|
// FIXME this can fail and the client won't notice
|
|
|
|
|
signedMsg = Nacl.util.decodeBase64(signedMsg);
|
|
|
|
|
// FIXME PERFORMANCE: cache the decoded key instead of decoding it every time
|
|
|
|
|
// CPU/Memory tradeoff
|
|
|
|
|
|
|
|
|
|
// FIXME this can blow up
|
|
|
|
|
// TODO check that that won't cause any problems other than not being able to append...
|
|
|
|
|
const validateKey = Nacl.util.decodeBase64(metadata.validateKey);
|
|
|
|
|
// validate the message
|
|
|
|
|
const validated = Nacl.sign.open(signedMsg, validateKey);
|
|
|
|
|
if (!validated) {
|
|
|
|
|
// don't go any further if the message fails validation
|
|
|
|
|
w.abort();
|
|
|
|
|
Log.info("HK_SIGNED_MESSAGE_REJECTED", 'Channel '+channel.id);
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}));
|
|
|
|
|
}).nThen(function () {
|
|
|
|
|
// do checkpoint stuff...
|
|
|
|
|
|
|
|
|
|
// 1. get the checkpoint id
|
|
|
|
|
// 2. reject duplicate checkpoints
|
|
|
|
|
|
|
|
|
|
if (isCp) {
|
|
|
|
|
// if the message is a checkpoint we will have already validated
|
|
|
|
|
// that it isn't a duplicate. remember its id so that we can
|
|
|
|
|
// repeat this process for the next incoming checkpoint
|
|
|
|
|
|
|
|
|
|
// WARNING: the fact that we only check the most recent checkpoints
|
|
|
|
|
// is a potential source of bugs if one editor has high latency and
|
|
|
|
|
// pushes a duplicate of an earlier checkpoint than the latest which
|
|
|
|
@ -359,8 +401,13 @@ module.exports.create = function (cfg) {
|
|
|
|
|
channel.lastSavedCp = id[2];
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// add the time to the message
|
|
|
|
|
msgStruct.push(now());
|
|
|
|
|
|
|
|
|
|
// storeMessage
|
|
|
|
|
storeMessage(ctx, channel, JSON.stringify(msgStruct), isCp, getHash(msgStruct[4]));
|
|
|
|
|
});
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
/* dropChannel
|
|
|
|
@ -673,6 +720,9 @@ module.exports.create = function (cfg) {
|
|
|
|
|
}
|
|
|
|
|
metadata.channel = channelName;
|
|
|
|
|
|
|
|
|
|
// XXX check that the validateKey is valid, otherwise send an error?
|
|
|
|
|
// don't bother putting it into storage
|
|
|
|
|
|
|
|
|
|
nThen(function (waitFor) {
|
|
|
|
|
var w = waitFor();
|
|
|
|
|
|
|
|
|
|