diff --git a/lib/historyKeeper.js b/lib/historyKeeper.js index cfdb14717..30b311eb7 100644 --- a/lib/historyKeeper.js +++ b/lib/historyKeeper.js @@ -18,11 +18,11 @@ module.exports.create = function (Env, cb) { id: Env.id, - channelMessage: function (Server, channel, msgStruct) { + channelMessage: function (Server, channel, msgStruct, cb) { // netflux-server emits 'channelMessage' events whenever someone broadcasts to a channel // historyKeeper stores these messages if the channel id indicates that they are // a channel type with permanent history - HK.onChannelMessage(Env, Server, channel, msgStruct); + HK.onChannelMessage(Env, Server, channel, msgStruct, cb); }, channelClose: function (channelName) { // netflux-server emits 'channelClose' events whenever everyone leaves a channel diff --git a/lib/hk-util.js b/lib/hk-util.js index cea5e5636..f141bf35a 100644 --- a/lib/hk-util.js +++ b/lib/hk-util.js @@ -300,9 +300,10 @@ var trimMapByOffset = function (map, offset) { * the fix is to use callbacks and implement queueing for writes * to guarantee that offset computation is always atomic with writes */ -const storeMessage = function (Env, channel, msg, isCp, optionalMessageHash) { +const storeMessage = function (Env, channel, msg, isCp, optionalMessageHash, cb) { const id = channel.id; const Log = Env.Log; + if (typeof(cb) !== "function") { cb = function () {}; } Env.queueStorage(id, function (next) { const msgBin = Buffer.from(msg + '\n', 'utf8'); @@ -321,6 +322,7 @@ const storeMessage = function (Env, channel, msg, isCp, optionalMessageHash) { // TODO make it possible to respond to clients with errors so they know // their message wasn't stored + cb(err); return void next(); } })); @@ -332,6 +334,8 @@ const storeMessage = function (Env, channel, msg, isCp, optionalMessageHash) { if (err) { Log.warn("HK_STORE_MESSAGE_INDEX", err.stack); // non-critical, we'll be able to get the channel index later + // cb with no error so that the message is broadcast + cb(); return void next(); } if (typeof (index.line) === "number") { index.line++; } @@ -357,13 +361,17 @@ const storeMessage = function (Env, channel, msg, isCp, optionalMessageHash) { if (offsetCount < 0) { Log.warn('OFFSET_TRIM_OOO', { channel: id, - map: index.OffsetByHash + map: index.offsetByHash }); } else if (offsetCount > 0) { trimOffsetByOrder(index.offsetByHash, index.offsets); index.offsets = checkOffsetMap(index.offsetByHash); } } + + // Message stored, call back + cb(); + index.size += msgBin.length; // handle the next element in the queue @@ -445,6 +453,14 @@ const getHistoryOffset = (Env, channelName, lastKnownHash, _cb) => { return void cb(new Error('EUNKNOWN')); } + // If we asked for a lastKnownHash but didn't find it AND if + // this channel has checkpoints, send EUNKNOWN so that the + // client can ask for normal history (without lastKnownHash) + if (lastKnownHash && !lkh && index.cpIndex.length) { + waitFor.abort(); + return void cb(new Error('EUNKNOWN')); + } + // Otherwise use our lastKnownHash cb(null, lkh); })); @@ -496,7 +512,7 @@ const getHistoryAsync = (Env, channelName, lastKnownHash, beforeHash, handler, c const start = (beforeHash) ? 0 : offset; store.readMessagesBin(channelName, start, (msgObj, readMore, abort) => { if (beforeHash && msgObj.offset >= offset) { return void abort(); } - var parsed = tryParse(Env, msgObj.buff.toString('utf8')); + const parsed = tryParse(Env, msgObj.buff.toString('utf8')); if (!parsed) { return void readMore(); } handler(parsed, readMore); }, waitFor(function (err) { @@ -846,7 +862,9 @@ HK.onDirectMessage = function (Env, Server, seq, userId, json) { * adds timestamps to incoming messages * writes messages to the store */ -HK.onChannelMessage = function (Env, Server, channel, msgStruct) { +HK.onChannelMessage = function (Env, Server, channel, msgStruct, cb) { + if (typeof(cb) !== "function") { cb = function () {}; } + //console.log(+new Date(), "onChannelMessage"); const Log = Env.Log; @@ -856,7 +874,7 @@ HK.onChannelMessage = function (Env, Server, channel, msgStruct) { // we should probably just change this to expect a channel id directly // don't store messages if the channel id indicates that it's an ephemeral message - if (!channel.id || channel.id.length === EPHEMERAL_CHANNEL_LENGTH) { return; } + if (!channel.id || channel.id.length === EPHEMERAL_CHANNEL_LENGTH) { return void cb(); } const isCp = /^cp\|/.test(msgStruct[4]); let id; @@ -868,7 +886,7 @@ HK.onChannelMessage = function (Env, Server, channel, msgStruct) { // more straightforward and reliable. if (Array.isArray(id) && id[2] && id[2] === channel.lastSavedCp) { // Reject duplicate checkpoints - return; + return void cb('DUPLICATE'); } } @@ -881,7 +899,10 @@ HK.onChannelMessage = function (Env, Server, channel, msgStruct) { metadata = _metadata; // don't write messages to expired channels - if (checkExpired(Env, Server, channel)) { return void w.abort(); } + if (checkExpired(Env, Server, channel)) { + cb('EEXPIRED'); + return void w.abort(); + } })); }).nThen(function (w) { // if there's no validateKey present skip to the next block @@ -910,6 +931,7 @@ HK.onChannelMessage = function (Env, Server, channel, msgStruct) { Log.info("HK_SIGNED_MESSAGE_REJECTED", 'Channel '+channel.id); } // always abort if there was an error... + cb('FAILED_VALIDATION'); return void w.abort(); }); }); @@ -942,7 +964,7 @@ HK.onChannelMessage = function (Env, Server, channel, msgStruct) { // storeMessage //console.log(+new Date(), "Storing message"); - storeMessage(Env, channel, JSON.stringify(msgStruct), isCp, getHash(msgStruct[4], Log)); + storeMessage(Env, channel, JSON.stringify(msgStruct), isCp, getHash(msgStruct[4], Log), cb); //console.log(+new Date(), "Message stored"); }); }; diff --git a/lib/workers/db-worker.js b/lib/workers/db-worker.js index 5750ff7ac..a8aa8f154 100644 --- a/lib/workers/db-worker.js +++ b/lib/workers/db-worker.js @@ -348,7 +348,10 @@ const getOlderHistory = function (data, cb) { if (hash === oldestKnownHash) { found = true; } - messages.push(parsed); + messages.push({ + msg: parsed, + hash: hash, + }); }, function (err) { var toSend = []; if (typeof (desiredMessages) === "number") { @@ -356,14 +359,14 @@ const getOlderHistory = function (data, cb) { } else if (untilHash) { for (var j = messages.length - 1; j >= 0; j--) { toSend.unshift(messages[j]); - if (Array.isArray(messages[j]) && HK.getHash(messages[j][4]) === untilHash) { + if (messages[j] && messages[j].hash === untilHash) { break; } } } else { let cpCount = 0; for (var i = messages.length - 1; i >= 0; i--) { - if (/^cp\|/.test(messages[i][4]) && i !== (messages.length - 1)) { + if (/^cp\|/.test(messages[i].msg[4]) && i !== (messages.length - 1)) { cpCount++; } toSend.unshift(messages[i]); diff --git a/www/common/outer/async-store.js b/www/common/outer/async-store.js index 32cf421d5..77fded8fa 100644 --- a/www/common/outer/async-store.js +++ b/www/common/outer/async-store.js @@ -2070,6 +2070,7 @@ define([ //var decryptedMsg = crypto.decrypt(msg, true); if (data.debug) { msgs.push({ + serverHash: msg.slice(0,64), msg: msg, author: parsed[1][1], time: parsed[1][5] diff --git a/www/common/sframe-common-outer.js b/www/common/sframe-common-outer.js index 6a9df3f6d..fcc3d4274 100644 --- a/www/common/sframe-common-outer.js +++ b/www/common/sframe-common-outer.js @@ -1239,6 +1239,7 @@ define([ if (typeof(_msg) === "object") { decryptedMsgs.push({ author: _msg.author, + serverHash: _msg.serverHash, time: _msg.time, msg: crypto.decrypt(_msg.msg, true, true) }); diff --git a/www/debug/app-debug.less b/www/debug/app-debug.less index ee7d43152..9a85832f9 100644 --- a/www/debug/app-debug.less +++ b/www/debug/app-debug.less @@ -35,6 +35,9 @@ } .cp-app-debug-progress, .cp-app-debug-init { text-align: center; + input { + width: auto !important; + } } #cp-app-debug-loading { text-align: center;