don't continue iterating in listChannels until a the current task has called back

pull/1/head
ansuz 6 years ago
parent 2605d27e3f
commit cb0c4ee944

@ -51,97 +51,115 @@ nThen(function (w) {
// count the number of files which have been removed in this run // count the number of files which have been removed in this run
var removed = 0; var removed = 0;
var handler = function (err, item) { var handler = function (err, item, cb) {
if (err) { return void Log.error('EVICT_ARCHIVED_CHANNEL_ITERATION', err); } if (err) {
Log.error('EVICT_ARCHIVED_CHANNEL_ITERATION', err);
return void cb();
}
// don't mess with files that are freshly stored in cold storage // don't mess with files that are freshly stored in cold storage
// based on ctime because that's changed when the file is moved... // based on ctime because that's changed when the file is moved...
if (+new Date(item.ctime) > retentionTime) { return; } if (+new Date(item.ctime) > retentionTime) {
return void cb();
}
// but if it's been stored for the configured time... // but if it's been stored for the configured time...
// expire it // expire it
store.removeArchivedChannel(item.channel, w(function (err) { store.removeArchivedChannel(item.channel, w(function (err) {
if (err) { if (err) {
return void Log.error('EVICT_ARCHIVED_CHANNEL_REMOVAL_ERROR', { Log.error('EVICT_ARCHIVED_CHANNEL_REMOVAL_ERROR', {
error: err, error: err,
channel: item.channel, channel: item.channel,
}); });
return void cb();
} }
Log.info('EVICT_ARCHIVED_CHANNEL_REMOVAL', item.channel); Log.info('EVICT_ARCHIVED_CHANNEL_REMOVAL', item.channel);
removed++; removed++;
cb();
})); }));
}; };
// if you hit an error, log it // if you hit an error, log it
// otherwise, when there are no more channels to process // otherwise, when there are no more channels to process
// log some stats about how many were removed // log some stats about how many were removed
var cb = function (err) { var done = function (err) {
if (err) { if (err) {
return Log.error('EVICT_ARCHIVED_FINAL_ERROR', err); return Log.error('EVICT_ARCHIVED_FINAL_ERROR', err);
} }
Log.info('EVICT_ARCHIVED_CHANNELS_REMOVED', removed); Log.info('EVICT_ARCHIVED_CHANNELS_REMOVED', removed);
}; };
store.listArchivedChannels(handler, w(cb)); store.listArchivedChannels(handler, w(done));
}).nThen(function (w) { }).nThen(function (w) {
var removed = 0; var removed = 0;
var channels = 0; var channels = 0;
var archived = 0; var archived = 0;
var handler = function (err, item) { var handler = function (err, item, cb) {
channels++; channels++;
if (err) { return void Log.error('EVICT_CHANNEL_ITERATION', err); } if (err) {
Log.error('EVICT_CHANNEL_ITERATION', err);
return void cb();
}
// check if the database has any ephemeral channels // check if the database has any ephemeral channels
// if it does it's because of a bug, and they should be removed // if it does it's because of a bug, and they should be removed
if (item.channel.length === 34) { if (item.channel.length === 34) {
return void store.removeChannel(item.channel, w(function (err) { return void store.removeChannel(item.channel, w(function (err) {
if (err) { if (err) {
return void Log.error('EVICT_EPHEMERAL_CHANNEL_REMOVAL_ERROR', { Log.error('EVICT_EPHEMERAL_CHANNEL_REMOVAL_ERROR', {
error: err, error: err,
channel: item.channel, channel: item.channel,
}); });
return void cb();
} }
Log.info('EVICT_EPHEMERAL_CHANNEL_REMOVAL', item.channel); Log.info('EVICT_EPHEMERAL_CHANNEL_REMOVAL', item.channel);
})); }));
} }
// bail out if the channel was modified recently // bail out if the channel was modified recently
if (+new Date(item.mtime) > inactiveTime) { return; } if (+new Date(item.mtime) > inactiveTime) { return void cb(); }
// ignore the channel if it's pinned // ignore the channel if it's pinned
if (pins[item.channel]) { return; } if (pins[item.channel]) { return void cb(); }
// if the server is configured to retain data, archive the channel // if the server is configured to retain data, archive the channel
if (config.retainData) { if (config.retainData) {
store.archiveChannel(item.channel, w(function (err) { return void store.archiveChannel(item.channel, w(function (err) {
if (err) { return void Log.error('EVICT_CHANNEL_ARCHIVAL_ERROR', { if (err) {
error: err, Log.error('EVICT_CHANNEL_ARCHIVAL_ERROR', {
channel: item.channel, error: err,
}); } channel: item.channel,
});
return void cb();
}
Log.info('EVICT_CHANNEL_ARCHIVAL', item.channel); Log.info('EVICT_CHANNEL_ARCHIVAL', item.channel);
archived++; archived++;
cb();
})); }));
return;
} }
// otherwise remove it // otherwise remove it
store.removeChannel(item.channel, w(function (err) { store.removeChannel(item.channel, w(function (err) {
if (err) { return void Log.error('EVICT_CHANNEL_REMOVAL_ERROR', { if (err) {
error: err, Log.error('EVICT_CHANNEL_REMOVAL_ERROR', {
channel: item.channel, error: err,
}); } channel: item.channel,
});
return void cb();
}
Log.info('EVICT_CHANNEL_REMOVAL', item.channel); Log.info('EVICT_CHANNEL_REMOVAL', item.channel);
removed++; removed++;
cb();
})); }));
}; };
var cb = function () { var done = function () {
if (config.retainData) { if (config.retainData) {
return void Log.info('EVICT_CHANNELS_ARCHIVED', archived); return void Log.info('EVICT_CHANNELS_ARCHIVED', archived);
} }
return void Log.info('EVICT_CHANNELS_REMOVED', removed); return void Log.info('EVICT_CHANNELS_REMOVED', removed);
}; };
store.listChannels(handler, w(cb)); store.listChannels(handler, w(done));
}).nThen(function () { }).nThen(function () {
// the store will keep this script running if you don't shut it down // the store will keep this script running if you don't shut it down
store.shutdown(); store.shutdown();

@ -0,0 +1,75 @@
var nThen = require("nthen");
var Store = require("../storage/file");
var Pinned = require("./pinned");
var config = require("../lib/load-config");
var store;
var pins;
var Log;
nThen(function (w) {
// load the store which will be used for iterating over channels
// and performing operations like archival and deletion
Store.create(config, w(function (_) {
store = _;
})); // load the list of pinned files so you know which files
// should not be archived or deleted
Pinned.load(w(function (err, _) {
if (err) {
w.abort();
return void console.error(err);
}
pins = _;
}), {
pinPath: config.pinPath,
});
// load the logging module so that you have a record of which
// files were archived or deleted at what time
var Logger = require("../lib/log");
Logger.create(config, w(function (_) {
Log = _;
}));
}).nThen(function (w) {
// count the number of files which have been restored in this run
var restored = 0;
var handler = function (err, item, cb) {
if (err) {
Log.error('RESTORE_ARCHIVED_CHANNEL_ITERATION', err);
return void cb();
}
// but if it's been stored for the configured time...
// expire it
store.removeArchivedChannel(item.channel, w(function (err) {
if (err) {
Log.error('RESTORE_ARCHIVED_CHANNEL_RESTORATION_ERROR', {
error: err,
channel: item.channel,
});
return void cb();
}
Log.info('RESTORE_ARCHIVED_CHANNEL_RESTORATION', item.channel);
restored++;
cb();
}));
};
// if you hit an error, log it
// otherwise, when there are no more channels to process
// log some stats about how many were removed
var done = function (err) {
if (err) {
return Log.error('RESTORE_ARCHIVED_FINAL_ERROR', err);
}
Log.info('RESTORE_ARCHIVED_CHANNELS_RESTORED', restored);
};
store.listArchivedChannels(handler, w(done));
}).nThen(function () {
// the store will keep this script running if you don't shut it down
store.shutdown();
Log.shutdown();
});

@ -253,7 +253,8 @@ var listChannels = function (root, handler, cb) {
// otherwise throw it on the pile // otherwise throw it on the pile
sema.take(function (give) { sema.take(function (give) {
Fs.stat(filepath, w(give(function (err, stats) { var next = give();
Fs.stat(filepath, w(function (err, stats) {
if (err) { if (err) {
return void handler(err); return void handler(err);
} }
@ -264,8 +265,8 @@ var listChannels = function (root, handler, cb) {
mtime: stats.mtime, mtime: stats.mtime,
ctime: stats.ctime, ctime: stats.ctime,
size: stats.size, size: stats.size,
}); }, next);
}))); }));
}); });
}); });
}))); })));

Loading…
Cancel
Save