have historyKeeper remove files that are a day or more past their expiration time

pull/1/head
ansuz 5 years ago
parent 4902554a61
commit f8cd1dc266

@ -11,6 +11,7 @@ const WriteQueue = require("./lib/write-queue");
let Log;
const now = function () { return (new Date()).getTime(); };
const ONE_DAY = 1000 * 60 * 60 * 24; // one day in milliseconds
/* getHash
* this function slices off the leading portion of a message which is
@ -81,6 +82,7 @@ module.exports.create = function (cfg) {
const rpc = cfg.rpc;
const tasks = cfg.tasks;
const store = cfg.store;
const retainData = cfg.retainData;
Log = cfg.log;
Log.silly('HK_LOADING', 'LOADING HISTORY_KEEPER MODULE');
@ -358,6 +360,82 @@ module.exports.create = function (cfg) {
});
};
/* historyKeeperBroadcast
* uses API from the netflux server to send messages to every member of a channel
* sendMsg runs in a try-catch and drops users if sending a message fails
*/
const historyKeeperBroadcast = function (ctx, channel, msg) {
let chan = ctx.channels[channel] || (([] /*:any*/) /*:Chan_t*/);
chan.forEach(function (user) {
sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify(msg)]);
});
};
/* expireChannel is here to clean up channels that should have been removed
but for some reason are still present
*/
const expireChannel = function (ctx, channel) {
if (retainData) {
return void store.archiveChannel(channel, function (err) {
Log.info("ARCHIVAL_CHANNEL_BY_HISTORY_KEEPER_EXPIRATION", {
channelId: channel,
status: err? String(err): "SUCCESS",
});
});
}
store.removeChannel(channel, function (err) {
Log.info("DELETION_CHANNEL_BY_HISTORY_KEEPER_EXPIRATION", {
channelid: channel,
status: err? String(err): "SUCCESS",
});
});
};
/* checkExpired
* synchronously returns true or undefined to indicate whether the channel is expired
* according to its metadata
* has some side effects:
* closes the channel via the store.closeChannel API
* and then broadcasts to all channel members that the channel has expired
* removes the channel from the netflux-server's in-memory cache
* removes the channel metadata from history keeper's in-memory cache
FIXME the boolean nature of this API should be separated from its side effects
*/
const checkExpired = function (ctx, channel) {
if (!(channel && channel.length === STANDARD_CHANNEL_LENGTH)) { return false; }
let metadata = metadata_cache[channel];
if (!(metadata && typeof(metadata.expire) === 'number')) { return false; }
// the number of milliseconds ago the channel should have expired
let pastDue = (+new Date()) - metadata.expire;
// less than zero means that it hasn't expired yet
if (pastDue < 0) { return false; }
// if it should have expired more than a day ago...
// there may have been a problem with scheduling tasks
// or the scheduled tasks may not be running
// so trigger a removal from here
if (pastDue >= ONE_DAY) { expireChannel(ctx, channel); }
// close the channel
store.closeChannel(channel, function () {
historyKeeperBroadcast(ctx, channel, {
error: 'EEXPIRED',
channel: channel
});
// remove it from any caches after you've told anyone in the channel
// that it has expired
delete ctx.channels[channel];
delete metadata_cache[channel];
});
// return true to indicate that it has expired
return true;
};
var CHECKPOINT_PATTERN = /^cp\|(([A-Za-z0-9+\/=]+)\|)?/;
/* onChannelMessage
@ -408,12 +486,8 @@ module.exports.create = function (cfg) {
metadata = index.metadata;
if (metadata.expire && metadata.expire < +new Date()) {
// don't store message sent to expired channels
w.abort();
return;
// TODO if a channel expired a long time ago but it's still here, remove it
}
// don't write messages to expired channels
if (checkExpired(ctx, channel)) { return void w.abort(); }
// if there's no validateKey present skip to the next block
if (!metadata.validateKey) { return; }
@ -646,26 +720,6 @@ module.exports.create = function (cfg) {
});
};
/*::
type Chan_t = {
indexOf: (any)=>number,
id: string,
lastSavedCp: string,
forEach: ((any)=>void)=>void,
push: (any)=>void,
};
*/
/* historyKeeperBroadcast
* uses API from the netflux server to send messages to every member of a channel
* sendMsg runs in a try-catch and drops users if sending a message fails
*/
const historyKeeperBroadcast = function (ctx, channel, msg) {
let chan = ctx.channels[channel] || (([] /*:any*/) /*:Chan_t*/);
chan.forEach(function (user) {
sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify(msg)]);
});
};
/* onChannelCleared
* broadcasts to all clients in a channel if that channel is deleted
@ -701,33 +755,6 @@ module.exports.create = function (cfg) {
}
};
/* checkExpired
* synchronously returns true or undefined to indicate whether the channel is expired
* according to its metadata
* has some side effects:
* closes the channel via the store.closeChannel API
* and then broadcasts to all channel members that the channel has expired
* removes the channel from the netflux-server's in-memory cache
* removes the channel metadata from history keeper's in-memory cache
FIXME the boolean nature of this API should be separated from its side effects
*/
const checkExpired = function (ctx, channel) {
if (channel && channel.length === STANDARD_CHANNEL_LENGTH && metadata_cache[channel] &&
metadata_cache[channel].expire && metadata_cache[channel].expire < +new Date()) {
store.closeChannel(channel, function () {
historyKeeperBroadcast(ctx, channel, {
error: 'EEXPIRED',
channel: channel
});
});
delete ctx.channels[channel];
delete metadata_cache[channel];
return true;
}
return;
};
/* onDirectMessage
* exported for use by the netflux-server
* parses and handles all direct messages directed to the history keeper

@ -321,7 +321,8 @@ var nt = nThen(function (w) {
tasks: config.tasks,
rpc: rpc,
store: config.store,
log: log
log: log,
retainData: Boolean(config.retainData),
};
historyKeeper = HK.create(hkConfig);
}).nThen(function () {

Loading…
Cancel
Save