diff --git a/storage/file.js b/storage/file.js index 6a7e0a17f..c54db236d 100644 --- a/storage/file.js +++ b/storage/file.js @@ -84,12 +84,6 @@ var getMetadataAtPath = function (Env, path, cb) { stream.on('error', function (e) { complete(e); }); }; -// FIXME METADATA -var getChannelMetadata = function (Env, channelId, cb) { - var path = mkPath(Env, channelId); - getMetadataAtPath(Env, path, cb); -}; - var closeChannel = function (env, channelName, cb) { if (!env.channels[channelName]) { return void cb(); } try { @@ -104,7 +98,6 @@ var closeChannel = function (env, channelName, cb) { var clearChannel = function (env, channelId, cb) { var path = mkPath(env, channelId); - // FIXME METADATA getMetadataAtPath(env, path, function (e, metadata) { if (e) { return cb(new Error(e)); } if (!metadata) { @@ -153,6 +146,77 @@ var readMessages = function (path, msgHandler, cb) { stream.on('error', function (e) { complete(e); }); }; +// FIXME METADATA +// XXX deprecate this everywhere in favour of the new method +var getChannelMetadata = function (Env, channelId, cb) { + var path = mkPath(Env, channelId); + + // gets metadata embedded in a file + getMetadataAtPath(Env, path, cb); +}; + +var readMetadata = function (env, channelId, handler, cb) { +/* + +Possibilities + + 1. there is no metadata because it's an old channel + 2. there is metadata in the first line of the channel, but nowhere else + 3. there is metadata in the first line of the channel as well as in a dedicated log + 4. there is no metadata in the first line of the channel. Everything is in the dedicated log + +How to proceed + + 1. load the first line of the channel and treat it as a metadata message if applicable + 2. load the dedicated log and treat it as an update + +*/ + + var CB = Once(cb); + + var index = 0; + nThen(function (w) { + // returns the first line of a channel, parsed... + getChannelMetadata(env, channelId, w(function (err, data) { + if (err) { + // 'INVALID_METADATA' if it can't parse + // stream errors if anything goes wrong at a lower level + // ENOENT (no channel here) + return void handler(err, undefined, index++); + } + // disregard anything that isn't a map + if (!data || typeof(data) !== 'object' || Array.isArray(data)) { return; } + + // otherwise it's good. + handler(null, data, index++); + })); + }).nThen(function (w) { + var metadataPath = mkMetadataPath(env, channelId); + readMessages(metadataPath, function (line) { + if (!line) { return; } + try { + var parsed = JSON.parse(line); + handler(null, parsed, index++); + } catch (e) { + handler(e, line, index++); + } + }, w(function (err) { + if (err) { + // ENOENT => there is no metadata log + if (err.code === 'ENOENT') { return void CB(); } + // otherwise stream errors? + CB(err); + } + CB(); + })); + }); +}; + +var writeMetadata = function (env, channelId, data, cb) { + cb = cb; + // XXX +}; + const NEWLINE_CHR = ('\n').charCodeAt(0); const mkBufferSplit = () => { let remainder = null; @@ -232,20 +296,62 @@ var checkPath = function (path, callback) { }); }; -// FIXME METADATA -// remove associated metadata as well +var labelError = function (label, err) { + return label + (err.code ? "_" + err.code: ''); +}; + var removeChannel = function (env, channelName, cb) { - var filename = mkPath(env, channelName); - Fs.unlink(filename, cb); + var channelPath = mkPath(env, channelName); + var metadataPath = mkMetadataPath(env, channelName); + + var CB = Once(cb); + + nThen(function (w) { + Fs.unlink(channelPath, w(function (err) { + if (err) { + w.abort(); + CB(labelError("E_CHANNEL_REMOVAL", err)); + } + })); + Fs.unlink(metadataPath, w(function (err) { + if (err) { + if (err.code === 'ENOENT') { return; } // proceed if there's no metadata to delete + w.abort(); + CB(labelError("E_METADATA_REMOVAL", err)); + } + })); + }).nThen(function () { + CB(); + }); }; -// FIXME -// remove associated metadata as well var removeArchivedChannel = function (env, channelName, cb) { - var filename = mkArchivePath(env, channelName); - Fs.unlink(filename, cb); + var channelPath = mkArchivePath(env, channelName); + var metadataPath = mkArchiveMetadataPath(env, channelName); + + var CB = Once(cb); + + nThen(function (w) { + Fs.unlink(channelPath, w(function (err) { + if (err) { + w.abort(); + CB(labelError("E_ARCHIVED_CHANNEL_REMOVAL", err)); + } + })); + Fs.unlink(metadataPath, w(function (err) { + if (err) { + if (err.code === "ENOENT") { return; } + w.abort(); + CB(labelError("E_ARCHIVED_METADATA_REMOVAL", err)); + } + })); + }).nThen(function () { + CB(); + }); }; +// TODO implement a method of removing metadata that doesn't have a corresponding channel + // TODO confirm that we're ignoring metadata files var listChannels = function (root, handler, cb) { // do twenty things at a time @@ -352,7 +458,7 @@ var archiveChannel = function (env, channelName, cb) { // there was an error archiving the metadata if (err) { - return void cb("E_METADATA_ARCHIVAL" + (err.code ? "_" + err.code: '')); + return void cb(labelError("E_METADATA_ARCHIVAL", err)); } // it was archived successfully @@ -420,7 +526,7 @@ var unarchiveChannel = function (env, channelName, cb) { // call back with an error if something goes wrong if (err) { w.abort(); - return void CB("E_METADATA_RESTORATION" + (err.code ? "_" + err.code: "")); + return void CB(labelError("E_METADATA_RESTORATION", err)); } // otherwise it was moved successfully CB(); @@ -458,7 +564,6 @@ var channelBytes = function (env, chanName, cb) { }); }; -// FIXME METADATA // implement metadata bytes as well? /*:: export type ChainPadServer_ChannelInternal_t = { @@ -662,80 +767,118 @@ module.exports.create = function ( })); }).nThen(function () { cb({ - readMessagesBin: (channelName, start, asyncMsgHandler, cb) => { - if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); } - readMessagesBin(env, channelName, start, asyncMsgHandler, cb); - }, + // OLDER METHODS + // write a new message to a log message: function (channelName, content, cb) { if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); } message(env, channelName, content, cb); }, + // iterate over all the messages in a log + getMessages: function (channelName, msgHandler, cb) { + if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); } + getMessages(env, channelName, msgHandler, cb); + }, + + // NEWER IMPLEMENTATIONS OF THE SAME THING + // write a new message to a log messageBin: (channelName, content, cb) => { if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); } messageBin(env, channelName, content, cb); }, - getMessages: function (channelName, msgHandler, cb) { + // iterate over the messages in a log + readMessagesBin: (channelName, start, asyncMsgHandler, cb) => { if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); } - getMessages(env, channelName, msgHandler, cb); + readMessagesBin(env, channelName, start, asyncMsgHandler, cb); }, + + // METHODS for deleting data + // remove a channel and its associated metadata log if present removeChannel: function (channelName, cb) { if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); } removeChannel(env, channelName, function (err) { cb(err); }); }, + // remove a channel and its associated metadata log from the archive directory removeArchivedChannel: function (channelName, cb) { if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); } removeArchivedChannel(env, channelName, cb); }, - closeChannel: function (channelName, cb) { - if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); } - closeChannel(env, channelName, cb); - }, - flushUnusedChannels: function (cb) { - flushUnusedChannels(env, cb); - }, - getChannelSize: function (channelName, cb) { - if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); } - channelBytes(env, channelName, cb); - }, - getChannelMetadata: function (channelName, cb) { // FIXME METADATA - if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); } - getChannelMetadata(env, channelName, cb); - }, + // clear all data for a channel but preserve its metadata clearChannel: function (channelName, cb) { if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); } clearChannel(env, channelName, cb); }, - listChannels: function (handler, cb) { - listChannels(env.root, handler, cb); - }, + + // check if a channel exists in the database isChannelAvailable: function (channelName, cb) { if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); } // construct the path var filepath = mkPath(env, channelName); channelExists(filepath, cb); }, + // check if a channel exists in the archive isChannelArchived: function (channelName, cb) { if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); } // construct the path var filepath = mkArchivePath(env, channelName); channelExists(filepath, cb); }, - listArchivedChannels: function (handler, cb) { - listChannels(Path.join(env.archiveRoot, 'datastore'), handler, cb); - }, + // move a channel from the database to the archive, along with its metadata archiveChannel: function (channelName, cb) { if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); } archiveChannel(env, channelName, cb); }, + // restore a channel from the archive to the database, along with its metadata restoreArchivedChannel: function (channelName, cb) { if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); } unarchiveChannel(env, channelName, cb); }, + + // METADATA METHODS + // fetch the metadata for a channel + getChannelMetadata: function (channelName, cb) { + if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); } + getChannelMetadata(env, channelName, cb); + }, + // iterate over multiple lines of metadata changes + readChannelMetadata: function (channelName, handler, cb) { + if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); } + readMetadata(env, channelName, handler, cb); + }, + // write a new line to a metadata log + writeMetadata: function (channelName, data, cb) { + if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); } + writeMetadata(env, channelName, data, cb); + }, + + // CHANNEL ITERATION + listChannels: function (handler, cb) { + listChannels(env.root, handler, cb); + }, + listArchivedChannels: function (handler, cb) { + listChannels(Path.join(env.archiveRoot, 'datastore'), handler, cb); + }, + + getChannelSize: function (channelName, cb) { + if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); } + channelBytes(env, channelName, cb); + }, + // OTHER DATABASE FUNCTIONALITY + // remove a particular channel from the cache + closeChannel: function (channelName, cb) { + if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); } + closeChannel(env, channelName, cb); + }, + // iterate over open channels and close any that are not active + flushUnusedChannels: function (cb) { + flushUnusedChannels(env, cb); + }, + // write to a log file log: function (channelName, content, cb) { message(env, channelName, content, cb); }, + // shut down the database shutdown: function () { clearInterval(it); }