diff --git a/historyKeeper.js b/historyKeeper.js index 5ceeb174c..0b4ce7ba1 100644 --- a/historyKeeper.js +++ b/historyKeeper.js @@ -7,6 +7,7 @@ const Nacl = require('tweetnacl'); const Crypto = require('crypto'); const Once = require("./lib/once"); const Meta = require("./lib/metadata"); +const WriteQueue = require("./lib/write-queue"); let Log; const now = function () { return (new Date()).getTime(); }; @@ -302,88 +303,59 @@ module.exports.create = function (cfg) { * the fix is to use callbacks and implement queueing for writes * to guarantee that offset computation is always atomic with writes */ - const storageQueues = {}; + const queueStorage = WriteQueue(); - const storeQueuedMessage = function (ctx, queue, id) { - if (queue.length === 0) { - delete storageQueues[id]; - return; - } - - const first = queue.shift(); - - const msgBin = first.msg; - const optionalMessageHash = first.hash; - const isCp = first.isCp; - - // Store the message first, and update the index only once it's stored. - // store.messageBin can be async so updating the index first may - // result in a wrong cpIndex - nThen((waitFor) => { - store.messageBin(id, msgBin, waitFor(function (err) { - if (err) { - waitFor.abort(); - Log.error("HK_STORE_MESSAGE_ERROR", err.message); - - // this error is critical, but there's not much we can do at the moment - // proceed with more messages, but they'll probably fail too - // at least you won't have a memory leak - - // TODO make it possible to respond to clients with errors so they know - // their message wasn't stored - storeQueuedMessage(ctx, queue, id); - return; - } - })); - }).nThen((waitFor) => { - getIndex(ctx, id, waitFor((err, index) => { - if (err) { - Log.warn("HK_STORE_MESSAGE_INDEX", err.stack); - // non-critical, we'll be able to get the channel index later + const storeMessage = function (ctx, channel, msg, isCp, optionalMessageHash) { + const id = channel.id; + const msgBin = new Buffer(msg + '\n', 'utf8'); - // proceed to the next message in the queue - storeQueuedMessage(ctx, queue, id); - return; - } - if (typeof (index.line) === "number") { index.line++; } - if (isCp) { - index.cpIndex = sliceCpIndex(index.cpIndex, index.line || 0); - for (let k in index.offsetByHash) { - if (index.offsetByHash[k] < index.cpIndex[0]) { - delete index.offsetByHash[k]; + queueStorage(id, function (next) { + // Store the message first, and update the index only once it's stored. + // store.messageBin can be async so updating the index first may + // result in a wrong cpIndex + nThen((waitFor) => { + store.messageBin(id, msgBin, waitFor(function (err) { + if (err) { + waitFor.abort(); + Log.error("HK_STORE_MESSAGE_ERROR", err.message); + + // this error is critical, but there's not much we can do at the moment + // proceed with more messages, but they'll probably fail too + // at least you won't have a memory leak + + // TODO make it possible to respond to clients with errors so they know + // their message wasn't stored + return void next(); + } + })); + }).nThen((waitFor) => { + getIndex(ctx, id, waitFor((err, index) => { + if (err) { + Log.warn("HK_STORE_MESSAGE_INDEX", err.stack); + // non-critical, we'll be able to get the channel index later + return void next(); + } + if (typeof (index.line) === "number") { index.line++; } + if (isCp) { + index.cpIndex = sliceCpIndex(index.cpIndex, index.line || 0); + for (let k in index.offsetByHash) { + if (index.offsetByHash[k] < index.cpIndex[0]) { + delete index.offsetByHash[k]; + } } + index.cpIndex.push(({ + offset: index.size, + line: ((index.line || 0) + 1) + } /*:cp_index_item*/)); } - index.cpIndex.push(({ - offset: index.size, - line: ((index.line || 0) + 1) - } /*:cp_index_item*/)); - } - if (optionalMessageHash) { index.offsetByHash[optionalMessageHash] = index.size; } - index.size += msgBin.length; - - // handle the next element in the queue - storeQueuedMessage(ctx, queue, id); - })); - }); - }; + if (optionalMessageHash) { index.offsetByHash[optionalMessageHash] = index.size; } + index.size += msgBin.length; - const storeMessage = function (ctx, channel, msg, isCp, optionalMessageHash) { - const id = channel.id; - - const msgBin = new Buffer(msg + '\n', 'utf8'); - if (Array.isArray(storageQueues[id])) { - return void storageQueues[id].push({ - msg: msgBin, - hash: optionalMessageHash, - isCp: isCp, + // handle the next element in the queue + next(); + })); }); - } - - const queue = storageQueues[id] = (storageQueues[id] || [{ - msg: msgBin, - hash: optionalMessageHash, - }]); - storeQueuedMessage(ctx, queue, id); + }); }; var CHECKPOINT_PATTERN = /^cp\|(([A-Za-z0-9+\/=]+)\|)?/; diff --git a/lib/write-queue.js b/lib/write-queue.js new file mode 100644 index 000000000..c1b64ebaf --- /dev/null +++ b/lib/write-queue.js @@ -0,0 +1,40 @@ +/* +var q = Queue(); +q(id, function (next) { + // whatever you need to do.... + + // when you're done + next(); +}); +*/ + +var fix1 = function (f, x) { + return function () { f(x); }; +}; + +module.exports = function () { + var map = {}; + + var next = function (id) { + if (map[id] && map[id].length === 0) { return void delete map[id]; } + var task = map[id].shift(); + task(fix1(next, id)); + }; + + return function (id, task) { + // support initialization with just a function + if (typeof(id) === 'function' && typeof(task) === 'undefined') { + task = id; + id = ''; + } + // ...but you really need to pass a function + if (typeof(task) !== 'function') { throw new Error("Expected function"); } + + // if the intended queue already has tasks in progress, add this one to the end of the queue + if (map[id]) { return void map[id].push(task); } + + // otherwise create a queue containing the given task + map[id] = [task]; + next(id); + }; +}; diff --git a/rpc.js b/rpc.js index fd0c11f35..70250beca 100644 --- a/rpc.js +++ b/rpc.js @@ -18,7 +18,7 @@ const nThen = require("nthen"); const getFolderSize = require("get-folder-size"); const Pins = require("./lib/pins"); const Meta = require("./lib/metadata"); - +const WriteQueue = require("./lib/write-queue"); var RPC = module.exports; @@ -340,8 +340,7 @@ var getMetadata = function (Env, channel, cb) { value: value } */ -// XXX global saferphore may cause issues here, a queue "per channel" is probably better -var metadataSem = Saferphore.create(1); +var queueMetadata = WriteQueue(); var setMetadata = function (Env, data, unsafeKey, cb) { var channel = data.channel; var command = data.command; @@ -349,16 +348,15 @@ var setMetadata = function (Env, data, unsafeKey, cb) { if (!command || typeof (command) !== 'string') { return void cb ('INVALID_COMMAND'); } if (Meta.commands.indexOf(command) === -1) { return void('UNSUPPORTED_COMMAND'); } - metadataSem.take(function (give) { - var g = give(); + queueMetadata(channel, function (next) { getMetadata(Env, channel, function (err, metadata) { if (err) { - g(); - return void cb(err); + cb(err); + return void next(); } if (!(metadata && Array.isArray(metadata.owners))) { - g(); - return void cb('E_NO_OWNERS'); + cb('E_NO_OWNERS'); + return void next(); } // Confirm that the channel is owned by the user in question @@ -372,13 +370,13 @@ var setMetadata = function (Env, data, unsafeKey, cb) { || !Array.isArray(data.value) || data.value.length !== 1 || data.value[0] !== unsafeKey) { - g(); - return void cb('INSUFFICIENT_PERMISSIONS'); + cb('INSUFFICIENT_PERMISSIONS'); + return void next(); } } else if (metadata.owners.indexOf(unsafeKey) === -1) { - g(); - return void cb('INSUFFICIENT_PERMISSIONS'); + cb('INSUFFICIENT_PERMISSIONS'); + return void next(); } // Add the new metadata line @@ -387,22 +385,23 @@ var setMetadata = function (Env, data, unsafeKey, cb) { try { changed = Meta.handleCommand(metadata, line); } catch (e) { - g(); - return void cb(e); + cb(e); + return void next(); } // if your command is valid but it didn't result in any change to the metadata, // call back now and don't write any "useless" line to the log if (!changed) { - g(); - return void cb(void 0, metadata); + cb(void 0, metadata); + return void next(); } Env.msgStore.writeMetadata(channel, JSON.stringify(line), function (e) { - g(); if (e) { - return void cb(e); + cb(e); + return void next(); } cb(void 0, metadata); + next(); }); }); });