diff --git a/storage/file.js b/storage/file.js index 0c6b6734c..cdd4dbb88 100644 --- a/storage/file.js +++ b/storage/file.js @@ -1,22 +1,149 @@ var Fs = require("fs"); +var Path = require("path"); -var insert = function (env, channel, content, cb) { +//function will check if a directory exists, and create it if it doesn't +var checkDir = function (dir, cb) { + Fs.stat(dir, function(err, stats) { + //Check if error defined and the error code is "not exists" + if (err) { + //Create the directory, call the callback. + Fs.mkdir(dir, cb); + } else { + //just in case there was a different error: + cb(err); + } + }); +}; +var checkFile = function (path, cb) { + Fs.stat(path, function (err, stats) { + if (err) { + if (err.code === 'ENOENT') { + return cb(null, false); + } else { + return cb(err); + } + } + return cb(null, stats.isFile()); + }); +}; + +var separate = function (channel) { + return { + first: channel.slice(0, 2), + rest: channel.slice(2), + }; +}; + +var Channel = function (env, id, filepath, cb) { + if (!env.channels[id]) { + return (env.channels[id] = { + atime: +new Date(), + queue: [], + stream: Fs.createWriteStream(filepath, { + flags: 'a' + }).on('open', function () { + cb(null, env.channels[id]); + }).on('error', function (err) { + cb(err); + }) + }); + } + cb(null, env.channels[id]); +}; + +var insert = function (env, channelName, content, cb) { + var parts = separate(channelName); + + var dirpath = Path.join(env.root, parts.first); + checkDir(dirpath, function (e) { + if (e) { throw new Error(e); } + + var filepath = Path.join(env.root, parts.first, parts.rest); + checkFile(filepath, function (err, isFile) { + Channel(env, channelName, filepath, function (err, channel) { + if (err) { + console.error(err); + return cb(); + } + + var doIt = function () { + channel.locked = true; + channel.atime = +new Date(); + channel.stream.write(JSON.stringify(content) + '\n'); + + if (!channel.queue.length) { + channel.locked = false; + cb(); + return; + } + + channel.queue.shift()(); + cb(); + }; + + if (channel.locked) { + channel.queue.push(doIt); + } else { + doIt(); + } + }); + }); + }); }; var getMessages = function (env, channelName, msgHandler, cb) { + var parts = separate(channelName); + var filepath = Path.join(env.root, parts.first, parts.rest); + + var remainder = ''; + var newlines = /[\n\r]+/; + + var stream = Fs.createReadStream(filepath, 'utf-8') + .on('data', function (chunk) { + var lines = chunk.split(newlines); + lines[0] = remainder + lines[0]; + remainder = lines.pop(); + lines.forEach(function (line) { + msgHandler(JSON.parse(line)); + }); + }) + .on('end', function () { cb(); }) + .on('error', function (e) { cb(); }); }; module.exports.create = function (conf, cb) { - var env = {}; + var env = { + root: conf.filePath, + channels: { }, + }; - cb({ - message: function (channelName, content, cb) { - insert(env, channelName, content, cb); - }, - getMessages: function (channelName, msgHandler, cb) { - getMessages(env, channelName, msgHandler, cb); - }, + checkDir(env.root, function (e, data) { + cb({ + message: function (channelName, content, cb) { + insert(env, channelName, content, cb); + }, + getMessages: function (channelName, msgHandler, cb) { + getMessages(env, channelName, msgHandler, cb); + }, + removeChannel: function (channelName, cb) { + console.log("[storage/file.removeChannel()] Not implemented"); + cb(); + }, + }); + + setInterval(function () { + var now = +new Date(); + Object.keys(env.channels).forEach(function (id) { + var channel = env.channels[id]; + if (now - channel.atime > (1000 * 60)) { + console.log("Cleaning up channel [%s]", id); + + channel.stream.close(); + delete env.channels[id]; + } + }); + }, 60 * 1000); }); };