@ -5,6 +5,7 @@ var Fs = require("fs");
var Fse = require("fs-extra");
var Path = require("path");
var nThen = require("nthen");
var Semaphore = require("saferphore");
const ToPull = require('stream-to-pull-stream');
const Pull = require('pull-stream');
@ -14,10 +15,18 @@ const isValidChannelId = function (id) {
// 511 -> octal 777
// read, write, execute permissions flag
const PERMISSIVE = 511;
var mkPath = function (env, channelId) {
return Path.join(env.root, channelId.slice(0, 2), channelId) + '.ndjson';
var mkArchivePath = function (env, channelId) {
return Path.join(env.archiveRoot, channelId.slice(0, 2), channelId) + '.ndjson';
var getMetadataAtPath = function (Env, path, cb) {
var remainder = '';
var stream = Fs.createReadStream(path, { encoding: 'utf8' });
@ -68,7 +77,7 @@ var closeChannel = function (env, channelName, cb) {
var clearChannel = function (env, channelId, cb) { // FIXME deletion
var clearChannel = function (env, channelId, cb) {
var path = mkPath(env, channelId);
getMetadataAtPath(env, path, function (e, metadata) {
if (e) { return cb(new Error(e)); }
@ -189,8 +198,7 @@ var checkPath = function (path, callback) {
// 511 -> octal 777
Fse.mkdirp(Path.dirname(path), 511, function (err) {
Fse.mkdirp(Path.dirname(path), PERMISSIVE, function (err) {
if (err && err.code !== 'EEXIST') {
@ -200,11 +208,99 @@ var checkPath = function (path, callback) {
var removeChannel = function (env, channelName, cb) { // FIXME deletion
var removeChannel = function (env, channelName, cb) {
var filename = mkPath(env, channelName);
Fs.unlink(filename, cb);
var removeArchivedChannel = function (env, channelName, cb) {
var filename = mkArchivePath(env, channelName);
Fs.unlink(filename, cb);
var listChannels = function (root, handler, cb) {
// do twenty things at a time
var sema = Semaphore.create(20);
var dirList = [];
nThen(function (w) {
// the root of your datastore contains nested directories...
Fs.readdir(root, w(function (err, list) {
if (err) {
// TODO check if we normally return strings or errors
return void cb(err);
dirList = list;
}).nThen(function (w) {
// search inside the nested directories
// stream it so you don't put unnecessary data in memory
var wait = w();
dirList.forEach(function (dir) {
sema.take(function (give) {
var nestedDirPath = Path.join(root, dir);
Fs.readdir(nestedDirPath, w(give(function (err, list) {
if (err) { return void handler(err); } // Is this correct?
list.forEach(function (item) {
// ignore things that don't match the naming pattern
if (/^\./.test(item) || !/[0-9a-fA-F]{32,}\.ndjson$/.test(item)) { return; }
var filepath = Path.join(nestedDirPath, item);
var channel = filepath.replace(/\.ndjson$/, '').replace(/.*\//, '');
if ([32, 34].indexOf(channel.length) === -1) { return; }
// otherwise throw it on the pile
sema.take(function (give) {
Fs.stat(filepath, w(give(function (err, stats) {
if (err) {
return void handler(err);
handler(void 0, {
channel: channel,
atime: stats.atime,
mtime: stats.mtime,
ctime: stats.ctime,
size: stats.size,
}).nThen(function () {
// move a channel's log file from its current location
// to an equivalent location in the cold storage directory
var archiveChannel = function (env, channelName, cb) {
if (!env.retainData) {
return void cb("ARCHIVES_DISABLED");
// ctime is the most reliable indicator of when a file was archived
// because it is used to indicate changes to the files metadata
// and not its contents
// if we find that this is not reliable in production, we can update it manually
// https://nodejs.org/api/fs.html#fs_fs_utimes_path_atime_mtime_callback
// check what the channel's path should be (in its current location)
var currentPath = mkPath(env, channelName);
// construct a parallel path in the new location
var archivePath = mkArchivePath(env, channelName);
// use Fse.move to move it, Fse makes paths to the directory when you use it.
// https://github.com/jprichardson/node-fs-extra/blob/HEAD/docs/move.md
Fse.move(currentPath, archivePath, { overwrite: true }, cb);
var flushUnusedChannels = function (env, cb, frame) {
var currentTime = +new Date();
@ -413,19 +509,30 @@ module.exports.create = function (
) {
var env = {
root: conf.filePath || './datastore',
archiveRoot: conf.archivePath || './data/archive',
retainData: conf.retainData,
channels: { },
channelExpirationMs: conf.channelExpirationMs || 30000,
verbose: conf.verbose,
openFiles: 0,
openFileLimit: conf.openFileLimit || 2048,
// 0x1ff -> 777
var it;
Fse.mkdirp(env.root, 0x1ff, function (err) {
nThen(function (w) {
// make sure the store's directory exists
Fse.mkdirp(env.root, PERMISSIVE, w(function (err) {
if (err && err.code !== 'EEXIST') {
// TODO: somehow return a nice error
throw err;
// make sure the cold storage directory exists
Fse.mkdirp(env.archiveRoot, PERMISSIVE, w(function (err) {
if (err && err.code !== 'EEXIST') {
throw err;
}).nThen(function () {
readMessagesBin: (channelName, start, asyncMsgHandler, cb) => {
if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); }
@ -449,6 +556,10 @@ module.exports.create = function (
removeArchivedChannel: function (channelName, cb) {
if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); }
removeArchivedChannel(env, channelName, cb);
closeChannel: function (channelName, cb) {
if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); }
closeChannel(env, channelName, cb);
@ -468,6 +579,16 @@ module.exports.create = function (
if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); }
clearChannel(env, channelName, cb);
listChannels: function (handler, cb) {
listChannels(env.root, handler, cb);
listArchivedChannels: function (handler, cb) {
listChannels(env.archiveRoot, handler, cb);
archiveChannel: function (channelName, cb) {
if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); }
archiveChannel(env, channelName, cb);
log: function (channelName, content, cb) {
message(env, channelName, content, cb);