diff --git a/historyKeeper.js b/historyKeeper.js index a5a371936..07a555e5f 100644 --- a/historyKeeper.js +++ b/historyKeeper.js @@ -1,5 +1,5 @@ /* jshint esversion: 6 */ -/* global Buffer, process */ +/* global Buffer */ ;(function () { 'use strict'; const nThen = require('nthen'); @@ -7,9 +7,11 @@ const Nacl = require('tweetnacl'); const Crypto = require('crypto'); const Once = require("./lib/once"); const Meta = require("./lib/metadata"); +const WriteQueue = require("./lib/write-queue"); let Log; const now = function () { return (new Date()).getTime(); }; +const ONE_DAY = 1000 * 60 * 60 * 24; // one day in milliseconds /* getHash * this function slices off the leading portion of a message which is @@ -80,6 +82,7 @@ module.exports.create = function (cfg) { const rpc = cfg.rpc; const tasks = cfg.tasks; const store = cfg.store; + const retainData = cfg.retainData; Log = cfg.log; Log.silly('HK_LOADING', 'LOADING HISTORY_KEEPER MODULE'); @@ -302,88 +305,135 @@ module.exports.create = function (cfg) { * the fix is to use callbacks and implement queueing for writes * to guarantee that offset computation is always atomic with writes */ - const storageQueues = {}; + const queueStorage = WriteQueue(); - const storeQueuedMessage = function (ctx, queue, id) { - if (queue.length === 0) { - delete storageQueues[id]; - return; - } - - const first = queue.shift(); - - const msgBin = first.msg; - const optionalMessageHash = first.hash; - const isCp = first.isCp; - - // Store the message first, and update the index only once it's stored. - // store.messageBin can be async so updating the index first may - // result in a wrong cpIndex - nThen((waitFor) => { - store.messageBin(id, msgBin, waitFor(function (err) { - if (err) { - waitFor.abort(); - Log.error("HK_STORE_MESSAGE_ERROR", err.message); - - // this error is critical, but there's not much we can do at the moment - // proceed with more messages, but they'll probably fail too - // at least you won't have a memory leak - - // TODO make it possible to respond to clients with errors so they know - // their message wasn't stored - storeQueuedMessage(ctx, queue, id); - return; - } - })); - }).nThen((waitFor) => { - getIndex(ctx, id, waitFor((err, index) => { - if (err) { - Log.warn("HK_STORE_MESSAGE_INDEX", err.stack); - // non-critical, we'll be able to get the channel index later + const storeMessage = function (ctx, channel, msg, isCp, optionalMessageHash) { + const id = channel.id; + const msgBin = new Buffer(msg + '\n', 'utf8'); - // proceed to the next message in the queue - storeQueuedMessage(ctx, queue, id); - return; - } - if (typeof (index.line) === "number") { index.line++; } - if (isCp) { - index.cpIndex = sliceCpIndex(index.cpIndex, index.line || 0); - for (let k in index.offsetByHash) { - if (index.offsetByHash[k] < index.cpIndex[0]) { - delete index.offsetByHash[k]; + queueStorage(id, function (next) { + // Store the message first, and update the index only once it's stored. + // store.messageBin can be async so updating the index first may + // result in a wrong cpIndex + nThen((waitFor) => { + store.messageBin(id, msgBin, waitFor(function (err) { + if (err) { + waitFor.abort(); + Log.error("HK_STORE_MESSAGE_ERROR", err.message); + + // this error is critical, but there's not much we can do at the moment + // proceed with more messages, but they'll probably fail too + // at least you won't have a memory leak + + // TODO make it possible to respond to clients with errors so they know + // their message wasn't stored + return void next(); + } + })); + }).nThen((waitFor) => { + getIndex(ctx, id, waitFor((err, index) => { + if (err) { + Log.warn("HK_STORE_MESSAGE_INDEX", err.stack); + // non-critical, we'll be able to get the channel index later + return void next(); + } + if (typeof (index.line) === "number") { index.line++; } + if (isCp) { + index.cpIndex = sliceCpIndex(index.cpIndex, index.line || 0); + for (let k in index.offsetByHash) { + if (index.offsetByHash[k] < index.cpIndex[0]) { + delete index.offsetByHash[k]; + } } + index.cpIndex.push(({ + offset: index.size, + line: ((index.line || 0) + 1) + } /*:cp_index_item*/)); } - index.cpIndex.push(({ - offset: index.size, - line: ((index.line || 0) + 1) - } /*:cp_index_item*/)); - } - if (optionalMessageHash) { index.offsetByHash[optionalMessageHash] = index.size; } - index.size += msgBin.length; + if (optionalMessageHash) { index.offsetByHash[optionalMessageHash] = index.size; } + index.size += msgBin.length; - // handle the next element in the queue - storeQueuedMessage(ctx, queue, id); - })); + // handle the next element in the queue + next(); + })); + }); }); }; - const storeMessage = function (ctx, channel, msg, isCp, optionalMessageHash) { - const id = channel.id; + /* historyKeeperBroadcast + * uses API from the netflux server to send messages to every member of a channel + * sendMsg runs in a try-catch and drops users if sending a message fails + */ + const historyKeeperBroadcast = function (ctx, channel, msg) { + let chan = ctx.channels[channel] || (([] /*:any*/) /*:Chan_t*/); + chan.forEach(function (user) { + sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify(msg)]); + }); + }; - const msgBin = new Buffer(msg + '\n', 'utf8'); - if (Array.isArray(storageQueues[id])) { - return void storageQueues[id].push({ - msg: msgBin, - hash: optionalMessageHash, - isCp: isCp, + /* expireChannel is here to clean up channels that should have been removed + but for some reason are still present + */ + const expireChannel = function (ctx, channel) { + if (retainData) { + return void store.archiveChannel(channel, function (err) { + Log.info("ARCHIVAL_CHANNEL_BY_HISTORY_KEEPER_EXPIRATION", { + channelId: channel, + status: err? String(err): "SUCCESS", + }); }); } - const queue = storageQueues[id] = (storageQueues[id] || [{ - msg: msgBin, - hash: optionalMessageHash, - }]); - storeQueuedMessage(ctx, queue, id); + store.removeChannel(channel, function (err) { + Log.info("DELETION_CHANNEL_BY_HISTORY_KEEPER_EXPIRATION", { + channelid: channel, + status: err? String(err): "SUCCESS", + }); + }); + }; + + /* checkExpired + * synchronously returns true or undefined to indicate whether the channel is expired + * according to its metadata + * has some side effects: + * closes the channel via the store.closeChannel API + * and then broadcasts to all channel members that the channel has expired + * removes the channel from the netflux-server's in-memory cache + * removes the channel metadata from history keeper's in-memory cache + + FIXME the boolean nature of this API should be separated from its side effects + */ + const checkExpired = function (ctx, channel) { + if (!(channel && channel.length === STANDARD_CHANNEL_LENGTH)) { return false; } + let metadata = metadata_cache[channel]; + if (!(metadata && typeof(metadata.expire) === 'number')) { return false; } + + // the number of milliseconds ago the channel should have expired + let pastDue = (+new Date()) - metadata.expire; + + // less than zero means that it hasn't expired yet + if (pastDue < 0) { return false; } + + // if it should have expired more than a day ago... + // there may have been a problem with scheduling tasks + // or the scheduled tasks may not be running + // so trigger a removal from here + if (pastDue >= ONE_DAY) { expireChannel(ctx, channel); } + + // close the channel + store.closeChannel(channel, function () { + historyKeeperBroadcast(ctx, channel, { + error: 'EEXPIRED', + channel: channel + }); + // remove it from any caches after you've told anyone in the channel + // that it has expired + delete ctx.channels[channel]; + delete metadata_cache[channel]; + }); + + // return true to indicate that it has expired + return true; }; var CHECKPOINT_PATTERN = /^cp\|(([A-Za-z0-9+\/=]+)\|)?/; @@ -436,12 +486,8 @@ module.exports.create = function (cfg) { metadata = index.metadata; - if (metadata.expire && metadata.expire < +new Date()) { - // don't store message sent to expired channels - w.abort(); - return; - // TODO if a channel expired a long time ago but it's still here, remove it - } + // don't write messages to expired channels + if (checkExpired(ctx, channel)) { return void w.abort(); } // if there's no validateKey present skip to the next block if (!metadata.validateKey) { return; } @@ -674,26 +720,6 @@ module.exports.create = function (cfg) { }); }; - /*:: - type Chan_t = { - indexOf: (any)=>number, - id: string, - lastSavedCp: string, - forEach: ((any)=>void)=>void, - push: (any)=>void, - }; - */ - - /* historyKeeperBroadcast - * uses API from the netflux server to send messages to every member of a channel - * sendMsg runs in a try-catch and drops users if sending a message fails - */ - const historyKeeperBroadcast = function (ctx, channel, msg) { - let chan = ctx.channels[channel] || (([] /*:any*/) /*:Chan_t*/); - chan.forEach(function (user) { - sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify(msg)]); - }); - }; /* onChannelCleared * broadcasts to all clients in a channel if that channel is deleted @@ -729,33 +755,6 @@ module.exports.create = function (cfg) { } }; - /* checkExpired - * synchronously returns true or undefined to indicate whether the channel is expired - * according to its metadata - * has some side effects: - * closes the channel via the store.closeChannel API - * and then broadcasts to all channel members that the channel has expired - * removes the channel from the netflux-server's in-memory cache - * removes the channel metadata from history keeper's in-memory cache - - FIXME the boolean nature of this API should be separated from its side effects - */ - const checkExpired = function (ctx, channel) { - if (channel && channel.length === STANDARD_CHANNEL_LENGTH && metadata_cache[channel] && - metadata_cache[channel].expire && metadata_cache[channel].expire < +new Date()) { - store.closeChannel(channel, function () { - historyKeeperBroadcast(ctx, channel, { - error: 'EEXPIRED', - channel: channel - }); - }); - delete ctx.channels[channel]; - delete metadata_cache[channel]; - return true; - } - return; - }; - /* onDirectMessage * exported for use by the netflux-server * parses and handles all direct messages directed to the history keeper @@ -772,7 +771,6 @@ module.exports.create = function (cfg) { const onDirectMessage = function (ctx, seq, user, json) { let parsed; let channelName; - let obj = HISTORY_KEEPER_ID; Log.silly('HK_MESSAGE', json); @@ -809,6 +807,7 @@ module.exports.create = function (cfg) { } } metadata.channel = channelName; + metadata.created = +new Date(); // if the user sends us an invalid key, we won't be able to validate their messages // so they'll never get written to the log anyway. Let's just drop their message @@ -913,7 +912,7 @@ module.exports.create = function (cfg) { channelName = parsed[1]; var map = parsed[2]; if (!(map && typeof(map) === 'object')) { - return void sendMsg(ctx, user, [seq, 'ERROR', 'INVALID_ARGS', obj]); + return void sendMsg(ctx, user, [seq, 'ERROR', 'INVALID_ARGS', HISTORY_KEEPER_ID]); } var oldestKnownHash = map.from; @@ -921,11 +920,11 @@ module.exports.create = function (cfg) { var desiredCheckpoint = map.cpCount; var txid = map.txid; if (typeof(desiredMessages) !== 'number' && typeof(desiredCheckpoint) !== 'number') { - return void sendMsg(ctx, user, [seq, 'ERROR', 'UNSPECIFIED_COUNT', obj]); + return void sendMsg(ctx, user, [seq, 'ERROR', 'UNSPECIFIED_COUNT', HISTORY_KEEPER_ID]); } if (!txid) { - return void sendMsg(ctx, user, [seq, 'ERROR', 'NO_TXID', obj]); + return void sendMsg(ctx, user, [seq, 'ERROR', 'NO_TXID', HISTORY_KEEPER_ID]); } sendMsg(ctx, user, [seq, 'ACK']); @@ -1024,33 +1023,6 @@ module.exports.create = function (cfg) { } }; - var cciLock = false; - const checkChannelIntegrity = function (ctx) { - if (process.env['CRYPTPAD_DEBUG'] && !cciLock) { - let nt = nThen; - cciLock = true; - Object.keys(ctx.channels).forEach(function (channelName) { - const chan = ctx.channels[channelName]; - if (!chan.index) { return; } - nt = nt((waitFor) => { - store.getChannelSize(channelName, waitFor((err, size) => { - if (err) { - return void Log.debug("HK_CHECK_CHANNEL_INTEGRITY", - "Couldn't get size of channel " + channelName); - } - if (size !== chan.index.size) { - return void Log.debug("HK_CHECK_CHANNEL_SIZE", - "channel size mismatch for " + channelName + - " --- cached: " + chan.index.size + - " --- fileSize: " + size); - } - })); - }).nThen; - }); - nt(() => { cciLock = false; }); - } - }; - return { id: HISTORY_KEEPER_ID, setConfig: setConfig, @@ -1058,7 +1030,6 @@ module.exports.create = function (cfg) { dropChannel: dropChannel, checkExpired: checkExpired, onDirectMessage: onDirectMessage, - checkChannelIntegrity: checkChannelIntegrity }; }; diff --git a/lib/write-queue.js b/lib/write-queue.js new file mode 100644 index 000000000..c1b64ebaf --- /dev/null +++ b/lib/write-queue.js @@ -0,0 +1,40 @@ +/* +var q = Queue(); +q(id, function (next) { + // whatever you need to do.... + + // when you're done + next(); +}); +*/ + +var fix1 = function (f, x) { + return function () { f(x); }; +}; + +module.exports = function () { + var map = {}; + + var next = function (id) { + if (map[id] && map[id].length === 0) { return void delete map[id]; } + var task = map[id].shift(); + task(fix1(next, id)); + }; + + return function (id, task) { + // support initialization with just a function + if (typeof(id) === 'function' && typeof(task) === 'undefined') { + task = id; + id = ''; + } + // ...but you really need to pass a function + if (typeof(task) !== 'function') { throw new Error("Expected function"); } + + // if the intended queue already has tasks in progress, add this one to the end of the queue + if (map[id]) { return void map[id].push(task); } + + // otherwise create a queue containing the given task + map[id] = [task]; + next(id); + }; +}; diff --git a/rpc.js b/rpc.js index fd0c11f35..97b317754 100644 --- a/rpc.js +++ b/rpc.js @@ -18,7 +18,7 @@ const nThen = require("nthen"); const getFolderSize = require("get-folder-size"); const Pins = require("./lib/pins"); const Meta = require("./lib/metadata"); - +const WriteQueue = require("./lib/write-queue"); var RPC = module.exports; @@ -340,8 +340,7 @@ var getMetadata = function (Env, channel, cb) { value: value } */ -// XXX global saferphore may cause issues here, a queue "per channel" is probably better -var metadataSem = Saferphore.create(1); +var queueMetadata = WriteQueue(); var setMetadata = function (Env, data, unsafeKey, cb) { var channel = data.channel; var command = data.command; @@ -349,16 +348,15 @@ var setMetadata = function (Env, data, unsafeKey, cb) { if (!command || typeof (command) !== 'string') { return void cb ('INVALID_COMMAND'); } if (Meta.commands.indexOf(command) === -1) { return void('UNSUPPORTED_COMMAND'); } - metadataSem.take(function (give) { - var g = give(); + queueMetadata(channel, function (next) { getMetadata(Env, channel, function (err, metadata) { if (err) { - g(); - return void cb(err); + cb(err); + return void next(); } if (!(metadata && Array.isArray(metadata.owners))) { - g(); - return void cb('E_NO_OWNERS'); + cb('E_NO_OWNERS'); + return void next(); } // Confirm that the channel is owned by the user in question @@ -372,13 +370,13 @@ var setMetadata = function (Env, data, unsafeKey, cb) { || !Array.isArray(data.value) || data.value.length !== 1 || data.value[0] !== unsafeKey) { - g(); - return void cb('INSUFFICIENT_PERMISSIONS'); + cb('INSUFFICIENT_PERMISSIONS'); + return void next(); } } else if (metadata.owners.indexOf(unsafeKey) === -1) { - g(); - return void cb('INSUFFICIENT_PERMISSIONS'); + cb('INSUFFICIENT_PERMISSIONS'); + return void next(); } // Add the new metadata line @@ -387,22 +385,23 @@ var setMetadata = function (Env, data, unsafeKey, cb) { try { changed = Meta.handleCommand(metadata, line); } catch (e) { - g(); - return void cb(e); + cb(e); + return void next(); } // if your command is valid but it didn't result in any change to the metadata, // call back now and don't write any "useless" line to the log if (!changed) { - g(); - return void cb(void 0, metadata); + cb(void 0, metadata); + return void next(); } Env.msgStore.writeMetadata(channel, JSON.stringify(line), function (e) { - g(); if (e) { - return void cb(e); + cb(e); + return void next(); } cb(void 0, metadata); + next(); }); }); }); @@ -1884,8 +1883,6 @@ RPC.create = function ( }; var rpc0 = function (ctx, data, respond) { - if (!Env.msgStore) { Env.msgStore = ctx.store; } - if (!Array.isArray(data)) { Log.debug('INVALID_ARG_FORMET', data); return void respond('INVALID_ARG_FORMAT'); diff --git a/server.js b/server.js index c0fd4c194..93d4f7af8 100644 --- a/server.js +++ b/server.js @@ -321,7 +321,8 @@ var nt = nThen(function (w) { tasks: config.tasks, rpc: rpc, store: config.store, - log: log + log: log, + retainData: Boolean(config.retainData), }; historyKeeper = HK.create(hkConfig); }).nThen(function () { diff --git a/www/common/outer/mailbox.js b/www/common/outer/mailbox.js index 898324691..ac8c1aa95 100644 --- a/www/common/outer/mailbox.js +++ b/www/common/outer/mailbox.js @@ -89,7 +89,6 @@ proxy.mailboxes = { if (!anonRpc) { return void cb({error: "anonymous rpc session not ready"}); } var crypto = Crypto.Mailbox.createEncryptor(keys); - var network = ctx.store.network; var text = JSON.stringify({ type: type, @@ -100,7 +99,7 @@ proxy.mailboxes = { anonRpc.send("WRITE_PRIVATE_MESSAGE", [ user.channel, ciphertext - ], function (err, response) { + ], function (err /*, response */) { if (err) { return void cb({ error: err,