|
|
|
@ -229,9 +229,10 @@ module.exports.create = function (cfg) {
|
|
|
|
|
as an added bonus:
|
|
|
|
|
if the channel exists but its index does not then it caches the index
|
|
|
|
|
*/
|
|
|
|
|
const indexQueue = {};
|
|
|
|
|
const indexQueues = {};
|
|
|
|
|
const getIndex = (ctx, channelName, cb) => {
|
|
|
|
|
const chan = ctx.channels[channelName];
|
|
|
|
|
// if there is a channel in memory and it has an index cached, return it
|
|
|
|
|
if (chan && chan.index) {
|
|
|
|
|
// enforce async behaviour
|
|
|
|
|
return void setTimeout(function () {
|
|
|
|
@ -239,38 +240,35 @@ module.exports.create = function (cfg) {
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// make a queue of callbacks in case getIndex is called before you
|
|
|
|
|
// compute and cache the index
|
|
|
|
|
|
|
|
|
|
// if a call to computeIndex is already in progress for this channel
|
|
|
|
|
// then add the callback for the latest invocation to the queue
|
|
|
|
|
// and wait for it to complete
|
|
|
|
|
if (indexQueue[channelName]) {
|
|
|
|
|
indexQueue.channelName.push(cb);
|
|
|
|
|
if (Array.isArray(indexQueues[channelName])) {
|
|
|
|
|
indexQueues[channelName].push(cb);
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// if computeIndex is not already in progress, make an array
|
|
|
|
|
// with the current call first in line, so additional calls
|
|
|
|
|
// can add their callbacks to the queue
|
|
|
|
|
indexQueue[channelName] = (indexQueue[channelName] || []).push(cb);
|
|
|
|
|
// otherwise, make a queue for any 'getIndex' calls made before the following 'computeIndex' call completes
|
|
|
|
|
var queue = indexQueues[channelName] = (indexQueues[channelName] || [cb]);
|
|
|
|
|
|
|
|
|
|
computeIndex(channelName, (err, ret) => {
|
|
|
|
|
// keep a local reference to this channel's queue
|
|
|
|
|
let queue = indexQueue[channelName];
|
|
|
|
|
if (!Array.isArray(queue)) {
|
|
|
|
|
// something is very wrong if there's no callback array
|
|
|
|
|
return void Log.error("E_INDEX_NO_CALLBACK", channelName);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// but remove it from the map of queues and not worry about cleaning up
|
|
|
|
|
// as long as computeIndex always calls back this won't be a memory leak
|
|
|
|
|
delete indexQueue[channelName];
|
|
|
|
|
// clean up the queue that you're about to handle, but keep a local copy
|
|
|
|
|
delete indexQueues[channelName];
|
|
|
|
|
|
|
|
|
|
// this is most likely an unrecoverable filesystem error
|
|
|
|
|
if (err) {
|
|
|
|
|
// call back every pending function with the error
|
|
|
|
|
return void queue.forEach(function (_cb) {
|
|
|
|
|
_cb(err);
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
// if there's a channel to use as a cache, store the result
|
|
|
|
|
// for future use
|
|
|
|
|
// cache the computed result if possible
|
|
|
|
|
if (chan) { chan.index = ret; }
|
|
|
|
|
|
|
|
|
|
// call back every pending function with the result
|
|
|
|
|