|
|
|
@ -4,7 +4,7 @@ var HK = module.exports;
|
|
|
|
|
|
|
|
|
|
const nThen = require('nthen');
|
|
|
|
|
const Util = require("./common-util");
|
|
|
|
|
const Meta = require("./metadata");
|
|
|
|
|
const MetaRPC = require("./commands/metadata");
|
|
|
|
|
const Nacl = require('tweetnacl/nacl-fast');
|
|
|
|
|
|
|
|
|
|
const now = function () { return (new Date()).getTime(); };
|
|
|
|
@ -71,10 +71,37 @@ const sliceCpIndex = function (cpIndex, line) {
|
|
|
|
|
return start.concat(end);
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
const isMetadataMessage = function (parsed) {
|
|
|
|
|
const isMetadataMessage = HK.isMetadataMessage = function (parsed) {
|
|
|
|
|
return Boolean(parsed && parsed.channel);
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
HK.listAllowedUsers = function (metadata) {
|
|
|
|
|
return (metadata.owners || []).concat((metadata.allowed || []));
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
HK.getNetfluxSession = function (Env, netfluxId) {
|
|
|
|
|
return Env.netfluxUsers[netfluxId];
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
HK.isUserSessionAllowed = function (allowed, session) {
|
|
|
|
|
if (!session) { return false; }
|
|
|
|
|
for (var unsafeKey in session) {
|
|
|
|
|
if (allowed.indexOf(unsafeKey) !== -1) {
|
|
|
|
|
return true;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return false;
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
HK.authenticateNetfluxSession = function (Env, netfluxId, unsafeKey) {
|
|
|
|
|
var user = Env.netfluxUsers[netfluxId] = Env.netfluxUsers[netfluxId] || {};
|
|
|
|
|
user[unsafeKey] = +new Date();
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
HK.closeNetfluxSession = function (Env, netfluxId) {
|
|
|
|
|
delete Env.netfluxUsers[netfluxId];
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
// validateKeyStrings supplied by clients must decode to 32-byte Uint8Arrays
|
|
|
|
|
const isValidValidateKeyString = function (key) {
|
|
|
|
|
try {
|
|
|
|
@ -151,6 +178,29 @@ const checkExpired = function (Env, Server, channel) {
|
|
|
|
|
return true;
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
const getMetadata = HK.getMetadata = function (Env, channelName, _cb) {
|
|
|
|
|
var cb = Util.once(Util.mkAsync(_cb));
|
|
|
|
|
|
|
|
|
|
var metadata = Env.metadata_cache[channelName];
|
|
|
|
|
if (metadata && typeof(metadata) === 'object') {
|
|
|
|
|
return void cb(undefined, metadata);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
MetaRPC.getMetadataRaw(Env, channelName, function (err, metadata) {
|
|
|
|
|
if (err) {
|
|
|
|
|
console.error(err);
|
|
|
|
|
return void cb(err);
|
|
|
|
|
}
|
|
|
|
|
if (!(metadata && typeof(metadata.channel) === 'string' && metadata.channel.length === STANDARD_CHANNEL_LENGTH)) {
|
|
|
|
|
return cb();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// cache it
|
|
|
|
|
Env.metadata_cache[channelName] = metadata;
|
|
|
|
|
cb(undefined, metadata);
|
|
|
|
|
});
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
/* computeIndex
|
|
|
|
|
can call back with an error or a computed index which includes:
|
|
|
|
|
* cpIndex:
|
|
|
|
@ -180,13 +230,19 @@ const computeIndex = function (Env, channelName, cb) {
|
|
|
|
|
let metadata;
|
|
|
|
|
let i = 0;
|
|
|
|
|
|
|
|
|
|
const ref = {};
|
|
|
|
|
|
|
|
|
|
const CB = Util.once(cb);
|
|
|
|
|
|
|
|
|
|
const offsetByHash = {};
|
|
|
|
|
let size = 0;
|
|
|
|
|
nThen(function (w) {
|
|
|
|
|
getMetadata(Env, channelName, w(function (err, _metadata) {
|
|
|
|
|
if (err) {
|
|
|
|
|
console.log(err);
|
|
|
|
|
throw new Error(err); // XXX
|
|
|
|
|
}
|
|
|
|
|
metadata = _metadata;
|
|
|
|
|
}));
|
|
|
|
|
}).nThen(function (w) {
|
|
|
|
|
// iterate over all messages in the channel log
|
|
|
|
|
// old channels can contain metadata as the first message of the log
|
|
|
|
|
// remember metadata the first time you encounter it
|
|
|
|
@ -195,14 +251,15 @@ const computeIndex = function (Env, channelName, cb) {
|
|
|
|
|
let msg;
|
|
|
|
|
// keep an eye out for the metadata line if you haven't already seen it
|
|
|
|
|
// but only check for metadata on the first line
|
|
|
|
|
if (!i && !metadata && msgObj.buff.indexOf('{') === 0) {
|
|
|
|
|
if (!i && msgObj.buff.indexOf('{') === 0) { // XXX RESTRICT metadata...
|
|
|
|
|
i++; // always increment the message counter
|
|
|
|
|
msg = tryParse(Env, msgObj.buff.toString('utf8'));
|
|
|
|
|
if (typeof msg === "undefined") { return readMore(); }
|
|
|
|
|
|
|
|
|
|
// validate that the current line really is metadata before storing it as such
|
|
|
|
|
if (isMetadataMessage(msg)) {
|
|
|
|
|
metadata = msg;
|
|
|
|
|
if (isMetadataMessage(msg)) { // XXX RESTRICT
|
|
|
|
|
//metadata = msg; // XXX RESTRICT
|
|
|
|
|
// skip this, as you already have metadata...
|
|
|
|
|
return readMore();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
@ -245,26 +302,8 @@ const computeIndex = function (Env, channelName, cb) {
|
|
|
|
|
size = msgObj.offset + msgObj.buff.length + 1;
|
|
|
|
|
});
|
|
|
|
|
}));
|
|
|
|
|
}).nThen(function (w) {
|
|
|
|
|
// create a function which will iterate over amendments to the metadata
|
|
|
|
|
const handler = Meta.createLineHandler(ref, Log.error);
|
|
|
|
|
|
|
|
|
|
// initialize the accumulator in case there was a foundational metadata line in the log content
|
|
|
|
|
if (metadata) { handler(void 0, metadata); }
|
|
|
|
|
|
|
|
|
|
// iterate over the dedicated metadata log (if it exists)
|
|
|
|
|
// proceed even in the event of a stream error on the metadata log
|
|
|
|
|
store.readDedicatedMetadata(channelName, handler, w(function (err) {
|
|
|
|
|
if (err) {
|
|
|
|
|
return void Log.error("DEDICATED_METADATA_ERROR", err);
|
|
|
|
|
}
|
|
|
|
|
}));
|
|
|
|
|
}).nThen(function () {
|
|
|
|
|
// when all is done, cache the metadata in memory
|
|
|
|
|
if (ref.index) { // but don't bother if no metadata was found...
|
|
|
|
|
metadata = Env.metadata_cache[channelName] = ref.meta;
|
|
|
|
|
}
|
|
|
|
|
// and return the computed index
|
|
|
|
|
// return the computed index
|
|
|
|
|
CB(null, {
|
|
|
|
|
// Only keep the checkpoints included in the last 100 messages
|
|
|
|
|
cpIndex: sliceCpIndex(cpIndex, i),
|
|
|
|
@ -293,9 +332,7 @@ const getIndex = (Env, channelName, cb) => {
|
|
|
|
|
// if there is a channel in memory and it has an index cached, return it
|
|
|
|
|
if (chan && chan.index) {
|
|
|
|
|
// enforce async behaviour
|
|
|
|
|
return void setTimeout(function () {
|
|
|
|
|
cb(undefined, chan.index);
|
|
|
|
|
});
|
|
|
|
|
return void Util.mkAsync(cb)(undefined, chan.index);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
Env.batchIndexReads(channelName, cb, function (done) {
|
|
|
|
@ -569,7 +606,7 @@ const handleRPC = function (Env, Server, seq, userId, parsed) {
|
|
|
|
|
Server.send(userId, [seq, 'ACK']);
|
|
|
|
|
try {
|
|
|
|
|
// slice off the sequence number and pass in the rest of the message
|
|
|
|
|
Env.rpc(Server, rpc_call, function (err, output) {
|
|
|
|
|
Env.rpc(Server, userId, rpc_call, function (err, output) {
|
|
|
|
|
if (err) {
|
|
|
|
|
Server.send(userId, [0, HISTORY_KEEPER_ID, 'MSG', userId, JSON.stringify([parsed[0], 'ERROR', err])]);
|
|
|
|
|
return;
|
|
|
|
@ -646,6 +683,7 @@ const handleGetHistory = function (Env, Server, seq, userId, parsed) {
|
|
|
|
|
// And then check if the channel is expired. If it is, send the error and abort
|
|
|
|
|
// FIXME this is hard to read because 'checkExpired' has side effects
|
|
|
|
|
if (checkExpired(Env, Server, channelName)) { return void waitFor.abort(); }
|
|
|
|
|
|
|
|
|
|
// always send metadata with GET_HISTORY requests
|
|
|
|
|
Server.send(userId, [0, HISTORY_KEEPER_ID, 'MSG', userId, JSON.stringify(index.metadata)], w);
|
|
|
|
|
}));
|
|
|
|
@ -662,7 +700,7 @@ const handleGetHistory = function (Env, Server, seq, userId, parsed) {
|
|
|
|
|
}, (err) => {
|
|
|
|
|
if (err && err.code !== 'ENOENT') {
|
|
|
|
|
if (err.message !== 'EINVAL') { Log.error("HK_GET_HISTORY", err); }
|
|
|
|
|
const parsedMsg = {error:err.message, channel: channelName, txid: txid};
|
|
|
|
|
const parsedMsg = {error:err.message, channel: channelName, txid: txid}; // XXX history retrieval error format
|
|
|
|
|
Server.send(userId, [0, HISTORY_KEEPER_ID, 'MSG', userId, JSON.stringify(parsedMsg)]);
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
@ -789,9 +827,9 @@ const handleGetFullHistory = function (Env, Server, seq, userId, parsed) {
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
const directMessageCommands = {
|
|
|
|
|
GET_HISTORY: handleGetHistory,
|
|
|
|
|
GET_HISTORY_RANGE: handleGetHistoryRange,
|
|
|
|
|
GET_FULL_HISTORY: handleGetFullHistory,
|
|
|
|
|
GET_HISTORY: handleGetHistory, // XXX RESTRICT
|
|
|
|
|
GET_HISTORY_RANGE: handleGetHistoryRange, // XXX RESTRICT
|
|
|
|
|
GET_FULL_HISTORY: handleGetFullHistory, // XXX RESTRICT
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
/* onDirectMessage
|
|
|
|
@ -812,16 +850,63 @@ HK.onDirectMessage = function (Env, Server, seq, userId, json) {
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// If the requested history is for an expired channel, abort
|
|
|
|
|
// Note the if we don't have the keys for that channel in metadata_cache, we'll
|
|
|
|
|
// have to abort later (once we know the expiration time)
|
|
|
|
|
if (checkExpired(Env, Server, parsed[1])) { return; }
|
|
|
|
|
var first = parsed[0];
|
|
|
|
|
|
|
|
|
|
// look up the appropriate command in the map of commands or fall back to RPC
|
|
|
|
|
var command = directMessageCommands[parsed[0]] || handleRPC;
|
|
|
|
|
if (typeof(directMessageCommands[first]) !== 'function') {
|
|
|
|
|
// it's either an unsupported command or an RPC call
|
|
|
|
|
// either way, RPC has it covered
|
|
|
|
|
return void handleRPC(Env, Server, seq, userId, parsed);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// otherwise it's some kind of history retrieval command...
|
|
|
|
|
// go grab its metadata, because unfortunately people can ask for history
|
|
|
|
|
// whether or not they have joined the channel, so we can't rely on JOIN restriction
|
|
|
|
|
// to stop people from loading history they shouldn't see.
|
|
|
|
|
var channelName = parsed[1];
|
|
|
|
|
nThen(function (w) {
|
|
|
|
|
HK.getMetadata(Env, channelName, w(function (err, metadata) {
|
|
|
|
|
if (err) {
|
|
|
|
|
// stream errors?
|
|
|
|
|
// we should log these, but if we can't load metadata
|
|
|
|
|
// then it's probably not restricted or expired
|
|
|
|
|
// it's not like anything else will recover from this anyway
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// run the command with the standard function signature
|
|
|
|
|
command(Env, Server, seq, userId, parsed);
|
|
|
|
|
|
|
|
|
|
// likewise, we can't do anything more here if there's no metadata
|
|
|
|
|
// jump to the next block
|
|
|
|
|
if (!metadata) { return; }
|
|
|
|
|
|
|
|
|
|
// If the requested history is for an expired channel, abort
|
|
|
|
|
// checkExpired has side effects and will disconnect users for you...
|
|
|
|
|
if (checkExpired(Env, Server, parsed[1])) {
|
|
|
|
|
// if the channel is expired just abort.
|
|
|
|
|
w.abort();
|
|
|
|
|
// XXX what do we tell the person who asked?
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// jump to handling the command if there's no restriction...
|
|
|
|
|
if (!metadata.restricted) { return; }
|
|
|
|
|
|
|
|
|
|
// check if the user is in the allow list...
|
|
|
|
|
const allowed = HK.listAllowedUsers(metadata);
|
|
|
|
|
const session = HK.getNetfluxSession(Env, userId);
|
|
|
|
|
|
|
|
|
|
if (HK.isUserSessionAllowed(allowed, session)) {
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// XXX NOT ALLOWED
|
|
|
|
|
// respond to txid with error as in handleGetHistory
|
|
|
|
|
// send the allow list anyway, it might not get used currently
|
|
|
|
|
// but will in the future
|
|
|
|
|
}));
|
|
|
|
|
}).nThen(function () {
|
|
|
|
|
// run the appropriate command from the map
|
|
|
|
|
directMessageCommands[first](Env, Server, seq, userId, parsed);
|
|
|
|
|
});
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
/* onChannelMessage
|
|
|
|
|