Merge pull request #32 from xwiki-labs/files

filesystem storage
pull/1/head
ansuz 2016-09-15 15:10:49 +02:00 committed by GitHub
commit 721cb8fed1
11 changed files with 283 additions and 356 deletions

1
.gitignore vendored
View File

@ -1,3 +1,4 @@
datastore
www/bower_components/*
node_modules
/config.js

View File

@ -33,7 +33,13 @@ const sendChannelMessage = function (ctx, channel, msgStruct) {
}
});
if (USE_HISTORY_KEEPER && msgStruct[2] === 'MSG') {
ctx.store.message(channel.id, JSON.stringify(msgStruct), function () { });
ctx.store.message(channel.id, JSON.stringify(msgStruct), function (err) {
if (err && typeof(err) !== 'function') {
// ignore functions because older datastores
// might pass waitFors into the callback
console.log("Error writing message: " + err);
}
});
}
};
@ -90,7 +96,11 @@ const getHistory = function (ctx, channelName, handler, cb) {
var messageBuf = [];
ctx.store.getMessages(channelName, function (msgStr) {
messageBuf.push(JSON.parse(msgStr));
}, function () {
}, function (err) {
if (err) {
console.log("Error getting messages " + err.stack);
// TODO: handle this better
}
var startPoint;
var cpCount = 0;
var msgBuff2 = [];

View File

@ -17,52 +17,30 @@ module.exports = {
*/
logToStdout: false,
/* Cryptpad can be configured to remove channels some number of ms
after the last remaining client has disconnected.
Default behaviour is to keep channels forever.
If you enable channel removal, the default removal time is one minute
/*
Cryptpad stores each document in an individual file on your hard drive.
Specify a directory where files should be stored.
It will be created automatically if it does not already exist.
*/
removeChannels: false,
channelRemovalTimeout: 60000,
filePath: './datastore/',
// You now have a choice of storage engines
/*
You have the option of specifying an alternative storage adaptor.
These status of these alternatives are specified in their READMEs,
which are available at the following URLs:
/* amnesiadb only exists in memory.
* it will not persist across server restarts
* it will not scale well if your server stays alive for a long time.
* but it is completely dependency free
*/
//storage: './storage/amnesia',
mongodb: a noSQL database
https://github.com/xwiki-labs/cryptpad-mongo-store
amnesiadb: in memory storage
https://github.com/xwiki-labs/cryptpad-amnesia-store
leveldb: a simple, fast, key-value store
https://github.com/xwiki-labs/cryptpad-level-store
sql: an adaptor for a variety of sql databases via knexjs
https://github.com/xwiki-labs/cryptpad-sql-store
/* the 'lvl' storage module uses leveldb
* it persists, and will perform better than amnesiadb
* you will need to run 'npm install level' to use it
*
* you can provide a path to a database folder, which will be created
* if it does not already exist. If you use level and do not pass a path
* it will be created at cryptpad/test.level.db
*
* to delete all pads, run `rm -rf $YOUR_DB`
*/
storage: './storage/lvl',
levelPath: './test.level.db'
/* mongo is the original storage engine for cryptpad
* it has been more thoroughly tested, but requires a little more setup
*/
// storage: './storage/mongo',
/* this url is accessible over the internet, it is useful for testing
* but should not be used in production
*/
// mongoUri: "mongodb://demo_user:demo_password@ds027769.mongolab.com:27769/demo_database",
/* mongoUri should really be used to refer to a local installation of mongodb
* to install the mongodb client, run `npm install mongodb`
*/
// mongoUri: "mongodb://localhost:27017/cryptpad",
// mongoCollectionName: 'cryptpad',
For the most up to date solution, use the default storage adaptor.
*/
storage: './storage/file',
/* it is recommended that you serve cryptpad over https
* the filepaths below are used to configure your certificates

65
import Executable file
View File

@ -0,0 +1,65 @@
#!/usr/bin/env node
/* globals process */
var Config = require("./config");
var Fs = require("fs");
var Storage = require(Config.storage);
var args = process.argv.slice(2);
if (!args.length) {
console.log("Insufficient arguments!");
console.log("Pass a path to a database backup!");
process.exit();
}
var dump = Fs.readFileSync(args[0], 'utf-8');
var ready = function (store) {
var lock = 0;
dump.split(/\n/)
.filter(function (line) {
return line;
})
.forEach(function (line, i) {
lock++;
var parts;
var channel;
var msg;
line.replace(/^(.*?)\|(.*)$/, function (all, c, m) {
channel = c;
msg = m;
return '';
});
if (!channel || !msg) {
console.log("BAD LINE on line %s", i);
return;
}
try {
JSON.parse(msg);
} catch (err) {
console.log("BAD LINE on line %s", i);
console.log(msg);
console.log();
}
store.message(channel, msg, function () {
console.log(line);
lock--;
if (!lock) {
console.log("DONE");
process.exit(0);
}
});
});
};
Storage.create(Config, function (store) {
console.log("READY");
ready(store);
});

View File

@ -5,7 +5,6 @@
"dependencies": {
"express": "~4.10.1",
"ws": "^1.0.1",
"level": "~1.4.0",
"nthen": "~0.1.0"
},
"devDependencies": {

View File

@ -1,56 +0,0 @@
/*
As the log statement says, this module does nothing to persist your data
across sessions. If your process crashes for any reason, all pads will die.
This might be useful if you want to debug other parts of the codebase, if
you want to test out cryptpad without installing mongodb locally, or if
you don't want to rely on a remote db like the one at mongolab.com.
Maybe you just like the idea of a forgetful pad? To use this module, edit
config.js to include a directive `storage: './storage/amnesia'
Enjoy!
*/
module.exports.create = function(conf, cb){
console.log("Loading amnesiadb. This is a horrible idea in production,"+
" as data *will not* persist\n");
var db=[],
index=0;
if (conf.removeChannels) {
console.log("Server is set to remove channels %sms after the last remaining client leaves.", conf.channelRemovalTimeout);
}
cb({
message: function(channelName, content, cb){
var val = {
id:index++,
chan: channelName,
msg: content,
time: new Date().getTime(),
};
db.push(val);
if (cb) { cb(); }
},
getMessages: function(channelName, handler, cb){
db.sort(function(a,b){
return a.id - b.id;
});
db.filter(function(val){
return val.chan === channelName;
}).forEach(function(doc){
handler(doc.msg);
});
if (cb) { cb(); }
},
removeChannel: function (channelName, cb) {
var err = false;
db = db.filter(function (msg) {
return msg.chan !== channelName;
});
cb(err);
},
});
};

View File

@ -1,22 +1,195 @@
var Fs = require("fs");
var Path = require("path");
var nThen = require("nthen");
var insert = function (env, channel, content, cb) {
var mkPath = function (env, channelId) {
return Path.join(env.root, channelId.slice(0, 2), channelId) + '.ndjson';
};
var getMessages = function (env, channelName, msgHandler, cb) {
var readMessages = function (path, msgHandler, cb) {
var remainder = '';
var stream = Fs.createReadStream(path, 'utf8');
var complete = function (err) {
var _cb = cb;
cb = undefined;
if (_cb) { _cb(err); }
};
stream.on('data', function (chunk) {
var lines = chunk.split('\n');
lines[0] = remainder + lines[0];
remainder = lines.pop();
lines.forEach(msgHandler);
});
stream.on('end', function () {
msgHandler(remainder);
complete();
});
stream.on('error', function (e) { complete(e); });
};
var checkPath = function (path, callback) {
Fs.stat(path, function (err, stats) {
if (!err) {
callback(undefined, true);
return;
}
if (err.code !== 'ENOENT') {
callback(err);
return;
}
var dirPath = path.replace(/\/[^\/]*$/, '/');
Fs.mkdir(dirPath, function (err) {
if (err && err.code !== 'EEXIST') {
callback(err);
return;
}
callback(undefined, false);
});
});
};
var getChannel = function (env, id, callback) {
if (env.channels[id]) {
var chan = env.channels[id];
if (chan.whenLoaded) {
chan.whenLoaded.push(callback);
} else {
callback(undefined, chan);
}
return;
}
var channel = env.channels[id] = {
atime: +new Date(),
messages: [],
writeStream: undefined,
whenLoaded: [ callback ],
onError: [ ]
};
var complete = function (err) {
var whenLoaded = channel.whenLoaded;
// no guarantee stream.on('error') will not cause this to be called multiple times
if (!whenLoaded) { return; }
channel.whenLoaded = undefined;
if (err) {
delete env.channels[id];
}
whenLoaded.forEach(function (wl) { wl(err, (err) ? undefined : channel); });
};
var path = mkPath(env, id);
var fileExists;
var errorState;
nThen(function (waitFor) {
checkPath(path, waitFor(function (err, exists) {
if (err) {
errorState = true;
complete(err);
return;
}
fileExists = exists;
}));
}).nThen(function (waitFor) {
if (errorState) { return; }
if (!fileExists) { return; }
readMessages(path, function (msg) {
channel.messages.push(msg);
}, waitFor(function (err) {
if (err) {
errorState = true;
complete(err);
}
}));
}).nThen(function (waitFor) {
if (errorState) { return; }
var stream = channel.writeStream = Fs.createWriteStream(path, { flags: 'a' });
stream.on('open', waitFor());
stream.on('error', function (err) {
// this might be called after this nThen block closes.
if (channel.whenLoaded) {
complete(err);
} else {
channel.onError.forEach(function (handler) {
handler(err);
});
}
});
}).nThen(function (waitFor) {
if (errorState) { return; }
complete();
});
};
var message = function (env, chanName, msg, cb) {
getChannel(env, chanName, function (err, chan) {
if (err) {
cb(err);
return;
}
var complete = function (err) {
var _cb = cb;
cb = undefined;
if (_cb) { _cb(err); }
};
chan.onError.push(complete);
chan.writeStream.write(msg + '\n', function () {
chan.onError.splice(chan.onError.indexOf(complete) - 1, 1);
if (!cb) { return; }
chan.messages.push(msg);
chan.atime = +new Date();
complete();
});
});
};
var getMessages = function (env, chanName, handler, cb) {
getChannel(env, chanName, function (err, chan) {
if (err) {
cb(err);
return;
}
try {
chan.messages
.forEach(function (message) {
if (!message) { return; }
handler(message);
});
} catch (err2) {
console.error(err2);
cb(err2);
return;
}
chan.atime = +new Date();
cb();
});
};
var removeChannel = function (env, channelName, cb) {
var filename = Path.join(env.root, channelName.slice(0, 2), channelName + '.ndjson');
Fs.unlink(filename, cb);
};
module.exports.create = function (conf, cb) {
var env = {};
cb({
message: function (channelName, content, cb) {
insert(env, channelName, content, cb);
},
getMessages: function (channelName, msgHandler, cb) {
getMessages(env, channelName, msgHandler, cb);
},
var env = {
root: conf.filePath || './datastore',
channels: { },
};
console.log('storing data in ' + env.root);
Fs.mkdir(env.root, function (err) {
if (err && err.code !== 'EEXIST') {
// TODO: somehow return a nice error
throw err;
}
cb({
message: function (channelName, content, cb) {
message(env, channelName, content, cb);
},
getMessages: function (channelName, msgHandler, cb) {
getMessages(env, channelName, msgHandler, cb);
},
removeChannel: function (channelName, cb) {
removeChannel(env, channelName, function (err) {
cb(err);
});
},
});
});
};

View File

@ -1,51 +0,0 @@
var kad=require("kad");
var levelup=require("levelup");
/*
THiS FILE IS NOT PRODUCTION READY
DON'T USE IT!
*/
module.exports.create=function(conf,cb){
var dht= kad({
address:conf.kadAddress,
port:conf.kadPort,
storage:levelup(conf.kadStore),
seeds:conf.kadSeeds,
transport: kad.transports.UDP,
});
var getIndex=function(cName,f){
dht.get(cName+'=>index',function(e,out){
e && console.error(e) || f(Number(out));
});
};
cb({
message:function(cName, content, cb){
getIndex(cName, function(index){
index+=1;
dht.put(cName+'=>index', ''+index,function(e){
e && console.error("ERROR updating index (%s): %s",index,e) ||
console.log("PUT SUCCESS: %s", cName+'=>index')
});
dht.put(cName+'=>'+index, content, function(e){
e && console.error("ERROR updating value at %s: %s",cName+'=>'+index,e)||
console.log("PUT SUCCESS: %s", cName+'=>index')
cb();
});
});
},
getMessages: function(cName, cb){
getIndex(cName, function(index){
for(var i=index;i>=0;i--){
dht.get(cName+'=>'+i,function(e,out){
if(e) return console.error("DHT GET ERROR: %s",e);
console.log("GET SUCCESS: %s", cName+'=>index')
cb(out);
});
}
});
},
});
};

View File

@ -1,73 +0,0 @@
var Level = require("level");
var nThen = require('nthen');
var getIndex = function(db, cName, cb) {
db.get(cName+'=>index', function(e, out){
if (e) {
if (e.notFound) {
cb(-1);
} else {
throw e;
}
return;
}
cb(parseInt(out));
});
};
var insert = function (db, channelName, content, cb) {
var index;
var doIt = function () {
db.locked = true;
nThen(function (waitFor) {
getIndex(db, channelName, waitFor(function (i) { index = i+1; }));
}).nThen(function (waitFor) {
db.put(channelName+'=>'+index, content, waitFor(function (e) { if (e) { throw e; } }));
}).nThen(function (waitFor) {
db.put(channelName+'=>index', ''+index, waitFor(function (e) { if (e) { throw e; } }));
}).nThen(function (waitFor) {
db.locked = false;
if (!db.queue.length) { return; }
db.queue.shift()();
}).nThen(cb);
};
if (db.locked) {
db.queue.push(doIt);
} else {
doIt();
}
};
var getMessages = function (db, channelName, msgHandler, cb) {
var index;
nThen(function (waitFor) {
getIndex(db, channelName, waitFor(function (i) {
index = i;
}));
}).nThen(function (waitFor) {
var again = function (i) {
db.get(channelName + '=>' + i, waitFor(function (e, out) {
if (e) { throw e; }
msgHandler(out);
if (i < index) { again(i+1); }
else if (cb) { cb(); }
}));
};
if (index > -1) { again(0); }
else if (cb) { cb(); }
});
};
module.exports.create = function (conf, cb) {
var db = Level(conf.levelPath || './test.level.db');
db.locked = false;
db.queue = [];
cb({
message: function (channelName, content, cb) {
insert(db, channelName, content, cb);
},
getMessages: function (channelName, msgHandler, cb) {
getMessages(db, channelName, msgHandler, cb);
}
});
};

View File

@ -1,61 +0,0 @@
/*
* Copyright 2014 XWiki SAS
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
var MongoClient = require('mongodb').MongoClient;
var MONGO_URI = "mongodb://demo_user:demo_password@ds027769.mongolab.com:27769/demo_database";
var COLLECTION_NAME = 'cryptpad';
var insert = function (coll, channelName, content, cb) {
var val = {chan: channelName, msg:content, time: (new Date()).getTime()};
coll.insertOne(val, {}, function (err, r) {
console.log(r);
if (err || (r.insertedCount !== 1)) {
console.log('failed to insert ' + err);
return;
}
cb();
});
};
var getMessages = function (coll, channelName, cb) {
// find entries with a matching channelname
coll.find({chan:channelName})
// sort by _id, ascending
.sort( { _id: 1 } )
// iterate over entries
.forEach(function (doc) {
cb(doc.msg);
}, function (err) {
if (!err) { return; }
console.log('error ' + err);
});
};
module.exports.create = function (conf, cb) {
MongoClient.connect(conf.mongoUri, function(err, db) {
var coll = db.collection(conf.mongoCollectionName);
if (err) { throw err; }
cb({
message: function (channelName, content, cb) {
insert(coll, channelName, content, cb);
},
getMessages: function (channelName, msgHandler) {
getMessages(coll, channelName, msgHandler);
}
});
});
};

View File

@ -1,58 +0,0 @@
var Knex = require("knex");
var getMessages = function (knex, channel, msgHandler, cb) {
return knex('messages')
.where({
channel: channel,
})
.select('*')
.then(function (rows) {
rows.forEach(function (row) {
msgHandler(row.content);
});
cb();
})
.catch(function (e) {
console.error(e);
cb();
});
};
var insert = function (knex, channel, content, cb) {
knex.table('messages').insert({
channel: channel,
content: content,
})
.then(function () {
cb();
});
};
module.exports.create = function (conf, cb) {
var knex = Knex({
dialect: 'sqlite3',
connection: conf.dbConnection,
useNullAsDefault: true,
});
knex.schema.hasTable('messages').then(function (exists) {
if (exists) { return; }
return knex.schema.createTable('messages', function (table) {
table.increments('id');
table.string('content');
table.string('channel');
table.timestamps();
});
})
.then(function () {
cb({
message: function (channelName, content, cb) {
insert(knex, channelName, content, cb);
},
getMessages: function (channelName, msgHandler, cb) {
getMessages(knex, channelName, msgHandler, cb);
},
});
});
};