start using the scheduler for all relevant database methods

...and describe the rationale for using particular scheduler semantics
pull/1/head
ansuz 5 years ago
parent 7072fe4fa4
commit 15ca855f22

@ -8,7 +8,7 @@ var nThen = require("nthen");
var Semaphore = require("saferphore"); var Semaphore = require("saferphore");
var Util = require("../lib/common-util"); var Util = require("../lib/common-util");
const WriteQueue = require("../lib/write-queue"); const Schedule = require("../lib/schedule");
const Readline = require("readline"); const Readline = require("readline");
const ToPull = require('stream-to-pull-stream'); const ToPull = require('stream-to-pull-stream');
const Pull = require('pull-stream'); const Pull = require('pull-stream');
@ -171,7 +171,6 @@ var readMessages = function (path, msgHandler, cb) {
could have been amended could have been amended
*/ */
var getChannelMetadata = function (Env, channelId, cb) { var getChannelMetadata = function (Env, channelId, cb) {
// XXX queue
var path = mkPath(Env, channelId); var path = mkPath(Env, channelId);
// gets metadata embedded in a file // gets metadata embedded in a file
@ -180,7 +179,6 @@ var getChannelMetadata = function (Env, channelId, cb) {
// low level method for getting just the dedicated metadata channel // low level method for getting just the dedicated metadata channel
var getDedicatedMetadata = function (env, channelId, handler, cb) { var getDedicatedMetadata = function (env, channelId, handler, cb) {
// XXX queue
var metadataPath = mkMetadataPath(env, channelId); var metadataPath = mkMetadataPath(env, channelId);
readMessages(metadataPath, function (line) { readMessages(metadataPath, function (line) {
if (!line) { return; } if (!line) { return; }
@ -223,7 +221,6 @@ How to proceed
*/ */
// XXX queue
nThen(function (w) { nThen(function (w) {
// returns the first line of a channel, parsed... // returns the first line of a channel, parsed...
getChannelMetadata(env, channelId, w(function (err, data) { getChannelMetadata(env, channelId, w(function (err, data) {
@ -268,7 +265,6 @@ var writeMetadata = function (env, channelId, data, cb) {
// TODO see if we could improve performance by using libnewline // TODO see if we could improve performance by using libnewline
const NEWLINE_CHR = ('\n').charCodeAt(0); const NEWLINE_CHR = ('\n').charCodeAt(0);
const mkBufferSplit = () => { const mkBufferSplit = () => {
// XXX queue lock
let remainder = null; let remainder = null;
return Pull((read) => { return Pull((read) => {
return (abort, cb) => { return (abort, cb) => {
@ -320,7 +316,6 @@ const mkOffsetCounter = () => {
const readMessagesBin = (env, id, start, msgHandler, cb) => { const readMessagesBin = (env, id, start, msgHandler, cb) => {
const stream = Fs.createReadStream(mkPath(env, id), { start: start }); const stream = Fs.createReadStream(mkPath(env, id), { start: start });
let keepReading = true; let keepReading = true;
// XXX queue lock
Pull( Pull(
ToPull.read(stream), ToPull.read(stream),
mkBufferSplit(), mkBufferSplit(),
@ -846,7 +841,6 @@ var message = function (env, chanName, msg, cb) {
// stream messages from a channel log // stream messages from a channel log
var getMessages = function (env, chanName, handler, cb) { var getMessages = function (env, chanName, handler, cb) {
// XXX queue lock
getChannel(env, chanName, function (err, chan) { getChannel(env, chanName, function (err, chan) {
if (!chan) { if (!chan) {
cb(err); cb(err);
@ -888,9 +882,26 @@ module.exports.create = function (conf, cb) {
openFiles: 0, openFiles: 0,
openFileLimit: conf.openFileLimit || 2048, openFileLimit: conf.openFileLimit || 2048,
}; };
var queue = env.queue = WriteQueue();
var it; var it;
/* our scheduler prioritizes and executes tasks with respect
to all other tasks invoked with an identical key
(typically the id of the concerned channel)
it assumes that all tasks can be categorized into three types
1. unordered tasks such as streaming reads which can take
a long time to complete.
2. ordered tasks such as appending to a file which does not
take very long, but where priority is important.
3. blocking tasks such as rewriting a file where it would be
dangerous to perform any other task concurrently.
*/
var schedule = env.schedule = Schedule();
nThen(function (w) { nThen(function (w) {
// make sure the store's directory exists // make sure the store's directory exists
Fse.mkdirp(env.root, PERMISSIVE, w(function (err) { Fse.mkdirp(env.root, PERMISSIVE, w(function (err) {
@ -910,61 +921,112 @@ module.exports.create = function (conf, cb) {
// write a new message to a log // write a new message to a log
message: function (channelName, content, cb) { message: function (channelName, content, cb) {
if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); } if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); }
queue(channelName, function (next) { schedule.ordered(channelName, function (next) {
message(env, channelName, content, Util.both(cb, next)); message(env, channelName, content, Util.both(cb, next));
}); });
}, },
// iterate over all the messages in a log // iterate over all the messages in a log
getMessages: function (channelName, msgHandler, cb) { getMessages: function (channelName, msgHandler, cb) {
// XXX queue lock
if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); } if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); }
getMessages(env, channelName, msgHandler, cb); schedule.unordered(channelName, function (next) {
getMessages(env, channelName, msgHandler, Util.both(cb, next));
});
}, },
// NEWER IMPLEMENTATIONS OF THE SAME THING // NEWER IMPLEMENTATIONS OF THE SAME THING
// write a new message to a log // write a new message to a log
messageBin: (channelName, content, cb) => { messageBin: (channelName, content, cb) => {
if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); } if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); }
queue(channelName, function (next) { schedule.ordered(channelName, function (next) {
messageBin(env, channelName, content, Util.both(cb, next)); messageBin(env, channelName, content, Util.both(cb, next));
}); });
}, },
// iterate over the messages in a log // iterate over the messages in a log
readMessagesBin: (channelName, start, asyncMsgHandler, cb) => { readMessagesBin: (channelName, start, asyncMsgHandler, cb) => {
// XXX queue lock
if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); } if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); }
readMessagesBin(env, channelName, start, asyncMsgHandler, cb); // XXX there is a race condition here
// historyKeeper reads the file to find the byte offset of the first interesting message
// then calls this function again to read from that point.
// If this task is in the queue already when the file is read again
// then that byte offset will have been invalidated
// and the resulting stream probably won't align with message boundaries.
// We can evict the cache in the callback but by that point it will be too late.
// Presumably we'll need to bury some of historyKeeper's logic into a filestore method
// in order to make index/read sequences atomic.
// Otherwise, we can add a new task type to the scheduler to take invalidation into account...
// either method introduces significant complexity.
schedule.unordered(channelName, function (next) {
readMessagesBin(env, channelName, start, asyncMsgHandler, Util.both(cb, next));
});
}, },
// METHODS for deleting data // METHODS for deleting data
// remove a channel and its associated metadata log if present // remove a channel and its associated metadata log if present
removeChannel: function (channelName, cb) { removeChannel: function (channelName, cb) {
if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); } if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); }
queue(channelName, function (next) { // XXX there's another race condition here...
// when a remove and an append are scheduled in that order
// the remove will delete the channel's metadata (including its validateKey)
// then the append will recreate the channel and insert a message.
// clients that are connected to the channel via historyKeeper should be kicked out
// however, anyone that connects to that channel in the future will be able to read the
// signed message, but will not find its validate key...
// resulting in a junk/unusable document
schedule.ordered(channelName, function (next) {
removeChannel(env, channelName, Util.both(cb, next)); removeChannel(env, channelName, Util.both(cb, next));
}); });
}, },
// remove a channel and its associated metadata log from the archive directory // remove a channel and its associated metadata log from the archive directory
removeArchivedChannel: function (channelName, cb) { removeArchivedChannel: function (channelName, cb) {
if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); } if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); }
queue(channelName, function (next) { schedule.ordered(channelName, function (next) {
removeArchivedChannel(env, channelName, Util.both(cb, next)); removeArchivedChannel(env, channelName, Util.both(cb, next));
}); });
}, },
// clear all data for a channel but preserve its metadata // clear all data for a channel but preserve its metadata
clearChannel: function (channelName, cb) { clearChannel: function (channelName, cb) {
if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); } if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); }
queue(channelName, function (next) { schedule.ordered(channelName, function (next) {
clearChannel(env, channelName, Util.both(cb, next)); clearChannel(env, channelName, Util.both(cb, next));
}); });
}, },
trimChannel: function (channelName, hash, cb) {
// XXX ansuz
// XXX queue lock
/* block any reads from the metadata and log files
until this whole process has finished
close the file descriptor if it is open
derive temporary file paths for metadata and log buffers
compute metadata state and write to metadata buffer
scan through log file and begin copying lines to the log buffer
once you recognize the first line by the hash the user provided
archive the file and current metadata once both buffers are copied
move the metadata and log buffers into place
return the lock on reads
call back
in case of an error, remove the buffer files
*/
schedule.blocking(channelName, function (next) {
cb("E_NOT_IMPLEMENTED");
next();
});
},
// check if a channel exists in the database // check if a channel exists in the database
isChannelAvailable: function (channelName, cb) { isChannelAvailable: function (channelName, cb) {
if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); } if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); }
// construct the path // construct the path
var filepath = mkPath(env, channelName); var filepath = mkPath(env, channelName);
queue(channelName, function (next) { // (ansuz) I'm uncertain whether this task should be unordered or ordered.
// there's a round trip to the client (and possibly the user) before they decide
// to act on the information of whether there is already content present in this channel.
// so it's practically impossible to avoid race conditions where someone else creates
// some content before you.
// if that's the case, it's basically impossible that you'd generate the same signing key,
// and thus historykeeper should reject the signed messages of whoever loses the race.
// thus 'unordered' seems appropriate.
schedule.unordered(channelName, function (next) {
channelExists(filepath, Util.both(cb, next)); channelExists(filepath, Util.both(cb, next));
}); });
}, },
@ -973,21 +1035,30 @@ module.exports.create = function (conf, cb) {
if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); } if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); }
// construct the path // construct the path
var filepath = mkArchivePath(env, channelName); var filepath = mkArchivePath(env, channelName);
queue(channelName, function (next) { // as with the method above, somebody might remove, restore, or overwrite an archive
// in the time that it takes to answer this query and to execute whatever follows.
// since it's impossible to win the race every time let's just make this 'unordered'
schedule.unordered(channelName, function (next) {
channelExists(filepath, Util.both(cb, next)); channelExists(filepath, Util.both(cb, next));
}); });
}, },
// move a channel from the database to the archive, along with its metadata // move a channel from the database to the archive, along with its metadata
archiveChannel: function (channelName, cb) { archiveChannel: function (channelName, cb) {
if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); } if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); }
queue(channelName, function (next) { // again, the semantics around archiving and appending are really muddy.
// so I'm calling this 'unordered' again
schedule.unordered(channelName, function (next) {
archiveChannel(env, channelName, Util.both(cb, next)); archiveChannel(env, channelName, Util.both(cb, next));
}); });
}, },
// restore a channel from the archive to the database, along with its metadata // restore a channel from the archive to the database, along with its metadata
restoreArchivedChannel: function (channelName, cb) { restoreArchivedChannel: function (channelName, cb) {
if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); } if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); }
queue(channelName, function (next) { // archive restoration will fail if either a file or its metadata exists in the live db.
// so I'm calling this 'ordered' to give writes a chance to flush out.
// accidental conflicts are extremely unlikely since clients check the status
// of a previously known channel before joining.
schedule.ordered(channelName, function (next) {
unarchiveChannel(env, channelName, Util.both(cb, next)); unarchiveChannel(env, channelName, Util.both(cb, next));
}); });
}, },
@ -996,14 +1067,17 @@ module.exports.create = function (conf, cb) {
// fetch the metadata for a channel // fetch the metadata for a channel
getChannelMetadata: function (channelName, cb) { getChannelMetadata: function (channelName, cb) {
if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); } if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); }
queue(channelName, function (next) { // The only thing that can invalid this method's results are channel archival, removal, or trimming.
// We want it to be fast, so let's make it unordered.
schedule.unordered(channelName, function (next) {
getChannelMetadata(env, channelName, Util.both(cb, next)); getChannelMetadata(env, channelName, Util.both(cb, next));
}); });
}, },
// iterate over lines of metadata changes from a dedicated log // iterate over lines of metadata changes from a dedicated log
readDedicatedMetadata: function (channelName, handler, cb) { readDedicatedMetadata: function (channelName, handler, cb) {
if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); } if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); }
queue(channelName, function (next) { // Everything that modifies metadata also updates clients, so this can be 'unordered'
schedule.unordered(channelName, function (next) {
getDedicatedMetadata(env, channelName, handler, Util.both(cb, next)); getDedicatedMetadata(env, channelName, handler, Util.both(cb, next));
}); });
}, },
@ -1011,13 +1085,16 @@ module.exports.create = function (conf, cb) {
// iterate over multiple lines of metadata changes // iterate over multiple lines of metadata changes
readChannelMetadata: function (channelName, handler, cb) { readChannelMetadata: function (channelName, handler, cb) {
if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); } if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); }
// XXX queue // same logic as 'readDedicatedMetadata
readMetadata(env, channelName, handler, cb); schedule.unordered(channelName, function (next) {
readMetadata(env, channelName, handler, Util.both(cb, next));
});
}, },
// write a new line to a metadata log // write a new line to a metadata log
writeMetadata: function (channelName, data, cb) { writeMetadata: function (channelName, data, cb) {
if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); } if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); }
queue(channelName, function (next) { // metadata writes are fast and should be applied in order
schedule.ordered(channelName, function (next) {
writeMetadata(env, channelName, data, Util.both(cb, next)); writeMetadata(env, channelName, data, Util.both(cb, next));
}); });
}, },
@ -1032,7 +1109,9 @@ module.exports.create = function (conf, cb) {
getChannelSize: function (channelName, cb) { getChannelSize: function (channelName, cb) {
if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); } if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); }
queue(channelName, function (next) { // this method should be really fast and it probably doesn't matter much
// if we get the size slightly before or after somebody writes a few hundred bytes to it.
schedule.ordered(channelName, function (next) {
channelBytes(env, channelName, Util.both(cb, next)); channelBytes(env, channelName, Util.both(cb, next));
}); });
}, },
@ -1040,7 +1119,10 @@ module.exports.create = function (conf, cb) {
// remove a particular channel from the cache // remove a particular channel from the cache
closeChannel: function (channelName, cb) { closeChannel: function (channelName, cb) {
if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); } if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); }
queue(channelName, function (next) { // It is most likely the case that the channel is inactive if we are trying to close it,
// thus it doesn't make much difference whether it's ordered or not.
// In any case, it will be re-opened if anyone tries to write to it.
schedule.ordered(channelName, function (next) {
closeChannel(env, channelName, Util.both(cb, next)); closeChannel(env, channelName, Util.both(cb, next));
}); });
}, },
@ -1050,7 +1132,10 @@ module.exports.create = function (conf, cb) {
}, },
// write to a log file // write to a log file
log: function (channelName, content, cb) { log: function (channelName, content, cb) {
message(env, channelName, content, cb); // you probably want the events in your log to be in the correct order.
schedule.ordered(channelName, function (next) {
message(env, channelName, content, Util.both(cb, next));
});
}, },
// shut down the database // shut down the database
shutdown: function () { shutdown: function () {

Loading…
Cancel
Save