|
|
|
@ -84,12 +84,6 @@ var getMetadataAtPath = function (Env, path, cb) {
|
|
|
|
|
stream.on('error', function (e) { complete(e); });
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
// FIXME METADATA
|
|
|
|
|
var getChannelMetadata = function (Env, channelId, cb) {
|
|
|
|
|
var path = mkPath(Env, channelId);
|
|
|
|
|
getMetadataAtPath(Env, path, cb);
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
var closeChannel = function (env, channelName, cb) {
|
|
|
|
|
if (!env.channels[channelName]) { return void cb(); }
|
|
|
|
|
try {
|
|
|
|
@ -104,7 +98,6 @@ var closeChannel = function (env, channelName, cb) {
|
|
|
|
|
|
|
|
|
|
var clearChannel = function (env, channelId, cb) {
|
|
|
|
|
var path = mkPath(env, channelId);
|
|
|
|
|
// FIXME METADATA
|
|
|
|
|
getMetadataAtPath(env, path, function (e, metadata) {
|
|
|
|
|
if (e) { return cb(new Error(e)); }
|
|
|
|
|
if (!metadata) {
|
|
|
|
@ -153,6 +146,77 @@ var readMessages = function (path, msgHandler, cb) {
|
|
|
|
|
stream.on('error', function (e) { complete(e); });
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
// FIXME METADATA
|
|
|
|
|
// XXX deprecate this everywhere in favour of the new method
|
|
|
|
|
var getChannelMetadata = function (Env, channelId, cb) {
|
|
|
|
|
var path = mkPath(Env, channelId);
|
|
|
|
|
|
|
|
|
|
// gets metadata embedded in a file
|
|
|
|
|
getMetadataAtPath(Env, path, cb);
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
var readMetadata = function (env, channelId, handler, cb) {
|
|
|
|
|
/*
|
|
|
|
|
|
|
|
|
|
Possibilities
|
|
|
|
|
|
|
|
|
|
1. there is no metadata because it's an old channel
|
|
|
|
|
2. there is metadata in the first line of the channel, but nowhere else
|
|
|
|
|
3. there is metadata in the first line of the channel as well as in a dedicated log
|
|
|
|
|
4. there is no metadata in the first line of the channel. Everything is in the dedicated log
|
|
|
|
|
|
|
|
|
|
How to proceed
|
|
|
|
|
|
|
|
|
|
1. load the first line of the channel and treat it as a metadata message if applicable
|
|
|
|
|
2. load the dedicated log and treat it as an update
|
|
|
|
|
|
|
|
|
|
*/
|
|
|
|
|
|
|
|
|
|
var CB = Once(cb);
|
|
|
|
|
|
|
|
|
|
var index = 0;
|
|
|
|
|
nThen(function (w) {
|
|
|
|
|
// returns the first line of a channel, parsed...
|
|
|
|
|
getChannelMetadata(env, channelId, w(function (err, data) {
|
|
|
|
|
if (err) {
|
|
|
|
|
// 'INVALID_METADATA' if it can't parse
|
|
|
|
|
// stream errors if anything goes wrong at a lower level
|
|
|
|
|
// ENOENT (no channel here)
|
|
|
|
|
return void handler(err, undefined, index++);
|
|
|
|
|
}
|
|
|
|
|
// disregard anything that isn't a map
|
|
|
|
|
if (!data || typeof(data) !== 'object' || Array.isArray(data)) { return; }
|
|
|
|
|
|
|
|
|
|
// otherwise it's good.
|
|
|
|
|
handler(null, data, index++);
|
|
|
|
|
}));
|
|
|
|
|
}).nThen(function (w) {
|
|
|
|
|
var metadataPath = mkMetadataPath(env, channelId);
|
|
|
|
|
readMessages(metadataPath, function (line) {
|
|
|
|
|
if (!line) { return; }
|
|
|
|
|
try {
|
|
|
|
|
var parsed = JSON.parse(line);
|
|
|
|
|
handler(null, parsed, index++);
|
|
|
|
|
} catch (e) {
|
|
|
|
|
handler(e, line, index++);
|
|
|
|
|
}
|
|
|
|
|
}, w(function (err) {
|
|
|
|
|
if (err) {
|
|
|
|
|
// ENOENT => there is no metadata log
|
|
|
|
|
if (err.code === 'ENOENT') { return void CB(); }
|
|
|
|
|
// otherwise stream errors?
|
|
|
|
|
CB(err);
|
|
|
|
|
}
|
|
|
|
|
CB();
|
|
|
|
|
}));
|
|
|
|
|
});
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
var writeMetadata = function (env, channelId, data, cb) {
|
|
|
|
|
cb = cb;
|
|
|
|
|
// XXX
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
const NEWLINE_CHR = ('\n').charCodeAt(0);
|
|
|
|
|
const mkBufferSplit = () => {
|
|
|
|
|
let remainder = null;
|
|
|
|
@ -232,20 +296,62 @@ var checkPath = function (path, callback) {
|
|
|
|
|
});
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
// FIXME METADATA
|
|
|
|
|
// remove associated metadata as well
|
|
|
|
|
var labelError = function (label, err) {
|
|
|
|
|
return label + (err.code ? "_" + err.code: '');
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
var removeChannel = function (env, channelName, cb) {
|
|
|
|
|
var filename = mkPath(env, channelName);
|
|
|
|
|
Fs.unlink(filename, cb);
|
|
|
|
|
var channelPath = mkPath(env, channelName);
|
|
|
|
|
var metadataPath = mkMetadataPath(env, channelName);
|
|
|
|
|
|
|
|
|
|
var CB = Once(cb);
|
|
|
|
|
|
|
|
|
|
nThen(function (w) {
|
|
|
|
|
Fs.unlink(channelPath, w(function (err) {
|
|
|
|
|
if (err) {
|
|
|
|
|
w.abort();
|
|
|
|
|
CB(labelError("E_CHANNEL_REMOVAL", err));
|
|
|
|
|
}
|
|
|
|
|
}));
|
|
|
|
|
Fs.unlink(metadataPath, w(function (err) {
|
|
|
|
|
if (err) {
|
|
|
|
|
if (err.code === 'ENOENT') { return; } // proceed if there's no metadata to delete
|
|
|
|
|
w.abort();
|
|
|
|
|
CB(labelError("E_METADATA_REMOVAL", err));
|
|
|
|
|
}
|
|
|
|
|
}));
|
|
|
|
|
}).nThen(function () {
|
|
|
|
|
CB();
|
|
|
|
|
});
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
// FIXME
|
|
|
|
|
// remove associated metadata as well
|
|
|
|
|
var removeArchivedChannel = function (env, channelName, cb) {
|
|
|
|
|
var filename = mkArchivePath(env, channelName);
|
|
|
|
|
Fs.unlink(filename, cb);
|
|
|
|
|
var channelPath = mkArchivePath(env, channelName);
|
|
|
|
|
var metadataPath = mkArchiveMetadataPath(env, channelName);
|
|
|
|
|
|
|
|
|
|
var CB = Once(cb);
|
|
|
|
|
|
|
|
|
|
nThen(function (w) {
|
|
|
|
|
Fs.unlink(channelPath, w(function (err) {
|
|
|
|
|
if (err) {
|
|
|
|
|
w.abort();
|
|
|
|
|
CB(labelError("E_ARCHIVED_CHANNEL_REMOVAL", err));
|
|
|
|
|
}
|
|
|
|
|
}));
|
|
|
|
|
Fs.unlink(metadataPath, w(function (err) {
|
|
|
|
|
if (err) {
|
|
|
|
|
if (err.code === "ENOENT") { return; }
|
|
|
|
|
w.abort();
|
|
|
|
|
CB(labelError("E_ARCHIVED_METADATA_REMOVAL", err));
|
|
|
|
|
}
|
|
|
|
|
}));
|
|
|
|
|
}).nThen(function () {
|
|
|
|
|
CB();
|
|
|
|
|
});
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
// TODO implement a method of removing metadata that doesn't have a corresponding channel
|
|
|
|
|
|
|
|
|
|
// TODO confirm that we're ignoring metadata files
|
|
|
|
|
var listChannels = function (root, handler, cb) {
|
|
|
|
|
// do twenty things at a time
|
|
|
|
@ -352,7 +458,7 @@ var archiveChannel = function (env, channelName, cb) {
|
|
|
|
|
|
|
|
|
|
// there was an error archiving the metadata
|
|
|
|
|
if (err) {
|
|
|
|
|
return void cb("E_METADATA_ARCHIVAL" + (err.code ? "_" + err.code: ''));
|
|
|
|
|
return void cb(labelError("E_METADATA_ARCHIVAL", err));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// it was archived successfully
|
|
|
|
@ -420,7 +526,7 @@ var unarchiveChannel = function (env, channelName, cb) {
|
|
|
|
|
// call back with an error if something goes wrong
|
|
|
|
|
if (err) {
|
|
|
|
|
w.abort();
|
|
|
|
|
return void CB("E_METADATA_RESTORATION" + (err.code ? "_" + err.code: ""));
|
|
|
|
|
return void CB(labelError("E_METADATA_RESTORATION", err));
|
|
|
|
|
}
|
|
|
|
|
// otherwise it was moved successfully
|
|
|
|
|
CB();
|
|
|
|
@ -458,7 +564,6 @@ var channelBytes = function (env, chanName, cb) {
|
|
|
|
|
});
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
// FIXME METADATA
|
|
|
|
|
// implement metadata bytes as well?
|
|
|
|
|
/*::
|
|
|
|
|
export type ChainPadServer_ChannelInternal_t = {
|
|
|
|
@ -662,80 +767,118 @@ module.exports.create = function (
|
|
|
|
|
}));
|
|
|
|
|
}).nThen(function () {
|
|
|
|
|
cb({
|
|
|
|
|
readMessagesBin: (channelName, start, asyncMsgHandler, cb) => {
|
|
|
|
|
if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); }
|
|
|
|
|
readMessagesBin(env, channelName, start, asyncMsgHandler, cb);
|
|
|
|
|
},
|
|
|
|
|
// OLDER METHODS
|
|
|
|
|
// write a new message to a log
|
|
|
|
|
message: function (channelName, content, cb) {
|
|
|
|
|
if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); }
|
|
|
|
|
message(env, channelName, content, cb);
|
|
|
|
|
},
|
|
|
|
|
// iterate over all the messages in a log
|
|
|
|
|
getMessages: function (channelName, msgHandler, cb) {
|
|
|
|
|
if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); }
|
|
|
|
|
getMessages(env, channelName, msgHandler, cb);
|
|
|
|
|
},
|
|
|
|
|
|
|
|
|
|
// NEWER IMPLEMENTATIONS OF THE SAME THING
|
|
|
|
|
// write a new message to a log
|
|
|
|
|
messageBin: (channelName, content, cb) => {
|
|
|
|
|
if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); }
|
|
|
|
|
messageBin(env, channelName, content, cb);
|
|
|
|
|
},
|
|
|
|
|
getMessages: function (channelName, msgHandler, cb) {
|
|
|
|
|
// iterate over the messages in a log
|
|
|
|
|
readMessagesBin: (channelName, start, asyncMsgHandler, cb) => {
|
|
|
|
|
if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); }
|
|
|
|
|
getMessages(env, channelName, msgHandler, cb);
|
|
|
|
|
readMessagesBin(env, channelName, start, asyncMsgHandler, cb);
|
|
|
|
|
},
|
|
|
|
|
|
|
|
|
|
// METHODS for deleting data
|
|
|
|
|
// remove a channel and its associated metadata log if present
|
|
|
|
|
removeChannel: function (channelName, cb) {
|
|
|
|
|
if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); }
|
|
|
|
|
removeChannel(env, channelName, function (err) {
|
|
|
|
|
cb(err);
|
|
|
|
|
});
|
|
|
|
|
},
|
|
|
|
|
// remove a channel and its associated metadata log from the archive directory
|
|
|
|
|
removeArchivedChannel: function (channelName, cb) {
|
|
|
|
|
if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); }
|
|
|
|
|
removeArchivedChannel(env, channelName, cb);
|
|
|
|
|
},
|
|
|
|
|
closeChannel: function (channelName, cb) {
|
|
|
|
|
if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); }
|
|
|
|
|
closeChannel(env, channelName, cb);
|
|
|
|
|
},
|
|
|
|
|
flushUnusedChannels: function (cb) {
|
|
|
|
|
flushUnusedChannels(env, cb);
|
|
|
|
|
},
|
|
|
|
|
getChannelSize: function (channelName, cb) {
|
|
|
|
|
if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); }
|
|
|
|
|
channelBytes(env, channelName, cb);
|
|
|
|
|
},
|
|
|
|
|
getChannelMetadata: function (channelName, cb) { // FIXME METADATA
|
|
|
|
|
if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); }
|
|
|
|
|
getChannelMetadata(env, channelName, cb);
|
|
|
|
|
},
|
|
|
|
|
// clear all data for a channel but preserve its metadata
|
|
|
|
|
clearChannel: function (channelName, cb) {
|
|
|
|
|
if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); }
|
|
|
|
|
clearChannel(env, channelName, cb);
|
|
|
|
|
},
|
|
|
|
|
listChannels: function (handler, cb) {
|
|
|
|
|
listChannels(env.root, handler, cb);
|
|
|
|
|
},
|
|
|
|
|
|
|
|
|
|
// check if a channel exists in the database
|
|
|
|
|
isChannelAvailable: function (channelName, cb) {
|
|
|
|
|
if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); }
|
|
|
|
|
// construct the path
|
|
|
|
|
var filepath = mkPath(env, channelName);
|
|
|
|
|
channelExists(filepath, cb);
|
|
|
|
|
},
|
|
|
|
|
// check if a channel exists in the archive
|
|
|
|
|
isChannelArchived: function (channelName, cb) {
|
|
|
|
|
if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); }
|
|
|
|
|
// construct the path
|
|
|
|
|
var filepath = mkArchivePath(env, channelName);
|
|
|
|
|
channelExists(filepath, cb);
|
|
|
|
|
},
|
|
|
|
|
listArchivedChannels: function (handler, cb) {
|
|
|
|
|
listChannels(Path.join(env.archiveRoot, 'datastore'), handler, cb);
|
|
|
|
|
},
|
|
|
|
|
// move a channel from the database to the archive, along with its metadata
|
|
|
|
|
archiveChannel: function (channelName, cb) {
|
|
|
|
|
if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); }
|
|
|
|
|
archiveChannel(env, channelName, cb);
|
|
|
|
|
},
|
|
|
|
|
// restore a channel from the archive to the database, along with its metadata
|
|
|
|
|
restoreArchivedChannel: function (channelName, cb) {
|
|
|
|
|
if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); }
|
|
|
|
|
unarchiveChannel(env, channelName, cb);
|
|
|
|
|
},
|
|
|
|
|
|
|
|
|
|
// METADATA METHODS
|
|
|
|
|
// fetch the metadata for a channel
|
|
|
|
|
getChannelMetadata: function (channelName, cb) {
|
|
|
|
|
if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); }
|
|
|
|
|
getChannelMetadata(env, channelName, cb);
|
|
|
|
|
},
|
|
|
|
|
// iterate over multiple lines of metadata changes
|
|
|
|
|
readChannelMetadata: function (channelName, handler, cb) {
|
|
|
|
|
if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); }
|
|
|
|
|
readMetadata(env, channelName, handler, cb);
|
|
|
|
|
},
|
|
|
|
|
// write a new line to a metadata log
|
|
|
|
|
writeMetadata: function (channelName, data, cb) {
|
|
|
|
|
if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); }
|
|
|
|
|
writeMetadata(env, channelName, data, cb);
|
|
|
|
|
},
|
|
|
|
|
|
|
|
|
|
// CHANNEL ITERATION
|
|
|
|
|
listChannels: function (handler, cb) {
|
|
|
|
|
listChannels(env.root, handler, cb);
|
|
|
|
|
},
|
|
|
|
|
listArchivedChannels: function (handler, cb) {
|
|
|
|
|
listChannels(Path.join(env.archiveRoot, 'datastore'), handler, cb);
|
|
|
|
|
},
|
|
|
|
|
|
|
|
|
|
getChannelSize: function (channelName, cb) {
|
|
|
|
|
if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); }
|
|
|
|
|
channelBytes(env, channelName, cb);
|
|
|
|
|
},
|
|
|
|
|
// OTHER DATABASE FUNCTIONALITY
|
|
|
|
|
// remove a particular channel from the cache
|
|
|
|
|
closeChannel: function (channelName, cb) {
|
|
|
|
|
if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); }
|
|
|
|
|
closeChannel(env, channelName, cb);
|
|
|
|
|
},
|
|
|
|
|
// iterate over open channels and close any that are not active
|
|
|
|
|
flushUnusedChannels: function (cb) {
|
|
|
|
|
flushUnusedChannels(env, cb);
|
|
|
|
|
},
|
|
|
|
|
// write to a log file
|
|
|
|
|
log: function (channelName, content, cb) {
|
|
|
|
|
message(env, channelName, content, cb);
|
|
|
|
|
},
|
|
|
|
|
// shut down the database
|
|
|
|
|
shutdown: function () {
|
|
|
|
|
clearInterval(it);
|
|
|
|
|
}
|
|
|
|
|