Merge branch 'staging' of github.com:xwiki-labs/cryptpad into staging

pull/1/head
yflory 6 years ago
commit a73e48de80

@ -66,6 +66,16 @@ const isMetadataMessage = function (parsed) {
return Boolean(parsed && parsed.channel); return Boolean(parsed && parsed.channel);
}; };
// validateKeyStrings supplied by clients must decode to 32-byte Uint8Arrays
const isValidValidateKeyString = function (key) {
try {
return typeof(key) === 'string' &&
Nacl.util.decodeBase64(key).length === Nacl.sign.publicKeyLength;
} catch (e) {
return false;
}
};
module.exports.create = function (cfg) { module.exports.create = function (cfg) {
const rpc = cfg.rpc; const rpc = cfg.rpc;
const tasks = cfg.tasks; const tasks = cfg.tasks;
@ -95,7 +105,6 @@ module.exports.create = function (cfg) {
* offsetByHash: * offsetByHash:
* a map containing message offsets by their hash * a map containing message offsets by their hash
* this is for every message in history, so it could be very large... * this is for every message in history, so it could be very large...
* XXX OFFSET
* except we remove offsets from the map if they occur before the oldest relevant checkpoint * except we remove offsets from the map if they occur before the oldest relevant checkpoint
* size: in bytes * size: in bytes
* metadata: * metadata:
@ -219,18 +228,52 @@ module.exports.create = function (cfg) {
as an added bonus: as an added bonus:
if the channel exists but its index does not then it caches the index if the channel exists but its index does not then it caches the index
*/ */
const indexQueues = {};
const getIndex = (ctx, channelName, cb) => { const getIndex = (ctx, channelName, cb) => {
const chan = ctx.channels[channelName]; const chan = ctx.channels[channelName];
// if there is a channel in memory and it has an index cached, return it
if (chan && chan.index) { if (chan && chan.index) {
// enforce async behaviour // enforce async behaviour
return void setTimeout(function () { return void setTimeout(function () {
cb(undefined, chan.index); cb(undefined, chan.index);
}); });
} }
// if a call to computeIndex is already in progress for this channel
// then add the callback for the latest invocation to the queue
// and wait for it to complete
if (Array.isArray(indexQueues[channelName])) {
indexQueues[channelName].push(cb);
return;
}
// otherwise, make a queue for any 'getIndex' calls made before the following 'computeIndex' call completes
var queue = indexQueues[channelName] = (indexQueues[channelName] || [cb]);
computeIndex(channelName, (err, ret) => { computeIndex(channelName, (err, ret) => {
if (err) { return void cb(err); } if (!Array.isArray(queue)) {
// something is very wrong if there's no callback array
return void Log.error("E_INDEX_NO_CALLBACK", channelName);
}
// clean up the queue that you're about to handle, but keep a local copy
delete indexQueues[channelName];
// this is most likely an unrecoverable filesystem error
if (err) {
// call back every pending function with the error
return void queue.forEach(function (_cb) {
_cb(err);
});
}
// cache the computed result if possible
if (chan) { chan.index = ret; } if (chan) { chan.index = ret; }
cb(undefined, ret);
// call back every pending function with the result
queue.forEach(function (_cb) {
_cb(void 0, ret);
});
}); });
}; };
@ -258,33 +301,54 @@ module.exports.create = function (cfg) {
* because the two actions were performed like ABba... * because the two actions were performed like ABba...
* the fix is to use callbacks and implement queueing for writes * the fix is to use callbacks and implement queueing for writes
* to guarantee that offset computation is always atomic with writes * to guarantee that offset computation is always atomic with writes
TODO rename maybeMsgHash to optionalMsgHash
*/ */
const storeMessage = function (ctx, channel, msg, isCp, maybeMsgHash) { const storageQueues = {};
const msgBin = new Buffer(msg + '\n', 'utf8');
const storeQueuedMessage = function (ctx, queue, id) {
if (queue.length === 0) {
delete storageQueues[id];
return;
}
const first = queue.shift();
const msgBin = first.msg;
const optionalMessageHash = first.hash;
const isCp = first.isCp;
// Store the message first, and update the index only once it's stored. // Store the message first, and update the index only once it's stored.
// store.messageBin can be async so updating the index first may // store.messageBin can be async so updating the index first may
// result in a wrong cpIndex // result in a wrong cpIndex
nThen((waitFor) => { nThen((waitFor) => {
store.messageBin(channel.id, msgBin, waitFor(function (err) { store.messageBin(id, msgBin, waitFor(function (err) {
if (err) { if (err) {
waitFor.abort(); waitFor.abort();
return void Log.error("HK_STORE_MESSAGE_ERROR", err.message); Log.error("HK_STORE_MESSAGE_ERROR", err.message);
// this error is critical, but there's not much we can do at the moment
// proceed with more messages, but they'll probably fail too
// at least you won't have a memory leak
// TODO make it possible to respond to clients with errors so they know
// their message wasn't stored
storeQueuedMessage(ctx, queue, id);
return;
} }
})); }));
}).nThen((waitFor) => { }).nThen((waitFor) => {
getIndex(ctx, channel.id, waitFor((err, index) => { getIndex(ctx, id, waitFor((err, index) => {
if (err) { if (err) {
Log.warn("HK_STORE_MESSAGE_INDEX", err.stack); Log.warn("HK_STORE_MESSAGE_INDEX", err.stack);
// non-critical, we'll be able to get the channel index later // non-critical, we'll be able to get the channel index later
// proceed to the next message in the queue
storeQueuedMessage(ctx, queue, id);
return; return;
} }
if (typeof (index.line) === "number") { index.line++; } if (typeof (index.line) === "number") { index.line++; }
if (isCp) { if (isCp) {
index.cpIndex = sliceCpIndex(index.cpIndex, index.line || 0); index.cpIndex = sliceCpIndex(index.cpIndex, index.line || 0);
for (let k in index.offsetByHash) { for (let k in index.offsetByHash) {
// XXX OFFSET
if (index.offsetByHash[k] < index.cpIndex[0]) { if (index.offsetByHash[k] < index.cpIndex[0]) {
delete index.offsetByHash[k]; delete index.offsetByHash[k];
} }
@ -294,12 +358,34 @@ module.exports.create = function (cfg) {
line: ((index.line || 0) + 1) line: ((index.line || 0) + 1)
} /*:cp_index_item*/)); } /*:cp_index_item*/));
} }
if (maybeMsgHash) { index.offsetByHash[maybeMsgHash] = index.size; } if (optionalMessageHash) { index.offsetByHash[optionalMessageHash] = index.size; }
index.size += msgBin.length; index.size += msgBin.length;
// handle the next element in the queue
storeQueuedMessage(ctx, queue, id);
})); }));
}); });
}; };
const storeMessage = function (ctx, channel, msg, isCp, optionalMessageHash) {
const id = channel.id;
const msgBin = new Buffer(msg + '\n', 'utf8');
if (Array.isArray(storageQueues[id])) {
return void storageQueues[id].push({
msg: msgBin,
hash: optionalMessageHash,
isCp: isCp,
});
}
const queue = storageQueues[id] = (storageQueues[id] || [{
msg: msgBin,
hash: optionalMessageHash,
}]);
storeQueuedMessage(ctx, queue, id);
};
var CHECKPOINT_PATTERN = /^cp\|(([A-Za-z0-9+\/=]+)\|)?/; var CHECKPOINT_PATTERN = /^cp\|(([A-Za-z0-9+\/=]+)\|)?/;
/* onChannelMessage /* onChannelMessage
@ -312,55 +398,96 @@ module.exports.create = function (cfg) {
* caches the id of the last saved checkpoint * caches the id of the last saved checkpoint
* adds timestamps to incoming messages * adds timestamps to incoming messages
* writes messages to the store * writes messages to the store
*/ */
const onChannelMessage = function (ctx, channel, msgStruct) { const onChannelMessage = function (ctx, channel, msgStruct) {
// don't store messages if the channel id indicates that it's an ephemeral message // don't store messages if the channel id indicates that it's an ephemeral message
if (!channel.id || channel.id.length === EPHEMERAL_CHANNEL_LENGTH) { return; } if (!channel.id || channel.id.length === EPHEMERAL_CHANNEL_LENGTH) { return; }
const isCp = /^cp\|/.test(msgStruct[4]); const isCp = /^cp\|/.test(msgStruct[4]);
if (metadata_cache[channel.id] && metadata_cache[channel.id].expire &&
metadata_cache[channel.id].expire < +new Date()) {
return; // Don't store messages on expired channel
// TODO if a channel expired a long time ago but it's still here, remove it
}
let id; let id;
if (isCp) { if (isCp) {
/*::if (typeof(msgStruct[4]) !== 'string') { throw new Error(); }*/ // id becomes either null or an array or results...
id = CHECKPOINT_PATTERN.exec(msgStruct[4]); id = CHECKPOINT_PATTERN.exec(msgStruct[4]);
if (Array.isArray(id) && id[2] && id[2] === channel.lastSavedCp) { if (Array.isArray(id) && id[2] && id[2] === channel.lastSavedCp) {
// Reject duplicate checkpoints // Reject duplicate checkpoints
return; return;
} }
} }
var metadata = metadata_cache[channel.id];
if (metadata && metadata.validateKey) { let metadata;
/*::if (typeof(msgStruct[4]) !== 'string') { throw new Error(); }*/ nThen(function (w) {
let signedMsg = (isCp) ? msgStruct[4].replace(CHECKPOINT_PATTERN, '') : msgStruct[4]; // getIndex (and therefore the latest metadata)
signedMsg = Nacl.util.decodeBase64(signedMsg); getIndex(ctx, channel.id, w(function (err, index) {
// FIXME PERFORMANCE: cache the decoded key instead of decoding it every time if (err) {
// CPU/Memory tradeoff w.abort();
const validateKey = Nacl.util.decodeBase64(metadata.validateKey); return void Log.error('CHANNEL_MESSAGE_ERROR', err);
const validated = Nacl.sign.open(signedMsg, validateKey); }
if (!validated) {
Log.info("HK_SIGNED_MESSAGE_REJECTED", 'Channel '+channel.id); if (!index.metadata) {
return; // if there's no channel metadata then it can't be an expiring channel
} // nor can we possibly validate it
} return;
if (isCp) { }
// WARNING: the fact that we only check the most recent checkpoints
// is a potential source of bugs if one editor has high latency and metadata = index.metadata;
// pushes a duplicate of an earlier checkpoint than the latest which
// has been pushed by editors with low latency if (metadata.expire && metadata.expire < +new Date()) {
// FIXME // don't store message sent to expired channels
if (Array.isArray(id) && id[2]) { w.abort();
// Store new checkpoint hash return;
channel.lastSavedCp = id[2]; // TODO if a channel expired a long time ago but it's still here, remove it
}
// if there's no validateKey present skip to the next block
if (!metadata.validateKey) { return; }
// trim the checkpoint indicator off the message if it's present
let signedMsg = (isCp) ? msgStruct[4].replace(CHECKPOINT_PATTERN, '') : msgStruct[4];
// convert the message from a base64 string into a Uint8Array
// FIXME this can fail and the client won't notice
signedMsg = Nacl.util.decodeBase64(signedMsg);
// FIXME this can blow up
// TODO check that that won't cause any problems other than not being able to append...
const validateKey = Nacl.util.decodeBase64(metadata.validateKey);
// validate the message
const validated = Nacl.sign.open(signedMsg, validateKey);
if (!validated) {
// don't go any further if the message fails validation
w.abort();
Log.info("HK_SIGNED_MESSAGE_REJECTED", 'Channel '+channel.id);
return;
}
}));
}).nThen(function () {
// do checkpoint stuff...
// 1. get the checkpoint id
// 2. reject duplicate checkpoints
if (isCp) {
// if the message is a checkpoint we will have already validated
// that it isn't a duplicate. remember its id so that we can
// repeat this process for the next incoming checkpoint
// WARNING: the fact that we only check the most recent checkpoints
// is a potential source of bugs if one editor has high latency and
// pushes a duplicate of an earlier checkpoint than the latest which
// has been pushed by editors with low latency
// FIXME
if (Array.isArray(id) && id[2]) {
// Store new checkpoint hash
channel.lastSavedCp = id[2];
}
} }
}
msgStruct.push(now()); // add the time to the message
storeMessage(ctx, channel, JSON.stringify(msgStruct), isCp, getHash(msgStruct[4])); msgStruct.push(now());
// storeMessage
storeMessage(ctx, channel, JSON.stringify(msgStruct), isCp, getHash(msgStruct[4]));
});
}; };
/* dropChannel /* dropChannel
@ -423,7 +550,6 @@ module.exports.create = function (cfg) {
// QUESTION: does this mean mailboxes are causing the server to store too much stuff in memory? // QUESTION: does this mean mailboxes are causing the server to store too much stuff in memory?
if (lastKnownHash && typeof(lkh) !== "number") { if (lastKnownHash && typeof(lkh) !== "number") {
waitFor.abort(); waitFor.abort();
// XXX this smells bad
return void cb(new Error('EINVAL')); return void cb(new Error('EINVAL'));
} }
@ -452,7 +578,7 @@ module.exports.create = function (cfg) {
if (offset !== -1) { return; } if (offset !== -1) { return; }
// do a lookup from the index // do a lookup from the index
// XXX maybe we don't need this anymore? // FIXME maybe we don't need this anymore?
// otherwise we have a non-negative offset and we can start to read from there // otherwise we have a non-negative offset and we can start to read from there
store.readMessagesBin(channelName, 0, (msgObj, readMore, abort) => { store.readMessagesBin(channelName, 0, (msgObj, readMore, abort) => {
// tryParse return a parsed message or undefined // tryParse return a parsed message or undefined
@ -588,7 +714,6 @@ module.exports.create = function (cfg) {
// If it is, remove it from memory and broadcast a message to its members // If it is, remove it from memory and broadcast a message to its members
const onChannelMetadataChanged = function (ctx, channel) { const onChannelMetadataChanged = function (ctx, channel) {
// XXX lint compliance
channel = channel; channel = channel;
}; };
@ -673,6 +798,14 @@ module.exports.create = function (cfg) {
} }
metadata.channel = channelName; metadata.channel = channelName;
// if the user sends us an invalid key, we won't be able to validate their messages
// so they'll never get written to the log anyway. Let's just drop their message
// on the floor instead of doing a bunch of extra work
// TODO send them an error message so they know something is wrong
if (metadata.validateKey && !isValidValidateKeyString(metadata.validateKey)) {
return void Log.error('HK_INVALID_KEY', metadata.validateKey);
}
nThen(function (waitFor) { nThen(function (waitFor) {
var w = waitFor(); var w = waitFor();
@ -734,8 +867,11 @@ module.exports.create = function (cfg) {
// otherwise maybe we need to check that the metadata log is empty as well // otherwise maybe we need to check that the metadata log is empty as well
store.writeMetadata(channelName, JSON.stringify(metadata), function (err) { store.writeMetadata(channelName, JSON.stringify(metadata), function (err) {
if (err) { if (err) {
// XXX tell the user that there was a channel error? // FIXME tell the user that there was a channel error?
return void Log.error('HK_WRITE_METADATA'); return void Log.error('HK_WRITE_METADATA', {
channel: channelName,
error: err,
});
} }
}); });
@ -810,8 +946,8 @@ module.exports.create = function (cfg) {
// parsed[3] is the last known hash (optionnal) // parsed[3] is the last known hash (optionnal)
sendMsg(ctx, user, [seq, 'ACK']); sendMsg(ctx, user, [seq, 'ACK']);
// XXX should we send metadata here too? // FIXME should we send metadata here too?
// my gut says yes // none of the clientside code which uses this API needs metadata, but it won't hurt to send it (2019-08-22)
getHistoryAsync(ctx, parsed[1], -1, false, (msg, readMore) => { getHistoryAsync(ctx, parsed[1], -1, false, (msg, readMore) => {
if (!msg) { return; } if (!msg) { return; }
sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify(['FULL_HISTORY', msg])], readMore); sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify(['FULL_HISTORY', msg])], readMore);

@ -236,9 +236,13 @@ How to proceed
// writeMetadata appends to the dedicated log of metadata amendments // writeMetadata appends to the dedicated log of metadata amendments
var writeMetadata = function (env, channelId, data, cb) { var writeMetadata = function (env, channelId, data, cb) {
var path = mkMetadataPath(env, channelId); var path = mkMetadataPath(env, channelId);
// XXX appendFile isn't great
// but this is a simple way to get things working Fse.mkdirp(Path.dirname(path), PERMISSIVE, function (err) {
Fs.appendFile(path, data + '\n', cb); if (err && err.code !== 'EEXIST') { return void cb(err); }
// TODO see if we can make this any faster by using something other than appendFile
Fs.appendFile(path, data + '\n', cb);
});
}; };
@ -290,7 +294,11 @@ const mkOffsetCounter = () => {
}); });
}; };
// XXX write some docs for this magic // readMessagesBin asynchronously iterates over the messages in a channel log
// the handler for each message must call back to read more, which should mean
// that this function has a lower memory profile than our classic method
// of reading logs line by line.
// it also allows the handler to abort reading at any time
const readMessagesBin = (env, id, start, msgHandler, cb) => { const readMessagesBin = (env, id, start, msgHandler, cb) => {
const stream = Fs.createReadStream(mkPath(env, id), { start: start }); const stream = Fs.createReadStream(mkPath(env, id), { start: start });
let keepReading = true; let keepReading = true;
@ -341,23 +349,33 @@ var removeChannel = function (env, channelName, cb) {
var CB = Once(cb); var CB = Once(cb);
var errors = 0;
nThen(function (w) { nThen(function (w) {
Fs.unlink(channelPath, w(function (err) { Fs.unlink(channelPath, w(function (err) {
if (err) { if (err) {
// XXX handle ENOENT and only return an error if (err.code === 'ENOENT') {
// if both channel and metadata did not exist... errors++;
return;
}
w.abort(); w.abort();
CB(labelError("E_CHANNEL_REMOVAL", err)); CB(labelError("E_CHANNEL_REMOVAL", err));
} }
})); }));
Fs.unlink(metadataPath, w(function (err) { Fs.unlink(metadataPath, w(function (err) {
if (err) { if (err) {
if (err.code === 'ENOENT') { return; } // proceed if there's no metadata to delete if (err.code === 'ENOENT') {
errors++;
return;
} // proceed if there's no metadata to delete
w.abort(); w.abort();
CB(labelError("E_METADATA_REMOVAL", err)); CB(labelError("E_METADATA_REMOVAL", err));
} }
})); }));
}).nThen(function () { }).nThen(function () {
if (errors === 2) {
return void CB(labelError('E_REMOVE_CHANNEL', new Error("ENOENT")));
}
CB(); CB();
}); });
}; };
@ -421,12 +439,23 @@ var listChannels = function (root, handler, cb) {
if (err) { return void handler(err); } // Is this correct? if (err) { return void handler(err); } // Is this correct?
list.forEach(function (item) { list.forEach(function (item) {
// ignore things that don't match the naming pattern // ignore hidden files
// XXX don't ignore metadata files if there is no corresponding channel if (/^\./.test(item)) { return; }
// since you probably want to clean those up // ignore anything that isn't channel or metadata
if (/^\./.test(item) || !/[0-9a-fA-F]{32}\.ndjson$/.test(item)) { return; } if (!/^[0-9a-fA-F]{32}(\.metadata?)*\.ndjson$/.test(item)) {
return;
}
if (!/^[0-9a-fA-F]{32}\.ndjson$/.test(item)) {
// this will catch metadata, which we want to ignore if
// the corresponding channel is present
if (list.indexOf(item.replace(/\.metadata/, '')) !== -1) { return; }
// otherwise fall through
}
var filepath = Path.join(nestedDirPath, item); var filepath = Path.join(nestedDirPath, item);
var channel = filepath.replace(/\.ndjson$/, '').replace(/.*\//, ''); var channel = filepath
.replace(/\.ndjson$/, '')
.replace(/\.metadata/, '')
.replace(/.*\//, '');
if ([32, 34].indexOf(channel.length) === -1) { return; } if ([32, 34].indexOf(channel.length) === -1) { return; }
// otherwise throw it on the pile // otherwise throw it on the pile

Loading…
Cancel
Save