From be69e2828ceea3f3411c9d6ef030dd5a7047c4f9 Mon Sep 17 00:00:00 2001 From: ansuz Date: Tue, 13 Sep 2016 11:56:17 +0200 Subject: [PATCH 01/13] implement flat file storage --- storage/file.js | 147 ++++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 137 insertions(+), 10 deletions(-) diff --git a/storage/file.js b/storage/file.js index 0c6b6734c..cdd4dbb88 100644 --- a/storage/file.js +++ b/storage/file.js @@ -1,22 +1,149 @@ var Fs = require("fs"); +var Path = require("path"); -var insert = function (env, channel, content, cb) { +//function will check if a directory exists, and create it if it doesn't +var checkDir = function (dir, cb) { + Fs.stat(dir, function(err, stats) { + //Check if error defined and the error code is "not exists" + if (err) { + //Create the directory, call the callback. + Fs.mkdir(dir, cb); + } else { + //just in case there was a different error: + cb(err); + } + }); +}; + +var checkFile = function (path, cb) { + Fs.stat(path, function (err, stats) { + if (err) { + if (err.code === 'ENOENT') { + return cb(null, false); + } else { + return cb(err); + } + } + return cb(null, stats.isFile()); + }); +}; +var separate = function (channel) { + return { + first: channel.slice(0, 2), + rest: channel.slice(2), + }; +}; + +var Channel = function (env, id, filepath, cb) { + if (!env.channels[id]) { + return (env.channels[id] = { + atime: +new Date(), + queue: [], + stream: Fs.createWriteStream(filepath, { + flags: 'a' + }).on('open', function () { + cb(null, env.channels[id]); + }).on('error', function (err) { + cb(err); + }) + }); + } + cb(null, env.channels[id]); +}; + +var insert = function (env, channelName, content, cb) { + var parts = separate(channelName); + + var dirpath = Path.join(env.root, parts.first); + checkDir(dirpath, function (e) { + if (e) { throw new Error(e); } + + var filepath = Path.join(env.root, parts.first, parts.rest); + checkFile(filepath, function (err, isFile) { + Channel(env, channelName, filepath, function (err, channel) { + if (err) { + console.error(err); + return cb(); + } + + var doIt = function () { + channel.locked = true; + channel.atime = +new Date(); + channel.stream.write(JSON.stringify(content) + '\n'); + + if (!channel.queue.length) { + channel.locked = false; + cb(); + return; + } + + channel.queue.shift()(); + cb(); + }; + + if (channel.locked) { + channel.queue.push(doIt); + } else { + doIt(); + } + }); + }); + }); }; var getMessages = function (env, channelName, msgHandler, cb) { + var parts = separate(channelName); + var filepath = Path.join(env.root, parts.first, parts.rest); + + var remainder = ''; + var newlines = /[\n\r]+/; + + var stream = Fs.createReadStream(filepath, 'utf-8') + .on('data', function (chunk) { + var lines = chunk.split(newlines); + lines[0] = remainder + lines[0]; + remainder = lines.pop(); + lines.forEach(function (line) { + msgHandler(JSON.parse(line)); + }); + }) + .on('end', function () { cb(); }) + .on('error', function (e) { cb(); }); }; module.exports.create = function (conf, cb) { - var env = {}; - - cb({ - message: function (channelName, content, cb) { - insert(env, channelName, content, cb); - }, - getMessages: function (channelName, msgHandler, cb) { - getMessages(env, channelName, msgHandler, cb); - }, + var env = { + root: conf.filePath, + channels: { }, + }; + + checkDir(env.root, function (e, data) { + cb({ + message: function (channelName, content, cb) { + insert(env, channelName, content, cb); + }, + getMessages: function (channelName, msgHandler, cb) { + getMessages(env, channelName, msgHandler, cb); + }, + removeChannel: function (channelName, cb) { + console.log("[storage/file.removeChannel()] Not implemented"); + cb(); + }, + }); + + setInterval(function () { + var now = +new Date(); + Object.keys(env.channels).forEach(function (id) { + var channel = env.channels[id]; + if (now - channel.atime > (1000 * 60)) { + console.log("Cleaning up channel [%s]", id); + + channel.stream.close(); + delete env.channels[id]; + } + }); + }, 60 * 1000); }); }; From 4b64f00cc09d4879cbca2f81a95e6d83ff073ce0 Mon Sep 17 00:00:00 2001 From: ansuz Date: Tue, 13 Sep 2016 12:12:02 +0200 Subject: [PATCH 02/13] don't log when cleaning up idle channels --- storage/file.js | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/storage/file.js b/storage/file.js index cdd4dbb88..9cfeffc17 100644 --- a/storage/file.js +++ b/storage/file.js @@ -138,8 +138,7 @@ module.exports.create = function (conf, cb) { Object.keys(env.channels).forEach(function (id) { var channel = env.channels[id]; if (now - channel.atime > (1000 * 60)) { - console.log("Cleaning up channel [%s]", id); - + //console.log("Cleaning up idle channel [%s]", id); channel.stream.close(); delete env.channels[id]; } From 2021bf67025eb64c919a346160cd3fe305c489d3 Mon Sep 17 00:00:00 2001 From: Caleb James DeLisle Date: Tue, 13 Sep 2016 17:03:46 +0200 Subject: [PATCH 03/13] Improvements to the file storage format --- .gitignore | 1 + NetfluxWebsocketSrv.js | 12 +- storage/file.js | 247 +++++++++++++++++++++++------------------ 3 files changed, 148 insertions(+), 112 deletions(-) diff --git a/.gitignore b/.gitignore index 2d0cd09d9..354096b87 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ +datastore www/bower_components/* node_modules /config.js diff --git a/NetfluxWebsocketSrv.js b/NetfluxWebsocketSrv.js index d92510749..e9fffd2ed 100644 --- a/NetfluxWebsocketSrv.js +++ b/NetfluxWebsocketSrv.js @@ -33,7 +33,11 @@ const sendChannelMessage = function (ctx, channel, msgStruct) { } }); if (USE_HISTORY_KEEPER && msgStruct[2] === 'MSG') { - ctx.store.message(channel.id, JSON.stringify(msgStruct), function () { }); + ctx.store.message(channel.id, JSON.stringify(msgStruct), function (err) { + if (err) { + console.log("Error writing message: " + err); + } + }); } }; @@ -90,7 +94,11 @@ const getHistory = function (ctx, channelName, handler, cb) { var messageBuf = []; ctx.store.getMessages(channelName, function (msgStr) { messageBuf.push(JSON.parse(msgStr)); - }, function () { + }, function (err) { + if (err) { + console.log("Error getting messages " + err.stack); + // TODO: handle this better + } var startPoint; var cpCount = 0; var msgBuff2 = []; diff --git a/storage/file.js b/storage/file.js index 9cfeffc17..df391db7b 100644 --- a/storage/file.js +++ b/storage/file.js @@ -1,128 +1,167 @@ var Fs = require("fs"); var Path = require("path"); +var nThen = require("nthen"); -//function will check if a directory exists, and create it if it doesn't -var checkDir = function (dir, cb) { - Fs.stat(dir, function(err, stats) { - //Check if error defined and the error code is "not exists" - if (err) { - //Create the directory, call the callback. - Fs.mkdir(dir, cb); - } else { - //just in case there was a different error: - cb(err); - } +var mkPath = function (env, channelId) { + return Path.join(env.root, channelId.slice(0, 2), channelId) + '.ndjson'; +}; + +var readMessages = function (path, msgHandler, cb) { + var remainder = ''; + var stream = Fs.createReadStream(path, 'utf8'); + var complete = function (err) { + var _cb = cb; + cb = undefined; + if (_cb) { _cb(err); } + }; + stream.on('data', function (chunk) { + var lines = chunk.split('\n'); + lines[0] = remainder + lines[0]; + remainder = lines.pop(); + lines.forEach(msgHandler); + }); + stream.on('end', function () { + msgHandler(remainder); + complete(); }); + stream.on('error', function (e) { complete(e); }); }; -var checkFile = function (path, cb) { +var checkPath = function (path, callback) { Fs.stat(path, function (err, stats) { - if (err) { - if (err.code === 'ENOENT') { - return cb(null, false); - } else { - return cb(err); - } + if (!err) { + callback(undefined, true); + return; + } + if (err.code !== 'ENOENT') { + callback(err); + return; } - return cb(null, stats.isFile()); + var dirPath = path.replace(/\/[^\/]*$/, '/'); + Fs.mkdir(dirPath, function (err) { + if (err && err !== 'EEXIST') { + callback(err); + return; + } + callback(undefined, false); + }); }); }; -var separate = function (channel) { - return { - first: channel.slice(0, 2), - rest: channel.slice(2), +var getChannel = function (env, id, callback) { + if (env.channels[id]) { + var chan = env.channels[id]; + if (chan.whenLoaded) { + chan.whenLoaded.push(callback); + } else { + callback(undefined, chan); + } + return; + } + var channel = env.channels[id] = { + atime: +new Date(), + messages: [], + writeStream: undefined, + whenLoaded: [ callback ], + onError: [ ] }; -}; - -var Channel = function (env, id, filepath, cb) { - if (!env.channels[id]) { - return (env.channels[id] = { - atime: +new Date(), - queue: [], - stream: Fs.createWriteStream(filepath, { - flags: 'a' - }).on('open', function () { - cb(null, env.channels[id]); - }).on('error', function (err) { - cb(err); - }) - }); + var complete = function (err) { + var whenLoaded = channel.whenLoaded; + // no guarantee stream.on('error') will not cause this to be called multiple times + if (!whenLoaded) { return; } + channel.whenLoaded = undefined; + if (err) { + delete env.channels[id]; + } + whenLoaded.forEach(function (wl) { wl(err, (err) ? undefined : channel); }); } - cb(null, env.channels[id]); + var path = mkPath(env, id); + var fileExists; + nThen(function (waitFor) { + checkPath(path, waitFor(function (err, exists) { + if (err) { + waitFor.abort(); + complete(err); + return; + } + fileExists = exists; + })); + }).nThen(function (waitFor) { + if (!fileExists) { return; } + readMessages(path, function (msg) { + channel.messages.push(msg); + }, waitFor(function (err) { + if (err) { + waitFor.abort(); + complete(err); + } + })); + }).nThen(function (waitFor) { + var stream = channel.writeStream = Fs.createWriteStream(path, { flags: 'a' }); + stream.on('open', waitFor()); + stream.on('error', function (err) { + // this might be called after this nThen block closes. + if (channel.whenLoaded) { + complete(err); + } else { + channel.onError.forEach(function (handler) { + handler(err); + }); + } + }); + }).nThen(function (waitFor) { + complete(); + }); }; -var insert = function (env, channelName, content, cb) { - var parts = separate(channelName); - - var dirpath = Path.join(env.root, parts.first); - checkDir(dirpath, function (e) { - if (e) { throw new Error(e); } - - var filepath = Path.join(env.root, parts.first, parts.rest); - checkFile(filepath, function (err, isFile) { - Channel(env, channelName, filepath, function (err, channel) { - if (err) { - console.error(err); - return cb(); - } - - var doIt = function () { - channel.locked = true; - channel.atime = +new Date(); - channel.stream.write(JSON.stringify(content) + '\n'); - - if (!channel.queue.length) { - channel.locked = false; - cb(); - return; - } - - channel.queue.shift()(); - cb(); - }; - - if (channel.locked) { - channel.queue.push(doIt); - } else { - doIt(); - } - }); +var message = function (env, chanName, msg, cb) { + getChannel(env, chanName, function (err, chan) { + if (err) { + cb(err); + return; + } + var complete = function (err) { + var _cb = cb; + cb = undefined; + if (_cb) { _cb(err); } + }; + chan.onError.push(complete); + chan.writeStream.write(msg + '\n', function () { + chan.onError.splice(chan.onError.indexOf(complete) - 1, 1); + if (!cb) { return; } + chan.messages.push(msg); + chan.atime = +new Date(); + complete(); }); }); }; -var getMessages = function (env, channelName, msgHandler, cb) { - var parts = separate(channelName); - - var filepath = Path.join(env.root, parts.first, parts.rest); - - var remainder = ''; - var newlines = /[\n\r]+/; - - var stream = Fs.createReadStream(filepath, 'utf-8') - .on('data', function (chunk) { - var lines = chunk.split(newlines); - lines[0] = remainder + lines[0]; - remainder = lines.pop(); - lines.forEach(function (line) { - msgHandler(JSON.parse(line)); - }); - }) - .on('end', function () { cb(); }) - .on('error', function (e) { cb(); }); +var getMessages = function (env, chanName, handler, cb) { + getChannel(env, chanName, function (err, chan) { + if (err) { + cb(err); + return; + } + chan.messages.forEach(handler); + chan.atime = +new Date(); + cb(); + }); }; module.exports.create = function (conf, cb) { var env = { - root: conf.filePath, + root: conf.filePath || './datastore', channels: { }, }; - - checkDir(env.root, function (e, data) { + console.log('storing data in ' + env.root); + Fs.mkdir(env.root, function (err) { + if (err && err.code !== 'EEXIST') { + // TODO: somehow return a nice error + throw err; + } cb({ message: function (channelName, content, cb) { - insert(env, channelName, content, cb); + message(env, channelName, content, cb); }, getMessages: function (channelName, msgHandler, cb) { getMessages(env, channelName, msgHandler, cb); @@ -132,17 +171,5 @@ module.exports.create = function (conf, cb) { cb(); }, }); - - setInterval(function () { - var now = +new Date(); - Object.keys(env.channels).forEach(function (id) { - var channel = env.channels[id]; - if (now - channel.atime > (1000 * 60)) { - //console.log("Cleaning up idle channel [%s]", id); - channel.stream.close(); - delete env.channels[id]; - } - }); - }, 60 * 1000); }); }; From 5fe3ffabd1e4d5acdeee9a6e0fb2b77bc3204c56 Mon Sep 17 00:00:00 2001 From: Caleb James DeLisle Date: Tue, 13 Sep 2016 17:10:50 +0200 Subject: [PATCH 04/13] stop using waitFor.abort() which is apparently not implemented --- storage/file.js | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/storage/file.js b/storage/file.js index df391db7b..8de76447d 100644 --- a/storage/file.js +++ b/storage/file.js @@ -77,26 +77,29 @@ var getChannel = function (env, id, callback) { } var path = mkPath(env, id); var fileExists; + var errorState; nThen(function (waitFor) { checkPath(path, waitFor(function (err, exists) { if (err) { - waitFor.abort(); + errorState = true; complete(err); return; } fileExists = exists; })); }).nThen(function (waitFor) { + if (errorState) { return; } if (!fileExists) { return; } readMessages(path, function (msg) { channel.messages.push(msg); }, waitFor(function (err) { if (err) { - waitFor.abort(); + errorState = true; complete(err); } })); }).nThen(function (waitFor) { + if (errorState) { return; } var stream = channel.writeStream = Fs.createWriteStream(path, { flags: 'a' }); stream.on('open', waitFor()); stream.on('error', function (err) { @@ -110,6 +113,7 @@ var getChannel = function (env, id, callback) { } }); }).nThen(function (waitFor) { + if (errorState) { return; } complete(); }); }; From de9f642cccae4644022975799214a30c995b080c Mon Sep 17 00:00:00 2001 From: Caleb James DeLisle Date: Tue, 13 Sep 2016 17:15:19 +0200 Subject: [PATCH 05/13] typo --- storage/file.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/storage/file.js b/storage/file.js index 8de76447d..00a565863 100644 --- a/storage/file.js +++ b/storage/file.js @@ -39,7 +39,7 @@ var checkPath = function (path, callback) { } var dirPath = path.replace(/\/[^\/]*$/, '/'); Fs.mkdir(dirPath, function (err) { - if (err && err !== 'EEXIST') { + if (err && err.code !== 'EEXIST') { callback(err); return; } From 19caac232b70f8adb17321c04406540044094b0d Mon Sep 17 00:00:00 2001 From: ansuz Date: Wed, 14 Sep 2016 11:11:00 +0200 Subject: [PATCH 06/13] don't try to send invalid messages --- storage/file.js | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/storage/file.js b/storage/file.js index 00a565863..10adb7894 100644 --- a/storage/file.js +++ b/storage/file.js @@ -74,7 +74,7 @@ var getChannel = function (env, id, callback) { delete env.channels[id]; } whenLoaded.forEach(function (wl) { wl(err, (err) ? undefined : channel); }); - } + }; var path = mkPath(env, id); var fileExists; var errorState; @@ -146,7 +146,15 @@ var getMessages = function (env, chanName, handler, cb) { cb(err); return; } - chan.messages.forEach(handler); + try { + chan.messages + .filter(function (x) { return x; }) + .forEach(handler); + } catch (err2) { + console.error(err2); + cb(err2); + return; + } chan.atime = +new Date(); cb(); }); From 349b6799e04f3b70fcfe089f7e1e11dd38dff5e0 Mon Sep 17 00:00:00 2001 From: ansuz Date: Wed, 14 Sep 2016 11:21:59 +0200 Subject: [PATCH 07/13] remove leveldb dependency --- package.json | 1 - 1 file changed, 1 deletion(-) diff --git a/package.json b/package.json index 06d115bab..c61266ee7 100644 --- a/package.json +++ b/package.json @@ -5,7 +5,6 @@ "dependencies": { "express": "~4.10.1", "ws": "^1.0.1", - "level": "~1.4.0", "nthen": "~0.1.0" }, "devDependencies": { From b46f74cd78aa2f5265a0ede76ec104c4162d4ca9 Mon Sep 17 00:00:00 2001 From: ansuz Date: Wed, 14 Sep 2016 11:47:54 +0200 Subject: [PATCH 08/13] remove alternative storage adaptors from core --- storage/amnesia.js | 56 ----------------------------------- storage/kad.js | 51 -------------------------------- storage/lvl.js | 73 ---------------------------------------------- storage/mongo.js | 61 -------------------------------------- storage/sql.js | 58 ------------------------------------ 5 files changed, 299 deletions(-) delete mode 100644 storage/amnesia.js delete mode 100644 storage/kad.js delete mode 100644 storage/lvl.js delete mode 100644 storage/mongo.js delete mode 100644 storage/sql.js diff --git a/storage/amnesia.js b/storage/amnesia.js deleted file mode 100644 index b7c1702e6..000000000 --- a/storage/amnesia.js +++ /dev/null @@ -1,56 +0,0 @@ -/* - As the log statement says, this module does nothing to persist your data - across sessions. If your process crashes for any reason, all pads will die. - - This might be useful if you want to debug other parts of the codebase, if - you want to test out cryptpad without installing mongodb locally, or if - you don't want to rely on a remote db like the one at mongolab.com. - - Maybe you just like the idea of a forgetful pad? To use this module, edit - config.js to include a directive `storage: './storage/amnesia' - - Enjoy! -*/ - -module.exports.create = function(conf, cb){ - console.log("Loading amnesiadb. This is a horrible idea in production,"+ - " as data *will not* persist\n"); - - var db=[], - index=0; - - if (conf.removeChannels) { - console.log("Server is set to remove channels %sms after the last remaining client leaves.", conf.channelRemovalTimeout); - } - - cb({ - message: function(channelName, content, cb){ - var val = { - id:index++, - chan: channelName, - msg: content, - time: new Date().getTime(), - }; - db.push(val); - if (cb) { cb(); } - }, - getMessages: function(channelName, handler, cb){ - db.sort(function(a,b){ - return a.id - b.id; - }); - db.filter(function(val){ - return val.chan === channelName; - }).forEach(function(doc){ - handler(doc.msg); - }); - if (cb) { cb(); } - }, - removeChannel: function (channelName, cb) { - var err = false; - db = db.filter(function (msg) { - return msg.chan !== channelName; - }); - cb(err); - }, - }); -}; diff --git a/storage/kad.js b/storage/kad.js deleted file mode 100644 index 84287c70e..000000000 --- a/storage/kad.js +++ /dev/null @@ -1,51 +0,0 @@ -var kad=require("kad"); -var levelup=require("levelup"); - -/* - THiS FILE IS NOT PRODUCTION READY - DON'T USE IT! -*/ - -module.exports.create=function(conf,cb){ - var dht= kad({ - address:conf.kadAddress, - port:conf.kadPort, - storage:levelup(conf.kadStore), - seeds:conf.kadSeeds, - transport: kad.transports.UDP, - }); - - var getIndex=function(cName,f){ - dht.get(cName+'=>index',function(e,out){ - e && console.error(e) || f(Number(out)); - }); - }; - - cb({ - message:function(cName, content, cb){ - getIndex(cName, function(index){ - index+=1; - dht.put(cName+'=>index', ''+index,function(e){ - e && console.error("ERROR updating index (%s): %s",index,e) || - console.log("PUT SUCCESS: %s", cName+'=>index') - }); - dht.put(cName+'=>'+index, content, function(e){ - e && console.error("ERROR updating value at %s: %s",cName+'=>'+index,e)|| - console.log("PUT SUCCESS: %s", cName+'=>index') - cb(); - }); - }); - }, - getMessages: function(cName, cb){ - getIndex(cName, function(index){ - for(var i=index;i>=0;i--){ - dht.get(cName+'=>'+i,function(e,out){ - if(e) return console.error("DHT GET ERROR: %s",e); - console.log("GET SUCCESS: %s", cName+'=>index') - cb(out); - }); - } - }); - }, - }); -}; diff --git a/storage/lvl.js b/storage/lvl.js deleted file mode 100644 index ff6713a0c..000000000 --- a/storage/lvl.js +++ /dev/null @@ -1,73 +0,0 @@ -var Level = require("level"); -var nThen = require('nthen'); - -var getIndex = function(db, cName, cb) { - db.get(cName+'=>index', function(e, out){ - if (e) { - if (e.notFound) { - cb(-1); - } else { - throw e; - } - return; - } - cb(parseInt(out)); - }); -}; - -var insert = function (db, channelName, content, cb) { - var index; - var doIt = function () { - db.locked = true; - nThen(function (waitFor) { - getIndex(db, channelName, waitFor(function (i) { index = i+1; })); - }).nThen(function (waitFor) { - db.put(channelName+'=>'+index, content, waitFor(function (e) { if (e) { throw e; } })); - }).nThen(function (waitFor) { - db.put(channelName+'=>index', ''+index, waitFor(function (e) { if (e) { throw e; } })); - }).nThen(function (waitFor) { - db.locked = false; - if (!db.queue.length) { return; } - db.queue.shift()(); - }).nThen(cb); - }; - if (db.locked) { - db.queue.push(doIt); - } else { - doIt(); - } -}; - -var getMessages = function (db, channelName, msgHandler, cb) { - var index; - nThen(function (waitFor) { - getIndex(db, channelName, waitFor(function (i) { - index = i; - })); - }).nThen(function (waitFor) { - var again = function (i) { - db.get(channelName + '=>' + i, waitFor(function (e, out) { - if (e) { throw e; } - msgHandler(out); - if (i < index) { again(i+1); } - else if (cb) { cb(); } - })); - }; - if (index > -1) { again(0); } - else if (cb) { cb(); } - }); -}; - -module.exports.create = function (conf, cb) { - var db = Level(conf.levelPath || './test.level.db'); - db.locked = false; - db.queue = []; - cb({ - message: function (channelName, content, cb) { - insert(db, channelName, content, cb); - }, - getMessages: function (channelName, msgHandler, cb) { - getMessages(db, channelName, msgHandler, cb); - } - }); -}; diff --git a/storage/mongo.js b/storage/mongo.js deleted file mode 100644 index 67f662a09..000000000 --- a/storage/mongo.js +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Copyright 2014 XWiki SAS - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ -var MongoClient = require('mongodb').MongoClient; - -var MONGO_URI = "mongodb://demo_user:demo_password@ds027769.mongolab.com:27769/demo_database"; -var COLLECTION_NAME = 'cryptpad'; - -var insert = function (coll, channelName, content, cb) { - var val = {chan: channelName, msg:content, time: (new Date()).getTime()}; - coll.insertOne(val, {}, function (err, r) { - console.log(r); - if (err || (r.insertedCount !== 1)) { - console.log('failed to insert ' + err); - return; - } - cb(); - }); -}; - -var getMessages = function (coll, channelName, cb) { - // find entries with a matching channelname - coll.find({chan:channelName}) - // sort by _id, ascending - .sort( { _id: 1 } ) - // iterate over entries - .forEach(function (doc) { - cb(doc.msg); - }, function (err) { - if (!err) { return; } - console.log('error ' + err); - }); -}; - -module.exports.create = function (conf, cb) { - MongoClient.connect(conf.mongoUri, function(err, db) { - var coll = db.collection(conf.mongoCollectionName); - if (err) { throw err; } - cb({ - message: function (channelName, content, cb) { - insert(coll, channelName, content, cb); - }, - getMessages: function (channelName, msgHandler) { - getMessages(coll, channelName, msgHandler); - } - }); - }); -}; diff --git a/storage/sql.js b/storage/sql.js deleted file mode 100644 index 9bb07519d..000000000 --- a/storage/sql.js +++ /dev/null @@ -1,58 +0,0 @@ -var Knex = require("knex"); - -var getMessages = function (knex, channel, msgHandler, cb) { - return knex('messages') - .where({ - channel: channel, - }) - .select('*') - .then(function (rows) { - rows.forEach(function (row) { - msgHandler(row.content); - }); - cb(); - }) - .catch(function (e) { - console.error(e); - cb(); - }); -}; - -var insert = function (knex, channel, content, cb) { - knex.table('messages').insert({ - channel: channel, - content: content, - }) - .then(function () { - cb(); - }); -}; - -module.exports.create = function (conf, cb) { - var knex = Knex({ - dialect: 'sqlite3', - connection: conf.dbConnection, - useNullAsDefault: true, - }); - - knex.schema.hasTable('messages').then(function (exists) { - if (exists) { return; } - - return knex.schema.createTable('messages', function (table) { - table.increments('id'); - table.string('content'); - table.string('channel'); - table.timestamps(); - }); - }) - .then(function () { - cb({ - message: function (channelName, content, cb) { - insert(knex, channelName, content, cb); - }, - getMessages: function (channelName, msgHandler, cb) { - getMessages(knex, channelName, msgHandler, cb); - }, - }); - }); -}; From 38d1a1a291b4dfdd460f8111c6bd86b4b3a16b1c Mon Sep 17 00:00:00 2001 From: ansuz Date: Wed, 14 Sep 2016 15:06:35 +0200 Subject: [PATCH 09/13] faster message iteration --- storage/file.js | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/storage/file.js b/storage/file.js index 10adb7894..6663ae2d8 100644 --- a/storage/file.js +++ b/storage/file.js @@ -148,8 +148,10 @@ var getMessages = function (env, chanName, handler, cb) { } try { chan.messages - .filter(function (x) { return x; }) - .forEach(handler); + .forEach(function (message) { + if (!message) { return; } + handler(message); + }); } catch (err2) { console.error(err2); cb(err2); From 02969895153fde9724b0f120fb24562bccab3c3b Mon Sep 17 00:00:00 2001 From: ansuz Date: Wed, 14 Sep 2016 15:53:35 +0200 Subject: [PATCH 10/13] rewrite default config file --- config.js.dist | 68 +++++++++++++++++--------------------------------- 1 file changed, 23 insertions(+), 45 deletions(-) diff --git a/config.js.dist b/config.js.dist index 4b5702574..e88b06b0a 100644 --- a/config.js.dist +++ b/config.js.dist @@ -17,52 +17,30 @@ module.exports = { */ logToStdout: false, - /* Cryptpad can be configured to remove channels some number of ms - after the last remaining client has disconnected. - - Default behaviour is to keep channels forever. - If you enable channel removal, the default removal time is one minute + /* + Cryptpad stores each document in an individual file on your hard drive. + Specify a directory where files should be stored. + It will be created automatically if it does not already exist. */ - removeChannels: false, - channelRemovalTimeout: 60000, - - // You now have a choice of storage engines - - /* amnesiadb only exists in memory. - * it will not persist across server restarts - * it will not scale well if your server stays alive for a long time. - * but it is completely dependency free - */ - //storage: './storage/amnesia', - - /* the 'lvl' storage module uses leveldb - * it persists, and will perform better than amnesiadb - * you will need to run 'npm install level' to use it - * - * you can provide a path to a database folder, which will be created - * if it does not already exist. If you use level and do not pass a path - * it will be created at cryptpad/test.level.db - * - * to delete all pads, run `rm -rf $YOUR_DB` - */ - storage: './storage/lvl', - levelPath: './test.level.db' - - /* mongo is the original storage engine for cryptpad - * it has been more thoroughly tested, but requires a little more setup - */ - // storage: './storage/mongo', - - /* this url is accessible over the internet, it is useful for testing - * but should not be used in production - */ - // mongoUri: "mongodb://demo_user:demo_password@ds027769.mongolab.com:27769/demo_database", - - /* mongoUri should really be used to refer to a local installation of mongodb - * to install the mongodb client, run `npm install mongodb` - */ - // mongoUri: "mongodb://localhost:27017/cryptpad", - // mongoCollectionName: 'cryptpad', + filePath: './datastore/', + + /* + You have the option of specifying an alternative storage adaptor. + These status of these alternatives are specified in their READMEs, + which are available at the following URLs: + + mongodb: a noSQL database + https://github.com/xwiki-labs/cryptpad-mongo-store + amnesiadb: in memory storage + https://github.com/xwiki-labs/cryptpad-amnesia-store + leveldb: a simple, fast, key-value store + https://github.com/xwiki-labs/cryptpad-level-store + sql: an adaptor for a variety of sql databases via knexjs + https://github.com/xwiki-labs/cryptpad-sql-store + + For the most up to date solution, use the default storage adaptor. + */ + storage: './storage/file', /* it is recommended that you serve cryptpad over https * the filepaths below are used to configure your certificates From fc7576fe205066e1755ce6a09a71e8a7a0194bc4 Mon Sep 17 00:00:00 2001 From: ansuz Date: Thu, 15 Sep 2016 10:07:31 +0200 Subject: [PATCH 11/13] implement channel removal --- storage/file.js | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/storage/file.js b/storage/file.js index 6663ae2d8..b25a5c99f 100644 --- a/storage/file.js +++ b/storage/file.js @@ -162,6 +162,11 @@ var getMessages = function (env, chanName, handler, cb) { }); }; +var removeChannel = function (env, channelName, cb) { + var filename = Path.join(env.root, channelName.slice(0, 2), channelName + '.ndjson'); + Fs.unlink(filename, cb); +}; + module.exports.create = function (conf, cb) { var env = { root: conf.filePath || './datastore', @@ -181,8 +186,9 @@ module.exports.create = function (conf, cb) { getMessages(env, channelName, msgHandler, cb); }, removeChannel: function (channelName, cb) { - console.log("[storage/file.removeChannel()] Not implemented"); - cb(); + removeChannel(env, channelName, function (err) { + cb(err); + }); }, }); }); From 4001e756cbe41384507115cdcae9bea0d2f13ae7 Mon Sep 17 00:00:00 2001 From: ansuz Date: Thu, 15 Sep 2016 12:15:27 +0200 Subject: [PATCH 12/13] add import script so admins can migrate lvl to files --- import | 65 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 65 insertions(+) create mode 100755 import diff --git a/import b/import new file mode 100755 index 000000000..1bf1d5de9 --- /dev/null +++ b/import @@ -0,0 +1,65 @@ +#!/usr/bin/env node +/* globals process */ + +var Config = require("./config"); +var Fs = require("fs"); +var Storage = require(Config.storage); + +var args = process.argv.slice(2); + +if (!args.length) { + console.log("Insufficient arguments!"); + console.log("Pass a path to a database backup!"); + process.exit(); +} + +var dump = Fs.readFileSync(args[0], 'utf-8'); + +var ready = function (store) { + var lock = 0; + dump.split(/\n/) + .filter(function (line) { + return line; + }) + .forEach(function (line, i) { + lock++; + var parts; + + var channel; + var msg; + + line.replace(/^(.*?)\|(.*)$/, function (all, c, m) { + channel = c; + msg = m; + return ''; + }); + + if (!channel || !msg) { + console.log("BAD LINE on line %s", i); + return; + } + + try { + JSON.parse(msg); + } catch (err) { + console.log("BAD LINE on line %s", i); + console.log(msg); + console.log(); + } + + store.message(channel, msg, function () { + console.log(line); + lock--; + if (!lock) { + console.log("DONE"); + process.exit(0); + } + }); + }); +}; + +Storage.create(Config, function (store) { + console.log("READY"); + ready(store); +}); + From 1f4d0888adebb5933460ec9f07935592f7ea48dd Mon Sep 17 00:00:00 2001 From: ansuz Date: Thu, 15 Sep 2016 14:57:20 +0200 Subject: [PATCH 13/13] fix false positive for error handler --- NetfluxWebsocketSrv.js | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/NetfluxWebsocketSrv.js b/NetfluxWebsocketSrv.js index e9fffd2ed..a5e75c6cf 100644 --- a/NetfluxWebsocketSrv.js +++ b/NetfluxWebsocketSrv.js @@ -34,7 +34,9 @@ const sendChannelMessage = function (ctx, channel, msgStruct) { }); if (USE_HISTORY_KEEPER && msgStruct[2] === 'MSG') { ctx.store.message(channel.id, JSON.stringify(msgStruct), function (err) { - if (err) { + if (err && typeof(err) !== 'function') { + // ignore functions because older datastores + // might pass waitFors into the callback console.log("Error writing message: " + err); } });