diff --git a/.gitignore b/.gitignore index 2d0cd09d9..354096b87 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ +datastore www/bower_components/* node_modules /config.js diff --git a/NetfluxWebsocketSrv.js b/NetfluxWebsocketSrv.js index d92510749..e9fffd2ed 100644 --- a/NetfluxWebsocketSrv.js +++ b/NetfluxWebsocketSrv.js @@ -33,7 +33,11 @@ const sendChannelMessage = function (ctx, channel, msgStruct) { } }); if (USE_HISTORY_KEEPER && msgStruct[2] === 'MSG') { - ctx.store.message(channel.id, JSON.stringify(msgStruct), function () { }); + ctx.store.message(channel.id, JSON.stringify(msgStruct), function (err) { + if (err) { + console.log("Error writing message: " + err); + } + }); } }; @@ -90,7 +94,11 @@ const getHistory = function (ctx, channelName, handler, cb) { var messageBuf = []; ctx.store.getMessages(channelName, function (msgStr) { messageBuf.push(JSON.parse(msgStr)); - }, function () { + }, function (err) { + if (err) { + console.log("Error getting messages " + err.stack); + // TODO: handle this better + } var startPoint; var cpCount = 0; var msgBuff2 = []; diff --git a/storage/file.js b/storage/file.js index 9cfeffc17..df391db7b 100644 --- a/storage/file.js +++ b/storage/file.js @@ -1,128 +1,167 @@ var Fs = require("fs"); var Path = require("path"); +var nThen = require("nthen"); -//function will check if a directory exists, and create it if it doesn't -var checkDir = function (dir, cb) { - Fs.stat(dir, function(err, stats) { - //Check if error defined and the error code is "not exists" - if (err) { - //Create the directory, call the callback. - Fs.mkdir(dir, cb); - } else { - //just in case there was a different error: - cb(err); - } +var mkPath = function (env, channelId) { + return Path.join(env.root, channelId.slice(0, 2), channelId) + '.ndjson'; +}; + +var readMessages = function (path, msgHandler, cb) { + var remainder = ''; + var stream = Fs.createReadStream(path, 'utf8'); + var complete = function (err) { + var _cb = cb; + cb = undefined; + if (_cb) { _cb(err); } + }; + stream.on('data', function (chunk) { + var lines = chunk.split('\n'); + lines[0] = remainder + lines[0]; + remainder = lines.pop(); + lines.forEach(msgHandler); + }); + stream.on('end', function () { + msgHandler(remainder); + complete(); }); + stream.on('error', function (e) { complete(e); }); }; -var checkFile = function (path, cb) { +var checkPath = function (path, callback) { Fs.stat(path, function (err, stats) { - if (err) { - if (err.code === 'ENOENT') { - return cb(null, false); - } else { - return cb(err); - } + if (!err) { + callback(undefined, true); + return; + } + if (err.code !== 'ENOENT') { + callback(err); + return; } - return cb(null, stats.isFile()); + var dirPath = path.replace(/\/[^\/]*$/, '/'); + Fs.mkdir(dirPath, function (err) { + if (err && err !== 'EEXIST') { + callback(err); + return; + } + callback(undefined, false); + }); }); }; -var separate = function (channel) { - return { - first: channel.slice(0, 2), - rest: channel.slice(2), +var getChannel = function (env, id, callback) { + if (env.channels[id]) { + var chan = env.channels[id]; + if (chan.whenLoaded) { + chan.whenLoaded.push(callback); + } else { + callback(undefined, chan); + } + return; + } + var channel = env.channels[id] = { + atime: +new Date(), + messages: [], + writeStream: undefined, + whenLoaded: [ callback ], + onError: [ ] }; -}; - -var Channel = function (env, id, filepath, cb) { - if (!env.channels[id]) { - return (env.channels[id] = { - atime: +new Date(), - queue: [], - stream: Fs.createWriteStream(filepath, { - flags: 'a' - }).on('open', function () { - cb(null, env.channels[id]); - }).on('error', function (err) { - cb(err); - }) - }); + var complete = function (err) { + var whenLoaded = channel.whenLoaded; + // no guarantee stream.on('error') will not cause this to be called multiple times + if (!whenLoaded) { return; } + channel.whenLoaded = undefined; + if (err) { + delete env.channels[id]; + } + whenLoaded.forEach(function (wl) { wl(err, (err) ? undefined : channel); }); } - cb(null, env.channels[id]); + var path = mkPath(env, id); + var fileExists; + nThen(function (waitFor) { + checkPath(path, waitFor(function (err, exists) { + if (err) { + waitFor.abort(); + complete(err); + return; + } + fileExists = exists; + })); + }).nThen(function (waitFor) { + if (!fileExists) { return; } + readMessages(path, function (msg) { + channel.messages.push(msg); + }, waitFor(function (err) { + if (err) { + waitFor.abort(); + complete(err); + } + })); + }).nThen(function (waitFor) { + var stream = channel.writeStream = Fs.createWriteStream(path, { flags: 'a' }); + stream.on('open', waitFor()); + stream.on('error', function (err) { + // this might be called after this nThen block closes. + if (channel.whenLoaded) { + complete(err); + } else { + channel.onError.forEach(function (handler) { + handler(err); + }); + } + }); + }).nThen(function (waitFor) { + complete(); + }); }; -var insert = function (env, channelName, content, cb) { - var parts = separate(channelName); - - var dirpath = Path.join(env.root, parts.first); - checkDir(dirpath, function (e) { - if (e) { throw new Error(e); } - - var filepath = Path.join(env.root, parts.first, parts.rest); - checkFile(filepath, function (err, isFile) { - Channel(env, channelName, filepath, function (err, channel) { - if (err) { - console.error(err); - return cb(); - } - - var doIt = function () { - channel.locked = true; - channel.atime = +new Date(); - channel.stream.write(JSON.stringify(content) + '\n'); - - if (!channel.queue.length) { - channel.locked = false; - cb(); - return; - } - - channel.queue.shift()(); - cb(); - }; - - if (channel.locked) { - channel.queue.push(doIt); - } else { - doIt(); - } - }); +var message = function (env, chanName, msg, cb) { + getChannel(env, chanName, function (err, chan) { + if (err) { + cb(err); + return; + } + var complete = function (err) { + var _cb = cb; + cb = undefined; + if (_cb) { _cb(err); } + }; + chan.onError.push(complete); + chan.writeStream.write(msg + '\n', function () { + chan.onError.splice(chan.onError.indexOf(complete) - 1, 1); + if (!cb) { return; } + chan.messages.push(msg); + chan.atime = +new Date(); + complete(); }); }); }; -var getMessages = function (env, channelName, msgHandler, cb) { - var parts = separate(channelName); - - var filepath = Path.join(env.root, parts.first, parts.rest); - - var remainder = ''; - var newlines = /[\n\r]+/; - - var stream = Fs.createReadStream(filepath, 'utf-8') - .on('data', function (chunk) { - var lines = chunk.split(newlines); - lines[0] = remainder + lines[0]; - remainder = lines.pop(); - lines.forEach(function (line) { - msgHandler(JSON.parse(line)); - }); - }) - .on('end', function () { cb(); }) - .on('error', function (e) { cb(); }); +var getMessages = function (env, chanName, handler, cb) { + getChannel(env, chanName, function (err, chan) { + if (err) { + cb(err); + return; + } + chan.messages.forEach(handler); + chan.atime = +new Date(); + cb(); + }); }; module.exports.create = function (conf, cb) { var env = { - root: conf.filePath, + root: conf.filePath || './datastore', channels: { }, }; - - checkDir(env.root, function (e, data) { + console.log('storing data in ' + env.root); + Fs.mkdir(env.root, function (err) { + if (err && err.code !== 'EEXIST') { + // TODO: somehow return a nice error + throw err; + } cb({ message: function (channelName, content, cb) { - insert(env, channelName, content, cb); + message(env, channelName, content, cb); }, getMessages: function (channelName, msgHandler, cb) { getMessages(env, channelName, msgHandler, cb); @@ -132,17 +171,5 @@ module.exports.create = function (conf, cb) { cb(); }, }); - - setInterval(function () { - var now = +new Date(); - Object.keys(env.channels).forEach(function (id) { - var channel = env.channels[id]; - if (now - channel.atime > (1000 * 60)) { - //console.log("Cleaning up idle channel [%s]", id); - channel.stream.close(); - delete env.channels[id]; - } - }); - }, 60 * 1000); }); };