Improvements to the file storage format

pull/1/head
Caleb James DeLisle 8 years ago
parent 4b64f00cc0
commit 2021bf6702

1
.gitignore vendored

@ -1,3 +1,4 @@
datastore
www/bower_components/* www/bower_components/*
node_modules node_modules
/config.js /config.js

@ -33,7 +33,11 @@ const sendChannelMessage = function (ctx, channel, msgStruct) {
} }
}); });
if (USE_HISTORY_KEEPER && msgStruct[2] === 'MSG') { if (USE_HISTORY_KEEPER && msgStruct[2] === 'MSG') {
ctx.store.message(channel.id, JSON.stringify(msgStruct), function () { }); ctx.store.message(channel.id, JSON.stringify(msgStruct), function (err) {
if (err) {
console.log("Error writing message: " + err);
}
});
} }
}; };
@ -90,7 +94,11 @@ const getHistory = function (ctx, channelName, handler, cb) {
var messageBuf = []; var messageBuf = [];
ctx.store.getMessages(channelName, function (msgStr) { ctx.store.getMessages(channelName, function (msgStr) {
messageBuf.push(JSON.parse(msgStr)); messageBuf.push(JSON.parse(msgStr));
}, function () { }, function (err) {
if (err) {
console.log("Error getting messages " + err.stack);
// TODO: handle this better
}
var startPoint; var startPoint;
var cpCount = 0; var cpCount = 0;
var msgBuff2 = []; var msgBuff2 = [];

@ -1,128 +1,167 @@
var Fs = require("fs"); var Fs = require("fs");
var Path = require("path"); var Path = require("path");
var nThen = require("nthen");
//function will check if a directory exists, and create it if it doesn't var mkPath = function (env, channelId) {
var checkDir = function (dir, cb) { return Path.join(env.root, channelId.slice(0, 2), channelId) + '.ndjson';
Fs.stat(dir, function(err, stats) { };
//Check if error defined and the error code is "not exists"
if (err) { var readMessages = function (path, msgHandler, cb) {
//Create the directory, call the callback. var remainder = '';
Fs.mkdir(dir, cb); var stream = Fs.createReadStream(path, 'utf8');
} else { var complete = function (err) {
//just in case there was a different error: var _cb = cb;
cb(err); cb = undefined;
} if (_cb) { _cb(err); }
};
stream.on('data', function (chunk) {
var lines = chunk.split('\n');
lines[0] = remainder + lines[0];
remainder = lines.pop();
lines.forEach(msgHandler);
});
stream.on('end', function () {
msgHandler(remainder);
complete();
}); });
stream.on('error', function (e) { complete(e); });
}; };
var checkFile = function (path, cb) { var checkPath = function (path, callback) {
Fs.stat(path, function (err, stats) { Fs.stat(path, function (err, stats) {
if (err) { if (!err) {
if (err.code === 'ENOENT') { callback(undefined, true);
return cb(null, false); return;
} else { }
return cb(err); if (err.code !== 'ENOENT') {
} callback(err);
return;
} }
return cb(null, stats.isFile()); var dirPath = path.replace(/\/[^\/]*$/, '/');
Fs.mkdir(dirPath, function (err) {
if (err && err !== 'EEXIST') {
callback(err);
return;
}
callback(undefined, false);
});
}); });
}; };
var separate = function (channel) { var getChannel = function (env, id, callback) {
return { if (env.channels[id]) {
first: channel.slice(0, 2), var chan = env.channels[id];
rest: channel.slice(2), if (chan.whenLoaded) {
chan.whenLoaded.push(callback);
} else {
callback(undefined, chan);
}
return;
}
var channel = env.channels[id] = {
atime: +new Date(),
messages: [],
writeStream: undefined,
whenLoaded: [ callback ],
onError: [ ]
}; };
}; var complete = function (err) {
var whenLoaded = channel.whenLoaded;
var Channel = function (env, id, filepath, cb) { // no guarantee stream.on('error') will not cause this to be called multiple times
if (!env.channels[id]) { if (!whenLoaded) { return; }
return (env.channels[id] = { channel.whenLoaded = undefined;
atime: +new Date(), if (err) {
queue: [], delete env.channels[id];
stream: Fs.createWriteStream(filepath, { }
flags: 'a' whenLoaded.forEach(function (wl) { wl(err, (err) ? undefined : channel); });
}).on('open', function () {
cb(null, env.channels[id]);
}).on('error', function (err) {
cb(err);
})
});
} }
cb(null, env.channels[id]); var path = mkPath(env, id);
var fileExists;
nThen(function (waitFor) {
checkPath(path, waitFor(function (err, exists) {
if (err) {
waitFor.abort();
complete(err);
return;
}
fileExists = exists;
}));
}).nThen(function (waitFor) {
if (!fileExists) { return; }
readMessages(path, function (msg) {
channel.messages.push(msg);
}, waitFor(function (err) {
if (err) {
waitFor.abort();
complete(err);
}
}));
}).nThen(function (waitFor) {
var stream = channel.writeStream = Fs.createWriteStream(path, { flags: 'a' });
stream.on('open', waitFor());
stream.on('error', function (err) {
// this might be called after this nThen block closes.
if (channel.whenLoaded) {
complete(err);
} else {
channel.onError.forEach(function (handler) {
handler(err);
});
}
});
}).nThen(function (waitFor) {
complete();
});
}; };
var insert = function (env, channelName, content, cb) { var message = function (env, chanName, msg, cb) {
var parts = separate(channelName); getChannel(env, chanName, function (err, chan) {
if (err) {
var dirpath = Path.join(env.root, parts.first); cb(err);
checkDir(dirpath, function (e) { return;
if (e) { throw new Error(e); } }
var complete = function (err) {
var filepath = Path.join(env.root, parts.first, parts.rest); var _cb = cb;
checkFile(filepath, function (err, isFile) { cb = undefined;
Channel(env, channelName, filepath, function (err, channel) { if (_cb) { _cb(err); }
if (err) { };
console.error(err); chan.onError.push(complete);
return cb(); chan.writeStream.write(msg + '\n', function () {
} chan.onError.splice(chan.onError.indexOf(complete) - 1, 1);
if (!cb) { return; }
var doIt = function () { chan.messages.push(msg);
channel.locked = true; chan.atime = +new Date();
channel.atime = +new Date(); complete();
channel.stream.write(JSON.stringify(content) + '\n');
if (!channel.queue.length) {
channel.locked = false;
cb();
return;
}
channel.queue.shift()();
cb();
};
if (channel.locked) {
channel.queue.push(doIt);
} else {
doIt();
}
});
}); });
}); });
}; };
var getMessages = function (env, channelName, msgHandler, cb) { var getMessages = function (env, chanName, handler, cb) {
var parts = separate(channelName); getChannel(env, chanName, function (err, chan) {
if (err) {
var filepath = Path.join(env.root, parts.first, parts.rest); cb(err);
return;
var remainder = ''; }
var newlines = /[\n\r]+/; chan.messages.forEach(handler);
chan.atime = +new Date();
var stream = Fs.createReadStream(filepath, 'utf-8') cb();
.on('data', function (chunk) { });
var lines = chunk.split(newlines);
lines[0] = remainder + lines[0];
remainder = lines.pop();
lines.forEach(function (line) {
msgHandler(JSON.parse(line));
});
})
.on('end', function () { cb(); })
.on('error', function (e) { cb(); });
}; };
module.exports.create = function (conf, cb) { module.exports.create = function (conf, cb) {
var env = { var env = {
root: conf.filePath, root: conf.filePath || './datastore',
channels: { }, channels: { },
}; };
console.log('storing data in ' + env.root);
checkDir(env.root, function (e, data) { Fs.mkdir(env.root, function (err) {
if (err && err.code !== 'EEXIST') {
// TODO: somehow return a nice error
throw err;
}
cb({ cb({
message: function (channelName, content, cb) { message: function (channelName, content, cb) {
insert(env, channelName, content, cb); message(env, channelName, content, cb);
}, },
getMessages: function (channelName, msgHandler, cb) { getMessages: function (channelName, msgHandler, cb) {
getMessages(env, channelName, msgHandler, cb); getMessages(env, channelName, msgHandler, cb);
@ -132,17 +171,5 @@ module.exports.create = function (conf, cb) {
cb(); cb();
}, },
}); });
setInterval(function () {
var now = +new Date();
Object.keys(env.channels).forEach(function (id) {
var channel = env.channels[id];
if (now - channel.atime > (1000 * 60)) {
//console.log("Cleaning up idle channel [%s]", id);
channel.stream.close();
delete env.channels[id];
}
});
}, 60 * 1000);
}); });
}; };

Loading…
Cancel
Save