From 9de073c269ac8d7ac6f2f45641422448023b5a0f Mon Sep 17 00:00:00 2001 From: ansuz Date: Thu, 16 Jan 2020 14:25:00 -0500 Subject: [PATCH] finally get around to reorganizing the messiest part of history keeper --- historyKeeper.js | 416 ++++++++++++++++++++++++----------------------- 1 file changed, 213 insertions(+), 203 deletions(-) diff --git a/historyKeeper.js b/historyKeeper.js index 59a694e8c..1035681d8 100644 --- a/historyKeeper.js +++ b/historyKeeper.js @@ -730,227 +730,204 @@ module.exports.create = function (cfg) { } }; - /* onDirectMessage - * exported for use by the netflux-server - * parses and handles all direct messages directed to the history keeper - * check if it's expired and execute all the associated side-effects - * routes queries to the appropriate handlers - * GET_HISTORY - * GET_HISTORY_RANGE - * GET_FULL_HISTORY - * RPC - * if the rpc has special hooks that the history keeper needs to be aware of... - * execute them here... - - */ - const onDirectMessage = function (ctx, seq, user, json) { - let parsed; - let channelName; - - Log.silly('HK_MESSAGE', json); - - try { - parsed = JSON.parse(json[2]); - } catch (err) { - Log.error("HK_PARSE_CLIENT_MESSAGE", json); - return; - } - - // If the requested history is for an expired channel, abort - // Note the if we don't have the keys for that channel in metadata_cache, we'll - // have to abort later (once we know the expiration time) - if (checkExpired(ctx, parsed[1])) { return; } - - if (parsed[0] === 'GET_HISTORY') { - // parsed[1] is the channel id - // parsed[2] is a validation key or an object containing metadata (optionnal) - // parsed[3] is the last known hash (optionnal) - sendMsg(ctx, user, [seq, 'ACK']); - channelName = parsed[1]; - var config = parsed[2]; - var metadata = {}; - var lastKnownHash; - - // clients can optionally pass a map of attributes - // if the channel already exists this map will be ignored - // otherwise it will be stored as the initial metadata state for the channel - if (config && typeof config === "object" && !Array.isArray(parsed[2])) { - lastKnownHash = config.lastKnownHash; - metadata = config.metadata || {}; - if (metadata.expire) { - metadata.expire = +metadata.expire * 1000 + (+new Date()); - } - } - metadata.channel = channelName; - metadata.created = +new Date(); - - // if the user sends us an invalid key, we won't be able to validate their messages - // so they'll never get written to the log anyway. Let's just drop their message - // on the floor instead of doing a bunch of extra work - // TODO send them an error message so they know something is wrong - if (metadata.validateKey && !isValidValidateKeyString(metadata.validateKey)) { - return void Log.error('HK_INVALID_KEY', metadata.validateKey); + const handleGetHistory = function (ctx, seq, user, parsed) { + // parsed[1] is the channel id + // parsed[2] is a validation key or an object containing metadata (optionnal) + // parsed[3] is the last known hash (optionnal) + sendMsg(ctx, user, [seq, 'ACK']); + var channelName = parsed[1]; + var config = parsed[2]; + var metadata = {}; + var lastKnownHash; + + // clients can optionally pass a map of attributes + // if the channel already exists this map will be ignored + // otherwise it will be stored as the initial metadata state for the channel + if (config && typeof config === "object" && !Array.isArray(parsed[2])) { + lastKnownHash = config.lastKnownHash; + metadata = config.metadata || {}; + if (metadata.expire) { + metadata.expire = +metadata.expire * 1000 + (+new Date()); } + } + metadata.channel = channelName; + metadata.created = +new Date(); + + // if the user sends us an invalid key, we won't be able to validate their messages + // so they'll never get written to the log anyway. Let's just drop their message + // on the floor instead of doing a bunch of extra work + // TODO send them an error message so they know something is wrong + if (metadata.validateKey && !isValidValidateKeyString(metadata.validateKey)) { + return void Log.error('HK_INVALID_KEY', metadata.validateKey); + } - nThen(function (waitFor) { - var w = waitFor(); + nThen(function (waitFor) { + var w = waitFor(); - /* unless this is a young channel, we will serve all messages from an offset - this will not include the channel metadata, so we need to explicitly fetch that. - unfortunately, we can't just serve it blindly, since then young channels will - send the metadata twice, so let's do a quick check of what we're going to serve... + /* unless this is a young channel, we will serve all messages from an offset + this will not include the channel metadata, so we need to explicitly fetch that. + unfortunately, we can't just serve it blindly, since then young channels will + send the metadata twice, so let's do a quick check of what we're going to serve... + */ + getIndex(ctx, channelName, waitFor((err, index) => { + /* if there's an error here, it should be encountered + and handled by the next nThen block. + so, let's just fall through... */ - getIndex(ctx, channelName, waitFor((err, index) => { - /* if there's an error here, it should be encountered - and handled by the next nThen block. - so, let's just fall through... - */ - if (err) { return w(); } - - - // it's possible that the channel doesn't have metadata - // but in that case there's no point in checking if the channel expired - // or in trying to send metadata, so just skip this block - if (!index || !index.metadata) { return void w(); } - // And then check if the channel is expired. If it is, send the error and abort - // FIXME this is hard to read because 'checkExpired' has side effects - if (checkExpired(ctx, channelName)) { return void waitFor.abort(); } - // always send metadata with GET_HISTORY requests - sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify(index.metadata)], w); - })); - }).nThen(() => { - let msgCount = 0; - - // TODO compute lastKnownHash in a manner such that it will always skip past the metadata line? - getHistoryAsync(ctx, channelName, lastKnownHash, false, (msg, readMore) => { - if (!msg) { return; } - msgCount++; - // avoid sending the metadata message a second time - if (isMetadataMessage(msg) && metadata_cache[channelName]) { return readMore(); } - sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify(msg)], readMore); - }, (err) => { - if (err && err.code !== 'ENOENT') { - if (err.message !== 'EINVAL') { Log.error("HK_GET_HISTORY", err); } - const parsedMsg = {error:err.message, channel: channelName}; - sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify(parsedMsg)]); - return; - } - - const chan = ctx.channels[channelName]; + if (err) { return w(); } + + + // it's possible that the channel doesn't have metadata + // but in that case there's no point in checking if the channel expired + // or in trying to send metadata, so just skip this block + if (!index || !index.metadata) { return void w(); } + // And then check if the channel is expired. If it is, send the error and abort + // FIXME this is hard to read because 'checkExpired' has side effects + if (checkExpired(ctx, channelName)) { return void waitFor.abort(); } + // always send metadata with GET_HISTORY requests + sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify(index.metadata)], w); + })); + }).nThen(() => { + let msgCount = 0; - if (msgCount === 0 && !metadata_cache[channelName] && chan && chan.indexOf(user) > -1) { - metadata_cache[channelName] = metadata; + // TODO compute lastKnownHash in a manner such that it will always skip past the metadata line? + getHistoryAsync(ctx, channelName, lastKnownHash, false, (msg, readMore) => { + if (!msg) { return; } + msgCount++; + // avoid sending the metadata message a second time + if (isMetadataMessage(msg) && metadata_cache[channelName]) { return readMore(); } + sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify(msg)], readMore); + }, (err) => { + if (err && err.code !== 'ENOENT') { + if (err.message !== 'EINVAL') { Log.error("HK_GET_HISTORY", err); } + const parsedMsg = {error:err.message, channel: channelName}; + sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify(parsedMsg)]); + return; + } - // the index will have already been constructed and cached at this point - // but it will not have detected any metadata because it hasn't been written yet - // this means that the cache starts off as invalid, so we have to correct it - if (chan && chan.index) { chan.index.metadata = metadata; } + const chan = ctx.channels[channelName]; + + if (msgCount === 0 && !metadata_cache[channelName] && chan && chan.indexOf(user) > -1) { + metadata_cache[channelName] = metadata; + + // the index will have already been constructed and cached at this point + // but it will not have detected any metadata because it hasn't been written yet + // this means that the cache starts off as invalid, so we have to correct it + if (chan && chan.index) { chan.index.metadata = metadata; } + + // new channels will always have their metadata written to a dedicated metadata log + // but any lines after the first which are not amendments in a particular format will be ignored. + // Thus we should be safe from race conditions here if just write metadata to the log as below... + // TODO validate this logic + // otherwise maybe we need to check that the metadata log is empty as well + store.writeMetadata(channelName, JSON.stringify(metadata), function (err) { + if (err) { + // FIXME tell the user that there was a channel error? + return void Log.error('HK_WRITE_METADATA', { + channel: channelName, + error: err, + }); + } + }); - // new channels will always have their metadata written to a dedicated metadata log - // but any lines after the first which are not amendments in a particular format will be ignored. - // Thus we should be safe from race conditions here if just write metadata to the log as below... - // TODO validate this logic - // otherwise maybe we need to check that the metadata log is empty as well - store.writeMetadata(channelName, JSON.stringify(metadata), function (err) { + // write tasks + if(tasks && metadata.expire && typeof(metadata.expire) === 'number') { + // the fun part... + // the user has said they want this pad to expire at some point + tasks.write(metadata.expire, "EXPIRE", [ channelName ], function (err) { if (err) { - // FIXME tell the user that there was a channel error? - return void Log.error('HK_WRITE_METADATA', { - channel: channelName, - error: err, - }); + // if there is an error, we don't want to crash the whole server... + // just log it, and if there's a problem you'll be able to fix it + // at a later date with the provided information + Log.error('HK_CREATE_EXPIRE_TASK', err); + Log.info('HK_INVALID_EXPIRE_TASK', JSON.stringify([metadata.expire, 'EXPIRE', channelName])); } }); - - // write tasks - if(tasks && metadata.expire && typeof(metadata.expire) === 'number') { - // the fun part... - // the user has said they want this pad to expire at some point - tasks.write(metadata.expire, "EXPIRE", [ channelName ], function (err) { - if (err) { - // if there is an error, we don't want to crash the whole server... - // just log it, and if there's a problem you'll be able to fix it - // at a later date with the provided information - Log.error('HK_CREATE_EXPIRE_TASK', err); - Log.info('HK_INVALID_EXPIRE_TASK', JSON.stringify([metadata.expire, 'EXPIRE', channelName])); - } - }); - } - sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify(metadata)]); } + sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify(metadata)]); + } - // End of history message: - let parsedMsg = {state: 1, channel: channelName}; - sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify(parsedMsg)]); - }); + // End of history message: + let parsedMsg = {state: 1, channel: channelName}; + sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify(parsedMsg)]); }); - } else if (parsed[0] === 'GET_HISTORY_RANGE') { - channelName = parsed[1]; - var map = parsed[2]; - if (!(map && typeof(map) === 'object')) { - return void sendMsg(ctx, user, [seq, 'ERROR', 'INVALID_ARGS', HISTORY_KEEPER_ID]); - } + }); + }; - var oldestKnownHash = map.from; - var desiredMessages = map.count; - var desiredCheckpoint = map.cpCount; - var txid = map.txid; - if (typeof(desiredMessages) !== 'number' && typeof(desiredCheckpoint) !== 'number') { - return void sendMsg(ctx, user, [seq, 'ERROR', 'UNSPECIFIED_COUNT', HISTORY_KEEPER_ID]); - } + const handleGetHistoryRange = function (ctx, seq, user, parsed) { + var channelName = parsed[1]; + var map = parsed[2]; + if (!(map && typeof(map) === 'object')) { + return void sendMsg(ctx, user, [seq, 'ERROR', 'INVALID_ARGS', HISTORY_KEEPER_ID]); + } - if (!txid) { - return void sendMsg(ctx, user, [seq, 'ERROR', 'NO_TXID', HISTORY_KEEPER_ID]); - } + var oldestKnownHash = map.from; + var desiredMessages = map.count; + var desiredCheckpoint = map.cpCount; + var txid = map.txid; + if (typeof(desiredMessages) !== 'number' && typeof(desiredCheckpoint) !== 'number') { + return void sendMsg(ctx, user, [seq, 'ERROR', 'UNSPECIFIED_COUNT', HISTORY_KEEPER_ID]); + } - sendMsg(ctx, user, [seq, 'ACK']); - return void getOlderHistory(channelName, oldestKnownHash, function (messages) { - var toSend = []; - if (typeof (desiredMessages) === "number") { - toSend = messages.slice(-desiredMessages); - } else { - let cpCount = 0; - for (var i = messages.length - 1; i >= 0; i--) { - if (/^cp\|/.test(messages[i][4]) && i !== (messages.length - 1)) { - cpCount++; - } - toSend.unshift(messages[i]); - if (cpCount >= desiredCheckpoint) { break; } + if (!txid) { + return void sendMsg(ctx, user, [seq, 'ERROR', 'NO_TXID', HISTORY_KEEPER_ID]); + } + + sendMsg(ctx, user, [seq, 'ACK']); + return void getOlderHistory(channelName, oldestKnownHash, function (messages) { + var toSend = []; + if (typeof (desiredMessages) === "number") { + toSend = messages.slice(-desiredMessages); + } else { + let cpCount = 0; + for (var i = messages.length - 1; i >= 0; i--) { + if (/^cp\|/.test(messages[i][4]) && i !== (messages.length - 1)) { + cpCount++; } + toSend.unshift(messages[i]); + if (cpCount >= desiredCheckpoint) { break; } } - toSend.forEach(function (msg) { - sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, - JSON.stringify(['HISTORY_RANGE', txid, msg])]); - }); - + } + toSend.forEach(function (msg) { sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, - JSON.stringify(['HISTORY_RANGE_END', txid, channelName]) - ]); - }); - } else if (parsed[0] === 'GET_FULL_HISTORY') { - // parsed[1] is the channel id - // parsed[2] is a validation key (optionnal) - // parsed[3] is the last known hash (optionnal) - sendMsg(ctx, user, [seq, 'ACK']); - - // FIXME should we send metadata here too? - // none of the clientside code which uses this API needs metadata, but it won't hurt to send it (2019-08-22) - getHistoryAsync(ctx, parsed[1], -1, false, (msg, readMore) => { - if (!msg) { return; } - sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify(['FULL_HISTORY', msg])], readMore); - }, (err) => { - let parsedMsg = ['FULL_HISTORY_END', parsed[1]]; - if (err) { - Log.error('HK_GET_FULL_HISTORY', err.stack); - parsedMsg = ['ERROR', parsed[1], err.message]; - } - sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify(parsedMsg)]); + JSON.stringify(['HISTORY_RANGE', txid, msg])]); }); - } else if (rpc) { - /* RPC Calls... */ - var rpc_call = parsed.slice(1); - sendMsg(ctx, user, [seq, 'ACK']); - try { + sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, + JSON.stringify(['HISTORY_RANGE_END', txid, channelName]) + ]); + }); + }; + + const handleGetFullHistory = function (ctx, seq, user, parsed) { + // parsed[1] is the channel id + // parsed[2] is a validation key (optionnal) + // parsed[3] is the last known hash (optionnal) + sendMsg(ctx, user, [seq, 'ACK']); + + // FIXME should we send metadata here too? + // none of the clientside code which uses this API needs metadata, but it won't hurt to send it (2019-08-22) + return void getHistoryAsync(ctx, parsed[1], -1, false, (msg, readMore) => { + if (!msg) { return; } + sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify(['FULL_HISTORY', msg])], readMore); + }, (err) => { + let parsedMsg = ['FULL_HISTORY_END', parsed[1]]; + if (err) { + Log.error('HK_GET_FULL_HISTORY', err.stack); + parsedMsg = ['ERROR', parsed[1], err.message]; + } + sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify(parsedMsg)]); + }); + }; + + const handleRPC = function (ctx, seq, user, parsed) { + if (typeof(rpc) !== 'function') { return; } + + /* RPC Calls... */ + var rpc_call = parsed.slice(1); + + sendMsg(ctx, user, [seq, 'ACK']); + try { // slice off the sequence number and pass in the rest of the message rpc(ctx, rpc_call, function (err, output) { if (err) { @@ -992,10 +969,43 @@ module.exports.create = function (cfg) { // finally, send a response to the client that sent the RPC sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify([parsed[0]].concat(output))]); }); - } catch (e) { - sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify([parsed[0], 'ERROR', 'SERVER_ERROR'])]); - } + } catch (e) { + sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify([parsed[0], 'ERROR', 'SERVER_ERROR'])]); + } + }; + + /* onDirectMessage + * exported for use by the netflux-server + * parses and handles all direct messages directed to the history keeper + * check if it's expired and execute all the associated side-effects + * routes queries to the appropriate handlers + */ + const onDirectMessage = function (ctx, seq, user, json) { + Log.silly('HK_MESSAGE', json); + + let parsed; + try { + parsed = JSON.parse(json[2]); + } catch (err) { + Log.error("HK_PARSE_CLIENT_MESSAGE", json); + return; + } + + // If the requested history is for an expired channel, abort + // Note the if we don't have the keys for that channel in metadata_cache, we'll + // have to abort later (once we know the expiration time) + if (checkExpired(ctx, parsed[1])) { return; } + + if (parsed[0] === 'GET_HISTORY') { + return void handleGetHistory(ctx, seq, user, parsed); + } + if (parsed[0] === 'GET_HISTORY_RANGE') { + return void handleGetHistoryRange(ctx, seq, user, parsed); + } + if (parsed[0] === 'GET_FULL_HISTORY') { + return void handleGetFullHistory(ctx, seq, user, parsed); } + return void handleRPC(ctx, seq, user, parsed); }; return {