finally untangle metadata and index caches

pull/1/head
ansuz 5 years ago
parent 32d769447a
commit 9d8bb43d03

@ -139,16 +139,10 @@ Data.setMetadata = function (Env, safeKey, data, cb, Server) {
next();
const metadata_cache = Env.metadata_cache;
const channel_cache = Env.channel_cache;
// update the cached metadata
metadata_cache[channel] = metadata;
// as well as the metadata that's attached to the index...
// XXX determine if we actually need this...
var index = Util.find(channel_cache, [channel, 'index']);
if (index && typeof(index) === 'object') { index.metadata = metadata; }
// it's easy to check if the channel is restricted
const isRestricted = metadata.restricted;
// and these values will be used in any case

@ -227,7 +227,6 @@ const computeIndex = function (Env, channelName, cb) {
const cpIndex = [];
let messageBuf = [];
let metadata;
let i = 0;
const CB = Util.once(cb);
@ -235,14 +234,9 @@ const computeIndex = function (Env, channelName, cb) {
const offsetByHash = {};
let size = 0;
nThen(function (w) {
getMetadata(Env, channelName, w(function (err, _metadata) {
//if (err) { console.log(err); }
metadata = _metadata;
}));
}).nThen(function (w) {
// iterate over all messages in the channel log
// old channels can contain metadata as the first message of the log
// remember metadata the first time you encounter it
// skip over metadata as that is handled elsewhere
// otherwise index important messages in the log
store.readMessagesBin(channelName, 0, (msgObj, readMore) => {
let msg;
@ -303,7 +297,7 @@ const computeIndex = function (Env, channelName, cb) {
cpIndex: sliceCpIndex(cpIndex, i),
offsetByHash: offsetByHash,
size: size,
metadata: metadata,
//metadata: metadata,
line: i
});
});
@ -613,11 +607,40 @@ const handleRPC = function (Env, Server, seq, userId, parsed) {
}
};
/*
This is called when a user tries to connect to a channel that doesn't exist.
we initialize that channel by writing the metadata supplied by the user to its log.
if the provided metadata has an expire time then we also create a task to expire it.
*/
const handleFirstMessage = function (Env, channelName, metadata) {
Env.store.writeMetadata(channelName, JSON.stringify(metadata), function (err) {
if (err) {
// FIXME tell the user that there was a channel error?
return void Env.Log.error('HK_WRITE_METADATA', {
channel: channelName,
error: err,
});
}
});
// write tasks
if(metadata.expire && typeof(metadata.expire) === 'number') {
// the fun part...
// the user has said they want this pad to expire at some point
Env.tasks.write(metadata.expire, "EXPIRE", [ channelName ], function (err) {
if (err) {
// if there is an error, we don't want to crash the whole server...
// just log it, and if there's a problem you'll be able to fix it
// at a later date with the provided information
Env.Log.error('HK_CREATE_EXPIRE_TASK', err);
Env.Log.info('HK_INVALID_EXPIRE_TASK', JSON.stringify([metadata.expire, 'EXPIRE', channelName]));
}
});
}
};
const handleGetHistory = function (Env, Server, seq, userId, parsed) {
const store = Env.store;
const tasks = Env.tasks;
const metadata_cache = Env.metadata_cache;
const channel_cache = Env.channel_cache;
const HISTORY_KEEPER_ID = Env.id;
const Log = Env.Log;
@ -656,30 +679,33 @@ const handleGetHistory = function (Env, Server, seq, userId, parsed) {
nThen(function (waitFor) {
var w = waitFor();
/* unless this is a young channel, we will serve all messages from an offset
this will not include the channel metadata, so we need to explicitly fetch that.
unfortunately, we can't just serve it blindly, since then young channels will
send the metadata twice, so let's do a quick check of what we're going to serve...
/* fetch the channel's metadata.
use it to check if the channel has expired.
send it to the client if it exists.
*/
getIndex(Env, channelName, waitFor((err, index) => {
/* if there's an error here, it should be encountered
and handled by the next nThen block.
so, let's just fall through...
*/
if (err) { return w(); }
getMetadata(Env, channelName, waitFor(function (err, metadata) {
if (err) {
Env.Log.error('HK_GET_HISTORY_METADATA', {
channel: channelName,
error: err,
});
return void w();
}
if (!metadata || !metadata.channel) { return w(); }
// if there is already a metadata log then use it instead
// of whatever the user supplied
// it's possible that the channel doesn't have metadata
// but in that case there's no point in checking if the channel expired
// or in trying to send metadata, so just skip this block
if (!index || !index.metadata) { return void w(); }
if (!metadata) { return void w(); }
// And then check if the channel is expired. If it is, send the error and abort
// FIXME this is hard to read because 'checkExpired' has side effects
if (checkExpired(Env, Server, channelName)) { return void waitFor.abort(); }
// always send metadata with GET_HISTORY requests
Server.send(userId, [0, HISTORY_KEEPER_ID, 'MSG', userId, JSON.stringify(index.metadata)], w);
Server.send(userId, [0, HISTORY_KEEPER_ID, 'MSG', userId, JSON.stringify(metadata)], w);
}));
}).nThen(() => {
let msgCount = 0;
@ -699,45 +725,8 @@ const handleGetHistory = function (Env, Server, seq, userId, parsed) {
return;
}
const chan = channel_cache[channelName];
if (msgCount === 0 && !metadata_cache[channelName] && Server.channelContainsUser(channelName, userId)) {
metadata_cache[channelName] = metadata;
// the index will have already been constructed and cached at this point
// but it will not have detected any metadata because it hasn't been written yet
// this means that the cache starts off as invalid, so we have to correct it
if (chan && chan.index) { chan.index.metadata = metadata; }
// new channels will always have their metadata written to a dedicated metadata log
// but any lines after the first which are not amendments in a particular format will be ignored.
// Thus we should be safe from race conditions here if just write metadata to the log as below...
// TODO validate this logic
// otherwise maybe we need to check that the metadata log is empty as well
store.writeMetadata(channelName, JSON.stringify(metadata), function (err) {
if (err) {
// FIXME tell the user that there was a channel error?
return void Log.error('HK_WRITE_METADATA', {
channel: channelName,
error: err,
});
}
});
// write tasks
if(metadata.expire && typeof(metadata.expire) === 'number') {
// the fun part...
// the user has said they want this pad to expire at some point
tasks.write(metadata.expire, "EXPIRE", [ channelName ], function (err) {
if (err) {
// if there is an error, we don't want to crash the whole server...
// just log it, and if there's a problem you'll be able to fix it
// at a later date with the provided information
Log.error('HK_CREATE_EXPIRE_TASK', err);
Log.info('HK_INVALID_EXPIRE_TASK', JSON.stringify([metadata.expire, 'EXPIRE', channelName]));
}
});
}
handleFirstMessage(Env, channelName, metadata);
Server.send(userId, [0, HISTORY_KEEPER_ID, 'MSG', userId, JSON.stringify(metadata)]);
}
@ -859,7 +848,7 @@ HK.onDirectMessage = function (Env, Server, seq, userId, json) {
// to stop people from loading history they shouldn't see.
var channelName = parsed[1];
nThen(function (w) {
HK.getMetadata(Env, channelName, w(function (err, metadata) {
getMetadata(Env, channelName, w(function (err, metadata) {
if (err) {
// stream errors?
// we should log these, but if we can't load metadata
@ -955,24 +944,16 @@ HK.onChannelMessage = function (Env, Server, channel, msgStruct) {
let metadata;
nThen(function (w) {
// getIndex (and therefore the latest metadata)
getIndex(Env, channel.id, w(function (err, index) {
if (err) {
w.abort();
return void Log.error('CHANNEL_MESSAGE_ERROR', err);
}
if (!index.metadata) {
getMetadata(Env, channel.id, w(function (err, _metadata) {
// if there's no channel metadata then it can't be an expiring channel
// nor can we possibly validate it
return;
}
metadata = index.metadata;
if (!_metadata) { return; }
metadata = _metadata;
// don't write messages to expired channels
if (checkExpired(Env, Server, channel)) { return void w.abort(); }
}));
}).nThen(function (w) {
// if there's no validateKey present skip to the next block
if (!metadata.validateKey) { return; }
@ -994,7 +975,6 @@ HK.onChannelMessage = function (Env, Server, channel, msgStruct) {
Log.info("HK_SIGNED_MESSAGE_REJECTED", 'Channel '+channel.id);
return;
}
}));
}).nThen(function () {
// do checkpoint stuff...

Loading…
Cancel
Save