diff --git a/lib/commands/metadata.js b/lib/commands/metadata.js index 78584d3ae..5b6d17493 100644 --- a/lib/commands/metadata.js +++ b/lib/commands/metadata.js @@ -6,17 +6,15 @@ const BatchRead = require("../batch-read"); const WriteQueue = require("../write-queue"); const Core = require("./core"); const Util = require("../common-util"); +const HK = require("../hk-util"); -const batchMetadata = BatchRead("GET_METADATA"); Data.getMetadata = function (Env, channel, cb/* , Server */) { if (!Core.isValidId(channel)) { return void cb('INVALID_CHAN'); } - if (channel.length !== 32) { return cb("INVALID_CHAN_LENGTH"); } + if (channel.length !== HK.STANDARD_CHANNEL_LENGTH) { return cb("INVALID_CHAN_LENGTH"); } - // XXX get metadata from the server cache if it is available - batchMetadata(channel, cb, function (done) { + Env.batchMetadata(channel, cb, function (done) { var ref = {}; var lineHandler = Meta.createLineHandler(ref, Env.Log.error); - return void Env.msgStore.readChannelMetadata(channel, lineHandler, function (err) { if (err) { // stream errors? @@ -118,6 +116,9 @@ Data.setMetadata = function (Env, safeKey, data, cb, Server) { // kick any current users from the channel // if they aren't on it. + // review Server.channelBroadcast as used for EEXPIRED + // send them to the user in question, from historyKeeper + cb(void 0, metadata); next(); diff --git a/lib/historyKeeper.js b/lib/historyKeeper.js index 93644f571..12c9762e5 100644 --- a/lib/historyKeeper.js +++ b/lib/historyKeeper.js @@ -38,6 +38,7 @@ module.exports.create = function (config, cb) { channel_cache: {}, queueStorage: WriteQueue(), batchIndexReads: BatchRead("HK_GET_INDEX"), + batchMetadata: BatchRead('GET_METADATA'), //historyKeeper: config.historyKeeper, intervals: config.intervals || {}, @@ -115,22 +116,23 @@ module.exports.create = function (config, cb) { channelOpen: function (Server, channelName, userId, wait) { Env.channel_cache[channelName] = Env.channel_cache[channelName] || {}; - var proceed = function () { - Server.send(userId, [ - 0, - Env.id, - 'JOIN', - channelName - ]); + var next = wait(); + var cb = function (err, info) { + next(err, info, function () { + Server.send(userId, [ + 0, + Env.id, + 'JOIN', + channelName + ]); + }); }; // only conventional channels can be restricted - if ((channelName || "").length !== 32) { // XXX use contants - return proceed(); + if ((channelName || "").length !== HK.STANDARD_CHANNEL_LENGTH) { + return void cb(); } - var next = wait(); - // gets and caches the metadata... // XXX make sure it doesn't get stuck in cache... HK.getMetadata(Env, channelName, function (err, metadata) { @@ -142,8 +144,7 @@ module.exports.create = function (config, cb) { if (!metadata || (metadata && !metadata.restricted)) { // the channel doesn't have metadata, or it does and it's not restricted // either way, let them join. - proceed(); - return void next(); + return void cb(); } // this channel is restricted. verify that the user in question is in the allow list @@ -154,15 +155,14 @@ module.exports.create = function (config, cb) { var session = HK.getNetfluxSession(Env, userId); if (HK.isUserSessionAllowed(allowed, session)) { - proceed(); - return void next(); + return void cb(); } // otherwise they're not allowed. // respond with a special error that includes the list of keys // which would be allowed... // XXX bonus points if you hash the keys to limit data exposure - next(["ERESTRICTED"].concat(allowed)); + cb("ERESTRICTED", allowed); }); }, sessionClose: function (userId, reason) { diff --git a/lib/hk-util.js b/lib/hk-util.js index 049529a74..e14975a1a 100644 --- a/lib/hk-util.js +++ b/lib/hk-util.js @@ -171,17 +171,19 @@ const checkExpired = function (Env, Server, channel) { error: 'EEXPIRED', channel: channel }, Env.id); - dropChannel(channel); + dropChannel(Env, channel); }); // return true to indicate that it has expired return true; }; -const getMetadata = HK.getMetadata = function (Env, channelName, cb) { +const getMetadata = HK.getMetadata = function (Env, channelName, _cb) { + var cb = Util.once(Util.mkAsync(_cb)); + var metadata = Env.metadata_cache[channelName]; if (metadata && typeof(metadata) === 'object') { - return void Util.mkAsync(cb)(undefined, metadata); + return void cb(undefined, metadata); } MetaRPC.getMetadata(Env, channelName, function (err, metadata) { @@ -189,6 +191,10 @@ const getMetadata = HK.getMetadata = function (Env, channelName, cb) { console.error(err); return void cb(err); } + if (!(metadata && typeof(metadata.channel) === 'string' && metadata.channel.length === STANDARD_CHANNEL_LENGTH)) { + return cb(); + } + // cache it Env.metadata_cache[channelName] = metadata; cb(undefined, metadata); @@ -231,7 +237,8 @@ const computeIndex = function (Env, channelName, cb) { nThen(function (w) { getMetadata(Env, channelName, w(function (err, _metadata) { if (err) { - throw new Error(err); + console.log(err); + throw new Error(err); // XXX } metadata = _metadata; })); @@ -693,7 +700,7 @@ const handleGetHistory = function (Env, Server, seq, userId, parsed) { }, (err) => { if (err && err.code !== 'ENOENT') { if (err.message !== 'EINVAL') { Log.error("HK_GET_HISTORY", err); } - const parsedMsg = {error:err.message, channel: channelName, txid: txid}; + const parsedMsg = {error:err.message, channel: channelName, txid: txid}; // XXX history retrieval error format Server.send(userId, [0, HISTORY_KEEPER_ID, 'MSG', userId, JSON.stringify(parsedMsg)]); return; } @@ -876,6 +883,7 @@ HK.onDirectMessage = function (Env, Server, seq, userId, json) { if (checkExpired(Env, Server, parsed[1])) { // if the channel is expired just abort. w.abort(); + // XXX what do we tell the person who asked? return; } @@ -891,6 +899,9 @@ HK.onDirectMessage = function (Env, Server, seq, userId, json) { } // XXX NOT ALLOWED + // respond to txid with error as in handleGetHistory + // send the allow list anyway, it might not get used currently + // but will in the future })); }).nThen(function () { // run the appropriate command from the map