include inactive accounts in the archival script

pull/1/head
ansuz 4 years ago
parent 1bd6c134ae
commit d794e0b48f

@ -192,6 +192,16 @@ module.exports = {
*/ */
//archiveRetentionTime: 15, //archiveRetentionTime: 15,
/* XXX
*
*
*
*
*
*/
//accountRetentionTime: 365,
/* Max Upload Size (bytes) /* Max Upload Size (bytes)
* this sets the maximum size of any one file uploaded to the server. * this sets the maximum size of any one file uploaded to the server.
* anything larger than this size will be rejected * anything larger than this size will be rejected

@ -74,6 +74,14 @@ var createLineHandler = Pins.createLineHandler = function (ref, errorHandler) {
}; };
}; };
var processPinFile = function (pinFile, fileName) {
var ref = {};
var handler = createLineHandler(ref, fileName);
pinFile.split('\n').forEach(handler);
return ref;
};
/* /*
takes contents of a pinFile (UTF8 string) takes contents of a pinFile (UTF8 string)
and the pin file's name and the pin file's name
@ -82,10 +90,7 @@ var createLineHandler = Pins.createLineHandler = function (ref, errorHandler) {
throw errors on pin logs with invalid pin data throw errors on pin logs with invalid pin data
*/ */
Pins.calculateFromLog = function (pinFile, fileName) { Pins.calculateFromLog = function (pinFile, fileName) {
var ref = {}; var ref = processPinFile(pinFile, fileName);
var handler = createLineHandler(ref, fileName);
pinFile.split('\n').forEach(handler);
return Object.keys(ref.pins); return Object.keys(ref.pins);
}; };
@ -207,6 +212,7 @@ Pins.load = function (cb, config) {
var pinPath = config.pinPath || './pins'; var pinPath = config.pinPath || './pins';
var done = Util.once(cb); var done = Util.once(cb);
var handler = config.handler;
nThen((waitFor) => { nThen((waitFor) => {
// recurse over the configured pinPath, or the default // recurse over the configured pinPath, or the default
@ -240,16 +246,23 @@ Pins.load = function (cb, config) {
}).nThen((waitFor) => { }).nThen((waitFor) => {
fileList.forEach((f) => { fileList.forEach((f) => {
sema.take((returnAfter) => { sema.take((returnAfter) => {
Fs.readFile(f, waitFor(returnAfter((err, content) => { var next = waitFor(returnAfter());
Fs.readFile(f, (err, content) => {
if (err) { if (err) {
waitFor.abort(); waitFor.abort();
return void done(err); return void done(err);
} }
const hashes = Pins.calculateFromLog(content.toString('utf8'), f); var id = f.replace(/.*\/([^/]*).ndjson$/, (x, y)=>y);
var contentString = content.toString('utf8');
if (handler) {
return void handler(processPinFile(contentString, f), id, next);
}
const hashes = Pins.calculateFromLog(contentString, f);
hashes.forEach((x) => { hashes.forEach((x) => {
(pinned[x] = pinned[x] || {})[f.replace(/.*\/([^/]*).ndjson$/, (x, y)=>y)] = 1; (pinned[x] = pinned[x] || {})[id] = 1;
}); });
}))); next();
});
}); });
}); });
}).nThen(() => { }).nThen(() => {

@ -422,7 +422,7 @@ var removeArchivedChannel = function (env, channelName, cb) {
}; };
// TODO use ../plan.js for a smaller memory footprint // TODO use ../plan.js for a smaller memory footprint
var listChannels = function (root, handler, cb) { var listChannels = function (root, handler, cb, fast) {
// do twenty things at a time // do twenty things at a time
var sema = Semaphore.create(20); var sema = Semaphore.create(20);
@ -478,15 +478,20 @@ var listChannels = function (root, handler, cb) {
metadataName = channelName.replace(/\.ndjson$/, '.metadata.ndjson'); metadataName = channelName.replace(/\.ndjson$/, '.metadata.ndjson');
} }
var filePath = Path.join(nestedDirPath, channelName);
var metadataPath = Path.join(nestedDirPath, metadataName);
var channel = metadataName.replace(/\.metadata.ndjson$/, ''); var channel = metadataName.replace(/\.metadata.ndjson$/, '');
if ([32, 34].indexOf(channel.length) === -1) { return; } if ([32, 34, 44].indexOf(channel.length) === -1) { return; }
// otherwise throw it on the pile // otherwise throw it on the pile
sema.take(function (give) { sema.take(function (give) {
var next = w(give()); var next = w(give());
if (fast) {
return void handler(void 0, { channel: channel, }, next);
}
var filePath = Path.join(nestedDirPath, channelName);
var metadataPath = Path.join(nestedDirPath, metadataName);
var metaStat, channelStat; var metaStat, channelStat;
var metaErr, channelErr; var metaErr, channelErr;
nThen(function (ww) { nThen(function (ww) {
@ -1186,11 +1191,11 @@ module.exports.create = function (conf, _cb) {
}, },
// CHANNEL ITERATION // CHANNEL ITERATION
listChannels: function (handler, cb) { listChannels: function (handler, cb, fastMode) {
listChannels(env.root, handler, cb); listChannels(env.root, handler, cb, fastMode);
}, },
listArchivedChannels: function (handler, cb) { listArchivedChannels: function (handler, cb, fastMode) {
listChannels(Path.join(env.archiveRoot, 'datastore'), handler, cb); listChannels(Path.join(env.archiveRoot, 'datastore'), handler, cb, fastMode);
}, },
getChannelSize: function (channelName, cb) { getChannelSize: function (channelName, cb) {

5
package-lock.json generated

@ -4,6 +4,11 @@
"lockfileVersion": 1, "lockfileVersion": 1,
"requires": true, "requires": true,
"dependencies": { "dependencies": {
"@mcrowe/minibloom": {
"version": "0.2.0",
"resolved": "https://registry.npmjs.org/@mcrowe/minibloom/-/minibloom-0.2.0.tgz",
"integrity": "sha1-G+2WrsGDiBmNo3RDiZssP/WUgFM="
},
"accepts": { "accepts": {
"version": "1.3.7", "version": "1.3.7",
"resolved": "https://registry.npmjs.org/accepts/-/accepts-1.3.7.tgz", "resolved": "https://registry.npmjs.org/accepts/-/accepts-1.3.7.tgz",

@ -12,6 +12,7 @@
"url": "https://opencollective.com/cryptpad" "url": "https://opencollective.com/cryptpad"
}, },
"dependencies": { "dependencies": {
"@mcrowe/minibloom": "^0.2.0",
"chainpad-crypto": "^0.2.2", "chainpad-crypto": "^0.2.2",
"chainpad-server": "^4.0.9", "chainpad-server": "^4.0.9",
"express": "~4.16.0", "express": "~4.16.0",

@ -1,8 +1,11 @@
/* global process */
var nThen = require("nthen"); var nThen = require("nthen");
var Store = require("../lib/storage/file"); var Store = require("../lib/storage/file");
var BlobStore = require("../lib/storage/blob"); var BlobStore = require("../lib/storage/blob");
var Pins = require("../lib/pins"); var Pins = require("../lib/pins");
var Bloom = require("@mcrowe/minibloom");
var config = require("../lib/load-config"); var config = require("../lib/load-config");
// the administrator should have set an 'inactiveTime' in their config // the administrator should have set an 'inactiveTime' in their config
@ -22,16 +25,42 @@ var getNewestTime = function (stats) {
}; };
var store; var store;
var pins; var pinStore;
var Log; var Log;
var blobs; var blobs;
/* It's fairly easy to know if a channel or blob is active
but knowing whether it is pinned requires that we
keep the set of pinned documents in memory.
Some users will share the same set of documents in their pin lists,
so the representation of pinned documents should scale sub-linearly
with the number of users and pinned documents.
That said, sub-linear isn't great...
A Bloom filter is "a space-efficient probabilistic data structure"
which lets us check whether an item is _probably_ or _definitely not_
in a set. This is good enough for our purposes since we just want to
know whether something can safely be removed and false negatives
(not safe to remove when it actually is) are acceptable.
We set our capacity to some large number, and the error rate to whatever
we think is acceptable.
*/
var BLOOM_CAPACITY = (1 << 20) - 1; // over a million items
var BLOOM_ERROR = 1 / 1000; // an error rate of one in a thousand
// we'll use one filter for the set of active documents
var activeDocs = Bloom.optimalFilter(BLOOM_CAPACITY, BLOOM_ERROR);
// and another one for the set of pinned documents
var pinnedDocs = Bloom. optimalFilter(BLOOM_CAPACITY, BLOOM_ERROR);
var startTime = +new Date(); var startTime = +new Date();
var msSinceStart = function () { var msSinceStart = function () {
return (+new Date()) - startTime; return (+new Date()) - startTime;
}; };
nThen(function (w) { var loadStorage = function (w) {
// load the store which will be used for iterating over channels // load the store which will be used for iterating over channels
// and performing operations like archival and deletion // and performing operations like archival and deletion
Store.create(config, w(function (err, _) { Store.create(config, w(function (err, _) {
@ -40,17 +69,17 @@ nThen(function (w) {
throw err; throw err;
} }
store = _; store = _;
})); // load the list of pinned files so you know which files }));
// should not be archived or deleted
Pins.load(w(function (err, _) { Store.create({
filePath: config.pinPath,
}, w(function (err, _) {
if (err) { if (err) {
w.abort(); w.abort();
return void console.error(err); throw err;
} }
pins = _; pinStore = _;
}), { }));
pinPath: config.pinPath,
});
// load the logging module so that you have a record of which // load the logging module so that you have a record of which
// files were archived or deleted at what time // files were archived or deleted at what time
@ -67,9 +96,9 @@ nThen(function (w) {
} }
blobs = _; blobs = _;
})); }));
}).nThen(function () { };
Log.info("EVICT_TIME_TO_LOAD_PINS", msSinceStart());
}).nThen(function (w) { var removeArchivedChannels = function (w) {
// this block will iterate over archived channels and removes them // this block will iterate over archived channels and removes them
// if they've been in cold storage for longer than your configured archive time // if they've been in cold storage for longer than your configured archive time
@ -79,6 +108,7 @@ nThen(function (w) {
// count the number of files which have been removed in this run // count the number of files which have been removed in this run
var removed = 0; var removed = 0;
var accounts = 0;
var handler = function (err, item, cb) { var handler = function (err, item, cb) {
if (err) { if (err) {
@ -102,7 +132,13 @@ nThen(function (w) {
return void cb(); return void cb();
} }
Log.info('EVICT_ARCHIVED_CHANNEL_REMOVAL', item.channel); Log.info('EVICT_ARCHIVED_CHANNEL_REMOVAL', item.channel);
removed++;
if (item.channel.length === 32) {
removed++;
} else if (item.channel.length === 44) {
accounts++;
}
cb(); cb();
})); }));
}; };
@ -115,10 +151,13 @@ nThen(function (w) {
return Log.error('EVICT_ARCHIVED_FINAL_ERROR', err); return Log.error('EVICT_ARCHIVED_FINAL_ERROR', err);
} }
Log.info('EVICT_ARCHIVED_CHANNELS_REMOVED', removed); Log.info('EVICT_ARCHIVED_CHANNELS_REMOVED', removed);
Log.info('EVICT_ARCHIVED_ACCOUNTS_REMOVED', accounts);
}; };
store.listArchivedChannels(handler, w(done)); store.listArchivedChannels(handler, w(done));
}).nThen(function (w) { };
var removeArchivedBlobProofs = function (w) {
if (typeof(config.archiveRetentionTime) !== "number") { return; } if (typeof(config.archiveRetentionTime) !== "number") { return; }
// Iterate over archive blob ownership proofs and remove them // Iterate over archive blob ownership proofs and remove them
// if they are older than the specified retention time // if they are older than the specified retention time
@ -128,7 +167,6 @@ nThen(function (w) {
Log.error("EVICT_BLOB_LIST_ARCHIVED_PROOF_ERROR", err); Log.error("EVICT_BLOB_LIST_ARCHIVED_PROOF_ERROR", err);
return void next(); return void next();
} }
if (pins[item.blobId]) { return void next(); }
if (item && getNewestTime(item) > retentionTime) { return void next(); } if (item && getNewestTime(item) > retentionTime) { return void next(); }
blobs.remove.archived.proof(item.safeKey, item.blobId, (function (err) { blobs.remove.archived.proof(item.safeKey, item.blobId, (function (err) {
if (err) { if (err) {
@ -142,7 +180,9 @@ nThen(function (w) {
}, w(function () { }, w(function () {
Log.info('EVICT_ARCHIVED_BLOB_PROOFS_REMOVED', removed); Log.info('EVICT_ARCHIVED_BLOB_PROOFS_REMOVED', removed);
})); }));
}).nThen(function (w) { };
var removeArchivedBlobs = function (w) {
if (typeof(config.archiveRetentionTime) !== "number") { return; } if (typeof(config.archiveRetentionTime) !== "number") { return; }
// Iterate over archived blobs and remove them // Iterate over archived blobs and remove them
// if they are older than the specified retention time // if they are older than the specified retention time
@ -152,7 +192,6 @@ nThen(function (w) {
Log.error("EVICT_BLOB_LIST_ARCHIVED_BLOBS_ERROR", err); Log.error("EVICT_BLOB_LIST_ARCHIVED_BLOBS_ERROR", err);
return void next(); return void next();
} }
if (pins[item.blobId]) { return void next(); }
if (item && getNewestTime(item) > retentionTime) { return void next(); } if (item && getNewestTime(item) > retentionTime) { return void next(); }
blobs.remove.archived.blob(item.blobId, function (err) { blobs.remove.archived.blob(item.blobId, function (err) {
if (err) { if (err) {
@ -166,7 +205,149 @@ nThen(function (w) {
}, w(function () { }, w(function () {
Log.info('EVICT_ARCHIVED_BLOBS_REMOVED', removed); Log.info('EVICT_ARCHIVED_BLOBS_REMOVED', removed);
})); }));
}).nThen(function (w) { };
var categorizeChannelsByActivity = function (w) {
var channels = 0;
var active = 0;
var handler = function (err, item, cb) {
channels++;
if (err) {
Log.error('EVICT_CHANNEL_CATEGORIZATION', err);
return void cb();
}
// if the channel has been modified recently
// we don't use mtime because we don't want to count access to the file, just modifications
if (+new Date(item.mtime) > inactiveTime) {
// add it to the set of activeDocs
activeDocs.add(item.channel);
active++;
return void cb();
}
return void cb();
};
var done = function () {
Log.info('EVICT_CHANNELS_CATEGORIZED', {
active: active,
channels: channels,
});
};
store.listChannels(handler, w(done));
};
var categorizeBlobsByActivity = function (w) {
var n_blobs = 0;
var active = 0;
blobs.list.blobs(function (err, item, next) {
n_blobs++;
if (err) {
Log.error("EVICT_BLOB_CATEGORIZATION", err);
return void next();
}
if (!item) {
next();
return void Log.error("EVICT_BLOB_CATEGORIZATION_INVALID", item);
}
if (getNewestTime(item) > inactiveTime) {
activeDocs.add(item.blobId);
active++;
return void next();
}
next();
}, w(function () {
Log.info('EVICT_BLOBS_CATEGORIZED', {
active: active,
blobs: n_blobs,
});
}));
};
var categorizeAccountsByActivity = function (w) {
// iterate over all accounts
var accounts = 0;
var inactive = 0;
var accountRetentionTime;
if (typeof(config.accountRetentionTime) === 'number' && config.accountRetentionTime > 0) {
accountRetentionTime = +new Date() - (24 * 3600 * 1000 * config.accountRetentionTime);
} else {
accountRetentionTime = -1;
}
var pinAll = function (pinList) {
pinList.forEach(function (docId) {
pinnedDocs.add(docId);
});
};
var handler;
if (accountRetentionTime < 0) {
// this means we'll retain all accounts
// so the pin log handler can be very simple
handler = function (content, id, next) {
pinAll(Object.keys(content.pins));
next();
};
} else {
// otherwise, we'll only retain data from active accounts
// so we need more heuristics
handler = function (content, id, next) {
accounts++;
//console.log(content, id);
var mtime = content.latest;
var pinList = Object.keys(content.pins);
// if their pin log has changed recently then consider them active
if (mtime && mtime > accountRetentionTime) {
// the account is active
pinAll(pinList);
return void next();
}
// otherwise iterate over their pinned documents until you find one that has been active
if (Object.keys(content.pins).some(function (docId) {
return !activeDocs.test(docId);
})) {
// add active accounts' pinned documents to a second bloom filter
pinAll(pinList);
return void next();
}
// if none are active then archive the pin log
pinStore.archiveChannel(id, function (err) {
console.log(inactive++);
if (err) {
Log.error('EVICT_INACTIVE_ACCOUNT_PIN_LOG', err);
return void next();
}
Log.info('EVICT_INACTIVE_ACCOUNT_LOG', id);
next();
});
};
}
var done = function () {
Log.info('EVICT_INACTIVE_ACCOUNTS', {
accounts: accounts,
inactive: inactive,
});
};
Pins.load(w(done), {
pinPath: config.pinPath,
handler: handler,
});
};
var archiveInactiveBlobs = function (w) {
// iterate over blobs and remove them // iterate over blobs and remove them
// if they have not been accessed within the specified retention time // if they have not been accessed within the specified retention time
var removed = 0; var removed = 0;
@ -179,8 +360,8 @@ nThen(function (w) {
next(); next();
return void Log.error('EVICT_BLOB_LIST_BLOBS_NO_ITEM', item); return void Log.error('EVICT_BLOB_LIST_BLOBS_NO_ITEM', item);
} }
if (pins[item.blobId]) { return void next(); } if (pinnedDocs.test(item.blobId)) { return void next(); }
if (getNewestTime(item) > inactiveTime) { return void next(); } if (activeDocs.test(item.blobId)) { return void next(); }
blobs.archive.blob(item.blobId, function (err) { blobs.archive.blob(item.blobId, function (err) {
if (err) { if (err) {
@ -199,7 +380,9 @@ nThen(function (w) {
}, w(function () { }, w(function () {
Log.info('EVICT_BLOBS_REMOVED', removed); Log.info('EVICT_BLOBS_REMOVED', removed);
})); }));
}).nThen(function (w) { };
var archiveInactiveBlobProofs = function (w) {
// iterate over blob proofs and remove them // iterate over blob proofs and remove them
// if they don't correspond to a pinned or active file // if they don't correspond to a pinned or active file
var removed = 0; var removed = 0;
@ -212,7 +395,7 @@ nThen(function (w) {
next(); next();
return void Log.error('EVICT_BLOB_LIST_PROOFS_NO_ITEM', item); return void Log.error('EVICT_BLOB_LIST_PROOFS_NO_ITEM', item);
} }
if (pins[item.blobId]) { return void next(); } if (pinnedDocs.test(item.blobId)) { return void next(); }
if (getNewestTime(item) > inactiveTime) { return void next(); } if (getNewestTime(item) > inactiveTime) { return void next(); }
nThen(function (w) { nThen(function (w) {
blobs.size(item.blobId, w(function (err, size) { blobs.size(item.blobId, w(function (err, size) {
@ -239,7 +422,9 @@ nThen(function (w) {
}, w(function () { }, w(function () {
Log.info("EVICT_BLOB_PROOFS_REMOVED", removed); Log.info("EVICT_BLOB_PROOFS_REMOVED", removed);
})); }));
}).nThen(function (w) { };
var archiveInactiveChannels = function (w) {
var channels = 0; var channels = 0;
var archived = 0; var archived = 0;
@ -265,11 +450,11 @@ nThen(function (w) {
})); }));
} }
// bail out if the channel was modified recently // bail out if the channel is in the set of activeDocs
if (+new Date(item.mtime) > inactiveTime) { return void cb(); } if (activeDocs.test(item.channel)) { return void cb(); }
// ignore the channel if it's pinned // ignore the channel if it's pinned
if (pins[item.channel]) { return void cb(); } if (pinnedDocs.test(item.channel)) { return void cb(); }
return void store.archiveChannel(item.channel, w(function (err) { return void store.archiveChannel(item.channel, w(function (err) {
if (err) { if (err) {
@ -289,12 +474,38 @@ nThen(function (w) {
return void Log.info('EVICT_CHANNELS_ARCHIVED', archived); return void Log.info('EVICT_CHANNELS_ARCHIVED', archived);
}; };
store.listChannels(handler, w(done)); store.listChannels(handler, w(done), true); // using a hacky "fast mode" since we only need the channel id
}).nThen(function () { };
nThen(loadStorage)
.nThen(function () {
Log.info("EVICT_TIME_TO_LOAD_PINS", msSinceStart());
})
.nThen(removeArchivedChannels)
.nThen(removeArchivedBlobProofs)
.nThen(removeArchivedBlobs)
// iterate over all documents and add them to a bloom filter if they have been active
.nThen(categorizeChannelsByActivity)
.nThen(categorizeBlobsByActivity)
// iterate over all accounts and add them to a bloom filter if they are active
.nThen(categorizeAccountsByActivity)
// iterate again and archive inactive unpinned documents
// (documents which are not in either bloom filter)
.nThen(archiveInactiveBlobs)
.nThen(archiveInactiveBlobProofs)
.nThen(archiveInactiveChannels)
.nThen(function () {
Log.info("EVICT_TIME_TO_RUN_SCRIPT", msSinceStart()); Log.info("EVICT_TIME_TO_RUN_SCRIPT", msSinceStart());
}).nThen(function () { }).nThen(function () {
// the store will keep this script running if you don't shut it down // the store will keep this script running if you don't shut it down
store.shutdown(); store.shutdown();
Log.shutdown(); Log.shutdown();
pinStore.shutdown();
process.exit();
}); });

Loading…
Cancel
Save