From 9d8bb43d039b288f5d3a5006d310f0ceeac4241e Mon Sep 17 00:00:00 2001 From: ansuz Date: Wed, 4 Mar 2020 12:56:33 -0500 Subject: [PATCH] finally untangle metadata and index caches --- lib/commands/metadata.js | 6 -- lib/hk-util.js | 180 +++++++++++++++++---------------------- 2 files changed, 80 insertions(+), 106 deletions(-) diff --git a/lib/commands/metadata.js b/lib/commands/metadata.js index a5bca0dca..b45bddf52 100644 --- a/lib/commands/metadata.js +++ b/lib/commands/metadata.js @@ -139,16 +139,10 @@ Data.setMetadata = function (Env, safeKey, data, cb, Server) { next(); const metadata_cache = Env.metadata_cache; - const channel_cache = Env.channel_cache; // update the cached metadata metadata_cache[channel] = metadata; - // as well as the metadata that's attached to the index... - // XXX determine if we actually need this... - var index = Util.find(channel_cache, [channel, 'index']); - if (index && typeof(index) === 'object') { index.metadata = metadata; } - // it's easy to check if the channel is restricted const isRestricted = metadata.restricted; // and these values will be used in any case diff --git a/lib/hk-util.js b/lib/hk-util.js index 810077c1c..7545c2f56 100644 --- a/lib/hk-util.js +++ b/lib/hk-util.js @@ -227,7 +227,6 @@ const computeIndex = function (Env, channelName, cb) { const cpIndex = []; let messageBuf = []; - let metadata; let i = 0; const CB = Util.once(cb); @@ -235,14 +234,9 @@ const computeIndex = function (Env, channelName, cb) { const offsetByHash = {}; let size = 0; nThen(function (w) { - getMetadata(Env, channelName, w(function (err, _metadata) { - //if (err) { console.log(err); } - metadata = _metadata; - })); - }).nThen(function (w) { // iterate over all messages in the channel log // old channels can contain metadata as the first message of the log - // remember metadata the first time you encounter it + // skip over metadata as that is handled elsewhere // otherwise index important messages in the log store.readMessagesBin(channelName, 0, (msgObj, readMore) => { let msg; @@ -303,7 +297,7 @@ const computeIndex = function (Env, channelName, cb) { cpIndex: sliceCpIndex(cpIndex, i), offsetByHash: offsetByHash, size: size, - metadata: metadata, + //metadata: metadata, line: i }); }); @@ -613,11 +607,40 @@ const handleRPC = function (Env, Server, seq, userId, parsed) { } }; +/* + This is called when a user tries to connect to a channel that doesn't exist. + we initialize that channel by writing the metadata supplied by the user to its log. + if the provided metadata has an expire time then we also create a task to expire it. +*/ +const handleFirstMessage = function (Env, channelName, metadata) { + Env.store.writeMetadata(channelName, JSON.stringify(metadata), function (err) { + if (err) { + // FIXME tell the user that there was a channel error? + return void Env.Log.error('HK_WRITE_METADATA', { + channel: channelName, + error: err, + }); + } + }); + + // write tasks + if(metadata.expire && typeof(metadata.expire) === 'number') { + // the fun part... + // the user has said they want this pad to expire at some point + Env.tasks.write(metadata.expire, "EXPIRE", [ channelName ], function (err) { + if (err) { + // if there is an error, we don't want to crash the whole server... + // just log it, and if there's a problem you'll be able to fix it + // at a later date with the provided information + Env.Log.error('HK_CREATE_EXPIRE_TASK', err); + Env.Log.info('HK_INVALID_EXPIRE_TASK', JSON.stringify([metadata.expire, 'EXPIRE', channelName])); + } + }); + } +}; + const handleGetHistory = function (Env, Server, seq, userId, parsed) { - const store = Env.store; - const tasks = Env.tasks; const metadata_cache = Env.metadata_cache; - const channel_cache = Env.channel_cache; const HISTORY_KEEPER_ID = Env.id; const Log = Env.Log; @@ -656,30 +679,33 @@ const handleGetHistory = function (Env, Server, seq, userId, parsed) { nThen(function (waitFor) { var w = waitFor(); - - /* unless this is a young channel, we will serve all messages from an offset - this will not include the channel metadata, so we need to explicitly fetch that. - unfortunately, we can't just serve it blindly, since then young channels will - send the metadata twice, so let's do a quick check of what we're going to serve... + /* fetch the channel's metadata. + use it to check if the channel has expired. + send it to the client if it exists. */ - getIndex(Env, channelName, waitFor((err, index) => { - /* if there's an error here, it should be encountered - and handled by the next nThen block. - so, let's just fall through... - */ - if (err) { return w(); } - + getMetadata(Env, channelName, waitFor(function (err, metadata) { + if (err) { + Env.Log.error('HK_GET_HISTORY_METADATA', { + channel: channelName, + error: err, + }); + return void w(); + } + if (!metadata || !metadata.channel) { return w(); } + // if there is already a metadata log then use it instead + // of whatever the user supplied // it's possible that the channel doesn't have metadata // but in that case there's no point in checking if the channel expired // or in trying to send metadata, so just skip this block - if (!index || !index.metadata) { return void w(); } + if (!metadata) { return void w(); } + // And then check if the channel is expired. If it is, send the error and abort // FIXME this is hard to read because 'checkExpired' has side effects if (checkExpired(Env, Server, channelName)) { return void waitFor.abort(); } // always send metadata with GET_HISTORY requests - Server.send(userId, [0, HISTORY_KEEPER_ID, 'MSG', userId, JSON.stringify(index.metadata)], w); + Server.send(userId, [0, HISTORY_KEEPER_ID, 'MSG', userId, JSON.stringify(metadata)], w); })); }).nThen(() => { let msgCount = 0; @@ -699,45 +725,8 @@ const handleGetHistory = function (Env, Server, seq, userId, parsed) { return; } - const chan = channel_cache[channelName]; - if (msgCount === 0 && !metadata_cache[channelName] && Server.channelContainsUser(channelName, userId)) { - metadata_cache[channelName] = metadata; - - // the index will have already been constructed and cached at this point - // but it will not have detected any metadata because it hasn't been written yet - // this means that the cache starts off as invalid, so we have to correct it - if (chan && chan.index) { chan.index.metadata = metadata; } - - // new channels will always have their metadata written to a dedicated metadata log - // but any lines after the first which are not amendments in a particular format will be ignored. - // Thus we should be safe from race conditions here if just write metadata to the log as below... - // TODO validate this logic - // otherwise maybe we need to check that the metadata log is empty as well - store.writeMetadata(channelName, JSON.stringify(metadata), function (err) { - if (err) { - // FIXME tell the user that there was a channel error? - return void Log.error('HK_WRITE_METADATA', { - channel: channelName, - error: err, - }); - } - }); - - // write tasks - if(metadata.expire && typeof(metadata.expire) === 'number') { - // the fun part... - // the user has said they want this pad to expire at some point - tasks.write(metadata.expire, "EXPIRE", [ channelName ], function (err) { - if (err) { - // if there is an error, we don't want to crash the whole server... - // just log it, and if there's a problem you'll be able to fix it - // at a later date with the provided information - Log.error('HK_CREATE_EXPIRE_TASK', err); - Log.info('HK_INVALID_EXPIRE_TASK', JSON.stringify([metadata.expire, 'EXPIRE', channelName])); - } - }); - } + handleFirstMessage(Env, channelName, metadata); Server.send(userId, [0, HISTORY_KEEPER_ID, 'MSG', userId, JSON.stringify(metadata)]); } @@ -859,7 +848,7 @@ HK.onDirectMessage = function (Env, Server, seq, userId, json) { // to stop people from loading history they shouldn't see. var channelName = parsed[1]; nThen(function (w) { - HK.getMetadata(Env, channelName, w(function (err, metadata) { + getMetadata(Env, channelName, w(function (err, metadata) { if (err) { // stream errors? // we should log these, but if we can't load metadata @@ -955,46 +944,37 @@ HK.onChannelMessage = function (Env, Server, channel, msgStruct) { let metadata; nThen(function (w) { - // getIndex (and therefore the latest metadata) - getIndex(Env, channel.id, w(function (err, index) { - if (err) { - w.abort(); - return void Log.error('CHANNEL_MESSAGE_ERROR', err); - } - - if (!index.metadata) { - // if there's no channel metadata then it can't be an expiring channel - // nor can we possibly validate it - return; - } - - metadata = index.metadata; + getMetadata(Env, channel.id, w(function (err, _metadata) { + // if there's no channel metadata then it can't be an expiring channel + // nor can we possibly validate it + if (!_metadata) { return; } + metadata = _metadata; // don't write messages to expired channels if (checkExpired(Env, Server, channel)) { return void w.abort(); } - - // if there's no validateKey present skip to the next block - if (!metadata.validateKey) { return; } - - // trim the checkpoint indicator off the message if it's present - let signedMsg = (isCp) ? msgStruct[4].replace(CHECKPOINT_PATTERN, '') : msgStruct[4]; - // convert the message from a base64 string into a Uint8Array - - // FIXME this can fail and the client won't notice - signedMsg = Nacl.util.decodeBase64(signedMsg); - - // FIXME this can blow up - // TODO check that that won't cause any problems other than not being able to append... - const validateKey = Nacl.util.decodeBase64(metadata.validateKey); - // validate the message - const validated = Nacl.sign.open(signedMsg, validateKey); - if (!validated) { - // don't go any further if the message fails validation - w.abort(); - Log.info("HK_SIGNED_MESSAGE_REJECTED", 'Channel '+channel.id); - return; - } })); + }).nThen(function (w) { + // if there's no validateKey present skip to the next block + if (!metadata.validateKey) { return; } + + // trim the checkpoint indicator off the message if it's present + let signedMsg = (isCp) ? msgStruct[4].replace(CHECKPOINT_PATTERN, '') : msgStruct[4]; + // convert the message from a base64 string into a Uint8Array + + // FIXME this can fail and the client won't notice + signedMsg = Nacl.util.decodeBase64(signedMsg); + + // FIXME this can blow up + // TODO check that that won't cause any problems other than not being able to append... + const validateKey = Nacl.util.decodeBase64(metadata.validateKey); + // validate the message + const validated = Nacl.sign.open(signedMsg, validateKey); + if (!validated) { + // don't go any further if the message fails validation + w.abort(); + Log.info("HK_SIGNED_MESSAGE_REJECTED", 'Channel '+channel.id); + return; + } }).nThen(function () { // do checkpoint stuff...