handle all the simple cases where operations on channels should be queued

pull/1/head
ansuz 5 years ago
parent 10eed5c46d
commit c89595c7bb

@ -7,6 +7,8 @@ var Path = require("path");
var nThen = require("nthen"); var nThen = require("nthen");
var Semaphore = require("saferphore"); var Semaphore = require("saferphore");
var Util = require("../lib/common-util"); var Util = require("../lib/common-util");
const WriteQueue = require("../lib/write-queue");
const Readline = require("readline"); const Readline = require("readline");
const ToPull = require('stream-to-pull-stream'); const ToPull = require('stream-to-pull-stream');
const Pull = require('pull-stream'); const Pull = require('pull-stream');
@ -169,6 +171,7 @@ var readMessages = function (path, msgHandler, cb) {
could have been amended could have been amended
*/ */
var getChannelMetadata = function (Env, channelId, cb) { var getChannelMetadata = function (Env, channelId, cb) {
// XXX queue
var path = mkPath(Env, channelId); var path = mkPath(Env, channelId);
// gets metadata embedded in a file // gets metadata embedded in a file
@ -177,6 +180,7 @@ var getChannelMetadata = function (Env, channelId, cb) {
// low level method for getting just the dedicated metadata channel // low level method for getting just the dedicated metadata channel
var getDedicatedMetadata = function (env, channelId, handler, cb) { var getDedicatedMetadata = function (env, channelId, handler, cb) {
// XXX queue
var metadataPath = mkMetadataPath(env, channelId); var metadataPath = mkMetadataPath(env, channelId);
readMessages(metadataPath, function (line) { readMessages(metadataPath, function (line) {
if (!line) { return; } if (!line) { return; }
@ -219,6 +223,7 @@ How to proceed
*/ */
// XXX queue
nThen(function (w) { nThen(function (w) {
// returns the first line of a channel, parsed... // returns the first line of a channel, parsed...
getChannelMetadata(env, channelId, w(function (err, data) { getChannelMetadata(env, channelId, w(function (err, data) {
@ -263,6 +268,7 @@ var writeMetadata = function (env, channelId, data, cb) {
// TODO see if we could improve performance by using libnewline // TODO see if we could improve performance by using libnewline
const NEWLINE_CHR = ('\n').charCodeAt(0); const NEWLINE_CHR = ('\n').charCodeAt(0);
const mkBufferSplit = () => { const mkBufferSplit = () => {
// XXX queue lock
let remainder = null; let remainder = null;
return Pull((read) => { return Pull((read) => {
return (abort, cb) => { return (abort, cb) => {
@ -314,6 +320,7 @@ const mkOffsetCounter = () => {
const readMessagesBin = (env, id, start, msgHandler, cb) => { const readMessagesBin = (env, id, start, msgHandler, cb) => {
const stream = Fs.createReadStream(mkPath(env, id), { start: start }); const stream = Fs.createReadStream(mkPath(env, id), { start: start });
let keepReading = true; let keepReading = true;
// XXX queue lock
Pull( Pull(
ToPull.read(stream), ToPull.read(stream),
mkBufferSplit(), mkBufferSplit(),
@ -837,6 +844,7 @@ var message = function (env, chanName, msg, cb) {
// stream messages from a channel log // stream messages from a channel log
var getMessages = function (env, chanName, handler, cb) { var getMessages = function (env, chanName, handler, cb) {
// XXX queue lock
getChannel(env, chanName, function (err, chan) { getChannel(env, chanName, function (err, chan) {
if (!chan) { if (!chan) {
cb(err); cb(err);
@ -878,6 +886,7 @@ module.exports.create = function (conf, cb) {
openFiles: 0, openFiles: 0,
openFileLimit: conf.openFileLimit || 2048, openFileLimit: conf.openFileLimit || 2048,
}; };
var queue = env.queue = WriteQueue();
var it; var it;
nThen(function (w) { nThen(function (w) {
@ -899,10 +908,13 @@ module.exports.create = function (conf, cb) {
// write a new message to a log // write a new message to a log
message: function (channelName, content, cb) { message: function (channelName, content, cb) {
if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); } if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); }
message(env, channelName, content, cb); queue(channelName, function (next) {
message(env, channelName, content, Util.both(cb, next));
});
}, },
// iterate over all the messages in a log // iterate over all the messages in a log
getMessages: function (channelName, msgHandler, cb) { getMessages: function (channelName, msgHandler, cb) {
// XXX queue lock
if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); } if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); }
getMessages(env, channelName, msgHandler, cb); getMessages(env, channelName, msgHandler, cb);
}, },
@ -911,10 +923,13 @@ module.exports.create = function (conf, cb) {
// write a new message to a log // write a new message to a log
messageBin: (channelName, content, cb) => { messageBin: (channelName, content, cb) => {
if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); } if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); }
messageBin(env, channelName, content, cb); queue(channelName, function (next) {
messageBin(env, channelName, content, Util.both(cb, next));
});
}, },
// iterate over the messages in a log // iterate over the messages in a log
readMessagesBin: (channelName, start, asyncMsgHandler, cb) => { readMessagesBin: (channelName, start, asyncMsgHandler, cb) => {
// XXX queue lock
if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); } if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); }
readMessagesBin(env, channelName, start, asyncMsgHandler, cb); readMessagesBin(env, channelName, start, asyncMsgHandler, cb);
}, },
@ -923,19 +938,23 @@ module.exports.create = function (conf, cb) {
// remove a channel and its associated metadata log if present // remove a channel and its associated metadata log if present
removeChannel: function (channelName, cb) { removeChannel: function (channelName, cb) {
if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); } if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); }
removeChannel(env, channelName, function (err) { queue(channelName, function (next) {
cb(err); removeChannel(env, channelName, Util.both(cb, next));
}); });
}, },
// remove a channel and its associated metadata log from the archive directory // remove a channel and its associated metadata log from the archive directory
removeArchivedChannel: function (channelName, cb) { removeArchivedChannel: function (channelName, cb) {
if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); } if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); }
removeArchivedChannel(env, channelName, cb); queue(channelName, function (next) {
removeArchivedChannel(env, channelName, Util.both(cb, next));
});
}, },
// clear all data for a channel but preserve its metadata // clear all data for a channel but preserve its metadata
clearChannel: function (channelName, cb) { clearChannel: function (channelName, cb) {
if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); } if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); }
clearChannel(env, channelName, cb); queue(channelName, function (next) {
clearChannel(env, channelName, Util.both(cb, next));
});
}, },
// check if a channel exists in the database // check if a channel exists in the database
@ -943,47 +962,62 @@ module.exports.create = function (conf, cb) {
if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); } if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); }
// construct the path // construct the path
var filepath = mkPath(env, channelName); var filepath = mkPath(env, channelName);
channelExists(filepath, cb); queue(channelName, function (next) {
channelExists(filepath, Util.both(cb, next));
});
}, },
// check if a channel exists in the archive // check if a channel exists in the archive
isChannelArchived: function (channelName, cb) { isChannelArchived: function (channelName, cb) {
if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); } if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); }
// construct the path // construct the path
var filepath = mkArchivePath(env, channelName); var filepath = mkArchivePath(env, channelName);
channelExists(filepath, cb); queue(channelName, function (next) {
channelExists(filepath, Util.both(cb, next));
});
}, },
// move a channel from the database to the archive, along with its metadata // move a channel from the database to the archive, along with its metadata
archiveChannel: function (channelName, cb) { archiveChannel: function (channelName, cb) {
if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); } if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); }
archiveChannel(env, channelName, cb); queue(channelName, function (next) {
archiveChannel(env, channelName, Util.both(cb, next));
});
}, },
// restore a channel from the archive to the database, along with its metadata // restore a channel from the archive to the database, along with its metadata
restoreArchivedChannel: function (channelName, cb) { restoreArchivedChannel: function (channelName, cb) {
if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); } if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); }
unarchiveChannel(env, channelName, cb); queue(channelName, function (next) {
unarchiveChannel(env, channelName, Util.both(cb, next));
});
}, },
// METADATA METHODS // METADATA METHODS
// fetch the metadata for a channel // fetch the metadata for a channel
getChannelMetadata: function (channelName, cb) { getChannelMetadata: function (channelName, cb) {
if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); } if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); }
getChannelMetadata(env, channelName, cb); queue(channelName, function (next) {
getChannelMetadata(env, channelName, Util.both(cb, next));
});
}, },
// iterate over lines of metadata changes from a dedicated log // iterate over lines of metadata changes from a dedicated log
readDedicatedMetadata: function (channelName, handler, cb) { readDedicatedMetadata: function (channelName, handler, cb) {
if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); } if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); }
getDedicatedMetadata(env, channelName, handler, cb); queue(channelName, function (next) {
getDedicatedMetadata(env, channelName, handler, Util.both(cb, next));
});
}, },
// iterate over multiple lines of metadata changes // iterate over multiple lines of metadata changes
readChannelMetadata: function (channelName, handler, cb) { readChannelMetadata: function (channelName, handler, cb) {
if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); } if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); }
// XXX queue
readMetadata(env, channelName, handler, cb); readMetadata(env, channelName, handler, cb);
}, },
// write a new line to a metadata log // write a new line to a metadata log
writeMetadata: function (channelName, data, cb) { writeMetadata: function (channelName, data, cb) {
if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); } if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); }
writeMetadata(env, channelName, data, cb); queue(channelName, function (next) {
writeMetadata(env, channelName, data, Util.both(cb, next));
});
}, },
// CHANNEL ITERATION // CHANNEL ITERATION
@ -996,13 +1030,17 @@ module.exports.create = function (conf, cb) {
getChannelSize: function (channelName, cb) { getChannelSize: function (channelName, cb) {
if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); } if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); }
channelBytes(env, channelName, cb); queue(channelName, function (next) {
channelBytes(env, channelName, Util.both(cb, next));
});
}, },
// OTHER DATABASE FUNCTIONALITY // OTHER DATABASE FUNCTIONALITY
// remove a particular channel from the cache // remove a particular channel from the cache
closeChannel: function (channelName, cb) { closeChannel: function (channelName, cb) {
if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); } if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); }
closeChannel(env, channelName, cb); queue(channelName, function (next) {
closeChannel(env, channelName, Util.both(cb, next));
});
}, },
// iterate over open channels and close any that are not active // iterate over open channels and close any that are not active
flushUnusedChannels: function (cb) { flushUnusedChannels: function (cb) {

Loading…
Cancel
Save