WIP allow list changes

pull/1/head
ansuz 5 years ago
parent 597f417ad6
commit 791aad53f2

@ -6,17 +6,15 @@ const BatchRead = require("../batch-read");
const WriteQueue = require("../write-queue"); const WriteQueue = require("../write-queue");
const Core = require("./core"); const Core = require("./core");
const Util = require("../common-util"); const Util = require("../common-util");
const HK = require("../hk-util");
const batchMetadata = BatchRead("GET_METADATA");
Data.getMetadata = function (Env, channel, cb/* , Server */) { Data.getMetadata = function (Env, channel, cb/* , Server */) {
if (!Core.isValidId(channel)) { return void cb('INVALID_CHAN'); } if (!Core.isValidId(channel)) { return void cb('INVALID_CHAN'); }
if (channel.length !== 32) { return cb("INVALID_CHAN_LENGTH"); } if (channel.length !== HK.STANDARD_CHANNEL_LENGTH) { return cb("INVALID_CHAN_LENGTH"); }
// XXX get metadata from the server cache if it is available Env.batchMetadata(channel, cb, function (done) {
batchMetadata(channel, cb, function (done) {
var ref = {}; var ref = {};
var lineHandler = Meta.createLineHandler(ref, Env.Log.error); var lineHandler = Meta.createLineHandler(ref, Env.Log.error);
return void Env.msgStore.readChannelMetadata(channel, lineHandler, function (err) { return void Env.msgStore.readChannelMetadata(channel, lineHandler, function (err) {
if (err) { if (err) {
// stream errors? // stream errors?
@ -118,6 +116,9 @@ Data.setMetadata = function (Env, safeKey, data, cb, Server) {
// kick any current users from the channel // kick any current users from the channel
// if they aren't on it. // if they aren't on it.
// review Server.channelBroadcast as used for EEXPIRED
// send them to the user in question, from historyKeeper
cb(void 0, metadata); cb(void 0, metadata);
next(); next();

@ -38,6 +38,7 @@ module.exports.create = function (config, cb) {
channel_cache: {}, channel_cache: {},
queueStorage: WriteQueue(), queueStorage: WriteQueue(),
batchIndexReads: BatchRead("HK_GET_INDEX"), batchIndexReads: BatchRead("HK_GET_INDEX"),
batchMetadata: BatchRead('GET_METADATA'),
//historyKeeper: config.historyKeeper, //historyKeeper: config.historyKeeper,
intervals: config.intervals || {}, intervals: config.intervals || {},
@ -115,22 +116,23 @@ module.exports.create = function (config, cb) {
channelOpen: function (Server, channelName, userId, wait) { channelOpen: function (Server, channelName, userId, wait) {
Env.channel_cache[channelName] = Env.channel_cache[channelName] || {}; Env.channel_cache[channelName] = Env.channel_cache[channelName] || {};
var proceed = function () { var next = wait();
Server.send(userId, [ var cb = function (err, info) {
0, next(err, info, function () {
Env.id, Server.send(userId, [
'JOIN', 0,
channelName Env.id,
]); 'JOIN',
channelName
]);
});
}; };
// only conventional channels can be restricted // only conventional channels can be restricted
if ((channelName || "").length !== 32) { // XXX use contants if ((channelName || "").length !== HK.STANDARD_CHANNEL_LENGTH) {
return proceed(); return void cb();
} }
var next = wait();
// gets and caches the metadata... // gets and caches the metadata...
// XXX make sure it doesn't get stuck in cache... // XXX make sure it doesn't get stuck in cache...
HK.getMetadata(Env, channelName, function (err, metadata) { HK.getMetadata(Env, channelName, function (err, metadata) {
@ -142,8 +144,7 @@ module.exports.create = function (config, cb) {
if (!metadata || (metadata && !metadata.restricted)) { if (!metadata || (metadata && !metadata.restricted)) {
// the channel doesn't have metadata, or it does and it's not restricted // the channel doesn't have metadata, or it does and it's not restricted
// either way, let them join. // either way, let them join.
proceed(); return void cb();
return void next();
} }
// this channel is restricted. verify that the user in question is in the allow list // this channel is restricted. verify that the user in question is in the allow list
@ -154,15 +155,14 @@ module.exports.create = function (config, cb) {
var session = HK.getNetfluxSession(Env, userId); var session = HK.getNetfluxSession(Env, userId);
if (HK.isUserSessionAllowed(allowed, session)) { if (HK.isUserSessionAllowed(allowed, session)) {
proceed(); return void cb();
return void next();
} }
// otherwise they're not allowed. // otherwise they're not allowed.
// respond with a special error that includes the list of keys // respond with a special error that includes the list of keys
// which would be allowed... // which would be allowed...
// XXX bonus points if you hash the keys to limit data exposure // XXX bonus points if you hash the keys to limit data exposure
next(["ERESTRICTED"].concat(allowed)); cb("ERESTRICTED", allowed);
}); });
}, },
sessionClose: function (userId, reason) { sessionClose: function (userId, reason) {

@ -171,17 +171,19 @@ const checkExpired = function (Env, Server, channel) {
error: 'EEXPIRED', error: 'EEXPIRED',
channel: channel channel: channel
}, Env.id); }, Env.id);
dropChannel(channel); dropChannel(Env, channel);
}); });
// return true to indicate that it has expired // return true to indicate that it has expired
return true; return true;
}; };
const getMetadata = HK.getMetadata = function (Env, channelName, cb) { const getMetadata = HK.getMetadata = function (Env, channelName, _cb) {
var cb = Util.once(Util.mkAsync(_cb));
var metadata = Env.metadata_cache[channelName]; var metadata = Env.metadata_cache[channelName];
if (metadata && typeof(metadata) === 'object') { if (metadata && typeof(metadata) === 'object') {
return void Util.mkAsync(cb)(undefined, metadata); return void cb(undefined, metadata);
} }
MetaRPC.getMetadata(Env, channelName, function (err, metadata) { MetaRPC.getMetadata(Env, channelName, function (err, metadata) {
@ -189,6 +191,10 @@ const getMetadata = HK.getMetadata = function (Env, channelName, cb) {
console.error(err); console.error(err);
return void cb(err); return void cb(err);
} }
if (!(metadata && typeof(metadata.channel) === 'string' && metadata.channel.length === STANDARD_CHANNEL_LENGTH)) {
return cb();
}
// cache it // cache it
Env.metadata_cache[channelName] = metadata; Env.metadata_cache[channelName] = metadata;
cb(undefined, metadata); cb(undefined, metadata);
@ -231,7 +237,8 @@ const computeIndex = function (Env, channelName, cb) {
nThen(function (w) { nThen(function (w) {
getMetadata(Env, channelName, w(function (err, _metadata) { getMetadata(Env, channelName, w(function (err, _metadata) {
if (err) { if (err) {
throw new Error(err); console.log(err);
throw new Error(err); // XXX
} }
metadata = _metadata; metadata = _metadata;
})); }));
@ -693,7 +700,7 @@ const handleGetHistory = function (Env, Server, seq, userId, parsed) {
}, (err) => { }, (err) => {
if (err && err.code !== 'ENOENT') { if (err && err.code !== 'ENOENT') {
if (err.message !== 'EINVAL') { Log.error("HK_GET_HISTORY", err); } if (err.message !== 'EINVAL') { Log.error("HK_GET_HISTORY", err); }
const parsedMsg = {error:err.message, channel: channelName, txid: txid}; const parsedMsg = {error:err.message, channel: channelName, txid: txid}; // XXX history retrieval error format
Server.send(userId, [0, HISTORY_KEEPER_ID, 'MSG', userId, JSON.stringify(parsedMsg)]); Server.send(userId, [0, HISTORY_KEEPER_ID, 'MSG', userId, JSON.stringify(parsedMsg)]);
return; return;
} }
@ -876,6 +883,7 @@ HK.onDirectMessage = function (Env, Server, seq, userId, json) {
if (checkExpired(Env, Server, parsed[1])) { if (checkExpired(Env, Server, parsed[1])) {
// if the channel is expired just abort. // if the channel is expired just abort.
w.abort(); w.abort();
// XXX what do we tell the person who asked?
return; return;
} }
@ -891,6 +899,9 @@ HK.onDirectMessage = function (Env, Server, seq, userId, json) {
} }
// XXX NOT ALLOWED // XXX NOT ALLOWED
// respond to txid with error as in handleGetHistory
// send the allow list anyway, it might not get used currently
// but will in the future
})); }));
}).nThen(function () { }).nThen(function () {
// run the appropriate command from the map // run the appropriate command from the map

Loading…
Cancel
Save