|
|
@ -6,7 +6,8 @@ var Fse = require("fs-extra");
|
|
|
|
var Path = require("path");
|
|
|
|
var Path = require("path");
|
|
|
|
var nThen = require("nthen");
|
|
|
|
var nThen = require("nthen");
|
|
|
|
var Semaphore = require("saferphore");
|
|
|
|
var Semaphore = require("saferphore");
|
|
|
|
var Once = require("../lib/once");
|
|
|
|
var Util = require("../lib/common-util");
|
|
|
|
|
|
|
|
const Readline = require("readline");
|
|
|
|
const ToPull = require('stream-to-pull-stream');
|
|
|
|
const ToPull = require('stream-to-pull-stream');
|
|
|
|
const Pull = require('pull-stream');
|
|
|
|
const Pull = require('pull-stream');
|
|
|
|
|
|
|
|
|
|
|
@ -52,37 +53,54 @@ var channelExists = function (filepath, cb) {
|
|
|
|
};
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
|
|
// reads classic metadata from a channel log and aborts
|
|
|
|
// reads classic metadata from a channel log and aborts
|
|
|
|
var getMetadataAtPath = function (Env, path, cb) {
|
|
|
|
// returns undefined if the first message was not an object (not an array)
|
|
|
|
var remainder = '';
|
|
|
|
var getMetadataAtPath = function (Env, path, _cb) {
|
|
|
|
var stream = Fs.createReadStream(path, { encoding: 'utf8' });
|
|
|
|
var stream;
|
|
|
|
var complete = function (err, data) {
|
|
|
|
|
|
|
|
var _cb = cb;
|
|
|
|
|
|
|
|
cb = undefined;
|
|
|
|
|
|
|
|
if (_cb) { _cb(err, data); }
|
|
|
|
|
|
|
|
};
|
|
|
|
|
|
|
|
stream.on('data', function (chunk) {
|
|
|
|
|
|
|
|
if (!/\n/.test(chunk)) {
|
|
|
|
|
|
|
|
remainder += chunk;
|
|
|
|
|
|
|
|
return;
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
stream.close();
|
|
|
|
|
|
|
|
var metadata = chunk.split('\n')[0];
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
var parsed = null;
|
|
|
|
// cb implicitly destroys the stream, if it exists
|
|
|
|
|
|
|
|
// and calls back asynchronously no more than once
|
|
|
|
|
|
|
|
var cb = Util.once(Util.both(function () {
|
|
|
|
try {
|
|
|
|
try {
|
|
|
|
parsed = JSON.parse(metadata);
|
|
|
|
stream.destroy();
|
|
|
|
complete(undefined, parsed);
|
|
|
|
} catch (err) {
|
|
|
|
}
|
|
|
|
return err;
|
|
|
|
catch (e) {
|
|
|
|
|
|
|
|
console.log("getMetadataAtPath");
|
|
|
|
|
|
|
|
console.error(e);
|
|
|
|
|
|
|
|
complete('INVALID_METADATA', metadata);
|
|
|
|
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
}, Util.mkAsync(_cb)));
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
try {
|
|
|
|
|
|
|
|
// stream creation can throw... probably ENOENT
|
|
|
|
|
|
|
|
stream = Fs.createReadStream(path, { encoding: 'utf8' });
|
|
|
|
|
|
|
|
} catch (err) {
|
|
|
|
|
|
|
|
return void cb(err);
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// stream lines
|
|
|
|
|
|
|
|
const rl = Readline.createInterface({
|
|
|
|
|
|
|
|
input: stream,
|
|
|
|
});
|
|
|
|
});
|
|
|
|
stream.on('end', function () {
|
|
|
|
|
|
|
|
complete();
|
|
|
|
var i = 0;
|
|
|
|
});
|
|
|
|
rl
|
|
|
|
stream.on('error', function (e) { complete(e); });
|
|
|
|
.on('line', function (line) {
|
|
|
|
|
|
|
|
// metadata should always be on the first line or not exist in the channel at all
|
|
|
|
|
|
|
|
if (i++ > 0) { return void cb(); }
|
|
|
|
|
|
|
|
var metadata;
|
|
|
|
|
|
|
|
try {
|
|
|
|
|
|
|
|
metadata = JSON.parse(line);
|
|
|
|
|
|
|
|
// if it parses, is a truthy object, and is not an array
|
|
|
|
|
|
|
|
// then it's what you were looking for
|
|
|
|
|
|
|
|
if (metadata && typeof(metadata) === 'object' && !Array.isArray(metadata)) {
|
|
|
|
|
|
|
|
return void cb(void 0, metadata);
|
|
|
|
|
|
|
|
} else { // it parsed, but isn't metadata
|
|
|
|
|
|
|
|
return void cb(); // call back without an error or metadata
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
} catch (err) {
|
|
|
|
|
|
|
|
// if you can't parse, that's bad
|
|
|
|
|
|
|
|
return void cb("INVALID_METADATA");
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
})
|
|
|
|
|
|
|
|
.on('end', cb)
|
|
|
|
|
|
|
|
.on('error', cb);
|
|
|
|
};
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
|
|
var closeChannel = function (env, channelName, cb) {
|
|
|
|
var closeChannel = function (env, channelName, cb) {
|
|
|
@ -98,18 +116,14 @@ var closeChannel = function (env, channelName, cb) {
|
|
|
|
};
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
|
|
// truncates a file to the end of its metadata line
|
|
|
|
// truncates a file to the end of its metadata line
|
|
|
|
var clearChannel = function (env, channelId, cb) {
|
|
|
|
// TODO write the metadata in a dedicated file
|
|
|
|
|
|
|
|
var clearChannel = function (env, channelId, _cb) {
|
|
|
|
|
|
|
|
var cb = Util.once(Util.mkAsync(_cb));
|
|
|
|
|
|
|
|
|
|
|
|
var path = mkPath(env, channelId);
|
|
|
|
var path = mkPath(env, channelId);
|
|
|
|
getMetadataAtPath(env, path, function (e, metadata) {
|
|
|
|
getMetadataAtPath(env, path, function (e, metadata) {
|
|
|
|
if (e) { return cb(new Error(e)); }
|
|
|
|
if (e) { return cb(new Error(e)); }
|
|
|
|
if (!metadata) {
|
|
|
|
if (!metadata) { return void Fs.truncate(path, 0, cb); }
|
|
|
|
return void Fs.truncate(path, 0, function (err) {
|
|
|
|
|
|
|
|
if (err) {
|
|
|
|
|
|
|
|
return cb(err);
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
cb(void 0);
|
|
|
|
|
|
|
|
});
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
var len = JSON.stringify(metadata).length + 1;
|
|
|
|
var len = JSON.stringify(metadata).length + 1;
|
|
|
|
|
|
|
|
|
|
|
@ -214,7 +228,7 @@ How to proceed
|
|
|
|
// 'INVALID_METADATA' if it can't parse
|
|
|
|
// 'INVALID_METADATA' if it can't parse
|
|
|
|
// stream errors if anything goes wrong at a lower level
|
|
|
|
// stream errors if anything goes wrong at a lower level
|
|
|
|
// ENOENT (no channel here)
|
|
|
|
// ENOENT (no channel here)
|
|
|
|
return void handler(err);
|
|
|
|
return void handler(err, data);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
// disregard anything that isn't a map
|
|
|
|
// disregard anything that isn't a map
|
|
|
|
if (!data || typeof(data) !== 'object' || Array.isArray(data)) { return; }
|
|
|
|
if (!data || typeof(data) !== 'object' || Array.isArray(data)) { return; }
|
|
|
@ -347,7 +361,7 @@ var removeChannel = function (env, channelName, cb) {
|
|
|
|
var channelPath = mkPath(env, channelName);
|
|
|
|
var channelPath = mkPath(env, channelName);
|
|
|
|
var metadataPath = mkMetadataPath(env, channelName);
|
|
|
|
var metadataPath = mkMetadataPath(env, channelName);
|
|
|
|
|
|
|
|
|
|
|
|
var CB = Once(cb);
|
|
|
|
var CB = Util.once(cb);
|
|
|
|
|
|
|
|
|
|
|
|
var errors = 0;
|
|
|
|
var errors = 0;
|
|
|
|
nThen(function (w) {
|
|
|
|
nThen(function (w) {
|
|
|
@ -387,7 +401,7 @@ var removeArchivedChannel = function (env, channelName, cb) {
|
|
|
|
var channelPath = mkArchivePath(env, channelName);
|
|
|
|
var channelPath = mkArchivePath(env, channelName);
|
|
|
|
var metadataPath = mkArchiveMetadataPath(env, channelName);
|
|
|
|
var metadataPath = mkArchiveMetadataPath(env, channelName);
|
|
|
|
|
|
|
|
|
|
|
|
var CB = Once(cb);
|
|
|
|
var CB = Util.once(cb);
|
|
|
|
|
|
|
|
|
|
|
|
nThen(function (w) {
|
|
|
|
nThen(function (w) {
|
|
|
|
Fs.unlink(channelPath, w(function (err) {
|
|
|
|
Fs.unlink(channelPath, w(function (err) {
|
|
|
@ -602,7 +616,7 @@ var unarchiveChannel = function (env, channelName, cb) {
|
|
|
|
var metadataPath = mkMetadataPath(env, channelName);
|
|
|
|
var metadataPath = mkMetadataPath(env, channelName);
|
|
|
|
|
|
|
|
|
|
|
|
// don't call the callback multiple times
|
|
|
|
// don't call the callback multiple times
|
|
|
|
var CB = Once(cb);
|
|
|
|
var CB = Util.once(cb);
|
|
|
|
|
|
|
|
|
|
|
|
// if a file exists in the unarchived path, you probably don't want to clobber its data
|
|
|
|
// if a file exists in the unarchived path, you probably don't want to clobber its data
|
|
|
|
// so unlike 'archiveChannel' we won't overwrite.
|
|
|
|
// so unlike 'archiveChannel' we won't overwrite.
|
|
|
@ -690,7 +704,7 @@ var channelBytes = function (env, chanName, cb) {
|
|
|
|
var channelPath = mkPath(env, chanName);
|
|
|
|
var channelPath = mkPath(env, chanName);
|
|
|
|
var dataPath = mkMetadataPath(env, chanName);
|
|
|
|
var dataPath = mkMetadataPath(env, chanName);
|
|
|
|
|
|
|
|
|
|
|
|
var CB = Once(cb);
|
|
|
|
var CB = Util.once(cb);
|
|
|
|
|
|
|
|
|
|
|
|
var channelSize = 0;
|
|
|
|
var channelSize = 0;
|
|
|
|
var dataSize = 0;
|
|
|
|
var dataSize = 0;
|
|
|
@ -820,6 +834,7 @@ const messageBin = (env, chanName, msgBin, cb) => {
|
|
|
|
chan.onError.push(complete);
|
|
|
|
chan.onError.push(complete);
|
|
|
|
chan.writeStream.write(msgBin, function () {
|
|
|
|
chan.writeStream.write(msgBin, function () {
|
|
|
|
/*::if (!chan) { throw new Error("Flow unreachable"); }*/
|
|
|
|
/*::if (!chan) { throw new Error("Flow unreachable"); }*/
|
|
|
|
|
|
|
|
// TODO replace ad hoc queuing with WriteQueue
|
|
|
|
chan.onError.splice(chan.onError.indexOf(complete), 1);
|
|
|
|
chan.onError.splice(chan.onError.indexOf(complete), 1);
|
|
|
|
chan.atime = +new Date();
|
|
|
|
chan.atime = +new Date();
|
|
|
|
if (!cb) { return; }
|
|
|
|
if (!cb) { return; }
|
|
|
|