|
|
|
@ -66,6 +66,16 @@ const isMetadataMessage = function (parsed) {
|
|
|
|
|
return Boolean(parsed && parsed.channel);
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
// validateKeyStrings supplied by clients must decode to 32-byte Uint8Arrays
|
|
|
|
|
const isValidValidateKeyString = function (key) {
|
|
|
|
|
try {
|
|
|
|
|
return typeof(key) === 'string' &&
|
|
|
|
|
Nacl.util.decodeBase64(key).length === Nacl.sign.publicKeyLength;
|
|
|
|
|
} catch (e) {
|
|
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
module.exports.create = function (cfg) {
|
|
|
|
|
const rpc = cfg.rpc;
|
|
|
|
|
const tasks = cfg.tasks;
|
|
|
|
@ -95,7 +105,6 @@ module.exports.create = function (cfg) {
|
|
|
|
|
* offsetByHash:
|
|
|
|
|
* a map containing message offsets by their hash
|
|
|
|
|
* this is for every message in history, so it could be very large...
|
|
|
|
|
* XXX OFFSET
|
|
|
|
|
* except we remove offsets from the map if they occur before the oldest relevant checkpoint
|
|
|
|
|
* size: in bytes
|
|
|
|
|
* metadata:
|
|
|
|
@ -219,18 +228,52 @@ module.exports.create = function (cfg) {
|
|
|
|
|
as an added bonus:
|
|
|
|
|
if the channel exists but its index does not then it caches the index
|
|
|
|
|
*/
|
|
|
|
|
const indexQueues = {};
|
|
|
|
|
const getIndex = (ctx, channelName, cb) => {
|
|
|
|
|
const chan = ctx.channels[channelName];
|
|
|
|
|
// if there is a channel in memory and it has an index cached, return it
|
|
|
|
|
if (chan && chan.index) {
|
|
|
|
|
// enforce async behaviour
|
|
|
|
|
return void setTimeout(function () {
|
|
|
|
|
cb(undefined, chan.index);
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// if a call to computeIndex is already in progress for this channel
|
|
|
|
|
// then add the callback for the latest invocation to the queue
|
|
|
|
|
// and wait for it to complete
|
|
|
|
|
if (Array.isArray(indexQueues[channelName])) {
|
|
|
|
|
indexQueues[channelName].push(cb);
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// otherwise, make a queue for any 'getIndex' calls made before the following 'computeIndex' call completes
|
|
|
|
|
var queue = indexQueues[channelName] = (indexQueues[channelName] || [cb]);
|
|
|
|
|
|
|
|
|
|
computeIndex(channelName, (err, ret) => {
|
|
|
|
|
if (err) { return void cb(err); }
|
|
|
|
|
if (!Array.isArray(queue)) {
|
|
|
|
|
// something is very wrong if there's no callback array
|
|
|
|
|
return void Log.error("E_INDEX_NO_CALLBACK", channelName);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// clean up the queue that you're about to handle, but keep a local copy
|
|
|
|
|
delete indexQueues[channelName];
|
|
|
|
|
|
|
|
|
|
// this is most likely an unrecoverable filesystem error
|
|
|
|
|
if (err) {
|
|
|
|
|
// call back every pending function with the error
|
|
|
|
|
return void queue.forEach(function (_cb) {
|
|
|
|
|
_cb(err);
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
// cache the computed result if possible
|
|
|
|
|
if (chan) { chan.index = ret; }
|
|
|
|
|
cb(undefined, ret);
|
|
|
|
|
|
|
|
|
|
// call back every pending function with the result
|
|
|
|
|
queue.forEach(function (_cb) {
|
|
|
|
|
_cb(void 0, ret);
|
|
|
|
|
});
|
|
|
|
|
});
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
@ -258,33 +301,54 @@ module.exports.create = function (cfg) {
|
|
|
|
|
* because the two actions were performed like ABba...
|
|
|
|
|
* the fix is to use callbacks and implement queueing for writes
|
|
|
|
|
* to guarantee that offset computation is always atomic with writes
|
|
|
|
|
|
|
|
|
|
TODO rename maybeMsgHash to optionalMsgHash
|
|
|
|
|
*/
|
|
|
|
|
const storeMessage = function (ctx, channel, msg, isCp, maybeMsgHash) {
|
|
|
|
|
const msgBin = new Buffer(msg + '\n', 'utf8');
|
|
|
|
|
const storageQueues = {};
|
|
|
|
|
|
|
|
|
|
const storeQueuedMessage = function (ctx, queue, id) {
|
|
|
|
|
if (queue.length === 0) {
|
|
|
|
|
delete storageQueues[id];
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
const first = queue.shift();
|
|
|
|
|
|
|
|
|
|
const msgBin = first.msg;
|
|
|
|
|
const optionalMessageHash = first.hash;
|
|
|
|
|
const isCp = first.isCp;
|
|
|
|
|
|
|
|
|
|
// Store the message first, and update the index only once it's stored.
|
|
|
|
|
// store.messageBin can be async so updating the index first may
|
|
|
|
|
// result in a wrong cpIndex
|
|
|
|
|
nThen((waitFor) => {
|
|
|
|
|
store.messageBin(channel.id, msgBin, waitFor(function (err) {
|
|
|
|
|
store.messageBin(id, msgBin, waitFor(function (err) {
|
|
|
|
|
if (err) {
|
|
|
|
|
waitFor.abort();
|
|
|
|
|
return void Log.error("HK_STORE_MESSAGE_ERROR", err.message);
|
|
|
|
|
Log.error("HK_STORE_MESSAGE_ERROR", err.message);
|
|
|
|
|
|
|
|
|
|
// this error is critical, but there's not much we can do at the moment
|
|
|
|
|
// proceed with more messages, but they'll probably fail too
|
|
|
|
|
// at least you won't have a memory leak
|
|
|
|
|
|
|
|
|
|
// TODO make it possible to respond to clients with errors so they know
|
|
|
|
|
// their message wasn't stored
|
|
|
|
|
storeQueuedMessage(ctx, queue, id);
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
}));
|
|
|
|
|
}).nThen((waitFor) => {
|
|
|
|
|
getIndex(ctx, channel.id, waitFor((err, index) => {
|
|
|
|
|
getIndex(ctx, id, waitFor((err, index) => {
|
|
|
|
|
if (err) {
|
|
|
|
|
Log.warn("HK_STORE_MESSAGE_INDEX", err.stack);
|
|
|
|
|
// non-critical, we'll be able to get the channel index later
|
|
|
|
|
|
|
|
|
|
// proceed to the next message in the queue
|
|
|
|
|
storeQueuedMessage(ctx, queue, id);
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
if (typeof (index.line) === "number") { index.line++; }
|
|
|
|
|
if (isCp) {
|
|
|
|
|
index.cpIndex = sliceCpIndex(index.cpIndex, index.line || 0);
|
|
|
|
|
for (let k in index.offsetByHash) {
|
|
|
|
|
// XXX OFFSET
|
|
|
|
|
if (index.offsetByHash[k] < index.cpIndex[0]) {
|
|
|
|
|
delete index.offsetByHash[k];
|
|
|
|
|
}
|
|
|
|
@ -294,12 +358,34 @@ module.exports.create = function (cfg) {
|
|
|
|
|
line: ((index.line || 0) + 1)
|
|
|
|
|
} /*:cp_index_item*/));
|
|
|
|
|
}
|
|
|
|
|
if (maybeMsgHash) { index.offsetByHash[maybeMsgHash] = index.size; }
|
|
|
|
|
if (optionalMessageHash) { index.offsetByHash[optionalMessageHash] = index.size; }
|
|
|
|
|
index.size += msgBin.length;
|
|
|
|
|
|
|
|
|
|
// handle the next element in the queue
|
|
|
|
|
storeQueuedMessage(ctx, queue, id);
|
|
|
|
|
}));
|
|
|
|
|
});
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
const storeMessage = function (ctx, channel, msg, isCp, optionalMessageHash) {
|
|
|
|
|
const id = channel.id;
|
|
|
|
|
|
|
|
|
|
const msgBin = new Buffer(msg + '\n', 'utf8');
|
|
|
|
|
if (Array.isArray(storageQueues[id])) {
|
|
|
|
|
return void storageQueues[id].push({
|
|
|
|
|
msg: msgBin,
|
|
|
|
|
hash: optionalMessageHash,
|
|
|
|
|
isCp: isCp,
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
const queue = storageQueues[id] = (storageQueues[id] || [{
|
|
|
|
|
msg: msgBin,
|
|
|
|
|
hash: optionalMessageHash,
|
|
|
|
|
}]);
|
|
|
|
|
storeQueuedMessage(ctx, queue, id);
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
var CHECKPOINT_PATTERN = /^cp\|(([A-Za-z0-9+\/=]+)\|)?/;
|
|
|
|
|
|
|
|
|
|
/* onChannelMessage
|
|
|
|
@ -312,55 +398,96 @@ module.exports.create = function (cfg) {
|
|
|
|
|
* caches the id of the last saved checkpoint
|
|
|
|
|
* adds timestamps to incoming messages
|
|
|
|
|
* writes messages to the store
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
*/
|
|
|
|
|
const onChannelMessage = function (ctx, channel, msgStruct) {
|
|
|
|
|
// don't store messages if the channel id indicates that it's an ephemeral message
|
|
|
|
|
if (!channel.id || channel.id.length === EPHEMERAL_CHANNEL_LENGTH) { return; }
|
|
|
|
|
|
|
|
|
|
const isCp = /^cp\|/.test(msgStruct[4]);
|
|
|
|
|
if (metadata_cache[channel.id] && metadata_cache[channel.id].expire &&
|
|
|
|
|
metadata_cache[channel.id].expire < +new Date()) {
|
|
|
|
|
return; // Don't store messages on expired channel
|
|
|
|
|
// TODO if a channel expired a long time ago but it's still here, remove it
|
|
|
|
|
}
|
|
|
|
|
let id;
|
|
|
|
|
if (isCp) {
|
|
|
|
|
/*::if (typeof(msgStruct[4]) !== 'string') { throw new Error(); }*/
|
|
|
|
|
// id becomes either null or an array or results...
|
|
|
|
|
id = CHECKPOINT_PATTERN.exec(msgStruct[4]);
|
|
|
|
|
if (Array.isArray(id) && id[2] && id[2] === channel.lastSavedCp) {
|
|
|
|
|
// Reject duplicate checkpoints
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
var metadata = metadata_cache[channel.id];
|
|
|
|
|
if (metadata && metadata.validateKey) {
|
|
|
|
|
/*::if (typeof(msgStruct[4]) !== 'string') { throw new Error(); }*/
|
|
|
|
|
let signedMsg = (isCp) ? msgStruct[4].replace(CHECKPOINT_PATTERN, '') : msgStruct[4];
|
|
|
|
|
signedMsg = Nacl.util.decodeBase64(signedMsg);
|
|
|
|
|
// FIXME PERFORMANCE: cache the decoded key instead of decoding it every time
|
|
|
|
|
// CPU/Memory tradeoff
|
|
|
|
|
const validateKey = Nacl.util.decodeBase64(metadata.validateKey);
|
|
|
|
|
const validated = Nacl.sign.open(signedMsg, validateKey);
|
|
|
|
|
if (!validated) {
|
|
|
|
|
Log.info("HK_SIGNED_MESSAGE_REJECTED", 'Channel '+channel.id);
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
if (isCp) {
|
|
|
|
|
// WARNING: the fact that we only check the most recent checkpoints
|
|
|
|
|
// is a potential source of bugs if one editor has high latency and
|
|
|
|
|
// pushes a duplicate of an earlier checkpoint than the latest which
|
|
|
|
|
// has been pushed by editors with low latency
|
|
|
|
|
// FIXME
|
|
|
|
|
if (Array.isArray(id) && id[2]) {
|
|
|
|
|
// Store new checkpoint hash
|
|
|
|
|
channel.lastSavedCp = id[2];
|
|
|
|
|
|
|
|
|
|
let metadata;
|
|
|
|
|
nThen(function (w) {
|
|
|
|
|
// getIndex (and therefore the latest metadata)
|
|
|
|
|
getIndex(ctx, channel.id, w(function (err, index) {
|
|
|
|
|
if (err) {
|
|
|
|
|
w.abort();
|
|
|
|
|
return void Log.error('CHANNEL_MESSAGE_ERROR', err);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (!index.metadata) {
|
|
|
|
|
// if there's no channel metadata then it can't be an expiring channel
|
|
|
|
|
// nor can we possibly validate it
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
metadata = index.metadata;
|
|
|
|
|
|
|
|
|
|
if (metadata.expire && metadata.expire < +new Date()) {
|
|
|
|
|
// don't store message sent to expired channels
|
|
|
|
|
w.abort();
|
|
|
|
|
return;
|
|
|
|
|
// TODO if a channel expired a long time ago but it's still here, remove it
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// if there's no validateKey present skip to the next block
|
|
|
|
|
if (!metadata.validateKey) { return; }
|
|
|
|
|
|
|
|
|
|
// trim the checkpoint indicator off the message if it's present
|
|
|
|
|
let signedMsg = (isCp) ? msgStruct[4].replace(CHECKPOINT_PATTERN, '') : msgStruct[4];
|
|
|
|
|
// convert the message from a base64 string into a Uint8Array
|
|
|
|
|
|
|
|
|
|
// FIXME this can fail and the client won't notice
|
|
|
|
|
signedMsg = Nacl.util.decodeBase64(signedMsg);
|
|
|
|
|
|
|
|
|
|
// FIXME this can blow up
|
|
|
|
|
// TODO check that that won't cause any problems other than not being able to append...
|
|
|
|
|
const validateKey = Nacl.util.decodeBase64(metadata.validateKey);
|
|
|
|
|
// validate the message
|
|
|
|
|
const validated = Nacl.sign.open(signedMsg, validateKey);
|
|
|
|
|
if (!validated) {
|
|
|
|
|
// don't go any further if the message fails validation
|
|
|
|
|
w.abort();
|
|
|
|
|
Log.info("HK_SIGNED_MESSAGE_REJECTED", 'Channel '+channel.id);
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
}));
|
|
|
|
|
}).nThen(function () {
|
|
|
|
|
// do checkpoint stuff...
|
|
|
|
|
|
|
|
|
|
// 1. get the checkpoint id
|
|
|
|
|
// 2. reject duplicate checkpoints
|
|
|
|
|
|
|
|
|
|
if (isCp) {
|
|
|
|
|
// if the message is a checkpoint we will have already validated
|
|
|
|
|
// that it isn't a duplicate. remember its id so that we can
|
|
|
|
|
// repeat this process for the next incoming checkpoint
|
|
|
|
|
|
|
|
|
|
// WARNING: the fact that we only check the most recent checkpoints
|
|
|
|
|
// is a potential source of bugs if one editor has high latency and
|
|
|
|
|
// pushes a duplicate of an earlier checkpoint than the latest which
|
|
|
|
|
// has been pushed by editors with low latency
|
|
|
|
|
// FIXME
|
|
|
|
|
if (Array.isArray(id) && id[2]) {
|
|
|
|
|
// Store new checkpoint hash
|
|
|
|
|
channel.lastSavedCp = id[2];
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
msgStruct.push(now());
|
|
|
|
|
storeMessage(ctx, channel, JSON.stringify(msgStruct), isCp, getHash(msgStruct[4]));
|
|
|
|
|
|
|
|
|
|
// add the time to the message
|
|
|
|
|
msgStruct.push(now());
|
|
|
|
|
|
|
|
|
|
// storeMessage
|
|
|
|
|
storeMessage(ctx, channel, JSON.stringify(msgStruct), isCp, getHash(msgStruct[4]));
|
|
|
|
|
});
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
/* dropChannel
|
|
|
|
@ -423,7 +550,6 @@ module.exports.create = function (cfg) {
|
|
|
|
|
// QUESTION: does this mean mailboxes are causing the server to store too much stuff in memory?
|
|
|
|
|
if (lastKnownHash && typeof(lkh) !== "number") {
|
|
|
|
|
waitFor.abort();
|
|
|
|
|
// XXX this smells bad
|
|
|
|
|
return void cb(new Error('EINVAL'));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -452,7 +578,7 @@ module.exports.create = function (cfg) {
|
|
|
|
|
if (offset !== -1) { return; }
|
|
|
|
|
|
|
|
|
|
// do a lookup from the index
|
|
|
|
|
// XXX maybe we don't need this anymore?
|
|
|
|
|
// FIXME maybe we don't need this anymore?
|
|
|
|
|
// otherwise we have a non-negative offset and we can start to read from there
|
|
|
|
|
store.readMessagesBin(channelName, 0, (msgObj, readMore, abort) => {
|
|
|
|
|
// tryParse return a parsed message or undefined
|
|
|
|
@ -588,7 +714,6 @@ module.exports.create = function (cfg) {
|
|
|
|
|
// If it is, remove it from memory and broadcast a message to its members
|
|
|
|
|
|
|
|
|
|
const onChannelMetadataChanged = function (ctx, channel) {
|
|
|
|
|
// XXX lint compliance
|
|
|
|
|
channel = channel;
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
@ -673,6 +798,14 @@ module.exports.create = function (cfg) {
|
|
|
|
|
}
|
|
|
|
|
metadata.channel = channelName;
|
|
|
|
|
|
|
|
|
|
// if the user sends us an invalid key, we won't be able to validate their messages
|
|
|
|
|
// so they'll never get written to the log anyway. Let's just drop their message
|
|
|
|
|
// on the floor instead of doing a bunch of extra work
|
|
|
|
|
// TODO send them an error message so they know something is wrong
|
|
|
|
|
if (metadata.validateKey && !isValidValidateKeyString(metadata.validateKey)) {
|
|
|
|
|
return void Log.error('HK_INVALID_KEY', metadata.validateKey);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
nThen(function (waitFor) {
|
|
|
|
|
var w = waitFor();
|
|
|
|
|
|
|
|
|
@ -734,8 +867,11 @@ module.exports.create = function (cfg) {
|
|
|
|
|
// otherwise maybe we need to check that the metadata log is empty as well
|
|
|
|
|
store.writeMetadata(channelName, JSON.stringify(metadata), function (err) {
|
|
|
|
|
if (err) {
|
|
|
|
|
// XXX tell the user that there was a channel error?
|
|
|
|
|
return void Log.error('HK_WRITE_METADATA');
|
|
|
|
|
// FIXME tell the user that there was a channel error?
|
|
|
|
|
return void Log.error('HK_WRITE_METADATA', {
|
|
|
|
|
channel: channelName,
|
|
|
|
|
error: err,
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
@ -810,8 +946,8 @@ module.exports.create = function (cfg) {
|
|
|
|
|
// parsed[3] is the last known hash (optionnal)
|
|
|
|
|
sendMsg(ctx, user, [seq, 'ACK']);
|
|
|
|
|
|
|
|
|
|
// XXX should we send metadata here too?
|
|
|
|
|
// my gut says yes
|
|
|
|
|
// FIXME should we send metadata here too?
|
|
|
|
|
// none of the clientside code which uses this API needs metadata, but it won't hurt to send it (2019-08-22)
|
|
|
|
|
getHistoryAsync(ctx, parsed[1], -1, false, (msg, readMore) => {
|
|
|
|
|
if (!msg) { return; }
|
|
|
|
|
sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify(['FULL_HISTORY', msg])], readMore);
|
|
|
|
|