diff --git a/config/config.example.js b/config/config.example.js index 273c196d2..3826a7291 100644 --- a/config/config.example.js +++ b/config/config.example.js @@ -89,6 +89,14 @@ module.exports = { */ //httpSafePort: 3001, +/* CryptPad will launch a child process for every core available + * in order to perform CPU-intensive tasks in parallel. + * Some host environments may have a very large number of cores available + * or you may want to limit how much computing power CryptPad can take. + * If so, set 'maxWorkers' to a positive integer. + */ + // maxWorkers: 4, + /* ===================== * Admin * ===================== */ diff --git a/customize.dist/ckeditor-config.js b/customize.dist/ckeditor-config.js index c55f5fe99..293c4b600 100644 --- a/customize.dist/ckeditor-config.js +++ b/customize.dist/ckeditor-config.js @@ -11,6 +11,8 @@ CKEDITOR.editorConfig = function( config ) { config.removePlugins= 'resize,elementspath'; config.resize_enabled= false; //bottom-bar config.extraPlugins= 'autolink,colorbutton,colordialog,font,indentblock,justify,mediatag,print,blockbase64,mathjax,wordcount,comments'; + // FIXME translation for default? updating to a newer CKEditor seems like it will add 'default' by default + config.fontSize_sizes = '(Default)/unset;8/8px;9/9px;10/10px;11/11px;12/12px;14/14px;16/16px;18/18px;20/20px;22/22px;24/24px;26/26px;28/28px;36/36px;48/48px;72/72px'; config.toolbarGroups= [ // {"name":"clipboard","groups":["clipboard","undo"]}, //{"name":"editing","groups":["find","selection"]}, diff --git a/customize.dist/pages.js b/customize.dist/pages.js index 63cbbf154..e271dd74b 100644 --- a/customize.dist/pages.js +++ b/customize.dist/pages.js @@ -107,7 +107,7 @@ define([ ])*/ ]) ]), - h('div.cp-version-footer', "CryptPad v3.15.0 (PigFootedBandicoot)") + h('div.cp-version-footer', "CryptPad v3.16.0 (Quagga)") ]); }; diff --git a/customize.dist/src/less2/include/toolbar.less b/customize.dist/src/less2/include/toolbar.less index 49654dc56..73e608c4a 100644 --- a/customize.dist/src/less2/include/toolbar.less +++ b/customize.dist/src/less2/include/toolbar.less @@ -1179,6 +1179,7 @@ &.fa-download { order: 2; } &.fa-upload { order: 3; } &.fa-print { order: 4; } + &.fa-arrows-h { order: 5; } &.fa-cog { order: 5; } &.fa-info-circle { order: 6; } &.fa-help { order: 7; } diff --git a/docs/example.nginx.conf b/docs/example.nginx.conf index ea8224c14..f6e163910 100644 --- a/docs/example.nginx.conf +++ b/docs/example.nginx.conf @@ -54,6 +54,7 @@ server { add_header Strict-Transport-Security "max-age=31536000; includeSubDomains" always; add_header X-XSS-Protection "1; mode=block"; add_header X-Content-Type-Options nosniff; + add_header Access-Control-Allow-Origin "*"; # add_header X-Frame-Options "SAMEORIGIN"; # Insert the path to your CryptPad repository root here diff --git a/lib/commands/admin-rpc.js b/lib/commands/admin-rpc.js index ed1d5b8a4..c60cdcfbc 100644 --- a/lib/commands/admin-rpc.js +++ b/lib/commands/admin-rpc.js @@ -1,4 +1,5 @@ /*jshint esversion: 6 */ +/* globals process */ const nThen = require("nthen"); const getFolderSize = require("get-folder-size"); const Util = require("../common-util"); @@ -50,6 +51,7 @@ var getCacheStats = function (env, server, cb) { metaSize: metaSize, channel: channelCount, channelSize: channelSize, + memoryUsage: process.memoryUsage(), }); }; diff --git a/lib/commands/channel.js b/lib/commands/channel.js index d35e15241..fce5b048c 100644 --- a/lib/commands/channel.js +++ b/lib/commands/channel.js @@ -54,16 +54,8 @@ Channel.clearOwnedChannel = function (Env, safeKey, channelId, cb, Server) { }); }; -Channel.removeOwnedChannel = function (Env, safeKey, channelId, cb, Server) { - if (typeof(channelId) !== 'string' || !Core.isValidId(channelId)) { - return cb('INVALID_ARGUMENTS'); - } +var archiveOwnedChannel = function (Env, safeKey, channelId, cb, Server) { var unsafeKey = Util.unescapeKeyCharacters(safeKey); - - if (Env.blobStore.isFileId(channelId)) { - return void Env.removeOwnedBlob(channelId, safeKey, cb); - } - Metadata.getMetadata(Env, channelId, function (err, metadata) { if (err) { return void cb(err); } if (!Core.hasOwners(metadata)) { return void cb('E_NO_OWNERS'); } @@ -124,6 +116,24 @@ Channel.removeOwnedChannel = function (Env, safeKey, channelId, cb, Server) { }); }; +Channel.removeOwnedChannel = function (Env, safeKey, channelId, __cb, Server) { + var _cb = Util.once(Util.mkAsync(__cb)); + + if (typeof(channelId) !== 'string' || !Core.isValidId(channelId)) { + return _cb('INVALID_ARGUMENTS'); + } + + // archiving large channels or files can be expensive, so do it one at a time + // for any given user to ensure that nobody can use too much of the server's resources + Env.queueDeletes(safeKey, function (next) { + var cb = Util.both(_cb, next); + if (Env.blobStore.isFileId(channelId)) { + return void Env.removeOwnedBlob(channelId, safeKey, cb); + } + archiveOwnedChannel(Env, safeKey, channelId, cb, Server); + }); +}; + Channel.trimHistory = function (Env, safeKey, data, cb) { if (!(data && typeof(data.channel) === 'string' && typeof(data.hash) === 'string' && data.hash.length === 64)) { return void cb('INVALID_ARGS'); diff --git a/lib/commands/pin-rpc.js b/lib/commands/pin-rpc.js index f3ade0489..a6f1642c6 100644 --- a/lib/commands/pin-rpc.js +++ b/lib/commands/pin-rpc.js @@ -49,16 +49,19 @@ var loadUserPins = function (Env, safeKey, cb) { // only put this into the cache if it completes session.channels = value; } - session.channels = value; done(value); }); }); }; var truthyKeys = function (O) { - return Object.keys(O).filter(function (k) { - return O[k]; - }); + try { + return Object.keys(O).filter(function (k) { + return O[k]; + }); + } catch (err) { + return []; + } }; var getChannelList = Pinning.getChannelList = function (Env, safeKey, _cb) { diff --git a/lib/historyKeeper.js b/lib/historyKeeper.js index f70dba006..aa320f4b6 100644 --- a/lib/historyKeeper.js +++ b/lib/historyKeeper.js @@ -31,13 +31,13 @@ module.exports.create = function (config, cb) { // and more easily share state between historyKeeper and rpc const Env = { Log: Log, - // tasks // store id: Crypto.randomBytes(8).toString('hex'), metadata_cache: {}, channel_cache: {}, queueStorage: WriteQueue(), + queueDeletes: WriteQueue(), batchIndexReads: BatchRead("HK_GET_INDEX"), batchMetadata: BatchRead('GET_METADATA'), @@ -98,7 +98,7 @@ module.exports.create = function (config, cb) { paths.staging = keyOrDefaultString('blobStagingPath', './blobstage'); paths.blob = keyOrDefaultString('blobPath', './blob'); - Env.defaultStorageLimit = typeof(config.defaultStorageLimit) === 'number' && config.defaultStorageLimit > 0? + Env.defaultStorageLimit = typeof(config.defaultStorageLimit) === 'number' && config.defaultStorageLimit >= 0? config.defaultStorageLimit: Core.DEFAULT_LIMIT; @@ -252,17 +252,14 @@ module.exports.create = function (config, cb) { channelExpirationMs: config.channelExpirationMs, verbose: config.verbose, openFileLimit: config.openFileLimit, + + maxWorkers: config.maxWorkers, }, w(function (err) { if (err) { throw new Error(err); } })); - }).nThen(function (w) { - // create a task store (for scheduling tasks) - require("./storage/tasks").create(config, w(function (e, tasks) { - if (e) { throw e; } - Env.tasks = tasks; - })); + }).nThen(function () { if (config.disableIntegratedTasks) { return; } config.intervals = config.intervals || {}; diff --git a/lib/hk-util.js b/lib/hk-util.js index 0bb96a52b..3b8c07e09 100644 --- a/lib/hk-util.js +++ b/lib/hk-util.js @@ -529,7 +529,7 @@ const handleFirstMessage = function (Env, channelName, metadata) { if(metadata.expire && typeof(metadata.expire) === 'number') { // the fun part... // the user has said they want this pad to expire at some point - Env.tasks.write(metadata.expire, "EXPIRE", [ channelName ], function (err) { + Env.writeTask(metadata.expire, "EXPIRE", [ channelName ], function (err) { if (err) { // if there is an error, we don't want to crash the whole server... // just log it, and if there's a problem you'll be able to fix it @@ -621,7 +621,10 @@ const handleGetHistory = function (Env, Server, seq, userId, parsed) { Server.send(userId, [0, HISTORY_KEEPER_ID, 'MSG', userId, JSON.stringify(msg)], readMore); }, (err) => { if (err && err.code !== 'ENOENT') { - if (err.message !== 'EINVAL') { Log.error("HK_GET_HISTORY", err); } + if (err.message !== 'EINVAL') { Log.error("HK_GET_HISTORY", { + err: err && err.message, + stack: err && err.stack, + }); } const parsedMsg = {error:err.message, channel: channelName, txid: txid}; Server.send(userId, [0, HISTORY_KEEPER_ID, 'MSG', userId, JSON.stringify(parsedMsg)]); return; @@ -662,30 +665,17 @@ const handleGetHistoryRange = function (Env, Server, seq, userId, parsed) { } Server.send(userId, [seq, 'ACK']); - Env.getOlderHistory(channelName, oldestKnownHash, function (err, messages) { + Env.getOlderHistory(channelName, oldestKnownHash, desiredMessages, desiredCheckpoint, function (err, toSend) { if (err && err.code !== 'ENOENT') { Env.Log.error("HK_GET_OLDER_HISTORY", err); } - if (!Array.isArray(messages)) { messages = []; } - - var toSend = []; - if (typeof (desiredMessages) === "number") { - toSend = messages.slice(-desiredMessages); - } else { - let cpCount = 0; - for (var i = messages.length - 1; i >= 0; i--) { - if (/^cp\|/.test(messages[i][4]) && i !== (messages.length - 1)) { - cpCount++; - } - toSend.unshift(messages[i]); - if (cpCount >= desiredCheckpoint) { break; } - } + if (Array.isArray(toSend)) { + toSend.forEach(function (msg) { + Server.send(userId, [0, HISTORY_KEEPER_ID, 'MSG', userId, + JSON.stringify(['HISTORY_RANGE', txid, msg])]); + }); } - toSend.forEach(function (msg) { - Server.send(userId, [0, HISTORY_KEEPER_ID, 'MSG', userId, - JSON.stringify(['HISTORY_RANGE', txid, msg])]); - }); Server.send(userId, [0, HISTORY_KEEPER_ID, 'MSG', userId, JSON.stringify(['HISTORY_RANGE_END', txid, channelName]) diff --git a/lib/pins.js b/lib/pins.js index 41e871446..d840991a3 100644 --- a/lib/pins.js +++ b/lib/pins.js @@ -7,6 +7,9 @@ const Path = require("path"); const Util = require("./common-util"); const Plan = require("./plan"); +const Semaphore = require('saferphore'); +const nThen = require('nthen'); + /* Accepts a reference to an object, and... either a string describing which log is being processed (backwards compatibility), or a function which will log the error with all relevant data @@ -194,3 +197,63 @@ Pins.list = function (_done, config) { }).start(); }); }; + +Pins.load = function (cb, config) { + const sema = Semaphore.create(config.workers || 5); + + let dirList; + const fileList = []; + const pinned = {}; + + var pinPath = config.pinPath || './pins'; + var done = Util.once(cb); + + nThen((waitFor) => { + // recurse over the configured pinPath, or the default + Fs.readdir(pinPath, waitFor((err, list) => { + if (err) { + if (err.code === 'ENOENT') { + dirList = []; + return; // this ends up calling back with an empty object + } + waitFor.abort(); + return void done(err); + } + dirList = list; + })); + }).nThen((waitFor) => { + dirList.forEach((f) => { + sema.take((returnAfter) => { + // iterate over all the subdirectories in the pin store + Fs.readdir(Path.join(pinPath, f), waitFor(returnAfter((err, list2) => { + if (err) { + waitFor.abort(); + return void done(err); + } + list2.forEach((ff) => { + if (config && config.exclude && config.exclude.indexOf(ff) > -1) { return; } + fileList.push(Path.join(pinPath, f, ff)); + }); + }))); + }); + }); + }).nThen((waitFor) => { + fileList.forEach((f) => { + sema.take((returnAfter) => { + Fs.readFile(f, waitFor(returnAfter((err, content) => { + if (err) { + waitFor.abort(); + return void done(err); + } + const hashes = Pins.calculateFromLog(content.toString('utf8'), f); + hashes.forEach((x) => { + (pinned[x] = pinned[x] || {})[f.replace(/.*\/([^/]*).ndjson$/, (x, y)=>y)] = 1; + }); + }))); + }); + }); + }).nThen(() => { + done(void 0, pinned); + }); +}; + diff --git a/lib/storage/file.js b/lib/storage/file.js index 2d27ce185..db7871ef0 100644 --- a/lib/storage/file.js +++ b/lib/storage/file.js @@ -14,6 +14,28 @@ const readFileBin = require("../stream-file").readFileBin; const BatchRead = require("../batch-read"); const Schedule = require("../schedule"); + +/* Each time you write to a channel it will either use an open file descriptor + for that channel or open a new descriptor if one is not available. These are + automatically closed after this window to prevent a file descriptor leak, so + writes that take longer than this time may be dropped! */ +const CHANNEL_WRITE_WINDOW = 300000; + +/* Each time you read a channel it will have this many milliseconds to complete + otherwise it will be closed to prevent a file descriptor leak. The server will + lock up if it uses all available file descriptors, so it's important to close + them. The tradeoff with this timeout is that some functions, the stream, and + and the timeout itself are stored in memory. A longer timeout uses more memory + and running out of memory will also kill the server. */ +const STREAM_CLOSE_TIMEOUT = 120000; + +/* The above timeout closes the stream, but apparently that doesn't always work. + We set yet another timeout to allow the runtime to gracefully close the stream + (flushing all pending writes/reads and doing who knows what else). After this timeout + it will be MERCILESSLY DESTROYED. This isn't graceful, but again, file descriptor + leaks are bad. */ +const STREAM_DESTROY_TIMEOUT = 30000; + const isValidChannelId = function (id) { return typeof(id) === 'string' && id.length >= 32 && id.length < 50 && @@ -61,20 +83,36 @@ var channelExists = function (filepath, cb) { const destroyStream = function (stream) { if (!stream) { return; } - try { stream.close(); } catch (err) { console.error(err); } + try { + stream.close(); + if (stream.closed && stream.fd === null) { return; } + } catch (err) { + console.error(err); + } setTimeout(function () { try { stream.destroy(); } catch (err) { console.error(err); } - }, 15000); + }, STREAM_DESTROY_TIMEOUT); }; +/* + accept a stream, an id (used as a label) and an optional number of milliseconds + + return a function which ignores all arguments + and first tries to gracefully close a stream + then destroys it after a period if the close was not successful + if the function is not called within the specified number of milliseconds + then it will be called implicitly with an error to indicate + that it was run because it timed out + +*/ const ensureStreamCloses = function (stream, id, ms) { return Util.bake(Util.mkTimeout(Util.once(function (err) { - destroyStream(stream, id); + destroyStream(stream); if (err) { // this can only be a timeout error... console.log("stream close error:", err, id); } - }), ms || 45000), []); + }), ms || STREAM_CLOSE_TIMEOUT), []); }; // readMessagesBin asynchronously iterates over the messages in a channel log @@ -84,7 +122,7 @@ const ensureStreamCloses = function (stream, id, ms) { // it also allows the handler to abort reading at any time const readMessagesBin = (env, id, start, msgHandler, cb) => { const stream = Fs.createReadStream(mkPath(env, id), { start: start }); - const finish = ensureStreamCloses(stream, id); + const finish = ensureStreamCloses(stream, '[readMessagesBin:' + id + ']'); return void readFileBin(stream, msgHandler, function (err) { cb(err); finish(); @@ -95,7 +133,7 @@ const readMessagesBin = (env, id, start, msgHandler, cb) => { // returns undefined if the first message was not an object (not an array) var getMetadataAtPath = function (Env, path, _cb) { const stream = Fs.createReadStream(path, { start: 0 }); - const finish = ensureStreamCloses(stream, path); + const finish = ensureStreamCloses(stream, '[getMetadataAtPath:' + path + ']'); var cb = Util.once(Util.mkAsync(Util.both(_cb, finish)), function () { throw new Error("Multiple Callbacks"); }); @@ -181,7 +219,7 @@ var clearChannel = function (env, channelId, _cb) { */ var readMessages = function (path, msgHandler, _cb) { var stream = Fs.createReadStream(path, { start: 0}); - const finish = ensureStreamCloses(stream, path); + const finish = ensureStreamCloses(stream, '[readMessages:' + path + ']'); var cb = Util.once(Util.mkAsync(Util.both(finish, _cb))); return readFileBin(stream, function (msgObj, readMore) { @@ -209,7 +247,7 @@ var getDedicatedMetadata = function (env, channelId, handler, _cb) { var metadataPath = mkMetadataPath(env, channelId); var stream = Fs.createReadStream(metadataPath, {start: 0}); - const finish = ensureStreamCloses(stream, metadataPath); + const finish = ensureStreamCloses(stream, '[getDedicatedMetadata:' + metadataPath + ']'); var cb = Util.both(finish, _cb); readFileBin(stream, function (msgObj, readMore) { @@ -729,7 +767,7 @@ var getChannel = function (env, id, _callback) { delete env.channels[id]; destroyStream(channel.writeStream, path); //console.log("closing writestream"); - }, 120000); + }, CHANNEL_WRITE_WINDOW); channel.delayClose(); env.channels[id] = channel; done(void 0, channel); diff --git a/lib/workers/crypto-worker.js b/lib/workers/crypto-worker.js deleted file mode 100644 index 5ed58ac7c..000000000 --- a/lib/workers/crypto-worker.js +++ /dev/null @@ -1,113 +0,0 @@ -/* jshint esversion: 6 */ -/* global process */ -const Nacl = require('tweetnacl/nacl-fast'); - -const COMMANDS = {}; - -COMMANDS.INLINE = function (data, cb) { - var signedMsg; - try { - signedMsg = Nacl.util.decodeBase64(data.msg); - } catch (e) { - return void cb('E_BAD_MESSAGE'); - } - - var validateKey; - try { - validateKey = Nacl.util.decodeBase64(data.key); - } catch (e) { - return void cb("E_BADKEY"); - } - // validate the message - const validated = Nacl.sign.open(signedMsg, validateKey); - if (!validated) { - return void cb("FAILED"); - } - cb(); -}; - -const checkDetachedSignature = function (signedMsg, signature, publicKey) { - if (!(signedMsg && publicKey)) { return false; } - - var signedBuffer; - var pubBuffer; - var signatureBuffer; - - try { - signedBuffer = Nacl.util.decodeUTF8(signedMsg); - } catch (e) { - throw new Error("INVALID_SIGNED_BUFFER"); - } - - try { - pubBuffer = Nacl.util.decodeBase64(publicKey); - } catch (e) { - throw new Error("INVALID_PUBLIC_KEY"); - } - - try { - signatureBuffer = Nacl.util.decodeBase64(signature); - } catch (e) { - throw new Error("INVALID_SIGNATURE"); - } - - if (pubBuffer.length !== 32) { - throw new Error("INVALID_PUBLIC_KEY_LENGTH"); - } - - if (signatureBuffer.length !== 64) { - throw new Error("INVALID_SIGNATURE_LENGTH"); - } - - if (Nacl.sign.detached.verify(signedBuffer, signatureBuffer, pubBuffer) !== true) { - throw new Error("FAILED"); - } -}; - -COMMANDS.DETACHED = function (data, cb) { - try { - checkDetachedSignature(data.msg, data.sig, data.key); - } catch (err) { - return void cb(err && err.message); - } - cb(); -}; - -COMMANDS.HASH_CHANNEL_LIST = function (data, cb) { - var channels = data.channels; - if (!Array.isArray(channels)) { return void cb('INVALID_CHANNEL_LIST'); } - var uniques = []; - - channels.forEach(function (a) { - if (uniques.indexOf(a) === -1) { uniques.push(a); } - }); - uniques.sort(); - - var hash = Nacl.util.encodeBase64(Nacl.hash(Nacl - .util.decodeUTF8(JSON.stringify(uniques)))); - - cb(void 0, hash); -}; - -process.on('message', function (data) { - if (!data || !data.txid) { - return void process.send({ - error:'E_INVAL' - }); - } - - const cb = function (err, value) { - process.send({ - txid: data.txid, - error: err, - value: value, - }); - }; - - const command = COMMANDS[data.command]; - if (typeof(command) !== 'function') { - return void cb("E_BAD_COMMAND"); - } - - command(data, cb); -}); diff --git a/lib/workers/db-worker.js b/lib/workers/db-worker.js index 0b9de4f53..d80c2dc21 100644 --- a/lib/workers/db-worker.js +++ b/lib/workers/db-worker.js @@ -12,6 +12,7 @@ const Core = require("../commands/core"); const Saferphore = require("saferphore"); const Logger = require("../log"); const Tasks = require("../storage/tasks"); +const Nacl = require('tweetnacl/nacl-fast'); const Env = { Log: {}, @@ -221,10 +222,10 @@ const computeMetadata = function (data, cb) { const getOlderHistory = function (data, cb) { const oldestKnownHash = data.hash; const channelName = data.channel; + const desiredMessages = data.desiredMessages; + const desiredCheckpoint = data.desiredCheckpoint; - //const store = Env.store; - //const Log = Env.Log; - var messageBuffer = []; + var messages = []; var found = false; store.getMessages(channelName, function (msgStr) { if (found) { return; } @@ -245,9 +246,22 @@ const getOlderHistory = function (data, cb) { if (hash === oldestKnownHash) { found = true; } - messageBuffer.push(parsed); + messages.push(parsed); }, function (err) { - cb(err, messageBuffer); + var toSend = []; + if (typeof (desiredMessages) === "number") { + toSend = messages.slice(-desiredMessages); + } else { + let cpCount = 0; + for (var i = messages.length - 1; i >= 0; i--) { + if (/^cp\|/.test(messages[i][4]) && i !== (messages.length - 1)) { + cpCount++; + } + toSend.unshift(messages[i]); + if (cpCount >= desiredCheckpoint) { break; } + } + } + cb(err, toSend); }); }; @@ -418,6 +432,10 @@ const runTasks = function (data, cb) { Env.tasks.runAll(cb); }; +const writeTask = function (data, cb) { + Env.tasks.write(data.time, data.task_command, data.args, cb); +}; + const COMMANDS = { COMPUTE_INDEX: computeIndex, COMPUTE_METADATA: computeMetadata, @@ -430,6 +448,92 @@ const COMMANDS = { GET_HASH_OFFSET: getHashOffset, REMOVE_OWNED_BLOB: removeOwnedBlob, RUN_TASKS: runTasks, + WRITE_TASK: writeTask, +}; + +COMMANDS.INLINE = function (data, cb) { + var signedMsg; + try { + signedMsg = Nacl.util.decodeBase64(data.msg); + } catch (e) { + return void cb('E_BAD_MESSAGE'); + } + + var validateKey; + try { + validateKey = Nacl.util.decodeBase64(data.key); + } catch (e) { + return void cb("E_BADKEY"); + } + // validate the message + const validated = Nacl.sign.open(signedMsg, validateKey); + if (!validated) { + return void cb("FAILED"); + } + cb(); +}; + +const checkDetachedSignature = function (signedMsg, signature, publicKey) { + if (!(signedMsg && publicKey)) { return false; } + + var signedBuffer; + var pubBuffer; + var signatureBuffer; + + try { + signedBuffer = Nacl.util.decodeUTF8(signedMsg); + } catch (e) { + throw new Error("INVALID_SIGNED_BUFFER"); + } + + try { + pubBuffer = Nacl.util.decodeBase64(publicKey); + } catch (e) { + throw new Error("INVALID_PUBLIC_KEY"); + } + + try { + signatureBuffer = Nacl.util.decodeBase64(signature); + } catch (e) { + throw new Error("INVALID_SIGNATURE"); + } + + if (pubBuffer.length !== 32) { + throw new Error("INVALID_PUBLIC_KEY_LENGTH"); + } + + if (signatureBuffer.length !== 64) { + throw new Error("INVALID_SIGNATURE_LENGTH"); + } + + if (Nacl.sign.detached.verify(signedBuffer, signatureBuffer, pubBuffer) !== true) { + throw new Error("FAILED"); + } +}; + +COMMANDS.DETACHED = function (data, cb) { + try { + checkDetachedSignature(data.msg, data.sig, data.key); + } catch (err) { + return void cb(err && err.message); + } + cb(); +}; + +COMMANDS.HASH_CHANNEL_LIST = function (data, cb) { + var channels = data.channels; + if (!Array.isArray(channels)) { return void cb('INVALID_CHANNEL_LIST'); } + var uniques = []; + + channels.forEach(function (a) { + if (uniques.indexOf(a) === -1) { uniques.push(a); } + }); + uniques.sort(); + + var hash = Nacl.util.encodeBase64(Nacl.hash(Nacl + .util.decodeUTF8(JSON.stringify(uniques)))); + + cb(void 0, hash); }; process.on('message', function (data) { diff --git a/lib/workers/index.js b/lib/workers/index.js index 3190741d3..6a163cadc 100644 --- a/lib/workers/index.js +++ b/lib/workers/index.js @@ -3,103 +3,14 @@ const Util = require("../common-util"); const nThen = require('nthen'); const OS = require("os"); -const numCPUs = OS.cpus().length; const { fork } = require('child_process'); const Workers = module.exports; const PID = process.pid; -const CRYPTO_PATH = 'lib/workers/crypto-worker'; const DB_PATH = 'lib/workers/db-worker'; +const MAX_JOBS = 16; -Workers.initializeValidationWorkers = function (Env) { - if (typeof(Env.validateMessage) !== 'undefined') { - return void console.error("validation workers are already initialized"); - } - - // Create our workers - const workers = []; - for (let i = 0; i < numCPUs; i++) { - workers.push(fork(CRYPTO_PATH)); - } - - const response = Util.response(function (errLabel, info) { - Env.Log.error('HK_VALIDATE_WORKER__' + errLabel, info); - }); - - var initWorker = function (worker) { - worker.on('message', function (res) { - if (!res || !res.txid) { return; } - response.handle(res.txid, [res.error, res.value]); - }); - - var substituteWorker = Util.once( function () { - Env.Log.info("SUBSTITUTE_VALIDATION_WORKER", ''); - var idx = workers.indexOf(worker); - if (idx !== -1) { - workers.splice(idx, 1); - } - // Spawn a new one - var w = fork(CRYPTO_PATH); - workers.push(w); - initWorker(w); - }); - - // Spawn a new process in one ends - worker.on('exit', substituteWorker); - worker.on('close', substituteWorker); - worker.on('error', function (err) { - substituteWorker(); - Env.Log.error('VALIDATION_WORKER_ERROR', { - error: err, - }); - }); - }; - workers.forEach(initWorker); - - var nextWorker = 0; - const send = function (msg, _cb) { - var cb = Util.once(Util.mkAsync(_cb)); - // let's be paranoid about asynchrony and only calling back once.. - nextWorker = (nextWorker + 1) % workers.length; - if (workers.length === 0 || typeof(workers[nextWorker].send) !== 'function') { - return void cb("INVALID_WORKERS"); - } - - var txid = msg.txid = Util.uid(); - - // expect a response within 45s - response.expect(txid, cb, 60000); - - // Send the request - workers[nextWorker].send(msg); - }; - - Env.validateMessage = function (signedMsg, key, cb) { - send({ - msg: signedMsg, - key: key, - command: 'INLINE', - }, cb); - }; - - Env.checkSignature = function (signedMsg, signature, publicKey, cb) { - send({ - command: 'DETACHED', - sig: signature, - msg: signedMsg, - key: publicKey, - }, cb); - }; - - Env.hashChannelList = function (channels, cb) { - send({ - command: 'HASH_CHANNEL_LIST', - channels: channels, - }, cb); - }; -}; - -Workers.initializeIndexWorkers = function (Env, config, _cb) { +Workers.initialize = function (Env, config, _cb) { var cb = Util.once(Util.mkAsync(_cb)); const workers = []; @@ -124,16 +35,60 @@ Workers.initializeIndexWorkers = function (Env, config, _cb) { return response.expected(id)? guid(): id; }; - var workerIndex = 0; - var sendCommand = function (msg, _cb) { - var cb = Util.once(Util.mkAsync(_cb)); + var workerOffset = -1; + var queue = []; + var getAvailableWorkerIndex = function () { +// If there is already a backlog of tasks you can avoid some work +// by going to the end of the line + if (queue.length) { return -1; } + + var L = workers.length; + if (L === 0) { + Log.error('NO_WORKERS_AVAILABLE', { + queue: queue.length, + }); + return -1; + } - workerIndex = (workerIndex + 1) % workers.length; - if (!isWorker(workers[workerIndex])) { - return void cb("NO_WORKERS"); + // cycle through the workers once + // start from a different offset each time + // return -1 if none are available + + workerOffset = (workerOffset + 1) % L; + + var temp; + for (let i = 0; i < L; i++) { + temp = (workerOffset + i) % L; +/* I'd like for this condition to be more efficient + (`Object.keys` is sub-optimal) but I found some bugs in my initial + implementation stemming from a task counter variable going out-of-sync + with reality when a worker crashed and its tasks were re-assigned to + its substitute. I'm sure it can be done correctly and efficiently, + but this is a relatively easy way to make sure it's always up to date. + We'll see how it performs in practice before optimizing. +*/ + if (workers[temp] && Object.keys(workers[temp]).length < MAX_JOBS) { + return temp; + } } + return -1; + }; - var state = workers[workerIndex]; + var sendCommand = function (msg, _cb) { + var index = getAvailableWorkerIndex(); + + var state = workers[index]; + // if there is no worker available: + if (!isWorker(state)) { + // queue the message for when one becomes available + queue.push({ + msg: msg, + cb: _cb, + }); + return; + } + + var cb = Util.once(Util.mkAsync(_cb)); const txid = guid(); msg.txid = txid; @@ -141,14 +96,42 @@ Workers.initializeIndexWorkers = function (Env, config, _cb) { // track which worker is doing which jobs state.tasks[txid] = msg; - response.expect(txid, function (err, value) { - // clean up when you get a response - delete state[txid]; - cb(err, value); - }, 60000); + + response.expect(txid, cb, 60000); state.worker.send(msg); }; + var handleResponse = function (state, res) { + if (!res) { return; } + // handle log messages before checking if it was addressed to your PID + // it might still be useful to know what happened inside an orphaned worker + if (res.log) { + return void handleLog(res.log, res.label, res.info); + } + // but don't bother handling things addressed to other processes + // since it's basically guaranteed not to work + if (res.pid !== PID) { + return void Log.error("WRONG_PID", res); + } + + if (!res.txid) { return; } + response.handle(res.txid, [res.error, res.value]); + delete state.tasks[res.txid]; + if (!queue.length) { return; } + + var nextMsg = queue.shift(); +/* `nextMsg` was at the top of the queue. + We know that a job just finished and all of this code + is synchronous, so calling `sendCommand` should take the worker + which was just freed up. This is somewhat fragile though, so + be careful if you want to modify this block. The risk is that + we take something that was at the top of the queue and push it + to the back because the following msg took its place. OR, in an + even worse scenario, we cycle through the queue but don't run anything. +*/ + sendCommand(nextMsg.msg, nextMsg.cb); + }; + const initWorker = function (worker, cb) { const txid = guid(); @@ -170,19 +153,7 @@ Workers.initializeIndexWorkers = function (Env, config, _cb) { }); worker.on('message', function (res) { - if (!res) { return; } - // handle log messages before checking if it was addressed to your PID - // it might still be useful to know what happened inside an orphaned worker - if (res.log) { - return void handleLog(res.log, res.label, res.info); - } - // but don't bother handling things addressed to other processes - // since it's basically guaranteed not to work - if (res.pid !== PID) { - return void Log.error("WRONG_PID", res); - } - - response.handle(res.txid, [res.error, res.value]); + handleResponse(state, res); }); var substituteWorker = Util.once(function () { @@ -222,7 +193,32 @@ Workers.initializeIndexWorkers = function (Env, config, _cb) { }; nThen(function (w) { - OS.cpus().forEach(function () { + const max = config.maxWorkers; + + var limit; + if (typeof(max) !== 'undefined') { + // the admin provided a limit on the number of workers + if (typeof(max) === 'number' && !isNaN(max)) { + if (max < 1) { + Log.info("INSUFFICIENT_MAX_WORKERS", max); + limit = 1; + } + } else { + Log.error("INVALID_MAX_WORKERS", '[' + max + ']'); + } + } + + var logged; + + OS.cpus().forEach(function (cpu, index) { + if (limit && index >= limit) { + if (!logged) { + logged = true; + Log.info('WORKER_LIMIT', "(Opting not to use available CPUs beyond " + index + ')'); + } + return; + } + initWorker(fork(DB_PATH), w(function (err) { if (!err) { return; } w.abort(); @@ -254,12 +250,14 @@ Workers.initializeIndexWorkers = function (Env, config, _cb) { }); }; - Env.getOlderHistory = function (channel, oldestKnownHash, cb) { + Env.getOlderHistory = function (channel, oldestKnownHash, desiredMessages, desiredCheckpoint, cb) { Env.store.getWeakLock(channel, function (next) { sendCommand({ channel: channel, command: "GET_OLDER_HISTORY", hash: oldestKnownHash, + desiredMessages: desiredMessages, + desiredCheckpoint: desiredCheckpoint, }, Util.both(next, cb)); }); }; @@ -327,11 +325,42 @@ Workers.initializeIndexWorkers = function (Env, config, _cb) { }, cb); }; + Env.writeTask = function (time, command, args, cb) { + sendCommand({ + command: 'WRITE_TASK', + time: time, + task_command: command, + args: args, + }, cb); + }; + + // Synchronous crypto functions + Env.validateMessage = function (signedMsg, key, cb) { + sendCommand({ + msg: signedMsg, + key: key, + command: 'INLINE', + }, cb); + }; + + Env.checkSignature = function (signedMsg, signature, publicKey, cb) { + sendCommand({ + command: 'DETACHED', + sig: signature, + msg: signedMsg, + key: publicKey, + }, cb); + }; + + Env.hashChannelList = function (channels, cb) { + sendCommand({ + command: 'HASH_CHANNEL_LIST', + channels: channels, + }, cb); + }; + cb(void 0); }); }; -Workers.initialize = function (Env, config, cb) { - Workers.initializeValidationWorkers(Env); - Workers.initializeIndexWorkers(Env, config, cb); -}; + diff --git a/package-lock.json b/package-lock.json index cefaca30d..5c5c2bc1c 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,6 +1,6 @@ { "name": "cryptpad", - "version": "3.15.0", + "version": "3.16.0", "lockfileVersion": 1, "requires": true, "dependencies": { diff --git a/package.json b/package.json index cd684660f..d5e019ad1 100644 --- a/package.json +++ b/package.json @@ -1,7 +1,7 @@ { "name": "cryptpad", "description": "realtime collaborative visual editor with zero knowlege server", - "version": "3.15.0", + "version": "3.16.0", "license": "AGPL-3.0+", "repository": { "type": "git", diff --git a/scripts/compare-pin-methods.js b/scripts/compare-pin-methods.js new file mode 100644 index 000000000..de7ef114d --- /dev/null +++ b/scripts/compare-pin-methods.js @@ -0,0 +1,42 @@ +/* jshint esversion: 6, node: true */ +const nThen = require("nthen"); +const Pins = require("../lib/pins"); +const Assert = require("assert"); + +const config = require("../lib/load-config"); + +var compare = function () { + console.log(config); + var conf = { + pinPath: config.pinPath, + }; + + var list, load; + + nThen(function (w) { + Pins.list(w(function (err, p) { + if (err) { throw err; } + list = p; + console.log(p); + console.log(list); + console.log(); + }), conf); + }).nThen(function (w) { + Pins.load(w(function (err, p) { + if (err) { throw err; } + load = p; + console.log(load); + console.log(); + }), conf); + }).nThen(function () { + console.log({ + listLength: Object.keys(list).length, + loadLength: Object.keys(load).length, + }); + + Assert.deepEqual(list, load); + console.log("methods are equivalent"); + }); +}; + +compare(); diff --git a/scripts/evict-inactive.js b/scripts/evict-inactive.js index a3a595ca4..1d7b87e91 100644 --- a/scripts/evict-inactive.js +++ b/scripts/evict-inactive.js @@ -42,7 +42,7 @@ nThen(function (w) { store = _; })); // load the list of pinned files so you know which files // should not be archived or deleted - Pins.list(w(function (err, _) { + Pins.load(w(function (err, _) { if (err) { w.abort(); return void console.error(err); diff --git a/www/admin/index.html b/www/admin/index.html index 79a96c97b..c59dd8edc 100644 --- a/www/admin/index.html +++ b/www/admin/index.html @@ -6,7 +6,7 @@ - +