From ed5d97f899e7b3b4bb94e36442b64e2a70c6f41b Mon Sep 17 00:00:00 2001 From: ansuz Date: Wed, 1 Apr 2020 11:48:16 -0400 Subject: [PATCH] WIP history-keeper fixes --- lib/hk-util.js | 17 +++++--- lib/workers/db-worker.js | 6 ++- scripts/tests/test-mailbox.js | 81 +++++++++++++++++++++++++++-------- 3 files changed, 79 insertions(+), 25 deletions(-) diff --git a/lib/hk-util.js b/lib/hk-util.js index c0da609ee..8a0d5fc86 100644 --- a/lib/hk-util.js +++ b/lib/hk-util.js @@ -395,7 +395,7 @@ const storeMessage = function (Env, channel, msg, isCp, optionalMessageHash) { * -1 if you didn't find it */ -const getHistoryOffset = (Env, channelName, lastKnownHash, _cb) => { +const getHistoryOffset = (Env, channelName, lastKnownHash, _cb) => { // XXX const cb = Util.once(Util.mkAsync(_cb)); // lastKnownhash === -1 means we want the complete history @@ -417,8 +417,9 @@ const getHistoryOffset = (Env, channelName, lastKnownHash, _cb) => { // without specifying the hash, and just trust the server to give them the relevant data. // QUESTION: does this mean mailboxes are causing the server to store too much stuff in memory? if (lastKnownHash && typeof(lkh) !== "number") { - waitFor.abort(); - return void cb(new Error('EINVAL')); + return; // XXX fall through to find the offset since it isn't cached + //waitFor.abort(); + //return void cb(new Error('EINVAL')); } // Since last 2 checkpoints @@ -474,7 +475,7 @@ const getHistoryAsync = (Env, channelName, lastKnownHash, beforeHash, handler, c let offset = -1; nThen((waitFor) => { - getHistoryOffset(Env, channelName, lastKnownHash, waitFor((err, os) => { + getHistoryOffset(Env, channelName, lastKnownHash, waitFor((err, os) => { // XXX if (err) { waitFor.abort(); return void cb(err); @@ -482,7 +483,11 @@ const getHistoryAsync = (Env, channelName, lastKnownHash, beforeHash, handler, c offset = os; })); }).nThen((waitFor) => { - if (offset === -1) { return void cb(new Error("could not find offset")); } + if (offset === -1) { + return void cb(new Error('EUNKNOWN')); + console.log(lastKnownHash); + return void cb(new Error("could not find offset")); // XXX EUNKNOWN + } const start = (beforeHash) ? 0 : offset; store.readMessagesBin(channelName, start, (msgObj, readMore, abort) => { if (beforeHash && msgObj.offset >= offset) { return void abort(); } @@ -630,7 +635,7 @@ const handleGetHistory = function (Env, Server, seq, userId, parsed) { }, (err) => { if (err && err.code !== 'ENOENT') { if (err.message !== 'EINVAL') { Log.error("HK_GET_HISTORY", err); } - const parsedMsg = {error:err.message, channel: channelName, txid: txid}; + const parsedMsg = {error:err.message, channel: channelName, txid: txid}; // XXX Server.send(userId, [0, HISTORY_KEEPER_ID, 'MSG', userId, JSON.stringify(parsedMsg)]); return; } diff --git a/lib/workers/db-worker.js b/lib/workers/db-worker.js index 65a381479..0b9de4f53 100644 --- a/lib/workers/db-worker.js +++ b/lib/workers/db-worker.js @@ -153,7 +153,8 @@ const computeIndex = function (data, cb) { messageBuf = []; } } else if (messageBuf.length > 100 && cpIndex.length === 0) { - messageBuf = messageBuf.slice(0, 50); + // take the last 50 messages + messageBuf = messageBuf.slice(-50); } // if it's not metadata or a checkpoint then it should be a regular message // store it in the buffer @@ -352,7 +353,8 @@ const getMultipleFileSize = function (data, cb) { const getHashOffset = function (data, cb) { const channelName = data.channel; - const lastKnownHash = data.lastKnownHash; + const lastKnownHash = data.hash; + if (typeof(lastKnownHash) !== 'string') { return void cb("INVALID_HASH"); } var offset = -1; store.readMessagesBin(channelName, 0, (msgObj, readMore, abort) => { diff --git a/scripts/tests/test-mailbox.js b/scripts/tests/test-mailbox.js index b0418c5db..d152cf0f2 100644 --- a/scripts/tests/test-mailbox.js +++ b/scripts/tests/test-mailbox.js @@ -11,8 +11,14 @@ var Hash = require("../../www/common/common-hash"); var CpNetflux = require("../../www/bower_components/chainpad-netflux"); var Util = require("../../lib/common-util"); -var createMailbox = function (config, cb) { +// you need more than 100 messages in the history, and you need a lastKnownHash between "50" and "length - 50" + +var createMailbox = function (config, _cb) { + var cb = Util.once(Util.mkAsync(_cb)); + var webchannel; + var user = config.user; + user.messages = []; CpNetflux.start({ network: config.network, @@ -21,11 +27,16 @@ var createMailbox = function (config, cb) { owners: [ config.edPublic ], noChainPad: true, + + lastKnownHash: config.lastKnownHash, + onChannelError: function (err) { + cb(err); + }, onConnect: function (wc /*, sendMessage */) { webchannel = wc; }, - onMessage: function (/* msg, user, vKey, isCp, hash, author */) { - + onMessage: function (msg /*, user, vKey, isCp, hash, author */) { + user.messages.push(msg); }, onReady: function () { cb(void 0, webchannel); @@ -37,6 +48,8 @@ process.on('unhandledRejection', function (err) { console.error(err); }); +var state = {}; + var makeCurveKeys = function () { var pair = Nacl.box.keyPair(); return { @@ -53,6 +66,10 @@ var makeEdKeys = function () { }; }; +var edKeys = makeEdKeys(); +var curveKeys = makeCurveKeys(); +var mailboxChannel = Hash.createChannelId(); + var createUser = function (config, cb) { // config should contain keys for a team rpc (ed) // teamEdKeys @@ -75,11 +92,11 @@ var createUser = function (config, cb) { // make all the parameters you'll need var network = user.network = user.config.network; - user.edKeys = makeEdKeys(); + user.edKeys = edKeys; + user.curveKeys = curveKeys; - user.curveKeys = makeCurveKeys(); user.mailbox = Mailbox.createEncryptor(user.curveKeys); - user.mailboxChannel = Hash.createChannelId(); + user.mailboxChannel = mailboxChannel; // create an anon rpc for alice Rpc.createAnonymous(network, w(function (err, rpc) { @@ -109,6 +126,11 @@ var createUser = function (config, cb) { }).nThen(function (w) { // create and subscribe to your mailbox createMailbox({ + user: user, + + + lastKnownHash: config.lastKnownHash, + network: user.network, channel: user.mailboxChannel, crypto: user.mailbox, @@ -116,8 +138,9 @@ var createUser = function (config, cb) { }, w(function (err /*, wc*/) { if (err) { w.abort(); - console.error("Mailbox creation error"); - process.exit(1); + //console.error("Mailbox creation error"); + cb(err); + //process.exit(1); } //wc.leave(); })); @@ -135,14 +158,10 @@ var createUser = function (config, cb) { var alice; -var sharedConfig = { - teamEdKeys: makeEdKeys(), - teamCurveKeys: makeCurveKeys(), - rosterSeed: Crypto.Team.createSeed(), -}; - nThen(function (w) { - createUser(sharedConfig, w(function (err, _alice) { + createUser({ + //sharedConfig + }, w(function (err, _alice) { if (err) { w.abort(); return void console.log(err); @@ -163,13 +182,18 @@ nThen(function (w) { var i = 0; var next = w(); + state.hashes = []; + var send = function () { - if (i++ >= 300) { return next(); } + if (i++ >= 160) { return next(); } var msg = alice.mailbox.encrypt(JSON.stringify({ pewpew: 'bangbang', }), alice.curveKeys.curvePublic); + var hash = msg.slice(0, 64); + state.hashes.push(hash); + alice.anonRpc.send('WRITE_PRIVATE_MESSAGE', [ alice.mailboxChannel, msg @@ -177,10 +201,33 @@ nThen(function (w) { ], w(function (err) { if (err) { throw new Error(err); } console.log('message %s written successfully', i); - setTimeout(send, 250); + setTimeout(send, 15); })); }; send(); +}).nThen(function (w) { + console.log("Connecting with second user"); + createUser({ + lastKnownHash: state.hashes[55], + }, w(function (err, _alice) { + if (err) { + w.abort(); + console.log("lastKnownHash: ", state.hashes[55]); + console.log(err); + process.exit(1); + //return void console.log(err); + } + var user = state.alice2 = _alice; + + if (user.messages.length === 105) { + process.exit(0); + } + //console.log(user.messages, user.messages.length); + process.exit(1); + })); +}).nThen(function () { + + }).nThen(function () { alice.cleanup(); //bob.cleanup();