diff --git a/storage/file.js b/storage/file.js index c930390a4..def17e622 100644 --- a/storage/file.js +++ b/storage/file.js @@ -51,6 +51,7 @@ var channelExists = function (filepath, cb) { }); }; +// reads classic metadata from a channel log and aborts var getMetadataAtPath = function (Env, path, cb) { var remainder = ''; var stream = Fs.createReadStream(path, { encoding: 'utf8' }); @@ -96,6 +97,7 @@ var closeChannel = function (env, channelName, cb) { } }; +// truncates a file to the end of its metadata line var clearChannel = function (env, channelId, cb) { var path = mkPath(env, channelId); getMetadataAtPath(env, path, function (e, metadata) { @@ -125,6 +127,9 @@ var clearChannel = function (env, channelId, cb) { }); }; +/* readMessages is our classic method of reading messages from the disk + notably doesn't provide a means of aborting if you finish early +*/ var readMessages = function (path, msgHandler, cb) { var remainder = ''; var stream = Fs.createReadStream(path, { encoding: 'utf8' }); @@ -146,8 +151,11 @@ var readMessages = function (path, msgHandler, cb) { stream.on('error', function (e) { complete(e); }); }; -// FIXME METADATA -// XXX deprecate this everywhere in favour of the new method +/* getChannelMetadata + reads only the metadata embedded in the first line of a channel log. + does not necessarily provide the most up to date metadata, as it + could have been amended +*/ var getChannelMetadata = function (Env, channelId, cb) { var path = mkPath(Env, channelId); @@ -177,6 +185,11 @@ var getDedicatedMetadata = function (env, channelId, handler, cb) { }); }; +/* readMetadata + fetches the classic format of the metadata from the channel log + if it is present, otherwise load the log of metadata amendments. + Requires a handler to process successive lines. +*/ var readMetadata = function (env, channelId, handler, cb) { /* @@ -220,6 +233,7 @@ How to proceed }); }; +// writeMetadata appends to the dedicated log of metadata amendments var writeMetadata = function (env, channelId, data, cb) { var path = mkMetadataPath(env, channelId); // XXX appendFile isn't great @@ -227,6 +241,10 @@ var writeMetadata = function (env, channelId, data, cb) { Fs.appendFile(path, data + '\n', cb); }; + +// transform a stream of arbitrarily divided data +// into a stream of buffers divided by newlines in the source stream +// TODO see if we could improve performance by using libnewline const NEWLINE_CHR = ('\n').charCodeAt(0); const mkBufferSplit = () => { let remainder = null; @@ -260,6 +278,8 @@ const mkBufferSplit = () => { }, Pull.flatten()); }; +// return a streaming function which transforms buffers into objects +// containing the buffer and the offset from the start of the stream const mkOffsetCounter = () => { let offset = 0; return Pull.map((buff) => { @@ -287,6 +307,7 @@ const readMessagesBin = (env, id, start, msgHandler, cb) => { ); }; +// check if a file exists at $path var checkPath = function (path, callback) { Fs.stat(path, function (err) { if (!err) { @@ -311,6 +332,9 @@ var labelError = function (label, err) { return label + (err.code ? "_" + err.code: ''); }; +/* removeChannel + fully deletes a channel log and any associated metadata +*/ var removeChannel = function (env, channelName, cb) { var channelPath = mkPath(env, channelName); var metadataPath = mkMetadataPath(env, channelName); @@ -320,6 +344,8 @@ var removeChannel = function (env, channelName, cb) { nThen(function (w) { Fs.unlink(channelPath, w(function (err) { if (err) { + // XXX handle ENOENT and only return an error + // if both channel and metadata did not exist... w.abort(); CB(labelError("E_CHANNEL_REMOVAL", err)); } @@ -336,6 +362,9 @@ var removeChannel = function (env, channelName, cb) { }); }; +/* removeArchivedChannel + fully removes an archived channel log and any associated metadata +*/ var removeArchivedChannel = function (env, channelName, cb) { var channelPath = mkArchivePath(env, channelName); var metadataPath = mkArchiveMetadataPath(env, channelName); @@ -362,8 +391,6 @@ var removeArchivedChannel = function (env, channelName, cb) { }; // TODO implement a method of removing metadata that doesn't have a corresponding channel - -// TODO confirm that we're ignoring metadata files var listChannels = function (root, handler, cb) { // do twenty things at a time var sema = Semaphore.create(20); @@ -395,6 +422,8 @@ var listChannels = function (root, handler, cb) { list.forEach(function (item) { // ignore things that don't match the naming pattern + // XXX don't ignore metadata files if there is no corresponding channel + // since you probably want to clean those up if (/^\./.test(item) || !/[0-9a-fA-F]{32}\.ndjson$/.test(item)) { return; } var filepath = Path.join(nestedDirPath, item); var channel = filepath.replace(/\.ndjson$/, '').replace(/.*\//, ''); @@ -691,6 +720,7 @@ var getChannel = function ( }); }; +// write a message to the disk as raw bytes const messageBin = (env, chanName, msgBin, cb) => { getChannel(env, chanName, function (err, chan) { if (!chan) { @@ -714,10 +744,12 @@ const messageBin = (env, chanName, msgBin, cb) => { }); }; +// append a string to a channel's log as a new line var message = function (env, chanName, msg, cb) { messageBin(env, chanName, new Buffer(msg + '\n', 'utf8'), cb); }; +// stream messages from a channel log var getMessages = function (env, chanName, handler, cb) { getChannel(env, chanName, function (err, chan) { if (!chan) { @@ -739,6 +771,9 @@ var getMessages = function (env, chanName, handler, cb) { errorState = true; return void cb(err); } + // is it really, though? what if we hit the limit of open channels + // and 'clean up' in the middle of reading a massive file? + // certainly unlikely if (!chan) { throw new Error("impossible, flow checking"); } chan.atime = +new Date(); cb();