From 4902554a613e81651d409b7e913e31ce198f4f25 Mon Sep 17 00:00:00 2001
From: ansuz <ansuz@transitiontech.ca>
Date: Wed, 4 Sep 2019 11:51:33 +0200
Subject: [PATCH] implement per-channel fifo queues for metadata and channel
 writes

---
 historyKeeper.js   | 126 ++++++++++++++++++---------------------------
 lib/write-queue.js |  40 ++++++++++++++
 rpc.js             |  37 +++++++------
 3 files changed, 107 insertions(+), 96 deletions(-)
 create mode 100644 lib/write-queue.js

diff --git a/historyKeeper.js b/historyKeeper.js
index 5ceeb174c..0b4ce7ba1 100644
--- a/historyKeeper.js
+++ b/historyKeeper.js
@@ -7,6 +7,7 @@ const Nacl = require('tweetnacl');
 const Crypto = require('crypto');
 const Once = require("./lib/once");
 const Meta = require("./lib/metadata");
+const WriteQueue = require("./lib/write-queue");
 
 let Log;
 const now = function () { return (new Date()).getTime(); };
@@ -302,88 +303,59 @@ module.exports.create = function (cfg) {
         * the fix is to use callbacks and implement queueing for writes
           * to guarantee that offset computation is always atomic with writes
     */
-    const storageQueues = {};
-
-    const storeQueuedMessage = function (ctx, queue, id) {
-        if (queue.length === 0) {
-            delete storageQueues[id];
-            return;
-        }
-
-        const first = queue.shift();
-
-        const msgBin = first.msg;
-        const optionalMessageHash = first.hash;
-        const isCp = first.isCp;
-
-        // Store the message first, and update the index only once it's stored.
-        // store.messageBin can be async so updating the index first may
-        // result in a wrong cpIndex
-        nThen((waitFor) => {
-            store.messageBin(id, msgBin, waitFor(function (err) {
-                if (err) {
-                    waitFor.abort();
-                    Log.error("HK_STORE_MESSAGE_ERROR", err.message);
-
-                    // this error is critical, but there's not much we can do at the moment
-                    // proceed with more messages, but they'll probably fail too
-                    // at least you won't have a memory leak
-
-                    // TODO make it possible to respond to clients with errors so they know
-                    // their message wasn't stored
-                    storeQueuedMessage(ctx, queue, id);
-                    return;
-                }
-            }));
-        }).nThen((waitFor) => {
-            getIndex(ctx, id, waitFor((err, index) => {
-                if (err) {
-                    Log.warn("HK_STORE_MESSAGE_INDEX", err.stack);
-                    // non-critical, we'll be able to get the channel index later
-
-                    // proceed to the next message in the queue
-                    storeQueuedMessage(ctx, queue, id);
-                    return;
-                }
-                if (typeof (index.line) === "number") { index.line++; }
-                if (isCp) {
-                    index.cpIndex = sliceCpIndex(index.cpIndex, index.line || 0);
-                    for (let k in index.offsetByHash) {
-                        if (index.offsetByHash[k] < index.cpIndex[0]) {
-                            delete index.offsetByHash[k];
-                        }
-                    }
-                    index.cpIndex.push(({
-                        offset: index.size,
-                        line: ((index.line || 0) + 1)
-                    } /*:cp_index_item*/));
-                }
-                if (optionalMessageHash) { index.offsetByHash[optionalMessageHash] = index.size; }
-                index.size += msgBin.length;
-
-                // handle the next element in the queue
-                storeQueuedMessage(ctx, queue, id);
-            }));
-        });
-    };
+    const queueStorage = WriteQueue();
 
     const storeMessage = function (ctx, channel, msg, isCp, optionalMessageHash) {
         const id = channel.id;
-
         const msgBin = new Buffer(msg + '\n', 'utf8');
-        if (Array.isArray(storageQueues[id])) {
-            return void storageQueues[id].push({
-                msg: msgBin,
-                hash: optionalMessageHash,
-                isCp: isCp,
-            });
-        }
 
-        const queue = storageQueues[id] = (storageQueues[id] || [{
-            msg: msgBin,
-            hash: optionalMessageHash,
-        }]);
-        storeQueuedMessage(ctx, queue, id);
+        queueStorage(id, function (next) {
+            // Store the message first, and update the index only once it's stored.
+            // store.messageBin can be async so updating the index first may
+            // result in a wrong cpIndex
+            nThen((waitFor) => {
+                store.messageBin(id, msgBin, waitFor(function (err) {
+                    if (err) {
+                        waitFor.abort();
+                        Log.error("HK_STORE_MESSAGE_ERROR", err.message);
+
+                        // this error is critical, but there's not much we can do at the moment
+                        // proceed with more messages, but they'll probably fail too
+                        // at least you won't have a memory leak
+
+                        // TODO make it possible to respond to clients with errors so they know
+                        // their message wasn't stored
+                        return void next();
+                    }
+                }));
+            }).nThen((waitFor) => {
+                getIndex(ctx, id, waitFor((err, index) => {
+                    if (err) {
+                        Log.warn("HK_STORE_MESSAGE_INDEX", err.stack);
+                        // non-critical, we'll be able to get the channel index later
+                        return void next();
+                    }
+                    if (typeof (index.line) === "number") { index.line++; }
+                    if (isCp) {
+                        index.cpIndex = sliceCpIndex(index.cpIndex, index.line || 0);
+                        for (let k in index.offsetByHash) {
+                            if (index.offsetByHash[k] < index.cpIndex[0]) {
+                                delete index.offsetByHash[k];
+                            }
+                        }
+                        index.cpIndex.push(({
+                            offset: index.size,
+                            line: ((index.line || 0) + 1)
+                        } /*:cp_index_item*/));
+                    }
+                    if (optionalMessageHash) { index.offsetByHash[optionalMessageHash] = index.size; }
+                    index.size += msgBin.length;
+
+                    // handle the next element in the queue
+                    next();
+                }));
+            });
+        });
     };
 
     var CHECKPOINT_PATTERN = /^cp\|(([A-Za-z0-9+\/=]+)\|)?/;
diff --git a/lib/write-queue.js b/lib/write-queue.js
new file mode 100644
index 000000000..c1b64ebaf
--- /dev/null
+++ b/lib/write-queue.js
@@ -0,0 +1,40 @@
+/*
+var q = Queue();
+q(id, function (next) {
+    // whatever you need to do....
+
+    // when you're done
+    next();
+});
+*/
+
+var fix1 = function (f, x) {
+    return function () { f(x); };
+};
+
+module.exports = function () {
+    var map = {};
+
+    var next = function (id) {
+        if (map[id] && map[id].length === 0) { return void delete map[id]; }
+        var task = map[id].shift();
+        task(fix1(next, id));
+    };
+
+    return function (id, task) {
+        // support initialization with just a function
+        if (typeof(id) === 'function' && typeof(task) === 'undefined') {
+            task = id;
+            id = '';
+        }
+        // ...but you really need to pass a function
+        if (typeof(task) !== 'function') { throw new Error("Expected function"); }
+
+        // if the intended queue already has tasks in progress, add this one to the end of the queue
+        if (map[id]) { return void map[id].push(task); }
+
+        // otherwise create a queue containing the given task
+        map[id] = [task];
+        next(id);
+    };
+};
diff --git a/rpc.js b/rpc.js
index fd0c11f35..70250beca 100644
--- a/rpc.js
+++ b/rpc.js
@@ -18,7 +18,7 @@ const nThen = require("nthen");
 const getFolderSize = require("get-folder-size");
 const Pins = require("./lib/pins");
 const Meta = require("./lib/metadata");
-
+const WriteQueue = require("./lib/write-queue");
 
 var RPC = module.exports;
 
@@ -340,8 +340,7 @@ var getMetadata = function (Env, channel, cb) {
         value: value
     }
 */
-// XXX global saferphore may cause issues here, a queue "per channel" is probably better
-var metadataSem = Saferphore.create(1);
+var queueMetadata = WriteQueue();
 var setMetadata = function (Env, data, unsafeKey, cb) {
     var channel = data.channel;
     var command = data.command;
@@ -349,16 +348,15 @@ var setMetadata = function (Env, data, unsafeKey, cb) {
     if (!command || typeof (command) !== 'string') { return void cb ('INVALID_COMMAND'); }
     if (Meta.commands.indexOf(command) === -1) { return void('UNSUPPORTED_COMMAND'); }
 
-    metadataSem.take(function (give) {
-        var g = give();
+    queueMetadata(channel, function (next) {
         getMetadata(Env, channel, function (err, metadata) {
             if (err) {
-                g();
-                return void cb(err);
+                cb(err);
+                return void next();
             }
             if (!(metadata && Array.isArray(metadata.owners))) {
-                g();
-                return void cb('E_NO_OWNERS');
+                cb('E_NO_OWNERS');
+                return void next();
             }
 
             // Confirm that the channel is owned by the user in question
@@ -372,13 +370,13 @@ var setMetadata = function (Env, data, unsafeKey, cb) {
                         || !Array.isArray(data.value)
                         || data.value.length !== 1
                         || data.value[0] !== unsafeKey) {
-                    g();
-                    return void cb('INSUFFICIENT_PERMISSIONS');
+                    cb('INSUFFICIENT_PERMISSIONS');
+                    return void next();
                 }
 
             } else if (metadata.owners.indexOf(unsafeKey) === -1) {
-                g();
-                return void cb('INSUFFICIENT_PERMISSIONS');
+                cb('INSUFFICIENT_PERMISSIONS');
+                return void next();
             }
 
             // Add the new metadata line
@@ -387,22 +385,23 @@ var setMetadata = function (Env, data, unsafeKey, cb) {
             try {
                 changed = Meta.handleCommand(metadata, line);
             } catch (e) {
-                g();
-                return void cb(e);
+                cb(e);
+                return void next();
             }
 
             // if your command is valid but it didn't result in any change to the metadata,
             // call back now and don't write any "useless" line to the log
             if (!changed) {
-                g();
-                return void cb(void 0, metadata);
+                cb(void 0, metadata);
+                return void next();
             }
             Env.msgStore.writeMetadata(channel, JSON.stringify(line), function (e) {
-                g();
                 if (e) {
-                    return void cb(e);
+                    cb(e);
+                    return void next();
                 }
                 cb(void 0, metadata);
+                next();
             });
         });
     });