archive inactive data in one of the server's workers

pull/1/head
ansuz 4 years ago
parent 6ada5fd751
commit 04d1bc9cd7

@ -15,14 +15,11 @@ Env = {
limits: { limits: {
<unsafeKey>: <limit>, <unsafeKey>: <limit>,
}, },
config: { archiveRetentionTime: <number of days>,
inactiveTime: <number of days>, accountRetentionTime: <number of days>,
archiveRetentionTime: <number of days>, inactiveTime: <number of days>,
accountRetentionTime: <number of days>, paths: {
pin: <path to pin storage>
pinPath: <filesystem path>,
customLimits: <custom limits map>,
}, },
store, store,
pinStore, pinStore,
@ -35,11 +32,9 @@ Env = {
module.exports = function (Env, cb) { module.exports = function (Env, cb) {
var complete = Util.once(Util.mkAsync(cb)); var complete = Util.once(Util.mkAsync(cb));
var config = Env.config;
// the administrator should have set an 'inactiveTime' in their config // the administrator should have set an 'inactiveTime' in their config
// if they didn't, just exit. // if they didn't, just exit.
if (!config.inactiveTime || typeof(config.inactiveTime) !== "number") { if (!Env.inactiveTime || typeof(Env.inactiveTime) !== "number") {
return void complete("NO_INACTIVE_TIME"); return void complete("NO_INACTIVE_TIME");
} }
@ -55,10 +50,10 @@ module.exports = function (Env, cb) {
}); });
// files which have not been changed since before this date can be considered inactive // files which have not been changed since before this date can be considered inactive
var inactiveTime = +new Date() - (config.inactiveTime * 24 * 3600 * 1000); var inactiveTime = +new Date() - (Env.inactiveTime * 24 * 3600 * 1000);
// files which were archived before this date can be considered safe to remove // files which were archived before this date can be considered safe to remove
var retentionTime = +new Date() - (config.archiveRetentionTime * 24 * 3600 * 1000); var retentionTime = +new Date() - (Env.archiveRetentionTime * 24 * 3600 * 1000);
var store; var store;
var pinStore; var pinStore;
@ -111,7 +106,7 @@ module.exports = function (Env, cb) {
// if the admin has not set an 'archiveRetentionTime', this block makes no sense // if the admin has not set an 'archiveRetentionTime', this block makes no sense
// so just skip it // so just skip it
if (typeof(config.archiveRetentionTime) !== "number") { return; } if (typeof(Env.archiveRetentionTime) !== "number") { return; }
// count the number of files which have been removed in this run // count the number of files which have been removed in this run
var removed = 0; var removed = 0;
@ -165,7 +160,7 @@ module.exports = function (Env, cb) {
}; };
var removeArchivedBlobProofs = function (w) { var removeArchivedBlobProofs = function (w) {
if (typeof(config.archiveRetentionTime) !== "number") { return; } if (typeof(Env.archiveRetentionTime) !== "number") { return; }
// Iterate over archive blob ownership proofs and remove them // Iterate over archive blob ownership proofs and remove them
// if they are older than the specified retention time // if they are older than the specified retention time
var removed = 0; var removed = 0;
@ -190,7 +185,7 @@ module.exports = function (Env, cb) {
}; };
var removeArchivedBlobs = function (w) { var removeArchivedBlobs = function (w) {
if (typeof(config.archiveRetentionTime) !== "number") { return; } if (typeof(Env.archiveRetentionTime) !== "number") { return; }
// Iterate over archived blobs and remove them // Iterate over archived blobs and remove them
// if they are older than the specified retention time // if they are older than the specified retention time
var removed = 0; var removed = 0;
@ -280,8 +275,8 @@ module.exports = function (Env, cb) {
var inactive = 0; var inactive = 0;
var accountRetentionTime; var accountRetentionTime;
if (typeof(config.accountRetentionTime) === 'number' && config.accountRetentionTime > 0) { if (typeof(Env.accountRetentionTime) === 'number' && Env.accountRetentionTime > 0) {
accountRetentionTime = +new Date() - (24 * 3600 * 1000 * config.accountRetentionTime); accountRetentionTime = +new Date() - (24 * 3600 * 1000 * Env.accountRetentionTime);
} else { } else {
accountRetentionTime = -1; accountRetentionTime = -1;
} }
@ -297,7 +292,7 @@ module.exports = function (Env, cb) {
}; };
var accountIsActive = function (mtime, pinList, id) { var accountIsActive = function (mtime, pinList, id) {
// console.log("id [%s] in premiumSafeKeys", id, premiumSafeKeys.indexOf(id) !== -1); // XXX // console.log("id [%s] in premiumSafeKeys", id, premiumSafeKeys.indexOf(id) !== -1);
// if their pin log has changed recently then consider them active // if their pin log has changed recently then consider them active
if (mtime && mtime > accountRetentionTime) { if (mtime && mtime > accountRetentionTime) {
return true; return true;
@ -360,7 +355,7 @@ module.exports = function (Env, cb) {
}; };
Pins.load(w(done), { Pins.load(w(done), {
pinPath: config.pinPath, pinPath: Env.paths.pin,
handler: handler, handler: handler,
}); });
}; };
@ -515,9 +510,6 @@ module.exports = function (Env, cb) {
}; };
nThen(loadStorage) nThen(loadStorage)
.nThen(function () {
Log.info("EVICT_TIME_TO_LOAD_PINS", msSinceStart());
})
.nThen(removeArchivedChannels) .nThen(removeArchivedChannels)
.nThen(removeArchivedBlobProofs) .nThen(removeArchivedBlobProofs)
.nThen(removeArchivedBlobs) .nThen(removeArchivedBlobs)

@ -11,6 +11,7 @@ const Core = require("./commands/core");
const Store = require("./storage/file"); const Store = require("./storage/file");
const BlobStore = require("./storage/blob"); const BlobStore = require("./storage/blob");
const Workers = require("./workers/index"); const Workers = require("./workers/index");
//const Eviction = require("./eviction");
module.exports.create = function (config, cb) { module.exports.create = function (config, cb) {
const Log = config.log; const Log = config.log;
@ -26,14 +27,15 @@ module.exports.create = function (config, cb) {
Log.silly('HK_LOADING', 'LOADING HISTORY_KEEPER MODULE'); Log.silly('HK_LOADING', 'LOADING HISTORY_KEEPER MODULE');
// TODO populate Env with everything that you use from config
// so that you can stop passing around your raw config
// and more easily share state between historyKeeper and rpc
const Env = { const Env = {
Log: Log, Log: Log,
// store // store
id: Crypto.randomBytes(8).toString('hex'), id: Crypto.randomBytes(8).toString('hex'),
inactiveTime: config.inactiveTime,
archiveRetentionTime: config.archiveRetentionTime,
accountRetentionTime: config.accountRetentionTime,
metadata_cache: {}, metadata_cache: {},
channel_cache: {}, channel_cache: {},
queueStorage: WriteQueue(), queueStorage: WriteQueue(),
@ -254,6 +256,10 @@ module.exports.create = function (config, cb) {
verbose: config.verbose, verbose: config.verbose,
openFileLimit: config.openFileLimit, openFileLimit: config.openFileLimit,
inactiveTime: config.inactiveTime,
archiveRetentionTime: config.archiveRetentionTime,
accountRetentionTime: config.accountRetentionTime,
maxWorkers: config.maxWorkers, maxWorkers: config.maxWorkers,
}, w(function (err) { }, w(function (err) {
if (err) { if (err) {
@ -261,8 +267,8 @@ module.exports.create = function (config, cb) {
} }
})); }));
}).nThen(function () { }).nThen(function () {
if (config.disableIntegratedTasks) { return; }
config.intervals = config.intervals || {}; config.intervals = config.intervals || {};
if (config.disableIntegratedTasks) { return; }
var tasks_running; var tasks_running;
config.intervals.taskExpiration = setInterval(function () { config.intervals.taskExpiration = setInterval(function () {
@ -275,6 +281,30 @@ module.exports.create = function (config, cb) {
tasks_running = false; tasks_running = false;
}); });
}, 1000 * 60 * 5); // run every five minutes }, 1000 * 60 * 5); // run every five minutes
}).nThen(function () {
if (config.disableIntegratedEviction) { return; }
const ONE_DAY = 24 * 1000 * 60 * 60;
// setting the time of the last eviction to "now"
// effectively makes it so that we'll start evicting after the server
// has been up for at least one day
var last_eviction = +new Date();
var active = false;
config.intervals.eviction = setInterval(function () {
if (active) { return; }
var now = +new Date();
// evict inactive data once per day
if (last_eviction && (now - ONE_DAY) < last_eviction) { return; }
active = true;
Env.evictInactive(function (err) {
if (err) {
// NO_INACTIVE_TIME
Log.error('EVICT_INACTIVE_MAIN_ERROR', err);
}
active = false;
last_eviction = now;
});
}, 60 * 1000);
}).nThen(function () { }).nThen(function () {
RPC.create(Env, function (err, _rpc) { RPC.create(Env, function (err, _rpc) {
if (err) { throw err; } if (err) { throw err; }

@ -13,6 +13,7 @@ const Saferphore = require("saferphore");
const Logger = require("../log"); const Logger = require("../log");
const Tasks = require("../storage/tasks"); const Tasks = require("../storage/tasks");
const Nacl = require('tweetnacl/nacl-fast'); const Nacl = require('tweetnacl/nacl-fast');
const Eviction = require("../eviction");
const Env = { const Env = {
Log: {}, Log: {},
@ -39,13 +40,21 @@ const init = function (config, _cb) {
return void cb('E_INVALID_CONFIG'); return void cb('E_INVALID_CONFIG');
} }
Env.paths = {
pin: config.pinPath,
};
Env.inactiveTime = config.inactiveTime;
Env.archiveRetentionTime = config.archiveRetentionTime;
Env.accountRetentionTime = config.accountRetentionTime;
nThen(function (w) { nThen(function (w) {
Store.create(config, w(function (err, _store) { Store.create(config, w(function (err, _store) {
if (err) { if (err) {
w.abort(); w.abort();
return void cb(err); return void cb(err);
} }
store = _store; Env.store = store = _store;
})); }));
Store.create({ Store.create({
filePath: config.pinPath, filePath: config.pinPath,
@ -54,7 +63,7 @@ const init = function (config, _cb) {
w.abort(); w.abort();
return void cb(err); return void cb(err);
} }
pinStore = _pinStore; Env.pinStore = pinStore = _pinStore;
})); }));
BlobStore.create({ BlobStore.create({
blobPath: config.blobPath, blobPath: config.blobPath,
@ -66,7 +75,7 @@ const init = function (config, _cb) {
w.abort(); w.abort();
return void cb(err); return void cb(err);
} }
blobStore = blob; Env.blobStore = blobStore = blob;
})); }));
}).nThen(function (w) { }).nThen(function (w) {
Tasks.create({ Tasks.create({
@ -436,6 +445,10 @@ const writeTask = function (data, cb) {
Env.tasks.write(data.time, data.task_command, data.args, cb); Env.tasks.write(data.time, data.task_command, data.args, cb);
}; };
const evictInactive = function (data, cb) {
Eviction(Env, cb);
};
const COMMANDS = { const COMMANDS = {
COMPUTE_INDEX: computeIndex, COMPUTE_INDEX: computeIndex,
COMPUTE_METADATA: computeMetadata, COMPUTE_METADATA: computeMetadata,
@ -449,6 +462,7 @@ const COMMANDS = {
REMOVE_OWNED_BLOB: removeOwnedBlob, REMOVE_OWNED_BLOB: removeOwnedBlob,
RUN_TASKS: runTasks, RUN_TASKS: runTasks,
WRITE_TASK: writeTask, WRITE_TASK: writeTask,
EVICT_INACTIVE: evictInactive,
}; };
COMMANDS.INLINE = function (data, cb) { COMMANDS.INLINE = function (data, cb) {

@ -350,6 +350,12 @@ Workers.initialize = function (Env, config, _cb) {
}, cb); }, cb);
}; };
Env.evictInactive = function (cb) {
sendCommand({
command: 'EVICT_INACTIVE',
}, cb);
};
Env.runTasks = function (cb) { Env.runTasks = function (cb) {
sendCommand({ sendCommand({
command: 'RUN_TASKS', command: 'RUN_TASKS',

@ -5,13 +5,17 @@ var BlobStore = require("../lib/storage/blob");
var Quota = require("../lib/commands/quota"); var Quota = require("../lib/commands/quota");
var config = require("../lib/load-config");
var Env = { var Env = {
config: require("../lib/load-config"), inactiveTime: config.inactiveTime,
archiveRetentionTime: config.archiveRetentionTime,
accountRetentionTime: config.accountRetentionTime,
paths: {
pin: config.pinPath,
},
}; };
var prepareEnv = function (Env, cb) { var prepareEnv = function (Env, cb) {
var config = Env.config;
Env.customLimits = config.customLimits; Env.customLimits = config.customLimits;
Quota.applyCustomLimits(Env); Quota.applyCustomLimits(Env);

Loading…
Cancel
Save