add comments for lots of functions
parent
d6d7f4f601
commit
a3a7bcbe35
|
@ -51,6 +51,7 @@ var channelExists = function (filepath, cb) {
|
|||
});
|
||||
};
|
||||
|
||||
// reads classic metadata from a channel log and aborts
|
||||
var getMetadataAtPath = function (Env, path, cb) {
|
||||
var remainder = '';
|
||||
var stream = Fs.createReadStream(path, { encoding: 'utf8' });
|
||||
|
@ -96,6 +97,7 @@ var closeChannel = function (env, channelName, cb) {
|
|||
}
|
||||
};
|
||||
|
||||
// truncates a file to the end of its metadata line
|
||||
var clearChannel = function (env, channelId, cb) {
|
||||
var path = mkPath(env, channelId);
|
||||
getMetadataAtPath(env, path, function (e, metadata) {
|
||||
|
@ -125,6 +127,9 @@ var clearChannel = function (env, channelId, cb) {
|
|||
});
|
||||
};
|
||||
|
||||
/* readMessages is our classic method of reading messages from the disk
|
||||
notably doesn't provide a means of aborting if you finish early
|
||||
*/
|
||||
var readMessages = function (path, msgHandler, cb) {
|
||||
var remainder = '';
|
||||
var stream = Fs.createReadStream(path, { encoding: 'utf8' });
|
||||
|
@ -146,8 +151,11 @@ var readMessages = function (path, msgHandler, cb) {
|
|||
stream.on('error', function (e) { complete(e); });
|
||||
};
|
||||
|
||||
// FIXME METADATA
|
||||
// XXX deprecate this everywhere in favour of the new method
|
||||
/* getChannelMetadata
|
||||
reads only the metadata embedded in the first line of a channel log.
|
||||
does not necessarily provide the most up to date metadata, as it
|
||||
could have been amended
|
||||
*/
|
||||
var getChannelMetadata = function (Env, channelId, cb) {
|
||||
var path = mkPath(Env, channelId);
|
||||
|
||||
|
@ -177,6 +185,11 @@ var getDedicatedMetadata = function (env, channelId, handler, cb) {
|
|||
});
|
||||
};
|
||||
|
||||
/* readMetadata
|
||||
fetches the classic format of the metadata from the channel log
|
||||
if it is present, otherwise load the log of metadata amendments.
|
||||
Requires a handler to process successive lines.
|
||||
*/
|
||||
var readMetadata = function (env, channelId, handler, cb) {
|
||||
/*
|
||||
|
||||
|
@ -220,6 +233,7 @@ How to proceed
|
|||
});
|
||||
};
|
||||
|
||||
// writeMetadata appends to the dedicated log of metadata amendments
|
||||
var writeMetadata = function (env, channelId, data, cb) {
|
||||
var path = mkMetadataPath(env, channelId);
|
||||
// XXX appendFile isn't great
|
||||
|
@ -227,6 +241,10 @@ var writeMetadata = function (env, channelId, data, cb) {
|
|||
Fs.appendFile(path, data + '\n', cb);
|
||||
};
|
||||
|
||||
|
||||
// transform a stream of arbitrarily divided data
|
||||
// into a stream of buffers divided by newlines in the source stream
|
||||
// TODO see if we could improve performance by using libnewline
|
||||
const NEWLINE_CHR = ('\n').charCodeAt(0);
|
||||
const mkBufferSplit = () => {
|
||||
let remainder = null;
|
||||
|
@ -260,6 +278,8 @@ const mkBufferSplit = () => {
|
|||
}, Pull.flatten());
|
||||
};
|
||||
|
||||
// return a streaming function which transforms buffers into objects
|
||||
// containing the buffer and the offset from the start of the stream
|
||||
const mkOffsetCounter = () => {
|
||||
let offset = 0;
|
||||
return Pull.map((buff) => {
|
||||
|
@ -287,6 +307,7 @@ const readMessagesBin = (env, id, start, msgHandler, cb) => {
|
|||
);
|
||||
};
|
||||
|
||||
// check if a file exists at $path
|
||||
var checkPath = function (path, callback) {
|
||||
Fs.stat(path, function (err) {
|
||||
if (!err) {
|
||||
|
@ -311,6 +332,9 @@ var labelError = function (label, err) {
|
|||
return label + (err.code ? "_" + err.code: '');
|
||||
};
|
||||
|
||||
/* removeChannel
|
||||
fully deletes a channel log and any associated metadata
|
||||
*/
|
||||
var removeChannel = function (env, channelName, cb) {
|
||||
var channelPath = mkPath(env, channelName);
|
||||
var metadataPath = mkMetadataPath(env, channelName);
|
||||
|
@ -320,6 +344,8 @@ var removeChannel = function (env, channelName, cb) {
|
|||
nThen(function (w) {
|
||||
Fs.unlink(channelPath, w(function (err) {
|
||||
if (err) {
|
||||
// XXX handle ENOENT and only return an error
|
||||
// if both channel and metadata did not exist...
|
||||
w.abort();
|
||||
CB(labelError("E_CHANNEL_REMOVAL", err));
|
||||
}
|
||||
|
@ -336,6 +362,9 @@ var removeChannel = function (env, channelName, cb) {
|
|||
});
|
||||
};
|
||||
|
||||
/* removeArchivedChannel
|
||||
fully removes an archived channel log and any associated metadata
|
||||
*/
|
||||
var removeArchivedChannel = function (env, channelName, cb) {
|
||||
var channelPath = mkArchivePath(env, channelName);
|
||||
var metadataPath = mkArchiveMetadataPath(env, channelName);
|
||||
|
@ -362,8 +391,6 @@ var removeArchivedChannel = function (env, channelName, cb) {
|
|||
};
|
||||
|
||||
// TODO implement a method of removing metadata that doesn't have a corresponding channel
|
||||
|
||||
// TODO confirm that we're ignoring metadata files
|
||||
var listChannels = function (root, handler, cb) {
|
||||
// do twenty things at a time
|
||||
var sema = Semaphore.create(20);
|
||||
|
@ -395,6 +422,8 @@ var listChannels = function (root, handler, cb) {
|
|||
|
||||
list.forEach(function (item) {
|
||||
// ignore things that don't match the naming pattern
|
||||
// XXX don't ignore metadata files if there is no corresponding channel
|
||||
// since you probably want to clean those up
|
||||
if (/^\./.test(item) || !/[0-9a-fA-F]{32}\.ndjson$/.test(item)) { return; }
|
||||
var filepath = Path.join(nestedDirPath, item);
|
||||
var channel = filepath.replace(/\.ndjson$/, '').replace(/.*\//, '');
|
||||
|
@ -691,6 +720,7 @@ var getChannel = function (
|
|||
});
|
||||
};
|
||||
|
||||
// write a message to the disk as raw bytes
|
||||
const messageBin = (env, chanName, msgBin, cb) => {
|
||||
getChannel(env, chanName, function (err, chan) {
|
||||
if (!chan) {
|
||||
|
@ -714,10 +744,12 @@ const messageBin = (env, chanName, msgBin, cb) => {
|
|||
});
|
||||
};
|
||||
|
||||
// append a string to a channel's log as a new line
|
||||
var message = function (env, chanName, msg, cb) {
|
||||
messageBin(env, chanName, new Buffer(msg + '\n', 'utf8'), cb);
|
||||
};
|
||||
|
||||
// stream messages from a channel log
|
||||
var getMessages = function (env, chanName, handler, cb) {
|
||||
getChannel(env, chanName, function (err, chan) {
|
||||
if (!chan) {
|
||||
|
@ -739,6 +771,9 @@ var getMessages = function (env, chanName, handler, cb) {
|
|||
errorState = true;
|
||||
return void cb(err);
|
||||
}
|
||||
// is it really, though? what if we hit the limit of open channels
|
||||
// and 'clean up' in the middle of reading a massive file?
|
||||
// certainly unlikely
|
||||
if (!chan) { throw new Error("impossible, flow checking"); }
|
||||
chan.atime = +new Date();
|
||||
cb();
|
||||
|
|
Loading…
Reference in New Issue