diff --git a/historyKeeper.js b/historyKeeper.js index 5eaafffcc..564de6f5f 100644 --- a/historyKeeper.js +++ b/historyKeeper.js @@ -91,6 +91,8 @@ module.exports.create = function (cfg) { * offsetByHash: * a map containing message offsets by their hash * this is for every message in history, so it could be very large... + * XXX OFFSET + * except we remove offsets from the map if they occur before the oldest relevant checkpoint * size: in bytes * metadata: * validationKey @@ -248,6 +250,7 @@ module.exports.create = function (cfg) { * the fix is to use callbacks and implement queueing for writes * to guarantee that offset computation is always atomic with writes + TODO rename maybeMsgHash to optionalMsgHash */ const storeMessage = function (ctx, channel, msg, isCp, maybeMsgHash) { const msgBin = new Buffer(msg + '\n', 'utf8'); @@ -272,6 +275,7 @@ module.exports.create = function (cfg) { if (isCp) { index.cpIndex = sliceCpIndex(index.cpIndex, index.line || 0); for (let k in index.offsetByHash) { + // XXX OFFSET if (index.offsetByHash[k] < index.cpIndex[0]) { delete index.offsetByHash[k]; } @@ -325,6 +329,7 @@ module.exports.create = function (cfg) { let signedMsg = (isCp) ? msgStruct[4].replace(CHECKPOINT_PATTERN, '') : msgStruct[4]; signedMsg = Nacl.util.decodeBase64(signedMsg); // FIXME PERFORMANCE: cache the decoded key instead of decoding it every time + // CPU/Memory tradeoff const validateKey = Nacl.util.decodeBase64(metadata_cache[channel.id].validateKey); const validated = Nacl.sign.open(signedMsg, validateKey); if (!validated) { @@ -337,6 +342,7 @@ module.exports.create = function (cfg) { // is a potential source of bugs if one editor has high latency and // pushes a duplicate of an earlier checkpoint than the latest which // has been pushed by editors with low latency + // FIXME if (Array.isArray(id) && id[2]) { // Store new checkpoint hash channel.lastSavedCp = id[2]; @@ -360,14 +366,17 @@ module.exports.create = function (cfg) { returns a number representing the byte offset from the start of the log for whatever history you're seeking. - query by providing a 'lastKnownHash', which is really just a string of - the first 64 characters of an encrypted message. + query by providing a 'lastKnownHash', + which is really just a string of the first 64 characters of an encrypted message. + OR by -1 which indicates that we want the full history (byte offset 0) + OR nothing, which indicates that you want whatever messages the historyKeeper deems relevant + (typically the last few checkpoints) this function embeds a lot of the history keeper's logic: 0. if you passed -1 as the lastKnownHash it means you want the complete history * I'm not sure why you'd need to call this function if you know it will return 0 in this case... - * ansuz + * it has a side-effect of filling the index cache if it's empty 1. if you provided a lastKnownHash and that message does not exist in the history: * either the client has made a mistake or the history they knew about no longer exists * call back with EINVAL @@ -378,7 +387,7 @@ module.exports.create = function (cfg) { * return the offset of the earliest checkpoint which 'sliceCpIndex' considers relevant 3. if you did provide a lastKnownHash * read through the log until you find the hash that you're looking for - * call back with either the bytee offset of the message that you found OR + * call back with either the byte offset of the message that you found OR * -1 if you didn't find it */ @@ -390,9 +399,20 @@ module.exports.create = function (cfg) { getIndex(ctx, channelName, waitFor((err, index) => { if (err) { waitFor.abort(); return void cb(err); } + // check if the "hash" the client is requesting exists in the index const lkh = index.offsetByHash[lastKnownHash]; + // we evict old hashes from the index as new checkpoints are discovered. + // if someone connects and asks for a hash that is no longer relevant, + // we tell them it's an invalid request. This is because of the semantics of "GET_HISTORY" + // which is only ever used when connecting or reconnecting in typical uses of history... + // this assumption should hold for uses by chainpad, but perhaps not for other uses cases. + // EXCEPT: other cases don't use checkpoints! + // clients that are told that their request is invalid should just make another request + // without specifying the hash, and just trust the server to give them the relevant data. + // QUESTION: does this mean mailboxes are causing the server to store too much stuff in memory? if (lastKnownHash && typeof(lkh) !== "number") { waitFor.abort(); + // XXX this smells bad return void cb(new Error('EINVAL')); } @@ -420,6 +440,8 @@ module.exports.create = function (cfg) { // returning falls through to the next block and therefore returns -1 if (offset !== -1) { return; } + // do a lookup from the index + // XXX maybe we don't need this anymore? // otherwise we have a non-negative offset and we can start to read from there store.readMessagesBin(channelName, 0, (msgObj, readMore, abort) => { // tryParse return a parsed message or undefined @@ -444,6 +466,9 @@ module.exports.create = function (cfg) { * streams through the rest of the messages, safely parsing them and returning the parsed content to the handler * calls back when it has reached the end of the log + Used by: + * GET_HISTORY + */ const getHistoryAsync = (ctx, channelName, lastKnownHash, beforeHash, handler, cb) => { let offset = -1; @@ -472,6 +497,9 @@ module.exports.create = function (cfg) { * stores all messages in history as they are read * can therefore be very expensive for memory * should probably be converted to a streaming interface + + Used by: + * GET_HISTORY_RANGE */ const getOlderHistory = function (channelName, oldestKnownHash, cb) { var messageBuffer = []; @@ -482,10 +510,11 @@ module.exports.create = function (cfg) { let parsed = tryParse(msgStr); if (typeof parsed === "undefined") { return; } - if (parsed.validateKey) { - metadata_cache[channelName] = parsed; - return; - } + // identify classic metadata messages by their inclusion of a channel. + // and don't send metadata, since: + // 1. the user won't be interested in it + // 2. this metadata is potentially incomplete/incorrect + if (parsed.channel) { return; } var content = parsed[4]; if (typeof(content) !== 'string') { return; } @@ -547,6 +576,10 @@ module.exports.create = function (cfg) { // Check if the selected channel is expired // If it is, remove it from memory and broadcast a message to its members + const onChannelMetadataChanged = function (ctx, channel) { + // XXX + }; + /* checkExpired * synchronously returns true or undefined to indicate whether the channel is expired * according to its metadata @@ -616,7 +649,9 @@ module.exports.create = function (cfg) { var lastKnownHash = parsed[3]; var owners; var expire; - if (parsed[2] && typeof parsed[2] === "object") { // FIXME METADATA RECEIVE + // XXX we can be a bit more strict in our validation here + // maybe we should check that it's an object and not an array? + if (parsed[2] && typeof parsed[2] === "object") { validateKey = parsed[2].validateKey; lastKnownHash = parsed[2].lastKnownHash; owners = parsed[2].owners; @@ -663,39 +698,24 @@ module.exports.create = function (cfg) { // And then check if the channel is expired. If it is, send the error and abort // FIXME this is hard to read because 'checkExpired' has side effects if (checkExpired(ctx, channelName)) { return void waitFor.abort(); } - // Send the metadata to the user - if (!lastKnownHash && index.cpIndex.length > 1) { - sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify(index.metadata)], w); - return; - } - w(); + // always send metadata with GET_HISTORY requests + sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify(index.metadata)], w); })); }).nThen(() => { let msgCount = 0; - let expired = false; // XXX a lot of this logic is currently wrong - // we should have already sent the most up to data metadata (if it exists) - // we should have also already checked if the channel has expired - // we just need to avoid sending metadata a second time + // we should have already sent the most up-to-date metadata (if it exists) // TODO compute lastKnownHash in a manner such that it will always skip past the metadata line? getHistoryAsync(ctx, channelName, lastKnownHash, false, (msg, readMore) => { if (!msg) { return; } - if (msg.validateKey) { - // If it is a young channel, this is the part where we get the metadata - // Check if the channel is expired and abort if it is. - if (!metadata_cache[channelName]) { metadata_cache[channelName] = msg; } - expired = checkExpired(ctx, channelName); - } - - if (expired) { return void readMore(); } msgCount++; + // check for the channel because it's the one thing that should + // always exist in "classic metadata" + if (msg.channel && metadata_cache[channelName]) { return readMore(); } sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify(msg)], readMore); }, (err) => { - // If the pad is expired, stop here, we've already sent the error message - if (expired) { return; } - if (err && err.code !== 'ENOENT') { if (err.message !== 'EINVAL') { Log.error("HK_GET_HISTORY", err); } const parsedMsg = {error:err.message, channel: channelName}; @@ -726,10 +746,10 @@ module.exports.create = function (cfg) { store.writeMetadata(channelName, JSON.stringify(metadata), function (err) { if (err) { // XXX handle errors + return void console.error(err); } }); - //storeMessage(ctx, chan, JSON.stringify(metadata), false, undefined); // FIXME METADATA WRITE - sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify(metadata)]); // FIXME METADATA SEND + sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify(metadata)]); } // End of history message: @@ -820,15 +840,13 @@ module.exports.create = function (cfg) { } // FIXME METADATA CHANGE - /* if (msg[3] === 'SET_METADATA') { // or whatever we call the RPC???? // make sure we update our cache of metadata // or at least invalidate it and force other mechanisms to recompute its state // 'output' could be the new state as computed by rpc + onChannelMetadataChanged(ctx, msg[4]); } - */ - sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify([parsed[0]].concat(output))]); }); } catch (e) { diff --git a/storage/file.js b/storage/file.js index df4219d7d..a4e52825a 100644 --- a/storage/file.js +++ b/storage/file.js @@ -221,8 +221,10 @@ How to proceed }; var writeMetadata = function (env, channelId, data, cb) { - // XXX - cb("NOT_IMPLEMENTED"); + var path = mkMetadataPath(env, channelId); + // XXX appendFile isn't great + // but this is a simple way to get things working + Fs.appendFile(path, data + '\n', cb); }; const NEWLINE_CHR = ('\n').charCodeAt(0); @@ -268,6 +270,7 @@ const mkOffsetCounter = () => { }); }; +// XXX write some docs for this magic const readMessagesBin = (env, id, start, msgHandler, cb) => { const stream = Fs.createReadStream(mkPath(env, id), { start: start }); let keepReading = true;