|
|
@ -58,7 +58,7 @@ module.exports.create = function (cfg) {
|
|
|
|
|
|
|
|
|
|
|
|
Log.silly('HK_LOADING', 'LOADING HISTORY_KEEPER MODULE');
|
|
|
|
Log.silly('HK_LOADING', 'LOADING HISTORY_KEEPER MODULE');
|
|
|
|
|
|
|
|
|
|
|
|
const historyKeeperKeys = {};
|
|
|
|
const metadata_cache = {};
|
|
|
|
const HISTORY_KEEPER_ID = Crypto.randomBytes(8).toString('hex');
|
|
|
|
const HISTORY_KEEPER_ID = Crypto.randomBytes(8).toString('hex');
|
|
|
|
|
|
|
|
|
|
|
|
Log.verbose('HK_ID', 'History keeper ID: ' + HISTORY_KEEPER_ID);
|
|
|
|
Log.verbose('HK_ID', 'History keeper ID: ' + HISTORY_KEEPER_ID);
|
|
|
@ -93,7 +93,6 @@ module.exports.create = function (cfg) {
|
|
|
|
const computeIndex = function (channelName, cb) {
|
|
|
|
const computeIndex = function (channelName, cb) {
|
|
|
|
const cpIndex = [];
|
|
|
|
const cpIndex = [];
|
|
|
|
let messageBuf = [];
|
|
|
|
let messageBuf = [];
|
|
|
|
let validateKey;
|
|
|
|
|
|
|
|
let metadata; // FIXME METADATA READ
|
|
|
|
let metadata; // FIXME METADATA READ
|
|
|
|
let i = 0;
|
|
|
|
let i = 0;
|
|
|
|
|
|
|
|
|
|
|
@ -107,12 +106,11 @@ module.exports.create = function (cfg) {
|
|
|
|
store.readMessagesBin(channelName, 0, (msgObj, rmcb) => {
|
|
|
|
store.readMessagesBin(channelName, 0, (msgObj, rmcb) => {
|
|
|
|
let msg;
|
|
|
|
let msg;
|
|
|
|
i++;
|
|
|
|
i++;
|
|
|
|
if (!validateKey && msgObj.buff.indexOf('validateKey') > -1) {
|
|
|
|
if (!metadata && msgObj.buff.indexOf('validateKey') > -1) {
|
|
|
|
metadata = msg = tryParse(msgObj.buff.toString('utf8')); // FIXME METADATA READ
|
|
|
|
metadata = msg = tryParse(msgObj.buff.toString('utf8')); // FIXME METADATA READ
|
|
|
|
if (typeof msg === "undefined") { return rmcb(); }
|
|
|
|
if (typeof msg === "undefined") { return rmcb(); }
|
|
|
|
if (msg.validateKey) {
|
|
|
|
if (msg.validateKey) {
|
|
|
|
// XXX this variable name is very misleading as it's actually the metadata, not the validateKey
|
|
|
|
metadata_cache[channelName] = msg;
|
|
|
|
validateKey = historyKeeperKeys[channelName] = msg;
|
|
|
|
|
|
|
|
return rmcb();
|
|
|
|
return rmcb();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
@ -278,8 +276,8 @@ module.exports.create = function (cfg) {
|
|
|
|
|
|
|
|
|
|
|
|
const isCp = /^cp\|/.test(msgStruct[4]);
|
|
|
|
const isCp = /^cp\|/.test(msgStruct[4]);
|
|
|
|
// FIXME METADATA
|
|
|
|
// FIXME METADATA
|
|
|
|
if (historyKeeperKeys[channel.id] && historyKeeperKeys[channel.id].expire &&
|
|
|
|
if (metadata_cache[channel.id] && metadata_cache[channel.id].expire &&
|
|
|
|
historyKeeperKeys[channel.id].expire < +new Date()) {
|
|
|
|
metadata_cache[channel.id].expire < +new Date()) {
|
|
|
|
return; // Don't store messages on expired channel
|
|
|
|
return; // Don't store messages on expired channel
|
|
|
|
}
|
|
|
|
}
|
|
|
|
let id;
|
|
|
|
let id;
|
|
|
@ -291,12 +289,12 @@ module.exports.create = function (cfg) {
|
|
|
|
return;
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if (historyKeeperKeys[channel.id] && historyKeeperKeys[channel.id].validateKey) {
|
|
|
|
if (metadata_cache[channel.id] && metadata_cache[channel.id].validateKey) {
|
|
|
|
/*::if (typeof(msgStruct[4]) !== 'string') { throw new Error(); }*/
|
|
|
|
/*::if (typeof(msgStruct[4]) !== 'string') { throw new Error(); }*/
|
|
|
|
let signedMsg = (isCp) ? msgStruct[4].replace(CHECKPOINT_PATTERN, '') : msgStruct[4];
|
|
|
|
let signedMsg = (isCp) ? msgStruct[4].replace(CHECKPOINT_PATTERN, '') : msgStruct[4];
|
|
|
|
signedMsg = Nacl.util.decodeBase64(signedMsg);
|
|
|
|
signedMsg = Nacl.util.decodeBase64(signedMsg);
|
|
|
|
// FIXME performance: cache the decoded key instead of decoding it every time
|
|
|
|
// FIXME performance: cache the decoded key instead of decoding it every time
|
|
|
|
const validateKey = Nacl.util.decodeBase64(historyKeeperKeys[channel.id].validateKey);
|
|
|
|
const validateKey = Nacl.util.decodeBase64(metadata_cache[channel.id].validateKey);
|
|
|
|
const validated = Nacl.sign.open(signedMsg, validateKey);
|
|
|
|
const validated = Nacl.sign.open(signedMsg, validateKey);
|
|
|
|
if (!validated) {
|
|
|
|
if (!validated) {
|
|
|
|
Log.info("HK_SIGNED_MESSAGE_REJECTED", 'Channel '+channel.id);
|
|
|
|
Log.info("HK_SIGNED_MESSAGE_REJECTED", 'Channel '+channel.id);
|
|
|
@ -324,7 +322,7 @@ module.exports.create = function (cfg) {
|
|
|
|
* the netflux server manages other memory in ctx.channels
|
|
|
|
* the netflux server manages other memory in ctx.channels
|
|
|
|
*/
|
|
|
|
*/
|
|
|
|
const dropChannel = function (chanName) {
|
|
|
|
const dropChannel = function (chanName) {
|
|
|
|
delete historyKeeperKeys[chanName];
|
|
|
|
delete metadata_cache[chanName];
|
|
|
|
};
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
|
|
/* getHistoryOffset
|
|
|
|
/* getHistoryOffset
|
|
|
@ -458,7 +456,7 @@ module.exports.create = function (cfg) {
|
|
|
|
if (typeof parsed === "undefined") { return; }
|
|
|
|
if (typeof parsed === "undefined") { return; }
|
|
|
|
|
|
|
|
|
|
|
|
if (parsed.validateKey) {
|
|
|
|
if (parsed.validateKey) {
|
|
|
|
historyKeeperKeys[channelName] = parsed;
|
|
|
|
metadata_cache[channelName] = parsed;
|
|
|
|
return;
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
@ -517,7 +515,7 @@ module.exports.create = function (cfg) {
|
|
|
|
});
|
|
|
|
});
|
|
|
|
});
|
|
|
|
});
|
|
|
|
delete ctx.channels[channel];
|
|
|
|
delete ctx.channels[channel];
|
|
|
|
delete historyKeeperKeys[channel];
|
|
|
|
delete metadata_cache[channel];
|
|
|
|
};
|
|
|
|
};
|
|
|
|
// Check if the selected channel is expired
|
|
|
|
// Check if the selected channel is expired
|
|
|
|
// If it is, remove it from memory and broadcast a message to its members
|
|
|
|
// If it is, remove it from memory and broadcast a message to its members
|
|
|
@ -534,8 +532,8 @@ module.exports.create = function (cfg) {
|
|
|
|
FIXME the boolean nature of this API should be separated from its side effects
|
|
|
|
FIXME the boolean nature of this API should be separated from its side effects
|
|
|
|
*/
|
|
|
|
*/
|
|
|
|
const checkExpired = function (ctx, channel) {
|
|
|
|
const checkExpired = function (ctx, channel) {
|
|
|
|
if (channel && channel.length === STANDARD_CHANNEL_LENGTH && historyKeeperKeys[channel] &&
|
|
|
|
if (channel && channel.length === STANDARD_CHANNEL_LENGTH && metadata_cache[channel] &&
|
|
|
|
historyKeeperKeys[channel].expire && historyKeeperKeys[channel].expire < +new Date()) {
|
|
|
|
metadata_cache[channel].expire && metadata_cache[channel].expire < +new Date()) {
|
|
|
|
store.closeChannel(channel, function () {
|
|
|
|
store.closeChannel(channel, function () {
|
|
|
|
historyKeeperBroadcast(ctx, channel, {
|
|
|
|
historyKeeperBroadcast(ctx, channel, {
|
|
|
|
error: 'EEXPIRED',
|
|
|
|
error: 'EEXPIRED',
|
|
|
@ -543,7 +541,7 @@ module.exports.create = function (cfg) {
|
|
|
|
});
|
|
|
|
});
|
|
|
|
});
|
|
|
|
});
|
|
|
|
delete ctx.channels[channel];
|
|
|
|
delete ctx.channels[channel];
|
|
|
|
delete historyKeeperKeys[channel];
|
|
|
|
delete metadata_cache[channel];
|
|
|
|
return true;
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return;
|
|
|
|
return;
|
|
|
@ -577,7 +575,7 @@ module.exports.create = function (cfg) {
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// If the requested history is for an expired channel, abort
|
|
|
|
// If the requested history is for an expired channel, abort
|
|
|
|
// Note the if we don't have the keys for that channel in historyKeeperKeys, we'll
|
|
|
|
// Note the if we don't have the keys for that channel in metadata_cache, we'll
|
|
|
|
// have to abort later (once we know the expiration time)
|
|
|
|
// have to abort later (once we know the expiration time)
|
|
|
|
if (checkExpired(ctx, parsed[1])) { return; }
|
|
|
|
if (checkExpired(ctx, parsed[1])) { return; }
|
|
|
|
|
|
|
|
|
|
|
@ -631,8 +629,8 @@ module.exports.create = function (cfg) {
|
|
|
|
if (err) { return w(); }
|
|
|
|
if (err) { return w(); }
|
|
|
|
if (!index || !index.metadata) { return void w(); } // FIXME METADATA READ
|
|
|
|
if (!index || !index.metadata) { return void w(); } // FIXME METADATA READ
|
|
|
|
// Store the metadata if we don't have it in memory
|
|
|
|
// Store the metadata if we don't have it in memory
|
|
|
|
if (!historyKeeperKeys[channelName]) {
|
|
|
|
if (!metadata_cache[channelName]) {
|
|
|
|
historyKeeperKeys[channelName] = index.metadata; // FIXME METADATA STORE
|
|
|
|
metadata_cache[channelName] = index.metadata; // FIXME METADATA STORE
|
|
|
|
}
|
|
|
|
}
|
|
|
|
// And then check if the channel is expired. If it is, send the error and abort
|
|
|
|
// And then check if the channel is expired. If it is, send the error and abort
|
|
|
|
if (checkExpired(ctx, channelName)) { return void waitFor.abort(); }
|
|
|
|
if (checkExpired(ctx, channelName)) { return void waitFor.abort(); }
|
|
|
@ -651,7 +649,7 @@ module.exports.create = function (cfg) {
|
|
|
|
if (msg.validateKey) {
|
|
|
|
if (msg.validateKey) {
|
|
|
|
// If it is a young channel, this is the part where we get the metadata
|
|
|
|
// If it is a young channel, this is the part where we get the metadata
|
|
|
|
// Check if the channel is expired and abort if it is.
|
|
|
|
// Check if the channel is expired and abort if it is.
|
|
|
|
if (!historyKeeperKeys[channelName]) { historyKeeperKeys[channelName] = msg; }
|
|
|
|
if (!metadata_cache[channelName]) { metadata_cache[channelName] = msg; }
|
|
|
|
expired = checkExpired(ctx, channelName);
|
|
|
|
expired = checkExpired(ctx, channelName);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if (expired) { return void cb(); }
|
|
|
|
if (expired) { return void cb(); }
|
|
|
@ -675,7 +673,7 @@ module.exports.create = function (cfg) {
|
|
|
|
const chan = ctx.channels[channelName];
|
|
|
|
const chan = ctx.channels[channelName];
|
|
|
|
|
|
|
|
|
|
|
|
// XXX check metadata message count too...
|
|
|
|
// XXX check metadata message count too...
|
|
|
|
if (msgCount === 0 && !historyKeeperKeys[channelName] && chan && chan.indexOf(user) > -1) {
|
|
|
|
if (msgCount === 0 && !metadata_cache[channelName] && chan && chan.indexOf(user) > -1) {
|
|
|
|
var metadata = {};
|
|
|
|
var metadata = {};
|
|
|
|
metadata.channel = channelName;
|
|
|
|
metadata.channel = channelName;
|
|
|
|
if (validateKey) {
|
|
|
|
if (validateKey) {
|
|
|
@ -687,7 +685,7 @@ module.exports.create = function (cfg) {
|
|
|
|
if (expire) {
|
|
|
|
if (expire) {
|
|
|
|
metadata.expire = expire;
|
|
|
|
metadata.expire = expire;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
historyKeeperKeys[channelName] = metadata;
|
|
|
|
metadata_cache[channelName] = metadata;
|
|
|
|
storeMessage(ctx, chan, JSON.stringify(metadata), false, undefined); // FIXME METADATA WRITE
|
|
|
|
storeMessage(ctx, chan, JSON.stringify(metadata), false, undefined); // FIXME METADATA WRITE
|
|
|
|
sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify(metadata)]); // FIXME METADATA SEND
|
|
|
|
sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify(metadata)]); // FIXME METADATA SEND
|
|
|
|
}
|
|
|
|
}
|
|
|
|