Merge branch 'staging' of github.com:xwiki-labs/cryptpad into staging

pull/1/head
yflory 7 years ago
commit a34e227d37

@ -3,10 +3,12 @@
"description": "realtime collaborative visual editor with zero knowlege server", "description": "realtime collaborative visual editor with zero knowlege server",
"version": "1.25.0", "version": "1.25.0",
"dependencies": { "dependencies": {
"chainpad-server": "^1.0.1", "chainpad-server": "^2.0.0",
"express": "~4.10.1", "express": "~4.10.1",
"nthen": "~0.1.0", "nthen": "~0.1.0",
"pull-stream": "^3.6.1",
"saferphore": "0.0.1", "saferphore": "0.0.1",
"stream-to-pull-stream": "^1.7.2",
"tweetnacl": "~0.12.2", "tweetnacl": "~0.12.2",
"ws": "^1.0.1" "ws": "^1.0.1"
}, },

@ -1,6 +1,11 @@
/*@flow*/
/* jshint esversion: 6 */
/* global Buffer */
var Fs = require("fs"); var Fs = require("fs");
var Path = require("path"); var Path = require("path");
var nThen = require("nthen"); var nThen = require("nthen");
const ToPull = require('stream-to-pull-stream');
const Pull = require('pull-stream');
var mkPath = function (env, channelId) { var mkPath = function (env, channelId) {
return Path.join(env.root, channelId.slice(0, 2), channelId) + '.ndjson'; return Path.join(env.root, channelId.slice(0, 2), channelId) + '.ndjson';
@ -8,7 +13,7 @@ var mkPath = function (env, channelId) {
var getMetadataAtPath = function (Env, path, cb) { var getMetadataAtPath = function (Env, path, cb) {
var remainder = ''; var remainder = '';
var stream = Fs.createReadStream(path, 'utf8'); var stream = Fs.createReadStream(path, { encoding: 'utf8' });
var complete = function (err, data) { var complete = function (err, data) {
var _cb = cb; var _cb = cb;
cb = undefined; cb = undefined;
@ -25,16 +30,16 @@ var getMetadataAtPath = function (Env, path, cb) {
var parsed = null; var parsed = null;
try { try {
parsed = JSON.parse(metadata); parsed = JSON.parse(metadata);
complete(void 0, parsed); complete(undefined, parsed);
} }
catch (e) { catch (e) {
console.log(); console.log("getMetadataAtPath");
console.error(e); console.error(e);
complete('INVALID_METADATA'); complete('INVALID_METADATA');
} }
}); });
stream.on('end', function () { stream.on('end', function () {
complete(null); complete();
}); });
stream.on('error', function (e) { complete(e); }); stream.on('error', function (e) { complete(e); });
}; };
@ -59,7 +64,7 @@ var closeChannel = function (env, channelName, cb) {
var clearChannel = function (env, channelId, cb) { var clearChannel = function (env, channelId, cb) {
var path = mkPath(env, channelId); var path = mkPath(env, channelId);
getMetadataAtPath(env, path, function (e, metadata) { getMetadataAtPath(env, path, function (e, metadata) {
if (e) { return cb(e); } if (e) { return cb(new Error(e)); }
if (!metadata) { if (!metadata) {
return void Fs.truncate(path, 0, function (err) { return void Fs.truncate(path, 0, function (err) {
if (err) { if (err) {
@ -87,7 +92,7 @@ var clearChannel = function (env, channelId, cb) {
var readMessages = function (path, msgHandler, cb) { var readMessages = function (path, msgHandler, cb) {
var remainder = ''; var remainder = '';
var stream = Fs.createReadStream(path, 'utf8'); var stream = Fs.createReadStream(path, { encoding: 'utf8' });
var complete = function (err) { var complete = function (err) {
var _cb = cb; var _cb = cb;
cb = undefined; cb = undefined;
@ -106,6 +111,60 @@ var readMessages = function (path, msgHandler, cb) {
stream.on('error', function (e) { complete(e); }); stream.on('error', function (e) { complete(e); });
}; };
const NEWLINE_CHR = ('\n').charCodeAt(0);
const mkBufferSplit = () => {
let remainder = null;
return Pull((read) => {
return (abort, cb) => {
read(abort, function (end, data) {
if (end) {
cb(end, remainder ? [remainder, data] : [data]);
remainder = null;
return;
}
const queue = [];
for (;;) {
const offset = data.indexOf(NEWLINE_CHR);
if (offset < 0) {
remainder = remainder ? Buffer.concat([remainder, data]) : data;
break;
}
let subArray = data.slice(0, offset);
if (remainder) {
subArray = Buffer.concat([remainder, subArray]);
remainder = null;
}
queue.push(subArray);
data = data.slice(offset + 1);
}
cb(end, queue);
});
};
}, Pull.flatten());
};
const mkOffsetCounter = () => {
let offset = 0;
return Pull.map((buff) => {
const out = { offset: offset, buff: buff };
// +1 for the eaten newline
offset += buff.length + 1;
return out;
});
};
const readMessagesBin = (env, id, start, msgHandler, cb) => {
const stream = Fs.createReadStream(mkPath(env, id), { start: start });
let keepReading = true;
Pull(
ToPull.read(stream),
mkBufferSplit(),
mkOffsetCounter(),
Pull.asyncMap((data, moreCb) => { msgHandler(data, moreCb, ()=>{ keepReading = false; moreCb(); }); }),
Pull.drain(()=>(keepReading), cb)
);
};
var checkPath = function (path, callback) { var checkPath = function (path, callback) {
// TODO check if we actually need to use stat at all // TODO check if we actually need to use stat at all
Fs.stat(path, function (err) { Fs.stat(path, function (err) {
@ -117,7 +176,8 @@ var checkPath = function (path, callback) {
callback(err); callback(err);
return; return;
} }
Fs.mkdir(Path.dirname(path), function (err) { // 511 -> octal 777
Fs.mkdir(Path.dirname(path), 511, function (err) {
if (err && err.code !== 'EEXIST') { if (err && err.code !== 'EEXIST') {
callback(err); callback(err);
return; return;
@ -154,7 +214,28 @@ var flushUnusedChannels = function (env, cb, frame) {
cb(); cb();
}; };
var getChannel = function (env, id, callback) { var channelBytes = function (env, chanName, cb) {
var path = mkPath(env, chanName);
Fs.stat(path, function (err, stats) {
if (err) { return void cb(err); }
cb(undefined, stats.size);
});
};
/*::
export type ChainPadServer_ChannelInternal_t = {
atime: number,
writeStream: typeof(process.stdout),
whenLoaded: ?Array<(err:?Error, chan:?ChainPadServer_ChannelInternal_t)=>void>,
onError: Array<(?Error)=>void>,
path: string
};
*/
var getChannel = function (
env,
id,
callback /*:(err:?Error, chan:?ChainPadServer_ChannelInternal_t)=>void*/
) {
if (env.channels[id]) { if (env.channels[id]) {
var chan = env.channels[id]; var chan = env.channels[id];
chan.atime = +new Date(); chan.atime = +new Date();
@ -178,9 +259,9 @@ var getChannel = function (env, id, callback) {
}); });
} }
var path = mkPath(env, id); var path = mkPath(env, id);
var channel = env.channels[id] = { var channel /*:ChainPadServer_ChannelInternal_t*/ = env.channels[id] = {
atime: +new Date(), atime: +new Date(),
writeStream: undefined, writeStream: (undefined /*:any*/),
whenLoaded: [ callback ], whenLoaded: [ callback ],
onError: [ ], onError: [ ],
path: path path: path
@ -193,6 +274,9 @@ var getChannel = function (env, id, callback) {
if (err) { if (err) {
delete env.channels[id]; delete env.channels[id];
} }
if (!channel.writeStream) {
throw new Error("getChannel() complete called without channel writeStream");
}
whenLoaded.forEach(function (wl) { wl(err, (err) ? undefined : channel); }); whenLoaded.forEach(function (wl) { wl(err, (err) ? undefined : channel); });
}; };
var fileExists; var fileExists;
@ -211,7 +295,7 @@ var getChannel = function (env, id, callback) {
var stream = channel.writeStream = Fs.createWriteStream(path, { flags: 'a' }); var stream = channel.writeStream = Fs.createWriteStream(path, { flags: 'a' });
env.openFiles++; env.openFiles++;
stream.on('open', waitFor()); stream.on('open', waitFor());
stream.on('error', function (err) { stream.on('error', function (err /*:?Error*/) {
env.openFiles--; env.openFiles--;
// this might be called after this nThen block closes. // this might be called after this nThen block closes.
if (channel.whenLoaded) { if (channel.whenLoaded) {
@ -228,20 +312,22 @@ var getChannel = function (env, id, callback) {
}); });
}; };
var message = function (env, chanName, msg, cb) { const messageBin = (env, chanName, msgBin, cb) => {
getChannel(env, chanName, function (err, chan) { getChannel(env, chanName, function (err, chan) {
if (err) { if (!chan) {
cb(err); cb(err);
return; return;
} }
let called = false;
var complete = function (err) { var complete = function (err) {
var _cb = cb; if (called) { return; }
cb = undefined; called = true;
if (_cb) { _cb(err); } cb(err);
}; };
chan.onError.push(complete); chan.onError.push(complete);
chan.writeStream.write(msg + '\n', function () { chan.writeStream.write(msgBin, function () {
chan.onError.splice(chan.onError.indexOf(complete) - 1, 1); /*::if (!chan) { throw new Error("Flow unreachable"); }*/
chan.onError.splice(chan.onError.indexOf(complete), 1);
if (!cb) { return; } if (!cb) { return; }
//chan.messages.push(msg); //chan.messages.push(msg);
chan.atime = +new Date(); chan.atime = +new Date();
@ -250,9 +336,13 @@ var message = function (env, chanName, msg, cb) {
}); });
}; };
var message = function (env, chanName, msg, cb) {
messageBin(env, chanName, new Buffer(msg + '\n', 'utf8'), cb);
};
var getMessages = function (env, chanName, handler, cb) { var getMessages = function (env, chanName, handler, cb) {
getChannel(env, chanName, function (err, chan) { getChannel(env, chanName, function (err, chan) {
if (err) { if (!chan) {
cb(err); cb(err);
return; return;
} }
@ -271,21 +361,39 @@ var getMessages = function (env, chanName, handler, cb) {
errorState = true; errorState = true;
return void cb(err); return void cb(err);
} }
if (!chan) { throw new Error("impossible, flow checking"); }
chan.atime = +new Date(); chan.atime = +new Date();
cb(); cb();
}); });
}); });
}; };
var channelBytes = function (env, chanName, cb) { /*::
var path = mkPath(env, chanName); export type ChainPadServer_MessageObj_t = { buff: Buffer, offset: number };
Fs.stat(path, function (err, stats) { export type ChainPadServer_Storage_t = {
if (err) { return void cb(err); } readMessagesBin: (
cb(void 0, stats.size); channelName:string,
}); start:number,
asyncMsgHandler:(msg:ChainPadServer_MessageObj_t, moreCb:()=>void, abortCb:()=>void)=>void,
cb:(err:?Error)=>void
)=>void,
message: (channelName:string, content:string, cb:(err:?Error)=>void)=>void,
messageBin: (channelName:string, content:Buffer, cb:(err:?Error)=>void)=>void,
getMessages: (channelName:string, msgHandler:(msg:string)=>void, cb:(err:?Error)=>void)=>void,
removeChannel: (channelName:string, cb:(err:?Error)=>void)=>void,
closeChannel: (channelName:string, cb:(err:?Error)=>void)=>void,
flushUnusedChannels: (cb:()=>void)=>void,
getChannelSize: (channelName:string, cb:(err:?Error, size:?number)=>void)=>void,
getChannelMetadata: (channelName:string, cb:(err:?Error|string, data:?any)=>void)=>void,
clearChannel: (channelName:string, (err:?Error)=>void)=>void
}; };
const flow_Config = require('../config.example.js');
module.exports.create = function (conf, cb) { type Config_t = typeof(flow_Config);
*/
module.exports.create = function (
conf /*:Config_t*/,
cb /*:(store:ChainPadServer_Storage_t)=>void*/
) {
var env = { var env = {
root: conf.filePath || './datastore', root: conf.filePath || './datastore',
channels: { }, channels: { },
@ -294,15 +402,22 @@ module.exports.create = function (conf, cb) {
openFiles: 0, openFiles: 0,
openFileLimit: conf.openFileLimit || 2048, openFileLimit: conf.openFileLimit || 2048,
}; };
Fs.mkdir(env.root, function (err) { // 0x1ff -> 777
Fs.mkdir(env.root, 0x1ff, function (err) {
if (err && err.code !== 'EEXIST') { if (err && err.code !== 'EEXIST') {
// TODO: somehow return a nice error // TODO: somehow return a nice error
throw err; throw err;
} }
cb({ cb({
readMessagesBin: (channelName, start, asyncMsgHandler, cb) => {
readMessagesBin(env, channelName, start, asyncMsgHandler, cb);
},
message: function (channelName, content, cb) { message: function (channelName, content, cb) {
message(env, channelName, content, cb); message(env, channelName, content, cb);
}, },
messageBin: (channelName, content, cb) => {
messageBin(env, channelName, content, cb);
},
getMessages: function (channelName, msgHandler, cb) { getMessages: function (channelName, msgHandler, cb) {
getMessages(env, channelName, msgHandler, cb); getMessages(env, channelName, msgHandler, cb);
}, },

Loading…
Cancel
Save