Merge branch 'soon' of github.com:xwiki-labs/cryptpad into soon
commit
fba130745d
|
@ -23,7 +23,7 @@ To update from 3.23.0 to 3.23.1:
|
|||
|
||||
0. Read the 3.23.0 release notes carefully and apply all configuration changes if you haven't already done so.
|
||||
1. Stop your server
|
||||
2. Get the latest code with `git checkout 3.20.1`
|
||||
2. Get the latest code with `git checkout 3.23.1`
|
||||
3. Install the latest dependencies with `bower update` and `npm i`
|
||||
4. Restart your server
|
||||
|
||||
|
|
|
@ -378,7 +378,10 @@ var makeWalker = function (n, handleChild, done) {
|
|||
nThen(function (w) {
|
||||
// check if the path is a directory...
|
||||
Fs.stat(path, w(function (err, stats) {
|
||||
if (err) { return next(); }
|
||||
if (err) {
|
||||
w.abort();
|
||||
return next();
|
||||
}
|
||||
if (!stats.isDirectory()) {
|
||||
w.abort();
|
||||
return void handleChild(void 0, path, next);
|
||||
|
|
|
@ -94,25 +94,31 @@ const destroyStream = function (stream) {
|
|||
}, STREAM_DESTROY_TIMEOUT);
|
||||
};
|
||||
|
||||
/*
|
||||
accept a stream, an id (used as a label) and an optional number of milliseconds
|
||||
/* createIdleStreamCollector
|
||||
|
||||
return a function which ignores all arguments
|
||||
and first tries to gracefully close a stream
|
||||
then destroys it after a period if the close was not successful
|
||||
if the function is not called within the specified number of milliseconds
|
||||
then it will be called implicitly with an error to indicate
|
||||
that it was run because it timed out
|
||||
Takes a stream and returns a function to asynchronously close that stream.
|
||||
Successive calls to the function will be ignored.
|
||||
|
||||
If the function is not called for a period of STREAM_CLOSE_TIMEOUT it will
|
||||
be called automatically unless its `keepAlive` method has been invoked
|
||||
in the meantime. Used to prevent file descriptor leaks in the case of
|
||||
abandoned streams while closing streams which are being read very very
|
||||
slowly.
|
||||
|
||||
XXX inform the stream consumer when it has been closed prematurely
|
||||
by calling back with a TIMEOUT error or something
|
||||
|
||||
*/
|
||||
const ensureStreamCloses = function (stream, id, ms) {
|
||||
return Util.bake(Util.mkTimeout(Util.once(function (err) {
|
||||
destroyStream(stream);
|
||||
if (err) {
|
||||
// this can only be a timeout error...
|
||||
console.log("stream close error:", err, id);
|
||||
}
|
||||
}), ms || STREAM_CLOSE_TIMEOUT), []);
|
||||
const createIdleStreamCollector = function (stream) {
|
||||
// create a function to close the stream which takes no arguments
|
||||
// and will do nothing after being called the first time
|
||||
var collector = Util.once(Util.mkAsync(Util.bake(destroyStream, [stream])));
|
||||
|
||||
// create a second function which will execute the first function after a delay
|
||||
// calling this function will reset the delay and thus keep the stream 'alive'
|
||||
collector.keepAlive = Util.throttle(collector, STREAM_CLOSE_TIMEOUT);
|
||||
collector.keepAlive();
|
||||
return collector;
|
||||
};
|
||||
|
||||
// readMessagesBin asynchronously iterates over the messages in a channel log
|
||||
|
@ -122,25 +128,22 @@ const ensureStreamCloses = function (stream, id, ms) {
|
|||
// it also allows the handler to abort reading at any time
|
||||
const readMessagesBin = (env, id, start, msgHandler, cb) => {
|
||||
const stream = Fs.createReadStream(mkPath(env, id), { start: start });
|
||||
const finish = ensureStreamCloses(stream, '[readMessagesBin:' + id + ']');
|
||||
return void readFileBin(stream, msgHandler, function (err) {
|
||||
cb(err);
|
||||
finish();
|
||||
});
|
||||
const collector = createIdleStreamCollector(stream);
|
||||
const handleMessageAndKeepStreamAlive = Util.both(msgHandler, collector.keepAlive);
|
||||
const done = Util.both(cb, collector);
|
||||
return void readFileBin(stream, handleMessageAndKeepStreamAlive, done);
|
||||
};
|
||||
|
||||
// reads classic metadata from a channel log and aborts
|
||||
// returns undefined if the first message was not an object (not an array)
|
||||
var getMetadataAtPath = function (Env, path, _cb) {
|
||||
const stream = Fs.createReadStream(path, { start: 0 });
|
||||
const finish = ensureStreamCloses(stream, '[getMetadataAtPath:' + path + ']');
|
||||
var cb = Util.once(Util.mkAsync(Util.both(_cb, finish)), function () {
|
||||
throw new Error("Multiple Callbacks");
|
||||
});
|
||||
|
||||
const collector = createIdleStreamCollector(stream);
|
||||
var cb = Util.once(Util.mkAsync(Util.both(_cb, collector)));
|
||||
var i = 0;
|
||||
|
||||
return readFileBin(stream, function (msgObj, readMore, abort) {
|
||||
collector.keepAlive();
|
||||
const line = msgObj.buff.toString('utf8');
|
||||
|
||||
if (!line) {
|
||||
|
@ -149,7 +152,7 @@ var getMetadataAtPath = function (Env, path, _cb) {
|
|||
|
||||
// metadata should always be on the first line or not exist in the channel at all
|
||||
if (i++ > 0) {
|
||||
console.log("aborting");
|
||||
//console.log("aborting");
|
||||
abort();
|
||||
return void cb();
|
||||
}
|
||||
|
@ -219,10 +222,11 @@ var clearChannel = function (env, channelId, _cb) {
|
|||
*/
|
||||
var readMessages = function (path, msgHandler, _cb) {
|
||||
var stream = Fs.createReadStream(path, { start: 0});
|
||||
const finish = ensureStreamCloses(stream, '[readMessages:' + path + ']');
|
||||
var cb = Util.once(Util.mkAsync(Util.both(finish, _cb)));
|
||||
var collector = createIdleStreamCollector(stream);
|
||||
var cb = Util.once(Util.mkAsync(Util.both(_cb, collector)));
|
||||
|
||||
return readFileBin(stream, function (msgObj, readMore) {
|
||||
collector.keepAlive();
|
||||
msgHandler(msgObj.buff.toString('utf8'));
|
||||
readMore();
|
||||
}, function (err) {
|
||||
|
@ -247,10 +251,11 @@ var getDedicatedMetadata = function (env, channelId, handler, _cb) {
|
|||
var metadataPath = mkMetadataPath(env, channelId);
|
||||
var stream = Fs.createReadStream(metadataPath, {start: 0});
|
||||
|
||||
const finish = ensureStreamCloses(stream, '[getDedicatedMetadata:' + metadataPath + ']');
|
||||
var cb = Util.both(finish, _cb);
|
||||
const collector = createIdleStreamCollector(stream);
|
||||
var cb = Util.both(_cb, collector);
|
||||
|
||||
readFileBin(stream, function (msgObj, readMore) {
|
||||
collector.keepAlive();
|
||||
var line = msgObj.buff.toString('utf8');
|
||||
try {
|
||||
var parsed = JSON.parse(line);
|
||||
|
@ -758,11 +763,11 @@ var getChannel = function (env, id, _callback) {
|
|||
});
|
||||
});
|
||||
}).nThen(function () {
|
||||
channel.delayClose = Util.throttle(function () {
|
||||
channel.delayClose = Util.throttle(Util.once(function () {
|
||||
delete env.channels[id];
|
||||
destroyStream(channel.writeStream, path);
|
||||
//console.log("closing writestream");
|
||||
}, CHANNEL_WRITE_WINDOW);
|
||||
}), CHANNEL_WRITE_WINDOW);
|
||||
channel.delayClose();
|
||||
env.channels[id] = channel;
|
||||
done(void 0, channel);
|
||||
|
|
|
@ -82,6 +82,13 @@ Workers.initialize = function (Env, config, _cb) {
|
|||
|
||||
var drained = true;
|
||||
var sendCommand = function (msg, _cb, opt) {
|
||||
if (!_cb) {
|
||||
return void Log.error('WORKER_COMMAND_MISSING_CB', {
|
||||
msg: msg,
|
||||
opt: opt,
|
||||
});
|
||||
}
|
||||
|
||||
opt = opt || {};
|
||||
var index = getAvailableWorkerIndex();
|
||||
|
||||
|
@ -95,7 +102,7 @@ Workers.initialize = function (Env, config, _cb) {
|
|||
});
|
||||
if (drained) {
|
||||
drained = false;
|
||||
Log.debug('WORKER_QUEUE_BACKLOG', {
|
||||
Log.error('WORKER_QUEUE_BACKLOG', {
|
||||
workers: workers.length,
|
||||
});
|
||||
}
|
||||
|
@ -104,12 +111,6 @@ Workers.initialize = function (Env, config, _cb) {
|
|||
}
|
||||
|
||||
const txid = guid();
|
||||
msg.txid = txid;
|
||||
msg.pid = PID;
|
||||
|
||||
// track which worker is doing which jobs
|
||||
state.tasks[txid] = msg;
|
||||
|
||||
var cb = Util.once(Util.mkAsync(Util.both(_cb, function (err /*, value */) {
|
||||
if (err !== 'TIMEOUT') { return; }
|
||||
// in the event of a timeout the user will receive an error
|
||||
|
@ -120,6 +121,16 @@ Workers.initialize = function (Env, config, _cb) {
|
|||
delete state.tasks[txid];
|
||||
})));
|
||||
|
||||
if (!msg) {
|
||||
return void cb('ESERVERERR');
|
||||
}
|
||||
|
||||
msg.txid = txid;
|
||||
msg.pid = PID;
|
||||
|
||||
// track which worker is doing which jobs
|
||||
state.tasks[txid] = msg;
|
||||
|
||||
// default to timing out affter 180s if no explicit timeout is passed
|
||||
var timeout = typeof(opt.timeout) !== 'undefined'? opt.timeout: 180000;
|
||||
response.expect(txid, cb, timeout);
|
||||
|
@ -153,6 +164,13 @@ Workers.initialize = function (Env, config, _cb) {
|
|||
}
|
||||
|
||||
var nextMsg = queue.shift();
|
||||
|
||||
if (!nextMsg || !nextMsg.msg) {
|
||||
return void Log.error('WORKER_QUEUE_EMPTY_MESSAGE', {
|
||||
item: nextMsg,
|
||||
});
|
||||
}
|
||||
|
||||
/* `nextMsg` was at the top of the queue.
|
||||
We know that a job just finished and all of this code
|
||||
is synchronous, so calling `sendCommand` should take the worker
|
||||
|
@ -200,7 +218,7 @@ Workers.initialize = function (Env, config, _cb) {
|
|||
const cb = response.expectation(txid);
|
||||
if (typeof(cb) !== 'function') { return; }
|
||||
const task = state.tasks[txid];
|
||||
if (!task && task.msg) { return; }
|
||||
if (!(task && task.msg)) { return; }
|
||||
response.clear(txid);
|
||||
Log.info('DB_WORKER_RESEND', task.msg);
|
||||
sendCommand(task.msg, cb);
|
||||
|
|
|
@ -190,7 +190,6 @@ define([
|
|||
var intr;
|
||||
var check = function() {
|
||||
if (window.CKEDITOR) {
|
||||
window.CKEDITOR.timestamp = 123456; // XXX cache-busting string for CkEditor
|
||||
clearTimeout(intr);
|
||||
cb(window.CKEDITOR);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue