diff --git a/historyKeeper.js b/historyKeeper.js index 0b4ce7ba1..89fed7e85 100644 --- a/historyKeeper.js +++ b/historyKeeper.js @@ -11,6 +11,7 @@ const WriteQueue = require("./lib/write-queue"); let Log; const now = function () { return (new Date()).getTime(); }; +const ONE_DAY = 1000 * 60 * 60 * 24; // one day in milliseconds /* getHash * this function slices off the leading portion of a message which is @@ -81,6 +82,7 @@ module.exports.create = function (cfg) { const rpc = cfg.rpc; const tasks = cfg.tasks; const store = cfg.store; + const retainData = cfg.retainData; Log = cfg.log; Log.silly('HK_LOADING', 'LOADING HISTORY_KEEPER MODULE'); @@ -358,6 +360,82 @@ module.exports.create = function (cfg) { }); }; + /* historyKeeperBroadcast + * uses API from the netflux server to send messages to every member of a channel + * sendMsg runs in a try-catch and drops users if sending a message fails + */ + const historyKeeperBroadcast = function (ctx, channel, msg) { + let chan = ctx.channels[channel] || (([] /*:any*/) /*:Chan_t*/); + chan.forEach(function (user) { + sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify(msg)]); + }); + }; + + /* expireChannel is here to clean up channels that should have been removed + but for some reason are still present + */ + const expireChannel = function (ctx, channel) { + if (retainData) { + return void store.archiveChannel(channel, function (err) { + Log.info("ARCHIVAL_CHANNEL_BY_HISTORY_KEEPER_EXPIRATION", { + channelId: channel, + status: err? String(err): "SUCCESS", + }); + }); + } + + store.removeChannel(channel, function (err) { + Log.info("DELETION_CHANNEL_BY_HISTORY_KEEPER_EXPIRATION", { + channelid: channel, + status: err? String(err): "SUCCESS", + }); + }); + }; + + /* checkExpired + * synchronously returns true or undefined to indicate whether the channel is expired + * according to its metadata + * has some side effects: + * closes the channel via the store.closeChannel API + * and then broadcasts to all channel members that the channel has expired + * removes the channel from the netflux-server's in-memory cache + * removes the channel metadata from history keeper's in-memory cache + + FIXME the boolean nature of this API should be separated from its side effects + */ + const checkExpired = function (ctx, channel) { + if (!(channel && channel.length === STANDARD_CHANNEL_LENGTH)) { return false; } + let metadata = metadata_cache[channel]; + if (!(metadata && typeof(metadata.expire) === 'number')) { return false; } + + // the number of milliseconds ago the channel should have expired + let pastDue = (+new Date()) - metadata.expire; + + // less than zero means that it hasn't expired yet + if (pastDue < 0) { return false; } + + // if it should have expired more than a day ago... + // there may have been a problem with scheduling tasks + // or the scheduled tasks may not be running + // so trigger a removal from here + if (pastDue >= ONE_DAY) { expireChannel(ctx, channel); } + + // close the channel + store.closeChannel(channel, function () { + historyKeeperBroadcast(ctx, channel, { + error: 'EEXPIRED', + channel: channel + }); + // remove it from any caches after you've told anyone in the channel + // that it has expired + delete ctx.channels[channel]; + delete metadata_cache[channel]; + }); + + // return true to indicate that it has expired + return true; + }; + var CHECKPOINT_PATTERN = /^cp\|(([A-Za-z0-9+\/=]+)\|)?/; /* onChannelMessage @@ -408,12 +486,8 @@ module.exports.create = function (cfg) { metadata = index.metadata; - if (metadata.expire && metadata.expire < +new Date()) { - // don't store message sent to expired channels - w.abort(); - return; - // TODO if a channel expired a long time ago but it's still here, remove it - } + // don't write messages to expired channels + if (checkExpired(ctx, channel)) { return void w.abort(); } // if there's no validateKey present skip to the next block if (!metadata.validateKey) { return; } @@ -646,26 +720,6 @@ module.exports.create = function (cfg) { }); }; - /*:: - type Chan_t = { - indexOf: (any)=>number, - id: string, - lastSavedCp: string, - forEach: ((any)=>void)=>void, - push: (any)=>void, - }; - */ - - /* historyKeeperBroadcast - * uses API from the netflux server to send messages to every member of a channel - * sendMsg runs in a try-catch and drops users if sending a message fails - */ - const historyKeeperBroadcast = function (ctx, channel, msg) { - let chan = ctx.channels[channel] || (([] /*:any*/) /*:Chan_t*/); - chan.forEach(function (user) { - sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify(msg)]); - }); - }; /* onChannelCleared * broadcasts to all clients in a channel if that channel is deleted @@ -701,33 +755,6 @@ module.exports.create = function (cfg) { } }; - /* checkExpired - * synchronously returns true or undefined to indicate whether the channel is expired - * according to its metadata - * has some side effects: - * closes the channel via the store.closeChannel API - * and then broadcasts to all channel members that the channel has expired - * removes the channel from the netflux-server's in-memory cache - * removes the channel metadata from history keeper's in-memory cache - - FIXME the boolean nature of this API should be separated from its side effects - */ - const checkExpired = function (ctx, channel) { - if (channel && channel.length === STANDARD_CHANNEL_LENGTH && metadata_cache[channel] && - metadata_cache[channel].expire && metadata_cache[channel].expire < +new Date()) { - store.closeChannel(channel, function () { - historyKeeperBroadcast(ctx, channel, { - error: 'EEXPIRED', - channel: channel - }); - }); - delete ctx.channels[channel]; - delete metadata_cache[channel]; - return true; - } - return; - }; - /* onDirectMessage * exported for use by the netflux-server * parses and handles all direct messages directed to the history keeper diff --git a/server.js b/server.js index c0fd4c194..93d4f7af8 100644 --- a/server.js +++ b/server.js @@ -321,7 +321,8 @@ var nt = nThen(function (w) { tasks: config.tasks, rpc: rpc, store: config.store, - log: log + log: log, + retainData: Boolean(config.retainData), }; historyKeeper = HK.create(hkConfig); }).nThen(function () {