abstract historyKeeper's batched reads implementation into a library

pull/1/head
ansuz 6 years ago
parent ec2b5dd01f
commit 9ce7cea9cc

@ -8,6 +8,7 @@ const Crypto = require('crypto');
const Once = require("./lib/once"); const Once = require("./lib/once");
const Meta = require("./lib/metadata"); const Meta = require("./lib/metadata");
const WriteQueue = require("./lib/write-queue"); const WriteQueue = require("./lib/write-queue");
const BatchRead = require("./lib/batch-read");
let Log; let Log;
const now = function () { return (new Date()).getTime(); }; const now = function () { return (new Date()).getTime(); };
@ -231,7 +232,7 @@ module.exports.create = function (cfg) {
as an added bonus: as an added bonus:
if the channel exists but its index does not then it caches the index if the channel exists but its index does not then it caches the index
*/ */
const indexQueues = {}; const batchIndexReads = BatchRead();
const getIndex = (ctx, channelName, cb) => { const getIndex = (ctx, channelName, cb) => {
const chan = ctx.channels[channelName]; const chan = ctx.channels[channelName];
// if there is a channel in memory and it has an index cached, return it // if there is a channel in memory and it has an index cached, return it
@ -242,40 +243,14 @@ module.exports.create = function (cfg) {
}); });
} }
// if a call to computeIndex is already in progress for this channel batchIndexReads(channelName, cb, function (done) {
// then add the callback for the latest invocation to the queue computeIndex(channelName, (err, ret) => {
// and wait for it to complete // this is most likely an unrecoverable filesystem error
if (Array.isArray(indexQueues[channelName])) { if (err) { return void done(err); }
indexQueues[channelName].push(cb); // cache the computed result if possible
return; if (chan) { chan.index = ret; }
} // return
done(void 0, ret);
// otherwise, make a queue for any 'getIndex' calls made before the following 'computeIndex' call completes
var queue = indexQueues[channelName] = (indexQueues[channelName] || [cb]);
computeIndex(channelName, (err, ret) => {
if (!Array.isArray(queue)) {
// something is very wrong if there's no callback array
return void Log.error("E_INDEX_NO_CALLBACK", channelName);
}
// clean up the queue that you're about to handle, but keep a local copy
delete indexQueues[channelName];
// this is most likely an unrecoverable filesystem error
if (err) {
// call back every pending function with the error
return void queue.forEach(function (_cb) {
_cb(err);
});
}
// cache the computed result if possible
if (chan) { chan.index = ret; }
// call back every pending function with the result
queue.forEach(function (_cb) {
_cb(void 0, ret);
}); });
}); });
}; };

@ -0,0 +1,58 @@
/*
## Purpose
To avoid running expensive IO or computation concurrently.
If the result of IO or computation is requested while an identical request
is already in progress, wait until the first one completes and provide its
result to every routine that requested it.
## Usage
Provide:
1. a named key for the computation or resource,
2. a callback to handle the result
3. an implementation which calls back with the result
```
var batch = Batch();
var read = function (path, cb) {
batch(path, cb, function (done) {
console.log("reading %s", path);
fs.readFile(path, 'utf8', done);
});
};
read('./pewpew.txt', function (err, data) {
if (err) { return void console.error(err); }
console.log(data);
});
read('./pewpew.txt', function (err, data) {
if (err) { return void console.error(err); }
console.log(data);
});
```
*/
module.exports = function () {
var map = {};
return function (id, cb, impl) {
if (typeof(cb) !== 'function' || typeof(impl) !== 'function') {
throw new Error("expected callback and implementation");
}
if (map[id]) { return void map[id].push(cb); }
map[id] = [cb];
impl(function () {
var args = Array.prototype.slice.call(arguments);
map[id].forEach(function (h) {
h.apply(null, args);
});
delete map[id];
});
};
};
Loading…
Cancel
Save