/*@flow*/ /* jshint esversion: 6 */ /* global Buffer */ var Fs = require("fs"); var Path = require("path"); var nThen = require("nthen"); const ToPull = require('stream-to-pull-stream'); const Pull = require('pull-stream'); var mkPath = function (env, channelId) { return Path.join(env.root, channelId.slice(0, 2), channelId) + '.ndjson'; }; var getMetadataAtPath = function (Env, path, cb) { var remainder = ''; var stream = Fs.createReadStream(path, { encoding: 'utf8' }); var complete = function (err, data) { var _cb = cb; cb = undefined; if (_cb) { _cb(err, data); } }; stream.on('data', function (chunk) { if (!/\n/.test(chunk)) { remainder += chunk; return; } stream.close(); var metadata = chunk.split('\n')[0]; var parsed = null; try { parsed = JSON.parse(metadata); complete(undefined, parsed); } catch (e) { console.log("getMetadataAtPath"); console.error(e); complete('INVALID_METADATA'); } }); stream.on('end', function () { complete(); }); stream.on('error', function (e) { complete(e); }); }; var getChannelMetadata = function (Env, channelId, cb) { var path = mkPath(Env, channelId); getMetadataAtPath(Env, path, cb); }; var closeChannel = function (env, channelName, cb) { if (!env.channels[channelName]) { return void cb(); } try { env.channels[channelName].writeStream.close(); delete env.channels[channelName]; env.openFiles--; cb(); } catch (err) { cb(err); } }; var clearChannel = function (env, channelId, cb) { var path = mkPath(env, channelId); getMetadataAtPath(env, path, function (e, metadata) { if (e) { return cb(new Error(e)); } if (!metadata) { return void Fs.truncate(path, 0, function (err) { if (err) { return cb(err); } cb(void 0); }); } var len = JSON.stringify(metadata).length + 1; // as long as closeChannel is synchronous, this should not cause // any race conditions. truncate ought to return faster than a channel // can be opened and read by another user. if that turns out not to be // the case, we'll need to implement locking. closeChannel(env, channelId, function (err) { if (err) { cb(err); } Fs.truncate(path, len, function (err) { if (err) { return cb(err); } cb(); }); }); }); }; var readMessages = function (path, msgHandler, cb) { var remainder = ''; var stream = Fs.createReadStream(path, { encoding: 'utf8' }); var complete = function (err) { var _cb = cb; cb = undefined; if (_cb) { _cb(err); } }; stream.on('data', function (chunk) { var lines = chunk.split('\n'); lines[0] = remainder + lines[0]; remainder = lines.pop(); lines.forEach(msgHandler); }); stream.on('end', function () { msgHandler(remainder); complete(); }); stream.on('error', function (e) { complete(e); }); }; const NEWLINE_CHR = ('\n').charCodeAt(0); const mkBufferSplit = () => { let remainder = null; return Pull((read) => { return (abort, cb) => { read(abort, function (end, data) { if (end) { cb(end, remainder ? [remainder, data] : [data]); remainder = null; return; } const queue = []; for (;;) { const offset = data.indexOf(NEWLINE_CHR); if (offset < 0) { remainder = remainder ? Buffer.concat([remainder, data]) : data; break; } let subArray = data.slice(0, offset); if (remainder) { subArray = Buffer.concat([remainder, subArray]); remainder = null; } queue.push(subArray); data = data.slice(offset + 1); } cb(end, queue); }); }; }, Pull.flatten()); }; const mkOffsetCounter = () => { let offset = 0; return Pull.map((buff) => { const out = { offset: offset, buff: buff }; // +1 for the eaten newline offset += buff.length + 1; return out; }); }; const readMessagesBin = (env, id, start, msgHandler, cb) => { const stream = Fs.createReadStream(mkPath(env, id), { start: start }); let keepReading = true; Pull( ToPull.read(stream), mkBufferSplit(), mkOffsetCounter(), Pull.asyncMap((data, moreCb) => { msgHandler(data, moreCb, ()=>{ keepReading = false; moreCb(); }); }), Pull.drain(()=>(keepReading), cb) ); }; var checkPath = function (path, callback) { // TODO check if we actually need to use stat at all Fs.stat(path, function (err) { if (!err) { callback(undefined, true); return; } if (err.code !== 'ENOENT') { callback(err); return; } // 511 -> octal 777 Fs.mkdir(Path.dirname(path), 511, function (err) { if (err && err.code !== 'EEXIST') { callback(err); return; } callback(undefined, false); }); }); }; var removeChannel = function (env, channelName, cb) { var filename = mkPath(env, channelName); Fs.unlink(filename, cb); }; var flushUnusedChannels = function (env, cb, frame) { var currentTime = +new Date(); var expiration = typeof(frame) === 'undefined'? env.channelExpirationMs: frame; Object.keys(env.channels).forEach(function (chanId) { var chan = env.channels[chanId]; if (typeof(chan.atime) !== 'number') { return; } if (currentTime >= expiration + chan.atime) { closeChannel(env, chanId, function (err) { if (err) { console.error(err); return; } if (env.verbose) { console.log("Closed channel [%s]", chanId); } }); } }); cb(); }; var channelBytes = function (env, chanName, cb) { var path = mkPath(env, chanName); Fs.stat(path, function (err, stats) { if (err) { return void cb(err); } cb(undefined, stats.size); }); }; /*:: export type ChainPadServer_ChannelInternal_t = { atime: number, writeStream: typeof(process.stdout), whenLoaded: ?Array<(err:?Error, chan:?ChainPadServer_ChannelInternal_t)=>void>, onError: Array<(?Error)=>void>, path: string }; */ var getChannel = function ( env, id, callback /*:(err:?Error, chan:?ChainPadServer_ChannelInternal_t)=>void*/ ) { if (env.channels[id]) { var chan = env.channels[id]; chan.atime = +new Date(); if (chan.whenLoaded) { chan.whenLoaded.push(callback); } else { callback(undefined, chan); } return; } if (env.openFiles >= env.openFileLimit) { // if you're running out of open files, asynchronously clean up expired files // do it on a shorter timeframe, though (half of normal) setTimeout(function () { flushUnusedChannels(env, function () { if (env.verbose) { console.log("Approaching open file descriptor limit. Cleaning up"); } }, env.channelExpirationMs / 2); }); } var path = mkPath(env, id); var channel /*:ChainPadServer_ChannelInternal_t*/ = env.channels[id] = { atime: +new Date(), writeStream: (undefined /*:any*/), whenLoaded: [ callback ], onError: [ ], path: path }; var complete = function (err) { var whenLoaded = channel.whenLoaded; // no guarantee stream.on('error') will not cause this to be called multiple times if (!whenLoaded) { return; } channel.whenLoaded = undefined; if (err) { delete env.channels[id]; } if (!channel.writeStream) { throw new Error("getChannel() complete called without channel writeStream"); } whenLoaded.forEach(function (wl) { wl(err, (err) ? undefined : channel); }); }; var fileExists; var errorState; nThen(function (waitFor) { checkPath(path, waitFor(function (err, exists) { if (err) { errorState = true; complete(err); return; } fileExists = exists; })); }).nThen(function (waitFor) { if (errorState) { return; } var stream = channel.writeStream = Fs.createWriteStream(path, { flags: 'a' }); env.openFiles++; stream.on('open', waitFor()); stream.on('error', function (err /*:?Error*/) { env.openFiles--; // this might be called after this nThen block closes. if (channel.whenLoaded) { complete(err); } else { channel.onError.forEach(function (handler) { handler(err); }); } }); }).nThen(function () { if (errorState) { return; } complete(); }); }; const messageBin = (env, chanName, msgBin, cb) => { getChannel(env, chanName, function (err, chan) { if (!chan) { cb(err); return; } let called = false; var complete = function (err) { if (called) { return; } called = true; cb(err); }; chan.onError.push(complete); chan.writeStream.write(msgBin, function () { /*::if (!chan) { throw new Error("Flow unreachable"); }*/ chan.onError.splice(chan.onError.indexOf(complete), 1); if (!cb) { return; } //chan.messages.push(msg); chan.atime = +new Date(); complete(); }); }); }; var message = function (env, chanName, msg, cb) { messageBin(env, chanName, new Buffer(msg + '\n', 'utf8'), cb); }; var getMessages = function (env, chanName, handler, cb) { getChannel(env, chanName, function (err, chan) { if (!chan) { cb(err); return; } var errorState = false; readMessages(chan.path, function (msg) { if (!msg || errorState) { return; } //console.log(msg); try { handler(msg); } catch (e) { errorState = true; return void cb(err); } }, function (err) { if (err) { errorState = true; return void cb(err); } if (!chan) { throw new Error("impossible, flow checking"); } chan.atime = +new Date(); cb(); }); }); }; /*:: export type ChainPadServer_MessageObj_t = { buff: Buffer, offset: number }; export type ChainPadServer_Storage_t = { readMessagesBin: ( channelName:string, start:number, asyncMsgHandler:(msg:ChainPadServer_MessageObj_t, moreCb:()=>void, abortCb:()=>void)=>void, cb:(err:?Error)=>void )=>void, message: (channelName:string, content:string, cb:(err:?Error)=>void)=>void, messageBin: (channelName:string, content:Buffer, cb:(err:?Error)=>void)=>void, getMessages: (channelName:string, msgHandler:(msg:string)=>void, cb:(err:?Error)=>void)=>void, removeChannel: (channelName:string, cb:(err:?Error)=>void)=>void, closeChannel: (channelName:string, cb:(err:?Error)=>void)=>void, flushUnusedChannels: (cb:()=>void)=>void, getChannelSize: (channelName:string, cb:(err:?Error, size:?number)=>void)=>void, getChannelMetadata: (channelName:string, cb:(err:?Error|string, data:?any)=>void)=>void, clearChannel: (channelName:string, (err:?Error)=>void)=>void }; const flow_Config = require('../config.example.js'); type Config_t = typeof(flow_Config); */ module.exports.create = function ( conf /*:Config_t*/, cb /*:(store:ChainPadServer_Storage_t)=>void*/ ) { var env = { root: conf.filePath || './datastore', channels: { }, channelExpirationMs: conf.channelExpirationMs || 30000, verbose: conf.verbose, openFiles: 0, openFileLimit: conf.openFileLimit || 2048, }; // 0x1ff -> 777 Fs.mkdir(env.root, 0x1ff, function (err) { if (err && err.code !== 'EEXIST') { // TODO: somehow return a nice error throw err; } cb({ readMessagesBin: (channelName, start, asyncMsgHandler, cb) => { readMessagesBin(env, channelName, start, asyncMsgHandler, cb); }, message: function (channelName, content, cb) { message(env, channelName, content, cb); }, messageBin: (channelName, content, cb) => { messageBin(env, channelName, content, cb); }, getMessages: function (channelName, msgHandler, cb) { getMessages(env, channelName, msgHandler, cb); }, removeChannel: function (channelName, cb) { removeChannel(env, channelName, function (err) { cb(err); }); }, closeChannel: function (channelName, cb) { closeChannel(env, channelName, cb); }, flushUnusedChannels: function (cb) { flushUnusedChannels(env, cb); }, getChannelSize: function (chanName, cb) { channelBytes(env, chanName, cb); }, getChannelMetadata: function (channelName, cb) { getChannelMetadata(env, channelName, cb); }, clearChannel: function (channelName, cb) { clearChannel(env, channelName, cb); }, }); }); setInterval(function () { flushUnusedChannels(env, function () { }); }, 5000); };