fix a WRITE_PRIVATE_MESSAGE rpc regression

pull/1/head
ansuz 5 years ago
parent 4e71b63331
commit 6523974ca2

@ -180,9 +180,9 @@ Channel.writePrivateMessage = function (Env, args, Server, cb) {
msg // the actual message content. Generally a string msg // the actual message content. Generally a string
]; ];
// XXX this API doesn't exist anymore... // historyKeeper already knows how to handle metadata and message validation, so we just pass it off here
// store the message and do everything else that is typically done when going through historyKeeper // if the message isn't valid it won't be stored.
Env.historyKeeper.onChannelMessage(Server, channelStruct, fullMessage); Env.historyKeeper.channelMessage(Server, channelStruct, fullMessage);
// call back with the message and the target channel. // call back with the message and the target channel.
// historyKeeper will take care of broadcasting it if anyone is in the channel // historyKeeper will take care of broadcasting it if anyone is in the channel

@ -685,12 +685,12 @@ module.exports.create = function (cfg, cb) {
Server.channelBroadcast(channel, metadata, HISTORY_KEEPER_ID); Server.channelBroadcast(channel, metadata, HISTORY_KEEPER_ID);
}; };
const handleGetHistory = function (Server, seq, user, parsed) { const handleGetHistory = function (Server, seq, userId, parsed) {
// parsed[1] is the channel id // parsed[1] is the channel id
// parsed[2] is a validation key or an object containing metadata (optionnal) // parsed[2] is a validation key or an object containing metadata (optionnal)
// parsed[3] is the last known hash (optionnal) // parsed[3] is the last known hash (optionnal)
Server.send(user.id, [seq, 'ACK']); Server.send(userId, [seq, 'ACK']);
var channelName = parsed[1]; var channelName = parsed[1];
var config = parsed[2]; var config = parsed[2];
var metadata = {}; var metadata = {};
@ -741,7 +741,7 @@ module.exports.create = function (cfg, cb) {
// FIXME this is hard to read because 'checkExpired' has side effects // FIXME this is hard to read because 'checkExpired' has side effects
if (checkExpired(Server, channelName)) { return void waitFor.abort(); } if (checkExpired(Server, channelName)) { return void waitFor.abort(); }
// always send metadata with GET_HISTORY requests // always send metadata with GET_HISTORY requests
Server.send(user.id, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify(index.metadata)], w); Server.send(userId, [0, HISTORY_KEEPER_ID, 'MSG', userId, JSON.stringify(index.metadata)], w);
})); }));
}).nThen(() => { }).nThen(() => {
let msgCount = 0; let msgCount = 0;
@ -752,18 +752,18 @@ module.exports.create = function (cfg, cb) {
msgCount++; msgCount++;
// avoid sending the metadata message a second time // avoid sending the metadata message a second time
if (isMetadataMessage(msg) && metadata_cache[channelName]) { return readMore(); } if (isMetadataMessage(msg) && metadata_cache[channelName]) { return readMore(); }
Server.send(user.id, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify(msg)], readMore); Server.send(userId, [0, HISTORY_KEEPER_ID, 'MSG', userId, JSON.stringify(msg)], readMore);
}, (err) => { }, (err) => {
if (err && err.code !== 'ENOENT') { if (err && err.code !== 'ENOENT') {
if (err.message !== 'EINVAL') { Log.error("HK_GET_HISTORY", err); } if (err.message !== 'EINVAL') { Log.error("HK_GET_HISTORY", err); }
const parsedMsg = {error:err.message, channel: channelName}; const parsedMsg = {error:err.message, channel: channelName};
Server.send(user.id, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify(parsedMsg)]); Server.send(userId, [0, HISTORY_KEEPER_ID, 'MSG', userId, JSON.stringify(parsedMsg)]);
return; return;
} }
const chan = channel_cache[channelName]; const chan = channel_cache[channelName];
if (msgCount === 0 && !metadata_cache[channelName] && chan && chan.indexOf(user) > -1) { if (msgCount === 0 && !metadata_cache[channelName] && Server.channelContainsUser(channelName, userId)) {
metadata_cache[channelName] = metadata; metadata_cache[channelName] = metadata;
// the index will have already been constructed and cached at this point // the index will have already been constructed and cached at this point
@ -800,22 +800,23 @@ module.exports.create = function (cfg, cb) {
} }
}); });
} }
Server.send(user.id, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify(metadata)]); Server.send(userId, [0, HISTORY_KEEPER_ID, 'MSG', userId, JSON.stringify(metadata)]);
} }
// End of history message: // End of history message:
let parsedMsg = {state: 1, channel: channelName}; let parsedMsg = {state: 1, channel: channelName};
Server.send(user.id, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify(parsedMsg)]); Server.send(userId, [0, HISTORY_KEEPER_ID, 'MSG', userId, JSON.stringify(parsedMsg)]);
}); });
}); });
}; };
const handleGetHistoryRange = function (Server, seq, user, parsed) { const handleGetHistoryRange = function (Server, seq, userId, parsed) {
var channelName = parsed[1]; var channelName = parsed[1];
var map = parsed[2]; var map = parsed[2];
if (!(map && typeof(map) === 'object')) { if (!(map && typeof(map) === 'object')) {
return void Server.send(user.id, [seq, 'ERROR', 'INVALID_ARGS', HISTORY_KEEPER_ID]); return void Server.send(userId, [seq, 'ERROR', 'INVALID_ARGS', HISTORY_KEEPER_ID]);
} }
var oldestKnownHash = map.from; var oldestKnownHash = map.from;
@ -823,14 +824,14 @@ module.exports.create = function (cfg, cb) {
var desiredCheckpoint = map.cpCount; var desiredCheckpoint = map.cpCount;
var txid = map.txid; var txid = map.txid;
if (typeof(desiredMessages) !== 'number' && typeof(desiredCheckpoint) !== 'number') { if (typeof(desiredMessages) !== 'number' && typeof(desiredCheckpoint) !== 'number') {
return void Server.send(user.id, [seq, 'ERROR', 'UNSPECIFIED_COUNT', HISTORY_KEEPER_ID]); return void Server.send(userId, [seq, 'ERROR', 'UNSPECIFIED_COUNT', HISTORY_KEEPER_ID]);
} }
if (!txid) { if (!txid) {
return void Server.send(user.id, [seq, 'ERROR', 'NO_TXID', HISTORY_KEEPER_ID]); return void Server.send(userId, [seq, 'ERROR', 'NO_TXID', HISTORY_KEEPER_ID]);
} }
Server.send(user.id, [seq, 'ACK']); Server.send(userId, [seq, 'ACK']);
return void getOlderHistory(channelName, oldestKnownHash, function (messages) { return void getOlderHistory(channelName, oldestKnownHash, function (messages) {
var toSend = []; var toSend = [];
if (typeof (desiredMessages) === "number") { if (typeof (desiredMessages) === "number") {
@ -846,51 +847,50 @@ module.exports.create = function (cfg, cb) {
} }
} }
toSend.forEach(function (msg) { toSend.forEach(function (msg) {
Server.send(user.id, [0, HISTORY_KEEPER_ID, 'MSG', user.id, Server.send(userId, [0, HISTORY_KEEPER_ID, 'MSG', userId,
JSON.stringify(['HISTORY_RANGE', txid, msg])]); JSON.stringify(['HISTORY_RANGE', txid, msg])]);
}); });
Server.send(user.id, [0, HISTORY_KEEPER_ID, 'MSG', user.id, Server.send(userId, [0, HISTORY_KEEPER_ID, 'MSG', userId,
JSON.stringify(['HISTORY_RANGE_END', txid, channelName]) JSON.stringify(['HISTORY_RANGE_END', txid, channelName])
]); ]);
}); });
}; };
const handleGetFullHistory = function (Server, seq, user, parsed) { const handleGetFullHistory = function (Server, seq, userId, parsed) {
// parsed[1] is the channel id // parsed[1] is the channel id
// parsed[2] is a validation key (optionnal) // parsed[2] is a validation key (optionnal)
// parsed[3] is the last known hash (optionnal) // parsed[3] is the last known hash (optionnal)
Server.send(user.id, [seq, 'ACK']); Server.send(userId, [seq, 'ACK']);
// FIXME should we send metadata here too? // FIXME should we send metadata here too?
// none of the clientside code which uses this API needs metadata, but it won't hurt to send it (2019-08-22) // none of the clientside code which uses this API needs metadata, but it won't hurt to send it (2019-08-22)
return void getHistoryAsync(parsed[1], -1, false, (msg, readMore) => { return void getHistoryAsync(parsed[1], -1, false, (msg, readMore) => {
if (!msg) { return; } if (!msg) { return; }
Server.send(user.id, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify(['FULL_HISTORY', msg])], readMore); Server.send(userId, [0, HISTORY_KEEPER_ID, 'MSG', userId, JSON.stringify(['FULL_HISTORY', msg])], readMore);
}, (err) => { }, (err) => {
let parsedMsg = ['FULL_HISTORY_END', parsed[1]]; let parsedMsg = ['FULL_HISTORY_END', parsed[1]];
if (err) { if (err) {
Log.error('HK_GET_FULL_HISTORY', err.stack); Log.error('HK_GET_FULL_HISTORY', err.stack);
parsedMsg = ['ERROR', parsed[1], err.message]; parsedMsg = ['ERROR', parsed[1], err.message];
} }
Server.send(user.id, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify(parsedMsg)]); Server.send(userId, [0, HISTORY_KEEPER_ID, 'MSG', userId, JSON.stringify(parsedMsg)]);
}); });
}; };
const handleRPC = function (Server, seq, user, parsed) { const handleRPC = function (Server, seq, userId, parsed) {
if (typeof(rpc) !== 'function') { return; } if (typeof(rpc) !== 'function') { return; }
/* RPC Calls... */ /* RPC Calls... */
var rpc_call = parsed.slice(1); var rpc_call = parsed.slice(1);
// XXX ensure user is guaranteed to have 'id' Server.send(userId, [seq, 'ACK']);
Server.send(user.id, [seq, 'ACK']);
try { try {
// slice off the sequence number and pass in the rest of the message // slice off the sequence number and pass in the rest of the message
rpc(Server, rpc_call, function (err, output) { rpc(Server, rpc_call, function (err, output) {
if (err) { if (err) {
Server.send(user.id, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify([parsed[0], 'ERROR', err])]); Server.send(userId, [0, HISTORY_KEEPER_ID, 'MSG', userId, JSON.stringify([parsed[0], 'ERROR', err])]);
return; return;
} }
var msg = rpc_call[0].slice(); var msg = rpc_call[0].slice();
@ -910,26 +910,25 @@ module.exports.create = function (cfg, cb) {
// unauthenticated RPC calls have a different message format // unauthenticated RPC calls have a different message format
if (msg[0] === "WRITE_PRIVATE_MESSAGE" && output && output.channel) { if (msg[0] === "WRITE_PRIVATE_MESSAGE" && output && output.channel) {
// this is an inline reimplementation of historyKeeperBroadcast // clients don't validate messages sent by the historyKeeper
// because if we use that directly it will bypass signature validation // so this broadcast needs to come from a different id
// which opens up the user to malicious behaviour // we pass 'null' to indicate that it's not coming from a real user
let chan = channel_cache[output.channel]; // to ensure that they know not to trust this message
if (chan && chan.length) { Server.getChannelUserList(output.channel).forEach(function (userId) {
chan.forEach(function (user) { Server.send(userId, output.message);
Server.send(user.id, output.message); });
//[0, null, 'MSG', user.id, JSON.stringify(output.message)]);
});
}
// rpc and anonRpc expect their responses to be of a certain length // rpc and anonRpc expect their responses to be of a certain length
// and we've already used the output of the rpc call, so overwrite it // and we've already used the output of the rpc call, so overwrite it
output = [null, null, null]; output = [null, null, null];
} }
// finally, send a response to the client that sent the RPC // finally, send a response to the client that sent the RPC
Server.send(user.id, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify([parsed[0]].concat(output))]); Server.send(userId, [0, HISTORY_KEEPER_ID, 'MSG', userId, JSON.stringify([parsed[0]].concat(output))]);
}); });
} catch (e) { } catch (e) {
Server.send(user.id, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify([parsed[0], 'ERROR', 'SERVER_ERROR'])]); // if anything throws in the middle, send an error
Server.send(userId, [0, HISTORY_KEEPER_ID, 'MSG', userId, JSON.stringify([parsed[0], 'ERROR', 'SERVER_ERROR'])]);
} }
}; };
@ -945,7 +944,7 @@ module.exports.create = function (cfg, cb) {
* check if it's expired and execute all the associated side-effects * check if it's expired and execute all the associated side-effects
* routes queries to the appropriate handlers * routes queries to the appropriate handlers
*/ */
const onDirectMessage = function (Server, seq, user, json) { const onDirectMessage = function (Server, seq, userId, json) {
Log.silly('HK_MESSAGE', json); Log.silly('HK_MESSAGE', json);
let parsed; let parsed;
@ -965,7 +964,7 @@ module.exports.create = function (cfg, cb) {
var command = directMessageCommands[parsed[0]] || handleRPC; var command = directMessageCommands[parsed[0]] || handleRPC;
// run the command with the standard function signature // run the command with the standard function signature
command(Server, seq, user, parsed); command(Server, seq, userId, parsed);
}; };
cfg.historyKeeper = { cfg.historyKeeper = {
@ -991,10 +990,10 @@ module.exports.create = function (cfg, cb) {
channelName channelName
]); ]);
}, },
directMessage: function (Server, seq, user, json) { directMessage: function (Server, seq, userId, json) {
// netflux-server allows you to register an id with a handler // netflux-server allows you to register an id with a handler
// this handler is invoked every time someone sends a message to that id // this handler is invoked every time someone sends a message to that id
onDirectMessage(Server, seq, user, json); onDirectMessage(Server, seq, userId, json);
}, },
}; };

@ -287,7 +287,7 @@ var rpc = function (Env, Server, data, respond) {
} }
if (isUnauthenticateMessage(msg)) { if (isUnauthenticateMessage(msg)) {
return handleUnauthenticatedMessage(Env, msg, respond); return handleUnauthenticatedMessage(Env, msg, respond, Server);
} }
var signature = msg.shift(); var signature = msg.shift();

Loading…
Cancel
Save