Merge branch 'callback' into staging

pull/1/head
ansuz 4 years ago
commit baab65fa36

@ -18,11 +18,11 @@ module.exports.create = function (Env, cb) {
id: Env.id,
channelMessage: function (Server, channel, msgStruct) {
channelMessage: function (Server, channel, msgStruct, cb) {
// netflux-server emits 'channelMessage' events whenever someone broadcasts to a channel
// historyKeeper stores these messages if the channel id indicates that they are
// a channel type with permanent history
HK.onChannelMessage(Env, Server, channel, msgStruct);
HK.onChannelMessage(Env, Server, channel, msgStruct, cb);
},
channelClose: function (channelName) {
// netflux-server emits 'channelClose' events whenever everyone leaves a channel

@ -300,9 +300,10 @@ var trimMapByOffset = function (map, offset) {
* the fix is to use callbacks and implement queueing for writes
* to guarantee that offset computation is always atomic with writes
*/
const storeMessage = function (Env, channel, msg, isCp, optionalMessageHash) {
const storeMessage = function (Env, channel, msg, isCp, optionalMessageHash, cb) {
const id = channel.id;
const Log = Env.Log;
if (typeof(cb) !== "function") { cb = function () {}; }
Env.queueStorage(id, function (next) {
const msgBin = Buffer.from(msg + '\n', 'utf8');
@ -321,6 +322,7 @@ const storeMessage = function (Env, channel, msg, isCp, optionalMessageHash) {
// TODO make it possible to respond to clients with errors so they know
// their message wasn't stored
cb(err);
return void next();
}
}));
@ -332,6 +334,8 @@ const storeMessage = function (Env, channel, msg, isCp, optionalMessageHash) {
if (err) {
Log.warn("HK_STORE_MESSAGE_INDEX", err.stack);
// non-critical, we'll be able to get the channel index later
// cb with no error so that the message is broadcast
cb();
return void next();
}
if (typeof (index.line) === "number") { index.line++; }
@ -357,13 +361,17 @@ const storeMessage = function (Env, channel, msg, isCp, optionalMessageHash) {
if (offsetCount < 0) {
Log.warn('OFFSET_TRIM_OOO', {
channel: id,
map: index.OffsetByHash
map: index.offsetByHash
});
} else if (offsetCount > 0) {
trimOffsetByOrder(index.offsetByHash, index.offsets);
index.offsets = checkOffsetMap(index.offsetByHash);
}
}
// Message stored, call back
cb();
index.size += msgBin.length;
// handle the next element in the queue
@ -445,6 +453,14 @@ const getHistoryOffset = (Env, channelName, lastKnownHash, _cb) => {
return void cb(new Error('EUNKNOWN'));
}
// If we asked for a lastKnownHash but didn't find it AND if
// this channel has checkpoints, send EUNKNOWN so that the
// client can ask for normal history (without lastKnownHash)
if (lastKnownHash && !lkh && index.cpIndex.length) {
waitFor.abort();
return void cb(new Error('EUNKNOWN'));
}
// Otherwise use our lastKnownHash
cb(null, lkh);
}));
@ -496,7 +512,7 @@ const getHistoryAsync = (Env, channelName, lastKnownHash, beforeHash, handler, c
const start = (beforeHash) ? 0 : offset;
store.readMessagesBin(channelName, start, (msgObj, readMore, abort) => {
if (beforeHash && msgObj.offset >= offset) { return void abort(); }
var parsed = tryParse(Env, msgObj.buff.toString('utf8'));
const parsed = tryParse(Env, msgObj.buff.toString('utf8'));
if (!parsed) { return void readMore(); }
handler(parsed, readMore);
}, waitFor(function (err) {
@ -846,7 +862,9 @@ HK.onDirectMessage = function (Env, Server, seq, userId, json) {
* adds timestamps to incoming messages
* writes messages to the store
*/
HK.onChannelMessage = function (Env, Server, channel, msgStruct) {
HK.onChannelMessage = function (Env, Server, channel, msgStruct, cb) {
if (typeof(cb) !== "function") { cb = function () {}; }
//console.log(+new Date(), "onChannelMessage");
const Log = Env.Log;
@ -856,7 +874,7 @@ HK.onChannelMessage = function (Env, Server, channel, msgStruct) {
// we should probably just change this to expect a channel id directly
// don't store messages if the channel id indicates that it's an ephemeral message
if (!channel.id || channel.id.length === EPHEMERAL_CHANNEL_LENGTH) { return; }
if (!channel.id || channel.id.length === EPHEMERAL_CHANNEL_LENGTH) { return void cb(); }
const isCp = /^cp\|/.test(msgStruct[4]);
let id;
@ -868,7 +886,7 @@ HK.onChannelMessage = function (Env, Server, channel, msgStruct) {
// more straightforward and reliable.
if (Array.isArray(id) && id[2] && id[2] === channel.lastSavedCp) {
// Reject duplicate checkpoints
return;
return void cb('DUPLICATE');
}
}
@ -881,7 +899,10 @@ HK.onChannelMessage = function (Env, Server, channel, msgStruct) {
metadata = _metadata;
// don't write messages to expired channels
if (checkExpired(Env, Server, channel)) { return void w.abort(); }
if (checkExpired(Env, Server, channel)) {
cb('EEXPIRED');
return void w.abort();
}
}));
}).nThen(function (w) {
// if there's no validateKey present skip to the next block
@ -910,6 +931,7 @@ HK.onChannelMessage = function (Env, Server, channel, msgStruct) {
Log.info("HK_SIGNED_MESSAGE_REJECTED", 'Channel '+channel.id);
}
// always abort if there was an error...
cb('FAILED_VALIDATION');
return void w.abort();
});
});
@ -942,7 +964,7 @@ HK.onChannelMessage = function (Env, Server, channel, msgStruct) {
// storeMessage
//console.log(+new Date(), "Storing message");
storeMessage(Env, channel, JSON.stringify(msgStruct), isCp, getHash(msgStruct[4], Log));
storeMessage(Env, channel, JSON.stringify(msgStruct), isCp, getHash(msgStruct[4], Log), cb);
//console.log(+new Date(), "Message stored");
});
};

@ -348,7 +348,10 @@ const getOlderHistory = function (data, cb) {
if (hash === oldestKnownHash) {
found = true;
}
messages.push(parsed);
messages.push({
msg: parsed,
hash: hash,
});
}, function (err) {
var toSend = [];
if (typeof (desiredMessages) === "number") {
@ -356,14 +359,14 @@ const getOlderHistory = function (data, cb) {
} else if (untilHash) {
for (var j = messages.length - 1; j >= 0; j--) {
toSend.unshift(messages[j]);
if (Array.isArray(messages[j]) && HK.getHash(messages[j][4]) === untilHash) {
if (messages[j] && messages[j].hash === untilHash) {
break;
}
}
} else {
let cpCount = 0;
for (var i = messages.length - 1; i >= 0; i--) {
if (/^cp\|/.test(messages[i][4]) && i !== (messages.length - 1)) {
if (/^cp\|/.test(messages[i].msg[4]) && i !== (messages.length - 1)) {
cpCount++;
}
toSend.unshift(messages[i]);

@ -2070,6 +2070,7 @@ define([
//var decryptedMsg = crypto.decrypt(msg, true);
if (data.debug) {
msgs.push({
serverHash: msg.slice(0,64),
msg: msg,
author: parsed[1][1],
time: parsed[1][5]

@ -1239,6 +1239,7 @@ define([
if (typeof(_msg) === "object") {
decryptedMsgs.push({
author: _msg.author,
serverHash: _msg.serverHash,
time: _msg.time,
msg: crypto.decrypt(_msg.msg, true, true)
});

@ -35,6 +35,9 @@
}
.cp-app-debug-progress, .cp-app-debug-init {
text-align: center;
input {
width: auto !important;
}
}
#cp-app-debug-loading {
text-align: center;

Loading…
Cancel
Save