WIP history-keeper fixes

pull/1/head
ansuz 5 years ago
parent 1a825ad664
commit ed5d97f899

@ -395,7 +395,7 @@ const storeMessage = function (Env, channel, msg, isCp, optionalMessageHash) {
* -1 if you didn't find it * -1 if you didn't find it
*/ */
const getHistoryOffset = (Env, channelName, lastKnownHash, _cb) => { const getHistoryOffset = (Env, channelName, lastKnownHash, _cb) => { // XXX
const cb = Util.once(Util.mkAsync(_cb)); const cb = Util.once(Util.mkAsync(_cb));
// lastKnownhash === -1 means we want the complete history // lastKnownhash === -1 means we want the complete history
@ -417,8 +417,9 @@ const getHistoryOffset = (Env, channelName, lastKnownHash, _cb) => {
// without specifying the hash, and just trust the server to give them the relevant data. // without specifying the hash, and just trust the server to give them the relevant data.
// QUESTION: does this mean mailboxes are causing the server to store too much stuff in memory? // QUESTION: does this mean mailboxes are causing the server to store too much stuff in memory?
if (lastKnownHash && typeof(lkh) !== "number") { if (lastKnownHash && typeof(lkh) !== "number") {
waitFor.abort(); return; // XXX fall through to find the offset since it isn't cached
return void cb(new Error('EINVAL')); //waitFor.abort();
//return void cb(new Error('EINVAL'));
} }
// Since last 2 checkpoints // Since last 2 checkpoints
@ -474,7 +475,7 @@ const getHistoryAsync = (Env, channelName, lastKnownHash, beforeHash, handler, c
let offset = -1; let offset = -1;
nThen((waitFor) => { nThen((waitFor) => {
getHistoryOffset(Env, channelName, lastKnownHash, waitFor((err, os) => { getHistoryOffset(Env, channelName, lastKnownHash, waitFor((err, os) => { // XXX
if (err) { if (err) {
waitFor.abort(); waitFor.abort();
return void cb(err); return void cb(err);
@ -482,7 +483,11 @@ const getHistoryAsync = (Env, channelName, lastKnownHash, beforeHash, handler, c
offset = os; offset = os;
})); }));
}).nThen((waitFor) => { }).nThen((waitFor) => {
if (offset === -1) { return void cb(new Error("could not find offset")); } if (offset === -1) {
return void cb(new Error('EUNKNOWN'));
console.log(lastKnownHash);
return void cb(new Error("could not find offset")); // XXX EUNKNOWN
}
const start = (beforeHash) ? 0 : offset; const start = (beforeHash) ? 0 : offset;
store.readMessagesBin(channelName, start, (msgObj, readMore, abort) => { store.readMessagesBin(channelName, start, (msgObj, readMore, abort) => {
if (beforeHash && msgObj.offset >= offset) { return void abort(); } if (beforeHash && msgObj.offset >= offset) { return void abort(); }
@ -630,7 +635,7 @@ const handleGetHistory = function (Env, Server, seq, userId, parsed) {
}, (err) => { }, (err) => {
if (err && err.code !== 'ENOENT') { if (err && err.code !== 'ENOENT') {
if (err.message !== 'EINVAL') { Log.error("HK_GET_HISTORY", err); } if (err.message !== 'EINVAL') { Log.error("HK_GET_HISTORY", err); }
const parsedMsg = {error:err.message, channel: channelName, txid: txid}; const parsedMsg = {error:err.message, channel: channelName, txid: txid}; // XXX
Server.send(userId, [0, HISTORY_KEEPER_ID, 'MSG', userId, JSON.stringify(parsedMsg)]); Server.send(userId, [0, HISTORY_KEEPER_ID, 'MSG', userId, JSON.stringify(parsedMsg)]);
return; return;
} }

@ -153,7 +153,8 @@ const computeIndex = function (data, cb) {
messageBuf = []; messageBuf = [];
} }
} else if (messageBuf.length > 100 && cpIndex.length === 0) { } else if (messageBuf.length > 100 && cpIndex.length === 0) {
messageBuf = messageBuf.slice(0, 50); // take the last 50 messages
messageBuf = messageBuf.slice(-50);
} }
// if it's not metadata or a checkpoint then it should be a regular message // if it's not metadata or a checkpoint then it should be a regular message
// store it in the buffer // store it in the buffer
@ -352,7 +353,8 @@ const getMultipleFileSize = function (data, cb) {
const getHashOffset = function (data, cb) { const getHashOffset = function (data, cb) {
const channelName = data.channel; const channelName = data.channel;
const lastKnownHash = data.lastKnownHash; const lastKnownHash = data.hash;
if (typeof(lastKnownHash) !== 'string') { return void cb("INVALID_HASH"); }
var offset = -1; var offset = -1;
store.readMessagesBin(channelName, 0, (msgObj, readMore, abort) => { store.readMessagesBin(channelName, 0, (msgObj, readMore, abort) => {

@ -11,8 +11,14 @@ var Hash = require("../../www/common/common-hash");
var CpNetflux = require("../../www/bower_components/chainpad-netflux"); var CpNetflux = require("../../www/bower_components/chainpad-netflux");
var Util = require("../../lib/common-util"); var Util = require("../../lib/common-util");
var createMailbox = function (config, cb) { // you need more than 100 messages in the history, and you need a lastKnownHash between "50" and "length - 50"
var createMailbox = function (config, _cb) {
var cb = Util.once(Util.mkAsync(_cb));
var webchannel; var webchannel;
var user = config.user;
user.messages = [];
CpNetflux.start({ CpNetflux.start({
network: config.network, network: config.network,
@ -21,11 +27,16 @@ var createMailbox = function (config, cb) {
owners: [ config.edPublic ], owners: [ config.edPublic ],
noChainPad: true, noChainPad: true,
lastKnownHash: config.lastKnownHash,
onChannelError: function (err) {
cb(err);
},
onConnect: function (wc /*, sendMessage */) { onConnect: function (wc /*, sendMessage */) {
webchannel = wc; webchannel = wc;
}, },
onMessage: function (/* msg, user, vKey, isCp, hash, author */) { onMessage: function (msg /*, user, vKey, isCp, hash, author */) {
user.messages.push(msg);
}, },
onReady: function () { onReady: function () {
cb(void 0, webchannel); cb(void 0, webchannel);
@ -37,6 +48,8 @@ process.on('unhandledRejection', function (err) {
console.error(err); console.error(err);
}); });
var state = {};
var makeCurveKeys = function () { var makeCurveKeys = function () {
var pair = Nacl.box.keyPair(); var pair = Nacl.box.keyPair();
return { return {
@ -53,6 +66,10 @@ var makeEdKeys = function () {
}; };
}; };
var edKeys = makeEdKeys();
var curveKeys = makeCurveKeys();
var mailboxChannel = Hash.createChannelId();
var createUser = function (config, cb) { var createUser = function (config, cb) {
// config should contain keys for a team rpc (ed) // config should contain keys for a team rpc (ed)
// teamEdKeys // teamEdKeys
@ -75,11 +92,11 @@ var createUser = function (config, cb) {
// make all the parameters you'll need // make all the parameters you'll need
var network = user.network = user.config.network; var network = user.network = user.config.network;
user.edKeys = makeEdKeys(); user.edKeys = edKeys;
user.curveKeys = curveKeys;
user.curveKeys = makeCurveKeys();
user.mailbox = Mailbox.createEncryptor(user.curveKeys); user.mailbox = Mailbox.createEncryptor(user.curveKeys);
user.mailboxChannel = Hash.createChannelId(); user.mailboxChannel = mailboxChannel;
// create an anon rpc for alice // create an anon rpc for alice
Rpc.createAnonymous(network, w(function (err, rpc) { Rpc.createAnonymous(network, w(function (err, rpc) {
@ -109,6 +126,11 @@ var createUser = function (config, cb) {
}).nThen(function (w) { }).nThen(function (w) {
// create and subscribe to your mailbox // create and subscribe to your mailbox
createMailbox({ createMailbox({
user: user,
lastKnownHash: config.lastKnownHash,
network: user.network, network: user.network,
channel: user.mailboxChannel, channel: user.mailboxChannel,
crypto: user.mailbox, crypto: user.mailbox,
@ -116,8 +138,9 @@ var createUser = function (config, cb) {
}, w(function (err /*, wc*/) { }, w(function (err /*, wc*/) {
if (err) { if (err) {
w.abort(); w.abort();
console.error("Mailbox creation error"); //console.error("Mailbox creation error");
process.exit(1); cb(err);
//process.exit(1);
} }
//wc.leave(); //wc.leave();
})); }));
@ -135,14 +158,10 @@ var createUser = function (config, cb) {
var alice; var alice;
var sharedConfig = {
teamEdKeys: makeEdKeys(),
teamCurveKeys: makeCurveKeys(),
rosterSeed: Crypto.Team.createSeed(),
};
nThen(function (w) { nThen(function (w) {
createUser(sharedConfig, w(function (err, _alice) { createUser({
//sharedConfig
}, w(function (err, _alice) {
if (err) { if (err) {
w.abort(); w.abort();
return void console.log(err); return void console.log(err);
@ -163,13 +182,18 @@ nThen(function (w) {
var i = 0; var i = 0;
var next = w(); var next = w();
state.hashes = [];
var send = function () { var send = function () {
if (i++ >= 300) { return next(); } if (i++ >= 160) { return next(); }
var msg = alice.mailbox.encrypt(JSON.stringify({ var msg = alice.mailbox.encrypt(JSON.stringify({
pewpew: 'bangbang', pewpew: 'bangbang',
}), alice.curveKeys.curvePublic); }), alice.curveKeys.curvePublic);
var hash = msg.slice(0, 64);
state.hashes.push(hash);
alice.anonRpc.send('WRITE_PRIVATE_MESSAGE', [ alice.anonRpc.send('WRITE_PRIVATE_MESSAGE', [
alice.mailboxChannel, alice.mailboxChannel,
msg msg
@ -177,10 +201,33 @@ nThen(function (w) {
], w(function (err) { ], w(function (err) {
if (err) { throw new Error(err); } if (err) { throw new Error(err); }
console.log('message %s written successfully', i); console.log('message %s written successfully', i);
setTimeout(send, 250); setTimeout(send, 15);
})); }));
}; };
send(); send();
}).nThen(function (w) {
console.log("Connecting with second user");
createUser({
lastKnownHash: state.hashes[55],
}, w(function (err, _alice) {
if (err) {
w.abort();
console.log("lastKnownHash: ", state.hashes[55]);
console.log(err);
process.exit(1);
//return void console.log(err);
}
var user = state.alice2 = _alice;
if (user.messages.length === 105) {
process.exit(0);
}
//console.log(user.messages, user.messages.length);
process.exit(1);
}));
}).nThen(function () {
}).nThen(function () { }).nThen(function () {
alice.cleanup(); alice.cleanup();
//bob.cleanup(); //bob.cleanup();

Loading…
Cancel
Save