|
|
@ -236,8 +236,7 @@ How to proceed
|
|
|
|
// writeMetadata appends to the dedicated log of metadata amendments
|
|
|
|
// writeMetadata appends to the dedicated log of metadata amendments
|
|
|
|
var writeMetadata = function (env, channelId, data, cb) {
|
|
|
|
var writeMetadata = function (env, channelId, data, cb) {
|
|
|
|
var path = mkMetadataPath(env, channelId);
|
|
|
|
var path = mkMetadataPath(env, channelId);
|
|
|
|
// XXX appendFile isn't great
|
|
|
|
// TODO see if we can make this any faster by using something other than appendFile
|
|
|
|
// but this is a simple way to get things working
|
|
|
|
|
|
|
|
Fs.appendFile(path, data + '\n', cb);
|
|
|
|
Fs.appendFile(path, data + '\n', cb);
|
|
|
|
};
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
|
@ -290,7 +289,11 @@ const mkOffsetCounter = () => {
|
|
|
|
});
|
|
|
|
});
|
|
|
|
};
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
|
|
// XXX write some docs for this magic
|
|
|
|
// readMessagesBin asynchronously iterates over the messages in a channel log
|
|
|
|
|
|
|
|
// the handler for each message must call back to read more, which should mean
|
|
|
|
|
|
|
|
// that this function has a lower memory profile than our classic method
|
|
|
|
|
|
|
|
// of reading logs line by line.
|
|
|
|
|
|
|
|
// it also allows the handler to abort reading at any time
|
|
|
|
const readMessagesBin = (env, id, start, msgHandler, cb) => {
|
|
|
|
const readMessagesBin = (env, id, start, msgHandler, cb) => {
|
|
|
|
const stream = Fs.createReadStream(mkPath(env, id), { start: start });
|
|
|
|
const stream = Fs.createReadStream(mkPath(env, id), { start: start });
|
|
|
|
let keepReading = true;
|
|
|
|
let keepReading = true;
|
|
|
@ -341,23 +344,33 @@ var removeChannel = function (env, channelName, cb) {
|
|
|
|
|
|
|
|
|
|
|
|
var CB = Once(cb);
|
|
|
|
var CB = Once(cb);
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
var errors = 0;
|
|
|
|
nThen(function (w) {
|
|
|
|
nThen(function (w) {
|
|
|
|
Fs.unlink(channelPath, w(function (err) {
|
|
|
|
Fs.unlink(channelPath, w(function (err) {
|
|
|
|
if (err) {
|
|
|
|
if (err) {
|
|
|
|
// XXX handle ENOENT and only return an error
|
|
|
|
if (err.code === 'ENOENT') {
|
|
|
|
// if both channel and metadata did not exist...
|
|
|
|
errors++;
|
|
|
|
|
|
|
|
return;
|
|
|
|
|
|
|
|
}
|
|
|
|
w.abort();
|
|
|
|
w.abort();
|
|
|
|
CB(labelError("E_CHANNEL_REMOVAL", err));
|
|
|
|
CB(labelError("E_CHANNEL_REMOVAL", err));
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}));
|
|
|
|
}));
|
|
|
|
Fs.unlink(metadataPath, w(function (err) {
|
|
|
|
Fs.unlink(metadataPath, w(function (err) {
|
|
|
|
if (err) {
|
|
|
|
if (err) {
|
|
|
|
if (err.code === 'ENOENT') { return; } // proceed if there's no metadata to delete
|
|
|
|
if (err.code === 'ENOENT') {
|
|
|
|
|
|
|
|
errors++;
|
|
|
|
|
|
|
|
return;
|
|
|
|
|
|
|
|
} // proceed if there's no metadata to delete
|
|
|
|
w.abort();
|
|
|
|
w.abort();
|
|
|
|
CB(labelError("E_METADATA_REMOVAL", err));
|
|
|
|
CB(labelError("E_METADATA_REMOVAL", err));
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}));
|
|
|
|
}));
|
|
|
|
}).nThen(function () {
|
|
|
|
}).nThen(function () {
|
|
|
|
|
|
|
|
if (errors === 2) {
|
|
|
|
|
|
|
|
return void CB(labelError('E_REMOVE_CHANNEL', new Error("ENOENT")));
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
CB();
|
|
|
|
CB();
|
|
|
|
});
|
|
|
|
});
|
|
|
|
};
|
|
|
|
};
|
|
|
@ -421,12 +434,23 @@ var listChannels = function (root, handler, cb) {
|
|
|
|
if (err) { return void handler(err); } // Is this correct?
|
|
|
|
if (err) { return void handler(err); } // Is this correct?
|
|
|
|
|
|
|
|
|
|
|
|
list.forEach(function (item) {
|
|
|
|
list.forEach(function (item) {
|
|
|
|
// ignore things that don't match the naming pattern
|
|
|
|
// ignore hidden files
|
|
|
|
// XXX don't ignore metadata files if there is no corresponding channel
|
|
|
|
if (/^\./.test(item)) { return; }
|
|
|
|
// since you probably want to clean those up
|
|
|
|
// ignore anything that isn't channel or metadata
|
|
|
|
if (/^\./.test(item) || !/[0-9a-fA-F]{32}\.ndjson$/.test(item)) { return; }
|
|
|
|
if (!/^[0-9a-fA-F]{32}(\.metadata?)*\.ndjson$/.test(item)) {
|
|
|
|
|
|
|
|
return;
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
if (!/^[0-9a-fA-F]{32}\.ndjson$/.test(item)) {
|
|
|
|
|
|
|
|
// this will catch metadata, which we want to ignore if
|
|
|
|
|
|
|
|
// the corresponding channel is present
|
|
|
|
|
|
|
|
if (list.indexOf(item.replace(/\.metadata/, '')) !== -1) { return; }
|
|
|
|
|
|
|
|
// otherwise fall through
|
|
|
|
|
|
|
|
}
|
|
|
|
var filepath = Path.join(nestedDirPath, item);
|
|
|
|
var filepath = Path.join(nestedDirPath, item);
|
|
|
|
var channel = filepath.replace(/\.ndjson$/, '').replace(/.*\//, '');
|
|
|
|
var channel = filepath
|
|
|
|
|
|
|
|
.replace(/\.ndjson$/, '')
|
|
|
|
|
|
|
|
.replace(/\.metadata/, '')
|
|
|
|
|
|
|
|
.replace(/.*\//, '');
|
|
|
|
if ([32, 34].indexOf(channel.length) === -1) { return; }
|
|
|
|
if ([32, 34].indexOf(channel.length) === -1) { return; }
|
|
|
|
|
|
|
|
|
|
|
|
// otherwise throw it on the pile
|
|
|
|
// otherwise throw it on the pile
|
|
|
|