From 7b55df5931f47d3f8e2cf7d2f876d2ca67645543 Mon Sep 17 00:00:00 2001
From: ansuz <ansuz@transitiontech.ca>
Date: Tue, 23 Apr 2019 13:26:45 +0200
Subject: [PATCH] rewrite task execution as API instead of a script

---
 scripts/expire-channels.js | 136 +++++++-------------------
 storage/tasks.js           | 191 +++++++++++++++++++++++++++++++++++--
 2 files changed, 218 insertions(+), 109 deletions(-)

diff --git a/scripts/expire-channels.js b/scripts/expire-channels.js
index 5c87c7cea..2479fc193 100644
--- a/scripts/expire-channels.js
+++ b/scripts/expire-channels.js
@@ -1,113 +1,43 @@
-var Fs = require("fs");
-var Path = require("path");
-
 var nThen = require("nthen");
+var Tasks = require("../storage/tasks");
+var Logger = require("../lib/log");
 
 var config = require("../lib/load-config");
-
 var FileStorage = require('../' + config.storage || './storage/file');
-var root = Path.resolve('../' + config.taskPath || './tasks');
 
-var dirs;
-var nt;
-var store;
-
-var queue = function (f) {
-    nt = nt.nThen(f);
-};
-
-var tryParse = function (s) {
-    try { return JSON.parse(s); }
-    catch (e) { return null; }
-};
-
-var CURRENT = +new Date();
-
-var handleTask = function (str, path, cb) {
-    var task = tryParse(str);
-    if (!Array.isArray(task)) {
-        console.error('invalid task: not array');
-        return cb();
-    }
-    if (task.length < 2) {
-        console.error('invalid task: too small');
-        return cb();
-    }
-
-    var time = task[0];
-    var command = task[1];
-    var args = task.slice(2);
-
-    if (time > CURRENT) {
-        // not time for this task yet
-        console.log('not yet time');
-        return cb();
-    }
-
-    nThen(function (waitFor) {
-        switch (command) {
-            case 'EXPIRE':
-                // FIXME noisy!
-                console.log("expiring: %s", args[0]);
-                store.removeChannel(args[0], waitFor());
-                break;
-            default:
-                // FIXME noisy
-                console.log("unknown command", command);
-        }
-    }).nThen(function () {
-        // remove the task file...
-        Fs.unlink(path, function (err) { // FIXME deletion
-            if (err) { console.error(err); }
-            cb();
-        });
-    });
-};
-
-nt = nThen(function (w) {
-    Fs.readdir(root, w(function (e, list) {
-        if (e) { throw e; }
-        dirs = list;
-        if (dirs.length === 0) {
-            w.abort();
-            return;
-        }
+nThen(function (w) {
+    Logger.create(config, w(function (_log) {
+        config.log = _log;
     }));
-}).nThen(function (waitFor) {
-    FileStorage.create(config, waitFor(function (_store) {
-        store = _store;
+}).nThen(function (w) {
+    FileStorage.create(config, w(function (_store) {
+        config.store = _store;
+
+        // config.taskPath
+        // config.store
+        // config.filePath
+        // config.blobPath
+        // config.coldPath
+
+        // config.enableTaskScheduling
+
+    }));
+}).nThen(function (w) {
+    Tasks.create(config, w(function (err, _tasks) {
+        if (err) {
+            throw err;
+        }
+        config.tasks = _tasks;
+    }));
+}).nThen(function (w) {
+     config.tasks.runAll(w(function (err) {
+        if (err) {
+            // either TASK_CONCURRENCY
+            // or an error from tasks.list
+        }
     }));
 }).nThen(function () {
-    dirs.forEach(function (dir, dIdx) {
-        queue(function (w) {
-            // FIXME noisy!
-            console.log('recursing into %s', dir);
-            Fs.readdir(Path.join(root, dir), w(function (e, list) {
-                list.forEach(function (fn) {
-                    queue(function (w) {
-                        var filePath = Path.join(root, dir, fn);
-                        var cb = w();
-
-                        // FIXME noisy!
-                        console.log("processing file at %s", filePath);
-                        Fs.readFile(filePath, 'utf8', function (e, str) {
-                            if (e) {
-                                console.error(e);
-                                return void cb();
-                            }
-
-                            handleTask(str, filePath, cb);
-                        });
-                    });
-                });
-                if (dIdx === (dirs.length - 1)) {
-                    queue(function () {
-                        store.shutdown();
-                    });
-                }
-            }));
-        });
-    });
+    config.store.shutdown();
+    config.log.shutdown();
 });
 
-
diff --git a/storage/tasks.js b/storage/tasks.js
index 5a999998c..0edf068e8 100644
--- a/storage/tasks.js
+++ b/storage/tasks.js
@@ -6,6 +6,11 @@ var nThen = require("nthen");
 
 var Tasks = module.exports;
 
+var tryParse = function (s) {
+    try { return JSON.parse(s); }
+    catch (e) { return null; }
+};
+
 var encode = function (time, command, args) {
     if (typeof(time) !== 'number') { return null; }
     if (typeof(command) !== 'string') { return null; }
@@ -73,25 +78,199 @@ var write = function (env, task, cb) {
     });
 };
 
-// TODO implement a standard API for removing tasks
-// currently they are deleted manually in 'expire-channels.js'
-// var remove = function (env, id, cb) { };
+var list = Tasks.list = function (env, cb) {
+    var rootDirs;
+
+    nThen(function (w) {
+        // read the root directory
+        Fs.readdir(env.root, w(function (e, list) {
+            if (e) {
+                env.log.error("TASK_ROOT_DIR", {
+                    root: env.root,
+                    error: e,
+                });
+                return void cb(e);
+            }
+            if (list.length === 0) {
+                w.abort();
+                return void cb(void 0, []);
+            }
+            rootDirs = list;
+        }));
+    }).nThen(function () {
+        // schedule the nested directories for exploration
+        // return a list of paths to tasks
+        var queue = nThen(function () {});
+
+        var allPaths = [];
+
+        // We prioritize a small footprint over speed, so we
+        // iterate over directories in serial rather than parallel
+        rootDirs.forEach(function (dir) {
+            queue.nThen(function (w) {
+                var subPath = Path.join(env.root, dir);
+                Fs.readdir(subPath, w(function (e, paths) {
+                    if (e) {
+                        env.log.error("TASKS_INVALID_SUBDIR", {
+                            path: subPath,
+                            error: e,
+                        });
+                        return;
+                    }
+                    // concat in place
+                    Array.prototype.push.apply(allPaths, paths.map(function (p) {
+                        return Path.join(subPath, p);
+                    }));
+                }));
+            });
+        });
+
+        queue.nThen(function () {
+            cb(void 0, allPaths);
+        });
+    });
+};
+
+var remove = function (env, path, cb) {
+    Fs.unlink(path, cb);
+};
+
+var read = function (env, filePath, cb) {
+    Fs.readFile(filePath, 'utf8', function (e, str) {
+        if (e) { return void cb(e); }
+
+        var task = tryParse(str);
+        if (!Array.isArray(task) || task.length < 2) {
+            env.log("INVALID_TASK", {
+                path: filePath,
+                task: task,
+            });
+            return cb(new Error('INVALID_TASK'));
+        }
+        cb(void 0, task);
+    });
+};
+
+var run = Tasks.run = function (env, path, cb) {
+    var CURRENT = +new Date();
+
+    var Log = env.log;
+    var task, time, command, args;
+
+    nThen(function (w) {
+        read(env, path, w(function (err, _task) {
+            if (err) {
+                w.abort();
+                // there was a file but it wasn't valid?
+                return void cb(err);
+            }
+            task = _task;
+            time = task[0];
+
+            if (time > CURRENT) {
+                w.abort();
+                return cb();
+            }
+
+            command = task[1];
+            args = task.slice(2);
+        }));
+    }).nThen(function (w) {
+        switch (command) {
+            case 'EXPIRE':
+                Log.info('DELETION_SCHEDULED_EXPIRATION', {
+                    task: task,
+                });
+                env.store.removeChannel(args[0], w());
+                break;
+            default:
+                Log.warn("TASKS_UNKNOWN_COMMAND", task);
+        }
+    }).nThen(function () {
+        // remove the task file...
+        remove(env, path, function (err) {
+            if (err) {
+                Log.error('TASKS_RECORD_REMOVAL', {
+                    path: path,
+                    err: err,
+                });
+            }
+            cb();
+        });
+    });
+};
+
+var runAll = function (env, cb) {
+    // check if already running and bail out if so
+    if (env.running) {
+        return void cb("TASK_CONCURRENCY");
+    }
+
+    // if not, set a flag to block concurrency and proceed
+    env.running = true;
+
+    var paths;
+    nThen(function (w) {
+        list(env, w(function (err, _paths) {
+            if (err) {
+                w.abort();
+                env.running = false;
+                return void cb(err);
+            }
+            paths = _paths;
+        }));
+    }).nThen(function (w) {
+        var done = w();
+        var nt = nThen(function () {});
+        paths.forEach(function (path) {
+            nt.nThen(function (w) {
+                run(env, path, w(function (err) {
+                    if (err) {
+                        // Any errors are already logged in 'run'
+                        // the admin will need to review the logs and clean up
+                    }
+                }));
+            });
+        });
+        nt.nThen(function () {
+            done();
+        });
+    }).nThen(function (/*w*/) {
+        env.running = false;
+        cb();
+    });
+};
 
 Tasks.create = function (config, cb) {
+    if (!config.store) { throw new Error("E_STORE_REQUIRED"); }
+    if (!config.log) { throw new Error("E_LOG_REQUIRED"); }
+
     var env = {
         root: config.taskPath || './tasks',
+        log: config.log,
+        store: config.store,
     };
 
     // make sure the path exists...
     Fse.mkdirp(env.root, 0x1ff, function (err) {
-        if (err && err.code !== 'EEXIST') {
-            throw err;
-        }
+        if (err) { return void cb(err); }
         cb(void 0, {
             write: function (time, command, args, cb) {
                 var task = encode(time, command, args);
                 write(env, task, cb);
             },
+            list: function (olderThan, cb) {
+                list(env, olderThan, cb);
+            },
+            remove: function (id, cb) {
+                remove(env, id, cb);
+            },
+            run: function (id, cb) {
+                run(env, id, cb);
+            },
+            runAll: function (cb) {
+                runAll(env, cb);
+            },
         });
     });
 };