diff --git a/historyKeeper.js b/historyKeeper.js new file mode 100644 index 000000000..351bf8cf1 --- /dev/null +++ b/historyKeeper.js @@ -0,0 +1,618 @@ +/* jshint esversion: 6 */ +;(function () { 'use strict'; + +const nThen = require('nthen'); +const Nacl = require('tweetnacl'); +const Crypto = require('crypto'); + + +const now = function () { return (new Date()).getTime(); }; + +const getHash = function (msg) { + if (typeof(msg) !== 'string') { + console.log('getHash() called on', typeof(msg), msg); + return ''; + } + return msg.slice(0,64); +}; + +const tryParse = function (str) { + try { + return JSON.parse(str); + } catch (err) { + console.error(err); + } +}; + +const sliceCpIndex = function (cpIndex, line) { + // Remove "old" checkpoints (cp sent before 100 messages ago) + const minLine = Math.max(0, (line - 100)); + let start = cpIndex.slice(0, -2); + const end = cpIndex.slice(-2); + start = start.filter(function (obj) { + return obj.line > minLine; + }); + return start.concat(end); +}; + +module.exports.create = function (cfg) { + const rpc = cfg.rpc; + const tasks = cfg.tasks; + const store = cfg.store; + + var Env = {}; + const historyKeeperKeys = {}; + const HISTORY_KEEPER_ID = Crypto.randomBytes(8).toString('hex'); + + let sendMsg = function () {}; + let STANDARD_CHANNEL_LENGTH, EPHEMERAL_CHANNEL_LENGTH; + const setConfig = function (config) { + STANDARD_CHANNEL_LENGTH = config.STANDARD_CHANNEL_LENGTH; + EPHEMERAL_CHANNEL_LENGTH = config.EPHEMERAL_CHANNEl_LENGTH; + sendMsg = config.sendMsg; + }; + + const computeIndex = function (channelName, cb) { + const cpIndex = []; + let messageBuf = []; + let validateKey; + let metadata; + let i = 0; + store.readMessagesBin(channelName, 0, (msgObj, rmcb) => { + let msg; + i++; + if (!validateKey && msgObj.buff.indexOf('validateKey') > -1) { + metadata = msg = tryParse(msgObj.buff.toString('utf8')); + if (typeof msg === "undefined") { return rmcb(); } + if (msg.validateKey) { + validateKey = historyKeeperKeys[channelName] = msg; + return rmcb(); + } + } + if (msgObj.buff.indexOf('cp|') > -1) { + msg = msg || tryParse(msgObj.buff.toString('utf8')); + if (typeof msg === "undefined") { return rmcb(); } + if (msg[2] === 'MSG' && msg[4].indexOf('cp|') === 0) { + cpIndex.push({ + offset: msgObj.offset, + line: i + }); + messageBuf = []; + } + } + messageBuf.push(msgObj); + return rmcb(); + }, (err) => { + if (err && err.code !== 'ENOENT') { return void cb(err); } + const offsetByHash = {}; + let size = 0; + messageBuf.forEach((msgObj) => { + const msg = tryParse(msgObj.buff.toString('utf8')); + if (typeof msg === "undefined") { return; } + if (msg[0] === 0 && msg[2] === 'MSG' && typeof(msg[4]) === 'string') { + offsetByHash[getHash(msg[4])] = msgObj.offset; + } + // There is a trailing \n at the end of the file + size = msgObj.offset + msgObj.buff.length + 1; + }); + cb(null, { + // Only keep the checkpoints included in the last 100 messages + cpIndex: sliceCpIndex(cpIndex, i), + offsetByHash: offsetByHash, + size: size, + metadata: metadata, + line: i + }); + }); + }; + + const getIndex = (ctx, channelName, cb) => { + const chan = ctx.channels[channelName]; + if (chan && chan.index) { return void cb(undefined, chan.index); } + computeIndex(channelName, (err, ret) => { + if (err) { return void cb(err); } + if (chan) { chan.index = ret; } + cb(undefined, ret); + }); + }; + + /*:: + type cp_index_item = { + offset: number, + line: number + } + */ + + + const storeMessage = function (ctx, channel, msg, isCp, maybeMsgHash) { + const msgBin = new Buffer(msg + '\n', 'utf8'); + // Store the message first, and update the index only once it's stored. + // store.messageBin can be async so updating the index first may + // result in a wrong cpIndex + nThen((waitFor) => { + store.messageBin(channel.id, msgBin, waitFor(function (err) { + if (err) { + waitFor.abort(); + return void console.log("Error writing message: " + err.message); + } + })); + }).nThen((waitFor) => { + getIndex(ctx, channel.id, waitFor((err, index) => { + if (err) { + console.log("getIndex()"); + console.log(err.stack); + // non-critical, we'll be able to get the channel index later + return; + } + if (typeof (index.line) === "number") { index.line++; } + if (isCp) { + index.cpIndex = sliceCpIndex(index.cpIndex, index.line || 0); + for (let k in index.offsetByHash) { + if (index.offsetByHash[k] < index.cpIndex[0]) { + delete index.offsetByHash[k]; + } + } + index.cpIndex.push(({ + offset: index.size, + line: ((index.line || 0) + 1) + } /*:cp_index_item*/)); + } + if (maybeMsgHash) { index.offsetByHash[maybeMsgHash] = index.size; } + index.size += msgBin.length; + })); + }); + }; + + // Determine what we should store when a message a broadcasted to a channel + const onChannelMessage = function (ctx, channel, msgStruct) { + // don't store messages if the channel id indicates that it's an ephemeral message + if (!channel.id || channel.id.length === EPHEMERAL_CHANNEL_LENGTH) { return; } + + const isCp = /^cp\|/.test(msgStruct[4]); + if (historyKeeperKeys[channel.id] && historyKeeperKeys[channel.id].expire && + historyKeeperKeys[channel.id].expire < +new Date()) { + return; // Don't store messages on expired channel + } + let id; + if (isCp) { + /*::if (typeof(msgStruct[4]) !== 'string') { throw new Error(); }*/ + id = /cp\|(([A-Za-z0-9+\/=]+)\|)?/.exec(msgStruct[4]); + if (Array.isArray(id) && id[2] && id[2] === channel.lastSavedCp) { + // Reject duplicate checkpoints + return; + } + } + if (historyKeeperKeys[channel.id] && historyKeeperKeys[channel.id].validateKey) { + /*::if (typeof(msgStruct[4]) !== 'string') { throw new Error(); }*/ + let signedMsg = (isCp) ? msgStruct[4].replace(/^cp\|(([A-Za-z0-9+\/=]+)\|)?/, '') : msgStruct[4]; + signedMsg = Nacl.util.decodeBase64(signedMsg); + const validateKey = Nacl.util.decodeBase64(historyKeeperKeys[channel.id].validateKey); + const validated = Nacl.sign.open(signedMsg, validateKey); + if (!validated) { + console.log("Signed message rejected"); // TODO logging + return; + } + } + if (isCp) { + // WARNING: the fact that we only check the most recent checkpoints + // is a potential source of bugs if one editor has high latency and + // pushes a duplicate of an earlier checkpoint than the latest which + // has been pushed by editors with low latency + if (Array.isArray(id) && id[2]) { + // Store new checkpoint hash + channel.lastSavedCp = id[2]; + } + } + msgStruct.push(now()); + storeMessage(ctx, channel, JSON.stringify(msgStruct), isCp, getHash(msgStruct[4])); + }; + + const dropChannel = function (chanName) { + delete historyKeeperKeys[chanName]; + }; + + const getHistoryOffset = (ctx, channelName, lastKnownHash, cb /*:(e:?Error, os:?number)=>void*/) => { + // lastKnownhash === -1 means we want the complete history + if (lastKnownHash === -1) { return void cb(null, 0); } + let offset = -1; + nThen((waitFor) => { + getIndex(ctx, channelName, waitFor((err, index) => { + if (err) { waitFor.abort(); return void cb(err); } + + // Check last known hash + const lkh = index.offsetByHash[lastKnownHash]; + if (lastKnownHash && typeof(lkh) !== "number") { + waitFor.abort(); + return void cb(new Error('EINVAL')); + } + + // Since last 2 checkpoints + if (!lastKnownHash) { + waitFor.abort(); + // Less than 2 checkpoints in the history: return everything + if (index.cpIndex.length < 2) { return void cb(null, 0); } + // Otherwise return the second last checkpoint's index + return void cb(null, index.cpIndex[0].offset); + /* LATER... + in practice, two checkpoints can be very close together + we have measures to avoid duplicate checkpoints, but editors + can produce nearby checkpoints which are slightly different, + and slip past these protections. To be really careful, we can + seek past nearby checkpoints by some number of patches so as + to ensure that all editors have sufficient knowledge of history + to reconcile their differences. */ + } + + offset = lkh; + })); + }).nThen((waitFor) => { + if (offset !== -1) { return; } + store.readMessagesBin(channelName, 0, (msgObj, rmcb, abort) => { + const msg = tryParse(msgObj.buff.toString('utf8')); + if (typeof msg === "undefined") { return rmcb(); } + if (typeof(msg[4]) !== 'string' || lastKnownHash !== getHash(msg[4])) { + return void rmcb(); + } + offset = msgObj.offset; + abort(); + }, waitFor(function (err) { + if (err) { waitFor.abort(); return void cb(err); } + })); + }).nThen((waitFor) => { + cb(null, offset); + }); + }; + + const getHistoryAsync = (ctx, channelName, lastKnownHash, beforeHash, handler, cb) => { + let offset = -1; + nThen((waitFor) => { + getHistoryOffset(ctx, channelName, lastKnownHash, waitFor((err, os) => { + if (err) { + waitFor.abort(); + return void cb(err); + } + offset = os; + })); + }).nThen((waitFor) => { + if (offset === -1) { return void cb(new Error("could not find offset")); } + const start = (beforeHash) ? 0 : offset; + store.readMessagesBin(channelName, start, (msgObj, rmcb, abort) => { + if (beforeHash && msgObj.offset >= offset) { return void abort(); } + handler(tryParse(msgObj.buff.toString('utf8')), rmcb); + }, waitFor(function (err) { + return void cb(err); + })); + }); + }; + + const getOlderHistory = function (channelName, oldestKnownHash, cb) { + var messageBuffer = []; + var found = false; + store.getMessages(channelName, function (msgStr) { + if (found) { return; } + + let parsed = tryParse(msgStr); + if (typeof parsed === "undefined") { return; } + + if (parsed.validateKey) { + historyKeeperKeys[channelName] = parsed; + return; + } + + var content = parsed[4]; + if (typeof(content) !== 'string') { return; } + + var hash = getHash(content); + if (hash === oldestKnownHash) { + found = true; + } + messageBuffer.push(parsed); + }, function (err) { + if (err) { + console.error("getOlderHistory", err); + } + cb(messageBuffer); + }); + }; + + /*:: + type Chan_t = { + indexOf: (any)=>number, + id: string, + lastSavedCp: string, + forEach: ((any)=>void)=>void, + push: (any)=>void, + }; + */ + + + const historyKeeperBroadcast = function (ctx, channel, msg) { + let chan = ctx.channels[channel] || (([] /*:any*/) /*:Chan_t*/); + chan.forEach(function (user) { + sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify(msg)]); + }); + }; + const onChannelCleared = function (ctx, channel) { + historyKeeperBroadcast(ctx, channel, { + error: 'ECLEARED', + channel: channel + }); + }; + // When a channel is removed from datastore, broadcast a message to all its connected users + const onChannelDeleted = function (ctx, channel) { + store.closeChannel(channel, function () { + historyKeeperBroadcast(ctx, channel, { + error: 'EDELETED', + channel: channel + }); + }); + delete ctx.channels[channel]; + delete historyKeeperKeys[channel]; + }; + // Check if the selected channel is expired + // If it is, remove it from memory and broadcast a message to its members + const checkExpired = function (ctx, channel) { + if (channel && channel.length === STANDARD_CHANNEL_LENGTH && historyKeeperKeys[channel] && + historyKeeperKeys[channel].expire && historyKeeperKeys[channel].expire < +new Date()) { + store.closeChannel(channel, function () { + historyKeeperBroadcast(ctx, channel, { + error: 'EEXPIRED', + channel: channel + }); + }); + delete ctx.channels[channel]; + delete historyKeeperKeys[channel]; + return true; + } + return; + }; + + const onDirectMessage = function (ctx, seq, user, json) { + let parsed; + let channelName; + try { + parsed = JSON.parse(json[2]); + } catch (err) { + console.error("handleMessage(JSON.parse)", err); // TODO logging + return; + } + + // If the requested history is for an expired channel, abort + // Note the if we don't have the keys for that channel in historyKeeperKeys, we'll + // have to abort later (once we know the expiration time) + if (checkExpired(ctx, parsed[1])) { return; } + + if (parsed[0] === 'GET_HISTORY') { + // parsed[1] is the channel id + // parsed[2] is a validation key or an object containing metadata (optionnal) + // parsed[3] is the last known hash (optionnal) + sendMsg(ctx, user, [seq, 'ACK']); + channelName = parsed[1]; + var validateKey = parsed[2]; + var lastKnownHash = parsed[3]; + var owners; + var expire; + if (parsed[2] && typeof parsed[2] === "object") { + validateKey = parsed[2].validateKey; + lastKnownHash = parsed[2].lastKnownHash; + owners = parsed[2].owners; + if (parsed[2].expire) { + expire = +parsed[2].expire * 1000 + (+new Date()); + } + } + + nThen(function (waitFor) { + if (!tasks) { return; } // tasks are not supported + if (typeof(expire) !== 'number' || !expire) { return; } + + // the fun part... + // the user has said they want this pad to expire at some point + tasks.write(expire, "EXPIRE", [ channelName ], waitFor(function (err) { + if (err) { + // if there is an error, we don't want to crash the whole server... + // just log it, and if there's a problem you'll be able to fix it + // at a later date with the provided information + console.error('Failed to write expiration to disk:', err); // TODO logging + console.error([expire, 'EXPIRE', channelName]); // TODO logging + } + })); + }).nThen(function (waitFor) { + var w = waitFor(); + + /* unless this is a young channel, we will serve all messages from an offset + this will not include the channel metadata, so we need to explicitly fetch that. + unfortunately, we can't just serve it blindly, since then young channels will + send the metadata twice, so let's do a quick check of what we're going to serve... + */ + getIndex(ctx, channelName, waitFor((err, index) => { + /* if there's an error here, it should be encountered + and handled by the next nThen block. + so, let's just fall through... + */ + if (err) { return w(); } + if (!index || !index.metadata) { return void w(); } + // Store the metadata if we don't have it in memory + if (!historyKeeperKeys[channelName]) { + historyKeeperKeys[channelName] = index.metadata; + } + // And then check if the channel is expired. If it is, send the error and abort + if (checkExpired(ctx, channelName)) { return void waitFor.abort(); } + // Send the metadata to the user + if (!lastKnownHash && index.cpIndex.length > 1) { + sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify(index.metadata)], w); + return; + } + w(); + })); + }).nThen(() => { + let msgCount = 0; + let expired = false; + getHistoryAsync(ctx, channelName, lastKnownHash, false, (msg, cb) => { + if (!msg) { return; } + if (msg.validateKey) { + // If it is a young channel, this is the part where we get the metadata + // Check if the channel is expired and abort if it is. + if (!historyKeeperKeys[channelName]) { historyKeeperKeys[channelName] = msg; } + expired = checkExpired(ctx, channelName); + } + if (expired) { return void cb(); } + msgCount++; + + sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify(msg)], cb); + }, (err) => { + // If the pad is expired, stop here, we've already sent the error message + if (expired) { return; } + + if (err && err.code !== 'ENOENT') { + if (err.message !== 'EINVAL') { console.error("GET_HISTORY", err); } + const parsedMsg = {error:err.message, channel: channelName}; + sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify(parsedMsg)]); + return; + } + + // If this is a new channel, we need to store the metadata as + // the first message in the file + const chan = ctx.channels[channelName]; + if (msgCount === 0 && !historyKeeperKeys[channelName] && chan && chan.indexOf(user) > -1) { + var key = {}; + key.channel = channelName; + if (validateKey) { + key.validateKey = validateKey; + } + if (owners) { + key.owners = owners; + } + if (expire) { + key.expire = expire; + } + historyKeeperKeys[channelName] = key; + storeMessage(ctx, chan, JSON.stringify(key), false, undefined); + sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify(key)]); + } + + // End of history message: + let parsedMsg = {state: 1, channel: channelName}; + sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify(parsedMsg)]); + }); + }); + } else if (parsed[0] === 'GET_HISTORY_RANGE') { + channelName = parsed[1]; + var map = parsed[2]; + if (!(map && typeof(map) === 'object')) { + return void sendMsg(ctx, user, [seq, 'ERROR', 'INVALID_ARGS', obj]); + } + + var oldestKnownHash = map.from; + var desiredMessages = map.count; + var desiredCheckpoint = map.cpCount; + var txid = map.txid; + if (typeof(desiredMessages) !== 'number' && typeof(desiredCheckpoint) !== 'number') { + return void sendMsg(ctx, user, [seq, 'ERROR', 'UNSPECIFIED_COUNT', obj]); + } + + if (!txid) { + return void sendMsg(ctx, user, [seq, 'ERROR', 'NO_TXID', obj]); + } + + sendMsg(ctx, user, [seq, 'ACK']); + return void getOlderHistory(channelName, oldestKnownHash, function (messages) { + var toSend = []; + if (typeof (desiredMessages) === "number") { + toSend = messages.slice(-desiredMessages); + } else { + let cpCount = 0; + for (var i = messages.length - 1; i >= 0; i--) { + if (/^cp\|/.test(messages[i][4]) && i !== (messages.length - 1)) { + cpCount++; + } + toSend.unshift(messages[i]); + if (cpCount >= desiredCheckpoint) { break; } + } + } + toSend.forEach(function (msg) { + sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, + JSON.stringify(['HISTORY_RANGE', txid, msg])]); + }); + + sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, + JSON.stringify(['HISTORY_RANGE_END', txid, channelName]) + ]); + }); + } else if (parsed[0] === 'GET_FULL_HISTORY') { + // parsed[1] is the channel id + // parsed[2] is a validation key (optionnal) + // parsed[3] is the last known hash (optionnal) + sendMsg(ctx, user, [seq, 'ACK']); + getHistoryAsync(ctx, parsed[1], -1, false, (msg, cb) => { + if (!msg) { return; } + sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify(['FULL_HISTORY', msg])], cb); + }, (err) => { + let parsedMsg = ['FULL_HISTORY_END', parsed[1]]; + if (err) { + console.error(err.stack); + parsedMsg = ['ERROR', parsed[1], err.message]; + } + sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify(parsedMsg)]); + }); + } else if (rpc) { + /* RPC Calls... */ + var rpc_call = parsed.slice(1); + + sendMsg(ctx, user, [seq, 'ACK']); + try { + // slice off the sequence number and pass in the rest of the message + rpc(ctx, rpc_call, function (err, output) { + if (err) { + sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify([parsed[0], 'ERROR', err])]); + return; + } + var msg = rpc_call[0].slice(); + if (msg[3] === 'REMOVE_OWNED_CHANNEL') { + onChannelDeleted(ctx, msg[4]); + } + if (msg[3] === 'CLEAR_OWNED_CHANNEL') { + onChannelCleared(ctx, msg[4]); + } + sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify([parsed[0]].concat(output))]); + }); + } catch (e) { + sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify([parsed[0], 'ERROR', 'SERVER_ERROR'])]); + } + } + }; + + // TODO logging + var cciLock = false; + const checkChannelIntegrity = function (ctx) { + if (process.env['CRYPTPAD_DEBUG'] && !cciLock) { + let nt = nThen; + cciLock = true; + Object.keys(ctx.channels).forEach(function (channelName) { + const chan = ctx.channels[channelName]; + if (!chan.index) { return; } + nt = nt((waitFor) => { + store.getChannelSize(channelName, waitFor((err, size) => { + if (err) { return void console.log("Couldn't get size of channel", channelName); } + if (size !== chan.index.size) { + console.log("channel size mismatch for", channelName, + "cached:", chan.index.size, "fileSize:", size); + } + })); + }).nThen; + }); + nt((waitFor) => { cciLock = false; }); + } + }; + + return { + id: HISTORY_KEEPER_ID, + setConfig: setConfig, + onChannelMessage: onChannelMessage, + dropChannel: dropChannel, + checkExpired: checkExpired, + onDirectMessage: onDirectMessage, + checkChannelIntegrity: checkChannelIntegrity + }; +}; + +}()); diff --git a/rpc.js b/rpc.js index 23e5b040c..81227b97c 100644 --- a/rpc.js +++ b/rpc.js @@ -1669,7 +1669,7 @@ RPC.create = function ( maxUploadSize: config.maxUploadSize || (20 * 1024 * 1024), Sessions: {}, paths: {}, - msgStore: (undefined /*:any*/), + msgStore: config.store, pinStore: (undefined /*:any*/), pinnedPads: {}, evPinnedPadsReady: mkEvent(true), diff --git a/server.js b/server.js index 0158c1b68..5375f1148 100644 --- a/server.js +++ b/server.js @@ -245,7 +245,9 @@ if (config.httpSafePort) { var wsConfig = { server: httpServer }; var rpc; +var historyKeeper; +// Initialize tasks, then rpc, then store, then history keeper and then start the server var nt = nThen(function (w) { if (!config.enableTaskScheduling) { return; } var Tasks = require("./storage/tasks"); @@ -253,6 +255,11 @@ var nt = nThen(function (w) { Tasks.create(config, w(function (e, tasks) { config.tasks = tasks; })); +}).nThen(function (w) { + if (config.useExternalWebsocket) { return; } + Storage.create(config, w(function (_store) { + config.store = _store; + })); }).nThen(function (w) { config.rpc = typeof(config.rpc) === 'undefined'? './rpc.js' : config.rpc; if (typeof(config.rpc) !== 'string') { return; } @@ -266,15 +273,22 @@ var nt = nThen(function (w) { rpc = _rpc; })); }).nThen(function () { - if(config.useExternalWebsocket) { return; } + if (config.useExternalWebsocket) { return; } + var HK = require('./historyKeeper.js'); + var hkConfig = { + tasks: config.tasks, + rpc: rpc, + store: config.store + }; + historyKeeper = HK.create(hkConfig); +}).nThen(function () { + if (config.useExternalWebsocket) { return; } if (websocketPort !== config.httpPort) { console.log("setting up a new websocket server"); wsConfig = { port: websocketPort}; } var wsSrv = new WebSocketServer(wsConfig); - Storage.create(config, function (store) { - NetfluxSrv.run(store, wsSrv, config, rpc); - }); + NetfluxSrv.run(wsSrv, config, historyKeeper); }); if (config.debugReplName) {