diff --git a/storage/file.js b/storage/file.js index eca3d76be..b2899619f 100644 --- a/storage/file.js +++ b/storage/file.js @@ -8,7 +8,7 @@ var nThen = require("nthen"); var Semaphore = require("saferphore"); var Util = require("../lib/common-util"); -const WriteQueue = require("../lib/write-queue"); +const Schedule = require("../lib/schedule"); const Readline = require("readline"); const ToPull = require('stream-to-pull-stream'); const Pull = require('pull-stream'); @@ -171,7 +171,6 @@ var readMessages = function (path, msgHandler, cb) { could have been amended */ var getChannelMetadata = function (Env, channelId, cb) { - // XXX queue var path = mkPath(Env, channelId); // gets metadata embedded in a file @@ -180,7 +179,6 @@ var getChannelMetadata = function (Env, channelId, cb) { // low level method for getting just the dedicated metadata channel var getDedicatedMetadata = function (env, channelId, handler, cb) { - // XXX queue var metadataPath = mkMetadataPath(env, channelId); readMessages(metadataPath, function (line) { if (!line) { return; } @@ -223,7 +221,6 @@ How to proceed */ - // XXX queue nThen(function (w) { // returns the first line of a channel, parsed... getChannelMetadata(env, channelId, w(function (err, data) { @@ -268,7 +265,6 @@ var writeMetadata = function (env, channelId, data, cb) { // TODO see if we could improve performance by using libnewline const NEWLINE_CHR = ('\n').charCodeAt(0); const mkBufferSplit = () => { - // XXX queue lock let remainder = null; return Pull((read) => { return (abort, cb) => { @@ -320,7 +316,6 @@ const mkOffsetCounter = () => { const readMessagesBin = (env, id, start, msgHandler, cb) => { const stream = Fs.createReadStream(mkPath(env, id), { start: start }); let keepReading = true; - // XXX queue lock Pull( ToPull.read(stream), mkBufferSplit(), @@ -846,7 +841,6 @@ var message = function (env, chanName, msg, cb) { // stream messages from a channel log var getMessages = function (env, chanName, handler, cb) { - // XXX queue lock getChannel(env, chanName, function (err, chan) { if (!chan) { cb(err); @@ -888,9 +882,26 @@ module.exports.create = function (conf, cb) { openFiles: 0, openFileLimit: conf.openFileLimit || 2048, }; - var queue = env.queue = WriteQueue(); var it; + /* our scheduler prioritizes and executes tasks with respect + to all other tasks invoked with an identical key + (typically the id of the concerned channel) + + it assumes that all tasks can be categorized into three types + + 1. unordered tasks such as streaming reads which can take + a long time to complete. + + 2. ordered tasks such as appending to a file which does not + take very long, but where priority is important. + + 3. blocking tasks such as rewriting a file where it would be + dangerous to perform any other task concurrently. + + */ + var schedule = env.schedule = Schedule(); + nThen(function (w) { // make sure the store's directory exists Fse.mkdirp(env.root, PERMISSIVE, w(function (err) { @@ -910,61 +921,112 @@ module.exports.create = function (conf, cb) { // write a new message to a log message: function (channelName, content, cb) { if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); } - queue(channelName, function (next) { + schedule.ordered(channelName, function (next) { message(env, channelName, content, Util.both(cb, next)); }); }, // iterate over all the messages in a log getMessages: function (channelName, msgHandler, cb) { - // XXX queue lock if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); } - getMessages(env, channelName, msgHandler, cb); + schedule.unordered(channelName, function (next) { + getMessages(env, channelName, msgHandler, Util.both(cb, next)); + }); }, // NEWER IMPLEMENTATIONS OF THE SAME THING // write a new message to a log messageBin: (channelName, content, cb) => { if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); } - queue(channelName, function (next) { + schedule.ordered(channelName, function (next) { messageBin(env, channelName, content, Util.both(cb, next)); }); }, // iterate over the messages in a log readMessagesBin: (channelName, start, asyncMsgHandler, cb) => { - // XXX queue lock if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); } - readMessagesBin(env, channelName, start, asyncMsgHandler, cb); +// XXX there is a race condition here +// historyKeeper reads the file to find the byte offset of the first interesting message +// then calls this function again to read from that point. +// If this task is in the queue already when the file is read again +// then that byte offset will have been invalidated +// and the resulting stream probably won't align with message boundaries. +// We can evict the cache in the callback but by that point it will be too late. +// Presumably we'll need to bury some of historyKeeper's logic into a filestore method +// in order to make index/read sequences atomic. +// Otherwise, we can add a new task type to the scheduler to take invalidation into account... +// either method introduces significant complexity. + schedule.unordered(channelName, function (next) { + readMessagesBin(env, channelName, start, asyncMsgHandler, Util.both(cb, next)); + }); }, // METHODS for deleting data // remove a channel and its associated metadata log if present removeChannel: function (channelName, cb) { if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); } - queue(channelName, function (next) { +// XXX there's another race condition here... +// when a remove and an append are scheduled in that order +// the remove will delete the channel's metadata (including its validateKey) +// then the append will recreate the channel and insert a message. +// clients that are connected to the channel via historyKeeper should be kicked out +// however, anyone that connects to that channel in the future will be able to read the +// signed message, but will not find its validate key... +// resulting in a junk/unusable document + schedule.ordered(channelName, function (next) { removeChannel(env, channelName, Util.both(cb, next)); }); }, // remove a channel and its associated metadata log from the archive directory removeArchivedChannel: function (channelName, cb) { if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); } - queue(channelName, function (next) { + schedule.ordered(channelName, function (next) { removeArchivedChannel(env, channelName, Util.both(cb, next)); }); }, // clear all data for a channel but preserve its metadata clearChannel: function (channelName, cb) { if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); } - queue(channelName, function (next) { + schedule.ordered(channelName, function (next) { clearChannel(env, channelName, Util.both(cb, next)); }); }, + trimChannel: function (channelName, hash, cb) { + // XXX ansuz + // XXX queue lock + /* block any reads from the metadata and log files + until this whole process has finished + close the file descriptor if it is open + derive temporary file paths for metadata and log buffers + compute metadata state and write to metadata buffer + scan through log file and begin copying lines to the log buffer + once you recognize the first line by the hash the user provided + archive the file and current metadata once both buffers are copied + move the metadata and log buffers into place + return the lock on reads + call back + + in case of an error, remove the buffer files + */ + schedule.blocking(channelName, function (next) { + cb("E_NOT_IMPLEMENTED"); + next(); + }); + }, // check if a channel exists in the database isChannelAvailable: function (channelName, cb) { if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); } // construct the path var filepath = mkPath(env, channelName); - queue(channelName, function (next) { +// (ansuz) I'm uncertain whether this task should be unordered or ordered. +// there's a round trip to the client (and possibly the user) before they decide +// to act on the information of whether there is already content present in this channel. +// so it's practically impossible to avoid race conditions where someone else creates +// some content before you. +// if that's the case, it's basically impossible that you'd generate the same signing key, +// and thus historykeeper should reject the signed messages of whoever loses the race. +// thus 'unordered' seems appropriate. + schedule.unordered(channelName, function (next) { channelExists(filepath, Util.both(cb, next)); }); }, @@ -973,21 +1035,30 @@ module.exports.create = function (conf, cb) { if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); } // construct the path var filepath = mkArchivePath(env, channelName); - queue(channelName, function (next) { +// as with the method above, somebody might remove, restore, or overwrite an archive +// in the time that it takes to answer this query and to execute whatever follows. +// since it's impossible to win the race every time let's just make this 'unordered' + schedule.unordered(channelName, function (next) { channelExists(filepath, Util.both(cb, next)); }); }, // move a channel from the database to the archive, along with its metadata archiveChannel: function (channelName, cb) { if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); } - queue(channelName, function (next) { +// again, the semantics around archiving and appending are really muddy. +// so I'm calling this 'unordered' again + schedule.unordered(channelName, function (next) { archiveChannel(env, channelName, Util.both(cb, next)); }); }, // restore a channel from the archive to the database, along with its metadata restoreArchivedChannel: function (channelName, cb) { if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); } - queue(channelName, function (next) { +// archive restoration will fail if either a file or its metadata exists in the live db. +// so I'm calling this 'ordered' to give writes a chance to flush out. +// accidental conflicts are extremely unlikely since clients check the status +// of a previously known channel before joining. + schedule.ordered(channelName, function (next) { unarchiveChannel(env, channelName, Util.both(cb, next)); }); }, @@ -996,14 +1067,17 @@ module.exports.create = function (conf, cb) { // fetch the metadata for a channel getChannelMetadata: function (channelName, cb) { if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); } - queue(channelName, function (next) { +// The only thing that can invalid this method's results are channel archival, removal, or trimming. +// We want it to be fast, so let's make it unordered. + schedule.unordered(channelName, function (next) { getChannelMetadata(env, channelName, Util.both(cb, next)); }); }, // iterate over lines of metadata changes from a dedicated log readDedicatedMetadata: function (channelName, handler, cb) { if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); } - queue(channelName, function (next) { +// Everything that modifies metadata also updates clients, so this can be 'unordered' + schedule.unordered(channelName, function (next) { getDedicatedMetadata(env, channelName, handler, Util.both(cb, next)); }); }, @@ -1011,13 +1085,16 @@ module.exports.create = function (conf, cb) { // iterate over multiple lines of metadata changes readChannelMetadata: function (channelName, handler, cb) { if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); } - // XXX queue - readMetadata(env, channelName, handler, cb); +// same logic as 'readDedicatedMetadata + schedule.unordered(channelName, function (next) { + readMetadata(env, channelName, handler, Util.both(cb, next)); + }); }, // write a new line to a metadata log writeMetadata: function (channelName, data, cb) { if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); } - queue(channelName, function (next) { +// metadata writes are fast and should be applied in order + schedule.ordered(channelName, function (next) { writeMetadata(env, channelName, data, Util.both(cb, next)); }); }, @@ -1032,7 +1109,9 @@ module.exports.create = function (conf, cb) { getChannelSize: function (channelName, cb) { if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); } - queue(channelName, function (next) { +// this method should be really fast and it probably doesn't matter much +// if we get the size slightly before or after somebody writes a few hundred bytes to it. + schedule.ordered(channelName, function (next) { channelBytes(env, channelName, Util.both(cb, next)); }); }, @@ -1040,7 +1119,10 @@ module.exports.create = function (conf, cb) { // remove a particular channel from the cache closeChannel: function (channelName, cb) { if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); } - queue(channelName, function (next) { +// It is most likely the case that the channel is inactive if we are trying to close it, +// thus it doesn't make much difference whether it's ordered or not. +// In any case, it will be re-opened if anyone tries to write to it. + schedule.ordered(channelName, function (next) { closeChannel(env, channelName, Util.both(cb, next)); }); }, @@ -1050,7 +1132,10 @@ module.exports.create = function (conf, cb) { }, // write to a log file log: function (channelName, content, cb) { - message(env, channelName, content, cb); +// you probably want the events in your log to be in the correct order. + schedule.ordered(channelName, function (next) { + message(env, channelName, content, Util.both(cb, next)); + }); }, // shut down the database shutdown: function () {