diff --git a/.gitignore b/.gitignore index 2d0cd09d9..354096b87 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ +datastore www/bower_components/* node_modules /config.js diff --git a/NetfluxWebsocketSrv.js b/NetfluxWebsocketSrv.js index d92510749..a5e75c6cf 100644 --- a/NetfluxWebsocketSrv.js +++ b/NetfluxWebsocketSrv.js @@ -33,7 +33,13 @@ const sendChannelMessage = function (ctx, channel, msgStruct) { } }); if (USE_HISTORY_KEEPER && msgStruct[2] === 'MSG') { - ctx.store.message(channel.id, JSON.stringify(msgStruct), function () { }); + ctx.store.message(channel.id, JSON.stringify(msgStruct), function (err) { + if (err && typeof(err) !== 'function') { + // ignore functions because older datastores + // might pass waitFors into the callback + console.log("Error writing message: " + err); + } + }); } }; @@ -90,7 +96,11 @@ const getHistory = function (ctx, channelName, handler, cb) { var messageBuf = []; ctx.store.getMessages(channelName, function (msgStr) { messageBuf.push(JSON.parse(msgStr)); - }, function () { + }, function (err) { + if (err) { + console.log("Error getting messages " + err.stack); + // TODO: handle this better + } var startPoint; var cpCount = 0; var msgBuff2 = []; diff --git a/config.js.dist b/config.js.dist index 4b5702574..e88b06b0a 100644 --- a/config.js.dist +++ b/config.js.dist @@ -17,52 +17,30 @@ module.exports = { */ logToStdout: false, - /* Cryptpad can be configured to remove channels some number of ms - after the last remaining client has disconnected. - - Default behaviour is to keep channels forever. - If you enable channel removal, the default removal time is one minute + /* + Cryptpad stores each document in an individual file on your hard drive. + Specify a directory where files should be stored. + It will be created automatically if it does not already exist. */ - removeChannels: false, - channelRemovalTimeout: 60000, - - // You now have a choice of storage engines - - /* amnesiadb only exists in memory. - * it will not persist across server restarts - * it will not scale well if your server stays alive for a long time. - * but it is completely dependency free - */ - //storage: './storage/amnesia', - - /* the 'lvl' storage module uses leveldb - * it persists, and will perform better than amnesiadb - * you will need to run 'npm install level' to use it - * - * you can provide a path to a database folder, which will be created - * if it does not already exist. If you use level and do not pass a path - * it will be created at cryptpad/test.level.db - * - * to delete all pads, run `rm -rf $YOUR_DB` - */ - storage: './storage/lvl', - levelPath: './test.level.db' - - /* mongo is the original storage engine for cryptpad - * it has been more thoroughly tested, but requires a little more setup - */ - // storage: './storage/mongo', - - /* this url is accessible over the internet, it is useful for testing - * but should not be used in production - */ - // mongoUri: "mongodb://demo_user:demo_password@ds027769.mongolab.com:27769/demo_database", - - /* mongoUri should really be used to refer to a local installation of mongodb - * to install the mongodb client, run `npm install mongodb` - */ - // mongoUri: "mongodb://localhost:27017/cryptpad", - // mongoCollectionName: 'cryptpad', + filePath: './datastore/', + + /* + You have the option of specifying an alternative storage adaptor. + These status of these alternatives are specified in their READMEs, + which are available at the following URLs: + + mongodb: a noSQL database + https://github.com/xwiki-labs/cryptpad-mongo-store + amnesiadb: in memory storage + https://github.com/xwiki-labs/cryptpad-amnesia-store + leveldb: a simple, fast, key-value store + https://github.com/xwiki-labs/cryptpad-level-store + sql: an adaptor for a variety of sql databases via knexjs + https://github.com/xwiki-labs/cryptpad-sql-store + + For the most up to date solution, use the default storage adaptor. + */ + storage: './storage/file', /* it is recommended that you serve cryptpad over https * the filepaths below are used to configure your certificates diff --git a/import b/import new file mode 100755 index 000000000..1bf1d5de9 --- /dev/null +++ b/import @@ -0,0 +1,65 @@ +#!/usr/bin/env node +/* globals process */ + +var Config = require("./config"); +var Fs = require("fs"); +var Storage = require(Config.storage); + +var args = process.argv.slice(2); + +if (!args.length) { + console.log("Insufficient arguments!"); + console.log("Pass a path to a database backup!"); + process.exit(); +} + +var dump = Fs.readFileSync(args[0], 'utf-8'); + +var ready = function (store) { + var lock = 0; + dump.split(/\n/) + .filter(function (line) { + return line; + }) + .forEach(function (line, i) { + lock++; + var parts; + + var channel; + var msg; + + line.replace(/^(.*?)\|(.*)$/, function (all, c, m) { + channel = c; + msg = m; + return ''; + }); + + if (!channel || !msg) { + console.log("BAD LINE on line %s", i); + return; + } + + try { + JSON.parse(msg); + } catch (err) { + console.log("BAD LINE on line %s", i); + console.log(msg); + console.log(); + } + + store.message(channel, msg, function () { + console.log(line); + lock--; + if (!lock) { + console.log("DONE"); + process.exit(0); + } + }); + }); +}; + +Storage.create(Config, function (store) { + console.log("READY"); + ready(store); +}); + diff --git a/package.json b/package.json index 06d115bab..c61266ee7 100644 --- a/package.json +++ b/package.json @@ -5,7 +5,6 @@ "dependencies": { "express": "~4.10.1", "ws": "^1.0.1", - "level": "~1.4.0", "nthen": "~0.1.0" }, "devDependencies": { diff --git a/storage/amnesia.js b/storage/amnesia.js deleted file mode 100644 index b7c1702e6..000000000 --- a/storage/amnesia.js +++ /dev/null @@ -1,56 +0,0 @@ -/* - As the log statement says, this module does nothing to persist your data - across sessions. If your process crashes for any reason, all pads will die. - - This might be useful if you want to debug other parts of the codebase, if - you want to test out cryptpad without installing mongodb locally, or if - you don't want to rely on a remote db like the one at mongolab.com. - - Maybe you just like the idea of a forgetful pad? To use this module, edit - config.js to include a directive `storage: './storage/amnesia' - - Enjoy! -*/ - -module.exports.create = function(conf, cb){ - console.log("Loading amnesiadb. This is a horrible idea in production,"+ - " as data *will not* persist\n"); - - var db=[], - index=0; - - if (conf.removeChannels) { - console.log("Server is set to remove channels %sms after the last remaining client leaves.", conf.channelRemovalTimeout); - } - - cb({ - message: function(channelName, content, cb){ - var val = { - id:index++, - chan: channelName, - msg: content, - time: new Date().getTime(), - }; - db.push(val); - if (cb) { cb(); } - }, - getMessages: function(channelName, handler, cb){ - db.sort(function(a,b){ - return a.id - b.id; - }); - db.filter(function(val){ - return val.chan === channelName; - }).forEach(function(doc){ - handler(doc.msg); - }); - if (cb) { cb(); } - }, - removeChannel: function (channelName, cb) { - var err = false; - db = db.filter(function (msg) { - return msg.chan !== channelName; - }); - cb(err); - }, - }); -}; diff --git a/storage/file.js b/storage/file.js index 0c6b6734c..b25a5c99f 100644 --- a/storage/file.js +++ b/storage/file.js @@ -1,22 +1,195 @@ var Fs = require("fs"); +var Path = require("path"); +var nThen = require("nthen"); -var insert = function (env, channel, content, cb) { +var mkPath = function (env, channelId) { + return Path.join(env.root, channelId.slice(0, 2), channelId) + '.ndjson'; +}; + +var readMessages = function (path, msgHandler, cb) { + var remainder = ''; + var stream = Fs.createReadStream(path, 'utf8'); + var complete = function (err) { + var _cb = cb; + cb = undefined; + if (_cb) { _cb(err); } + }; + stream.on('data', function (chunk) { + var lines = chunk.split('\n'); + lines[0] = remainder + lines[0]; + remainder = lines.pop(); + lines.forEach(msgHandler); + }); + stream.on('end', function () { + msgHandler(remainder); + complete(); + }); + stream.on('error', function (e) { complete(e); }); +}; + +var checkPath = function (path, callback) { + Fs.stat(path, function (err, stats) { + if (!err) { + callback(undefined, true); + return; + } + if (err.code !== 'ENOENT') { + callback(err); + return; + } + var dirPath = path.replace(/\/[^\/]*$/, '/'); + Fs.mkdir(dirPath, function (err) { + if (err && err.code !== 'EEXIST') { + callback(err); + return; + } + callback(undefined, false); + }); + }); +}; +var getChannel = function (env, id, callback) { + if (env.channels[id]) { + var chan = env.channels[id]; + if (chan.whenLoaded) { + chan.whenLoaded.push(callback); + } else { + callback(undefined, chan); + } + return; + } + var channel = env.channels[id] = { + atime: +new Date(), + messages: [], + writeStream: undefined, + whenLoaded: [ callback ], + onError: [ ] + }; + var complete = function (err) { + var whenLoaded = channel.whenLoaded; + // no guarantee stream.on('error') will not cause this to be called multiple times + if (!whenLoaded) { return; } + channel.whenLoaded = undefined; + if (err) { + delete env.channels[id]; + } + whenLoaded.forEach(function (wl) { wl(err, (err) ? undefined : channel); }); + }; + var path = mkPath(env, id); + var fileExists; + var errorState; + nThen(function (waitFor) { + checkPath(path, waitFor(function (err, exists) { + if (err) { + errorState = true; + complete(err); + return; + } + fileExists = exists; + })); + }).nThen(function (waitFor) { + if (errorState) { return; } + if (!fileExists) { return; } + readMessages(path, function (msg) { + channel.messages.push(msg); + }, waitFor(function (err) { + if (err) { + errorState = true; + complete(err); + } + })); + }).nThen(function (waitFor) { + if (errorState) { return; } + var stream = channel.writeStream = Fs.createWriteStream(path, { flags: 'a' }); + stream.on('open', waitFor()); + stream.on('error', function (err) { + // this might be called after this nThen block closes. + if (channel.whenLoaded) { + complete(err); + } else { + channel.onError.forEach(function (handler) { + handler(err); + }); + } + }); + }).nThen(function (waitFor) { + if (errorState) { return; } + complete(); + }); }; -var getMessages = function (env, channelName, msgHandler, cb) { +var message = function (env, chanName, msg, cb) { + getChannel(env, chanName, function (err, chan) { + if (err) { + cb(err); + return; + } + var complete = function (err) { + var _cb = cb; + cb = undefined; + if (_cb) { _cb(err); } + }; + chan.onError.push(complete); + chan.writeStream.write(msg + '\n', function () { + chan.onError.splice(chan.onError.indexOf(complete) - 1, 1); + if (!cb) { return; } + chan.messages.push(msg); + chan.atime = +new Date(); + complete(); + }); + }); +}; + +var getMessages = function (env, chanName, handler, cb) { + getChannel(env, chanName, function (err, chan) { + if (err) { + cb(err); + return; + } + try { + chan.messages + .forEach(function (message) { + if (!message) { return; } + handler(message); + }); + } catch (err2) { + console.error(err2); + cb(err2); + return; + } + chan.atime = +new Date(); + cb(); + }); +}; +var removeChannel = function (env, channelName, cb) { + var filename = Path.join(env.root, channelName.slice(0, 2), channelName + '.ndjson'); + Fs.unlink(filename, cb); }; module.exports.create = function (conf, cb) { - var env = {}; - - cb({ - message: function (channelName, content, cb) { - insert(env, channelName, content, cb); - }, - getMessages: function (channelName, msgHandler, cb) { - getMessages(env, channelName, msgHandler, cb); - }, + var env = { + root: conf.filePath || './datastore', + channels: { }, + }; + console.log('storing data in ' + env.root); + Fs.mkdir(env.root, function (err) { + if (err && err.code !== 'EEXIST') { + // TODO: somehow return a nice error + throw err; + } + cb({ + message: function (channelName, content, cb) { + message(env, channelName, content, cb); + }, + getMessages: function (channelName, msgHandler, cb) { + getMessages(env, channelName, msgHandler, cb); + }, + removeChannel: function (channelName, cb) { + removeChannel(env, channelName, function (err) { + cb(err); + }); + }, + }); }); }; diff --git a/storage/kad.js b/storage/kad.js deleted file mode 100644 index 84287c70e..000000000 --- a/storage/kad.js +++ /dev/null @@ -1,51 +0,0 @@ -var kad=require("kad"); -var levelup=require("levelup"); - -/* - THiS FILE IS NOT PRODUCTION READY - DON'T USE IT! -*/ - -module.exports.create=function(conf,cb){ - var dht= kad({ - address:conf.kadAddress, - port:conf.kadPort, - storage:levelup(conf.kadStore), - seeds:conf.kadSeeds, - transport: kad.transports.UDP, - }); - - var getIndex=function(cName,f){ - dht.get(cName+'=>index',function(e,out){ - e && console.error(e) || f(Number(out)); - }); - }; - - cb({ - message:function(cName, content, cb){ - getIndex(cName, function(index){ - index+=1; - dht.put(cName+'=>index', ''+index,function(e){ - e && console.error("ERROR updating index (%s): %s",index,e) || - console.log("PUT SUCCESS: %s", cName+'=>index') - }); - dht.put(cName+'=>'+index, content, function(e){ - e && console.error("ERROR updating value at %s: %s",cName+'=>'+index,e)|| - console.log("PUT SUCCESS: %s", cName+'=>index') - cb(); - }); - }); - }, - getMessages: function(cName, cb){ - getIndex(cName, function(index){ - for(var i=index;i>=0;i--){ - dht.get(cName+'=>'+i,function(e,out){ - if(e) return console.error("DHT GET ERROR: %s",e); - console.log("GET SUCCESS: %s", cName+'=>index') - cb(out); - }); - } - }); - }, - }); -}; diff --git a/storage/lvl.js b/storage/lvl.js deleted file mode 100644 index ff6713a0c..000000000 --- a/storage/lvl.js +++ /dev/null @@ -1,73 +0,0 @@ -var Level = require("level"); -var nThen = require('nthen'); - -var getIndex = function(db, cName, cb) { - db.get(cName+'=>index', function(e, out){ - if (e) { - if (e.notFound) { - cb(-1); - } else { - throw e; - } - return; - } - cb(parseInt(out)); - }); -}; - -var insert = function (db, channelName, content, cb) { - var index; - var doIt = function () { - db.locked = true; - nThen(function (waitFor) { - getIndex(db, channelName, waitFor(function (i) { index = i+1; })); - }).nThen(function (waitFor) { - db.put(channelName+'=>'+index, content, waitFor(function (e) { if (e) { throw e; } })); - }).nThen(function (waitFor) { - db.put(channelName+'=>index', ''+index, waitFor(function (e) { if (e) { throw e; } })); - }).nThen(function (waitFor) { - db.locked = false; - if (!db.queue.length) { return; } - db.queue.shift()(); - }).nThen(cb); - }; - if (db.locked) { - db.queue.push(doIt); - } else { - doIt(); - } -}; - -var getMessages = function (db, channelName, msgHandler, cb) { - var index; - nThen(function (waitFor) { - getIndex(db, channelName, waitFor(function (i) { - index = i; - })); - }).nThen(function (waitFor) { - var again = function (i) { - db.get(channelName + '=>' + i, waitFor(function (e, out) { - if (e) { throw e; } - msgHandler(out); - if (i < index) { again(i+1); } - else if (cb) { cb(); } - })); - }; - if (index > -1) { again(0); } - else if (cb) { cb(); } - }); -}; - -module.exports.create = function (conf, cb) { - var db = Level(conf.levelPath || './test.level.db'); - db.locked = false; - db.queue = []; - cb({ - message: function (channelName, content, cb) { - insert(db, channelName, content, cb); - }, - getMessages: function (channelName, msgHandler, cb) { - getMessages(db, channelName, msgHandler, cb); - } - }); -}; diff --git a/storage/mongo.js b/storage/mongo.js deleted file mode 100644 index 67f662a09..000000000 --- a/storage/mongo.js +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Copyright 2014 XWiki SAS - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ -var MongoClient = require('mongodb').MongoClient; - -var MONGO_URI = "mongodb://demo_user:demo_password@ds027769.mongolab.com:27769/demo_database"; -var COLLECTION_NAME = 'cryptpad'; - -var insert = function (coll, channelName, content, cb) { - var val = {chan: channelName, msg:content, time: (new Date()).getTime()}; - coll.insertOne(val, {}, function (err, r) { - console.log(r); - if (err || (r.insertedCount !== 1)) { - console.log('failed to insert ' + err); - return; - } - cb(); - }); -}; - -var getMessages = function (coll, channelName, cb) { - // find entries with a matching channelname - coll.find({chan:channelName}) - // sort by _id, ascending - .sort( { _id: 1 } ) - // iterate over entries - .forEach(function (doc) { - cb(doc.msg); - }, function (err) { - if (!err) { return; } - console.log('error ' + err); - }); -}; - -module.exports.create = function (conf, cb) { - MongoClient.connect(conf.mongoUri, function(err, db) { - var coll = db.collection(conf.mongoCollectionName); - if (err) { throw err; } - cb({ - message: function (channelName, content, cb) { - insert(coll, channelName, content, cb); - }, - getMessages: function (channelName, msgHandler) { - getMessages(coll, channelName, msgHandler); - } - }); - }); -}; diff --git a/storage/sql.js b/storage/sql.js deleted file mode 100644 index 9bb07519d..000000000 --- a/storage/sql.js +++ /dev/null @@ -1,58 +0,0 @@ -var Knex = require("knex"); - -var getMessages = function (knex, channel, msgHandler, cb) { - return knex('messages') - .where({ - channel: channel, - }) - .select('*') - .then(function (rows) { - rows.forEach(function (row) { - msgHandler(row.content); - }); - cb(); - }) - .catch(function (e) { - console.error(e); - cb(); - }); -}; - -var insert = function (knex, channel, content, cb) { - knex.table('messages').insert({ - channel: channel, - content: content, - }) - .then(function () { - cb(); - }); -}; - -module.exports.create = function (conf, cb) { - var knex = Knex({ - dialect: 'sqlite3', - connection: conf.dbConnection, - useNullAsDefault: true, - }); - - knex.schema.hasTable('messages').then(function (exists) { - if (exists) { return; } - - return knex.schema.createTable('messages', function (table) { - table.increments('id'); - table.string('content'); - table.string('channel'); - table.timestamps(); - }); - }) - .then(function () { - cb({ - message: function (channelName, content, cb) { - insert(knex, channelName, content, cb); - }, - getMessages: function (channelName, msgHandler, cb) { - getMessages(knex, channelName, msgHandler, cb); - }, - }); - }); -};