centralize historykeeper-rpc interaction in rpc methods
parent
7d0dbe5d09
commit
802034616c
|
@ -6,7 +6,7 @@ const nThen = require("nthen");
|
|||
const Core = require("./core");
|
||||
const Metadata = require("./metadata");
|
||||
|
||||
Channel.clearOwnedChannel = function (Env, safeKey, channelId, cb) {
|
||||
Channel.clearOwnedChannel = function (Env, safeKey, channelId, cb, Server) {
|
||||
if (typeof(channelId) !== 'string' || channelId.length !== 32) {
|
||||
return cb('INVALID_ARGUMENTS');
|
||||
}
|
||||
|
@ -20,19 +20,46 @@ Channel.clearOwnedChannel = function (Env, safeKey, channelId, cb) {
|
|||
return void cb('INSUFFICIENT_PERMISSIONS');
|
||||
}
|
||||
return void Env.msgStore.clearChannel(channelId, function (e) {
|
||||
cb(e);
|
||||
if (e) { return void cb(e); }
|
||||
cb();
|
||||
|
||||
const channel_cache = Env.historyKeeper.channel_cache;
|
||||
|
||||
const clear = function () {
|
||||
// delete the channel cache because it will have been invalidated
|
||||
delete channel_cache[channelId];
|
||||
};
|
||||
|
||||
nThen(function (w) {
|
||||
Server.getChannelUserList(channelId).forEach(function (userId) {
|
||||
Server.send(userId, [
|
||||
0,
|
||||
Env.historyKeeper.id,
|
||||
'MSG',
|
||||
userId,
|
||||
JSON.stringify({
|
||||
error: 'ECLEARED',
|
||||
channel: channelId
|
||||
})
|
||||
], w());
|
||||
});
|
||||
}).nThen(function () {
|
||||
clear();
|
||||
}).orTimeout(function () {
|
||||
Env.Log.warn("ON_CHANNEL_CLEARED_TIMEOUT", channelId);
|
||||
clear();
|
||||
}, 30000);
|
||||
});
|
||||
});
|
||||
};
|
||||
|
||||
Channel.removeOwnedChannel = function (Env, safeKey, channelId, cb) {
|
||||
Channel.removeOwnedChannel = function (Env, safeKey, channelId, cb, Server) {
|
||||
if (typeof(channelId) !== 'string' || !Core.isValidId(channelId)) {
|
||||
return cb('INVALID_ARGUMENTS');
|
||||
}
|
||||
var unsafeKey = Util.unescapeKeyCharacters(safeKey);
|
||||
|
||||
if (Env.blobStore.isFileId(channelId)) {
|
||||
//var safeKey = Util.escapeKeyCharacters(unsafeKey);
|
||||
var blobId = channelId;
|
||||
|
||||
return void nThen(function (w) {
|
||||
|
@ -89,6 +116,45 @@ Channel.removeOwnedChannel = function (Env, safeKey, channelId, cb) {
|
|||
return void cb(e);
|
||||
}
|
||||
cb(void 0, 'OK');
|
||||
|
||||
const channel_cache = Env.historyKeeper.channel_cache;
|
||||
const metadata_cache = Env.historyKeeper.metadata_cache;
|
||||
|
||||
const clear = function () {
|
||||
delete channel_cache[channelId];
|
||||
Server.clearChannel(channelId);
|
||||
delete metadata_cache[channelId];
|
||||
};
|
||||
|
||||
// an owner of a channel deleted it
|
||||
nThen(function (w) {
|
||||
// close the channel in the store
|
||||
Env.msgStore.closeChannel(channelId, w());
|
||||
}).nThen(function (w) {
|
||||
// Server.channelBroadcast would be better
|
||||
// but we can't trust it to track even one callback,
|
||||
// let alone many in parallel.
|
||||
// so we simulate it on this side to avoid race conditions
|
||||
Server.getChannelUserList(channelId).forEach(function (userId) {
|
||||
Server.send(userId, [
|
||||
0,
|
||||
Env.historyKeeper.id,
|
||||
"MSG",
|
||||
userId,
|
||||
JSON.stringify({
|
||||
error: 'EDELETED',
|
||||
channel: channelId,
|
||||
})
|
||||
], w());
|
||||
});
|
||||
}).nThen(function () {
|
||||
// clear the channel's data from memory
|
||||
// once you've sent everyone a notice that the channel has been deleted
|
||||
clear();
|
||||
}).orTimeout(function () {
|
||||
Env.Log.warn('ON_CHANNEL_DELETED_TIMEOUT', channelId);
|
||||
clear();
|
||||
}, 30000);
|
||||
});
|
||||
});
|
||||
};
|
||||
|
@ -121,6 +187,8 @@ Channel.trimHistory = function (Env, safeKey, data, cb) {
|
|||
// clear historyKeeper's cache for this channel
|
||||
Env.historyKeeper.channelClose(channelId);
|
||||
cb(void 0, 'OK');
|
||||
delete Env.historyKeeper.channel_cache[channelId];
|
||||
delete Env.historyKeeper.metadata_cache[channelId];
|
||||
});
|
||||
});
|
||||
};
|
||||
|
@ -160,7 +228,7 @@ Channel.isNewChannel = function (Env, channel, cb) {
|
|||
|
||||
Otherwise behaves the same as sending to a channel
|
||||
*/
|
||||
Channel.writePrivateMessage = function (Env, args, cb, Server) { // XXX odd signature
|
||||
Channel.writePrivateMessage = function (Env, args, cb, Server) {
|
||||
var channelId = args[0];
|
||||
var msg = args[1];
|
||||
|
||||
|
@ -197,11 +265,10 @@ Channel.writePrivateMessage = function (Env, args, cb, Server) { // XXX odd sign
|
|||
// if the message isn't valid it won't be stored.
|
||||
Env.historyKeeper.channelMessage(Server, channelStruct, fullMessage);
|
||||
|
||||
// call back with the message and the target channel.
|
||||
// historyKeeper will take care of broadcasting it if anyone is in the channel
|
||||
cb(void 0, {
|
||||
channel: channelId,
|
||||
message: fullMessage
|
||||
Server.getChannelUserList(channelId).forEach(function (userId) {
|
||||
Server.send(userId, fullMessage);
|
||||
});
|
||||
|
||||
cb();
|
||||
};
|
||||
|
||||
|
|
|
@ -12,8 +12,7 @@ Data.getMetadata = function (Env, channel, cb/* , Server */) {
|
|||
if (!Core.isValidId(channel)) { return void cb('INVALID_CHAN'); }
|
||||
if (channel.length !== 32) { return cb("INVALID_CHAN_LENGTH"); }
|
||||
|
||||
// XXX get metadata from the server cache if it is available
|
||||
// Server isn't always passed, though...
|
||||
// FIXME get metadata from the server cache if it is available
|
||||
batchMetadata(channel, cb, function (done) {
|
||||
var ref = {};
|
||||
var lineHandler = Meta.createLineHandler(ref, Env.Log.error);
|
||||
|
@ -37,7 +36,7 @@ Data.getMetadata = function (Env, channel, cb/* , Server */) {
|
|||
}
|
||||
*/
|
||||
var queueMetadata = WriteQueue();
|
||||
Data.setMetadata = function (Env, safeKey, data, cb) {
|
||||
Data.setMetadata = function (Env, safeKey, data, cb, Server) {
|
||||
var unsafeKey = Util.unescapeKeyCharacters(safeKey);
|
||||
|
||||
var channel = data.channel;
|
||||
|
@ -108,8 +107,19 @@ Data.setMetadata = function (Env, safeKey, data, cb) {
|
|||
cb(e);
|
||||
return void next();
|
||||
}
|
||||
|
||||
cb(void 0, metadata);
|
||||
next();
|
||||
|
||||
const metadata_cache = Env.historyKeeper.metadata_cache;
|
||||
const channel_cache = Env.historyKeeper.channel_cache;
|
||||
|
||||
metadata_cache[channel] = metadata;
|
||||
|
||||
var index = Util.find(channel_cache, [channel, 'index']);
|
||||
if (index && typeof(index) === 'object') { index.metadata = metadata; }
|
||||
|
||||
Server.channelBroadcast(channel, JSON.stringify(metadata), Env.historyKeeper.id);
|
||||
});
|
||||
});
|
||||
});
|
||||
|
|
|
@ -205,7 +205,6 @@ Pinning.removePins = function (Env, safeKey, cb) {
|
|||
};
|
||||
|
||||
Pinning.trimPins = function (Env, safeKey, cb) {
|
||||
// XXX trim to latest pin checkpoint
|
||||
cb("NOT_IMPLEMENTED");
|
||||
};
|
||||
|
||||
|
@ -453,10 +452,10 @@ Pinning.loadChannelPins = function (Env) {
|
|||
|
||||
Pinning.isChannelPinned = function (Env, channel, cb) {
|
||||
Env.evPinnedPadsReady.reg(() => {
|
||||
if (Env.pinnedPads[channel] && Object.keys(Env.pinnedPads[channel]).length) {
|
||||
if (Env.pinnedPads[channel] && Object.keys(Env.pinnedPads[channel]).length) { // FIXME 'Object.keys' here is overkill. We only need to know that it isn't empty
|
||||
cb(void 0, true);
|
||||
} else {
|
||||
delete Env.pinnedPads[channel]; // XXX WAT
|
||||
delete Env.pinnedPads[channel];
|
||||
cb(void 0, false);
|
||||
}
|
||||
});
|
||||
|
|
|
@ -644,48 +644,6 @@ module.exports.create = function (cfg, cb) {
|
|||
});
|
||||
};
|
||||
|
||||
|
||||
/* onChannelCleared
|
||||
* broadcasts to all clients in a channel if that channel is deleted
|
||||
*/
|
||||
const onChannelCleared = function (Server, channel) {
|
||||
Server.channelBroadcast(channel, {
|
||||
error: 'ECLEARED',
|
||||
channel: channel
|
||||
}, HISTORY_KEEPER_ID);
|
||||
};
|
||||
|
||||
// When a channel is removed from datastore, broadcast a message to all its connected users
|
||||
const onChannelDeleted = function (Server, channel) {
|
||||
store.closeChannel(channel, function () {
|
||||
Server.channelBroadcast(channel, {
|
||||
error: 'EDELETED',
|
||||
channel: channel
|
||||
}, HISTORY_KEEPER_ID);
|
||||
});
|
||||
|
||||
delete channel_cache[channel];
|
||||
Server.clearChannel(channel);
|
||||
delete metadata_cache[channel];
|
||||
};
|
||||
// Check if the selected channel is expired
|
||||
// If it is, remove it from memory and broadcast a message to its members
|
||||
|
||||
const onChannelMetadataChanged = function (Server, channel, metadata) {
|
||||
if (!(channel && metadata_cache[channel] && typeof (metadata) === "object")) { return; }
|
||||
Log.silly('SET_METADATA_CACHE', {
|
||||
channel: channel,
|
||||
metadata: JSON.stringify(metadata),
|
||||
});
|
||||
|
||||
metadata_cache[channel] = metadata;
|
||||
|
||||
if (channel_cache[channel] && channel_cache[channel].index) {
|
||||
channel_cache[channel].index.metadata = metadata;
|
||||
}
|
||||
Server.channelBroadcast(channel, metadata, HISTORY_KEEPER_ID);
|
||||
};
|
||||
|
||||
const handleGetHistory = function (Server, seq, userId, parsed) {
|
||||
// parsed[1] is the channel id
|
||||
// parsed[2] is a validation key or an object containing metadata (optionnal)
|
||||
|
@ -892,37 +850,6 @@ module.exports.create = function (cfg, cb) {
|
|||
Server.send(userId, [0, HISTORY_KEEPER_ID, 'MSG', userId, JSON.stringify([parsed[0], 'ERROR', err])]);
|
||||
return;
|
||||
}
|
||||
var msg = rpc_call[0].slice();
|
||||
if (msg[3] === 'REMOVE_OWNED_CHANNEL') {
|
||||
onChannelDeleted(Server, msg[4]);
|
||||
}
|
||||
if (msg[3] === 'CLEAR_OWNED_CHANNEL') {
|
||||
onChannelCleared(Server, msg[4]);
|
||||
}
|
||||
|
||||
if (msg[3] === 'SET_METADATA') { // or whatever we call the RPC????
|
||||
// make sure we update our cache of metadata
|
||||
// or at least invalidate it and force other mechanisms to recompute its state
|
||||
// 'output' could be the new state as computed by rpc
|
||||
onChannelMetadataChanged(Server, msg[4].channel, output[1]);
|
||||
}
|
||||
|
||||
// unauthenticated RPC calls have a different message format
|
||||
if (msg[0] === "WRITE_PRIVATE_MESSAGE" && output && output.channel) {
|
||||
// clients don't validate messages sent by the historyKeeper
|
||||
// so this broadcast needs to come from a different id
|
||||
// we pass 'null' to indicate that it's not coming from a real user
|
||||
// to ensure that they know not to trust this message
|
||||
Server.getChannelUserList(output.channel).forEach(function (userId) {
|
||||
Server.send(userId, output.message);
|
||||
});
|
||||
|
||||
// rpc and anonRpc expect their responses to be of a certain length
|
||||
// and we've already used the output of the rpc call, so overwrite it
|
||||
output = [null, null, null];
|
||||
}
|
||||
|
||||
// finally, send a response to the client that sent the RPC
|
||||
Server.send(userId, [0, HISTORY_KEEPER_ID, 'MSG', userId, JSON.stringify([parsed[0]].concat(output))]);
|
||||
});
|
||||
} catch (e) {
|
||||
|
@ -967,6 +894,9 @@ module.exports.create = function (cfg, cb) {
|
|||
};
|
||||
|
||||
cfg.historyKeeper = {
|
||||
metadata_cache: metadata_cache,
|
||||
channel_cache: channel_cache,
|
||||
|
||||
id: HISTORY_KEEPER_ID,
|
||||
|
||||
channelMessage: function (Server, channel, msgStruct) {
|
||||
|
|
|
@ -60,6 +60,7 @@ const AUTHENTICATED_USER_TARGETED = {
|
|||
WRITE_LOGIN_BLOCK: Block.writeLoginBlock,
|
||||
REMOVE_LOGIN_BLOCK: Block.removeLoginBlock,
|
||||
ADMIN: Admin.command,
|
||||
SET_METADATA: Metadata.setMetadata,
|
||||
};
|
||||
|
||||
const AUTHENTICATED_USER_SCOPED = {
|
||||
|
@ -70,7 +71,6 @@ const AUTHENTICATED_USER_SCOPED = {
|
|||
EXPIRE_SESSION: Core.expireSessionAsync,
|
||||
REMOVE_PINS: Pinning.removePins,
|
||||
TRIM_PINS: Pinning.trimPins,
|
||||
SET_METADATA: Metadata.setMetadata,
|
||||
COOKIE: Core.haveACookie,
|
||||
};
|
||||
|
||||
|
|
Loading…
Reference in New Issue