diff --git a/storage/file.js b/storage/file.js index 74f794cca..2a8802c8c 100644 --- a/storage/file.js +++ b/storage/file.js @@ -7,6 +7,8 @@ var Path = require("path"); var nThen = require("nthen"); var Semaphore = require("saferphore"); var Util = require("../lib/common-util"); + +const WriteQueue = require("../lib/write-queue"); const Readline = require("readline"); const ToPull = require('stream-to-pull-stream'); const Pull = require('pull-stream'); @@ -169,6 +171,7 @@ var readMessages = function (path, msgHandler, cb) { could have been amended */ var getChannelMetadata = function (Env, channelId, cb) { + // XXX queue var path = mkPath(Env, channelId); // gets metadata embedded in a file @@ -177,6 +180,7 @@ var getChannelMetadata = function (Env, channelId, cb) { // low level method for getting just the dedicated metadata channel var getDedicatedMetadata = function (env, channelId, handler, cb) { + // XXX queue var metadataPath = mkMetadataPath(env, channelId); readMessages(metadataPath, function (line) { if (!line) { return; } @@ -219,6 +223,7 @@ How to proceed */ + // XXX queue nThen(function (w) { // returns the first line of a channel, parsed... getChannelMetadata(env, channelId, w(function (err, data) { @@ -263,6 +268,7 @@ var writeMetadata = function (env, channelId, data, cb) { // TODO see if we could improve performance by using libnewline const NEWLINE_CHR = ('\n').charCodeAt(0); const mkBufferSplit = () => { + // XXX queue lock let remainder = null; return Pull((read) => { return (abort, cb) => { @@ -314,6 +320,7 @@ const mkOffsetCounter = () => { const readMessagesBin = (env, id, start, msgHandler, cb) => { const stream = Fs.createReadStream(mkPath(env, id), { start: start }); let keepReading = true; + // XXX queue lock Pull( ToPull.read(stream), mkBufferSplit(), @@ -837,6 +844,7 @@ var message = function (env, chanName, msg, cb) { // stream messages from a channel log var getMessages = function (env, chanName, handler, cb) { + // XXX queue lock getChannel(env, chanName, function (err, chan) { if (!chan) { cb(err); @@ -878,6 +886,7 @@ module.exports.create = function (conf, cb) { openFiles: 0, openFileLimit: conf.openFileLimit || 2048, }; + var queue = env.queue = WriteQueue(); var it; nThen(function (w) { @@ -899,10 +908,13 @@ module.exports.create = function (conf, cb) { // write a new message to a log message: function (channelName, content, cb) { if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); } - message(env, channelName, content, cb); + queue(channelName, function (next) { + message(env, channelName, content, Util.both(cb, next)); + }); }, // iterate over all the messages in a log getMessages: function (channelName, msgHandler, cb) { + // XXX queue lock if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); } getMessages(env, channelName, msgHandler, cb); }, @@ -911,10 +923,13 @@ module.exports.create = function (conf, cb) { // write a new message to a log messageBin: (channelName, content, cb) => { if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); } - messageBin(env, channelName, content, cb); + queue(channelName, function (next) { + messageBin(env, channelName, content, Util.both(cb, next)); + }); }, // iterate over the messages in a log readMessagesBin: (channelName, start, asyncMsgHandler, cb) => { + // XXX queue lock if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); } readMessagesBin(env, channelName, start, asyncMsgHandler, cb); }, @@ -923,19 +938,23 @@ module.exports.create = function (conf, cb) { // remove a channel and its associated metadata log if present removeChannel: function (channelName, cb) { if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); } - removeChannel(env, channelName, function (err) { - cb(err); + queue(channelName, function (next) { + removeChannel(env, channelName, Util.both(cb, next)); }); }, // remove a channel and its associated metadata log from the archive directory removeArchivedChannel: function (channelName, cb) { if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); } - removeArchivedChannel(env, channelName, cb); + queue(channelName, function (next) { + removeArchivedChannel(env, channelName, Util.both(cb, next)); + }); }, // clear all data for a channel but preserve its metadata clearChannel: function (channelName, cb) { if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); } - clearChannel(env, channelName, cb); + queue(channelName, function (next) { + clearChannel(env, channelName, Util.both(cb, next)); + }); }, // check if a channel exists in the database @@ -943,47 +962,62 @@ module.exports.create = function (conf, cb) { if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); } // construct the path var filepath = mkPath(env, channelName); - channelExists(filepath, cb); + queue(channelName, function (next) { + channelExists(filepath, Util.both(cb, next)); + }); }, // check if a channel exists in the archive isChannelArchived: function (channelName, cb) { if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); } // construct the path var filepath = mkArchivePath(env, channelName); - channelExists(filepath, cb); + queue(channelName, function (next) { + channelExists(filepath, Util.both(cb, next)); + }); }, // move a channel from the database to the archive, along with its metadata archiveChannel: function (channelName, cb) { if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); } - archiveChannel(env, channelName, cb); + queue(channelName, function (next) { + archiveChannel(env, channelName, Util.both(cb, next)); + }); }, // restore a channel from the archive to the database, along with its metadata restoreArchivedChannel: function (channelName, cb) { if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); } - unarchiveChannel(env, channelName, cb); + queue(channelName, function (next) { + unarchiveChannel(env, channelName, Util.both(cb, next)); + }); }, // METADATA METHODS // fetch the metadata for a channel getChannelMetadata: function (channelName, cb) { if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); } - getChannelMetadata(env, channelName, cb); + queue(channelName, function (next) { + getChannelMetadata(env, channelName, Util.both(cb, next)); + }); }, // iterate over lines of metadata changes from a dedicated log readDedicatedMetadata: function (channelName, handler, cb) { if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); } - getDedicatedMetadata(env, channelName, handler, cb); + queue(channelName, function (next) { + getDedicatedMetadata(env, channelName, handler, Util.both(cb, next)); + }); }, // iterate over multiple lines of metadata changes readChannelMetadata: function (channelName, handler, cb) { if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); } + // XXX queue readMetadata(env, channelName, handler, cb); }, // write a new line to a metadata log writeMetadata: function (channelName, data, cb) { if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); } - writeMetadata(env, channelName, data, cb); + queue(channelName, function (next) { + writeMetadata(env, channelName, data, Util.both(cb, next)); + }); }, // CHANNEL ITERATION @@ -996,13 +1030,17 @@ module.exports.create = function (conf, cb) { getChannelSize: function (channelName, cb) { if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); } - channelBytes(env, channelName, cb); + queue(channelName, function (next) { + channelBytes(env, channelName, Util.both(cb, next)); + }); }, // OTHER DATABASE FUNCTIONALITY // remove a particular channel from the cache closeChannel: function (channelName, cb) { if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); } - closeChannel(env, channelName, cb); + queue(channelName, function (next) { + closeChannel(env, channelName, Util.both(cb, next)); + }); }, // iterate over open channels and close any that are not active flushUnusedChannels: function (cb) {