Add mailbox history

pull/1/head
yflory 5 years ago
parent 93b4dac8bb
commit 75e0b68f51

@ -451,9 +451,7 @@ define([
var txid = parsed[1]; var txid = parsed[1];
var req = getRangeRequest(txid); var req = getRangeRequest(txid);
var type = parsed[0]; var type = parsed[0];
if (!req) { if (!req) { return; }
return void console.error("received response to unknown request");
}
if (!req.cb) { if (!req.cb) {
// This is the initial history for a pad chat // This is the initial history for a pad chat

@ -206,6 +206,7 @@ proxy.mailboxes = {
var keys = m.keys || getMyKeys(ctx); var keys = m.keys || getMyKeys(ctx);
if (!keys) { return void console.error("missing asymmetric encryption keys"); } if (!keys) { return void console.error("missing asymmetric encryption keys"); }
var crypto = Crypto.Mailbox.createEncryptor(keys); var crypto = Crypto.Mailbox.createEncryptor(keys);
box.encryptor = crypto;
var cfg = { var cfg = {
network: ctx.store.network, network: ctx.store.network,
channel: m.channel, channel: m.channel,
@ -323,6 +324,60 @@ proxy.mailboxes = {
CpNetflux.start(cfg); CpNetflux.start(cfg);
}; };
var initializeHistory = function (ctx) {
var network = ctx.store.network;
network.on('message', function (msg, sender) {
if (sender !== network.historyKeeper) { return; }
var parsed = JSON.parse(msg);
if (!/HISTORY_RANGE/.test(parsed[0])) { return; }
var txid = parsed[1];
var req = ctx.req[txid];
var type = parsed[0];
var _msg = parsed[2];
var box = req.box;
if (!req) { return; }
if (type === 'HISTORY_RANGE') {
var message;
try {
var decrypted = box.encryptor.decrypt(_msg[4]);
message = JSON.parse(decrypted.content);
} catch (e) {
console.log(e);
}
ctx.emit('HISTORY', {
txid: txid,
message: message,
hash: _msg[4].slice(0,64)
}, [req.cId]);
} else if (type === 'HISTORY_RANGE_END') {
ctx.emit('HISTORY', {
txid: txid,
complete: true
}, [req.cId]);
}
});
};
var loadHistory = function (ctx, clientId, data, cb) {
var box = ctx.boxes[data.type];
if (!box) { return void cb({error: 'ENOENT'}); }
var msg = [ 'GET_HISTORY_RANGE', box.channel, {
from: data.lastKnownHash,
count: data.count,
txid: data.txid
}
];
ctx.req[data.txid] = {
cId: clientId,
box: box
};
var network = ctx.store.network;
network.sendto(network.historyKeeper, JSON.stringify(msg)).then(function () {
}, function (err) {
throw new Error(err);
});
};
var subscribe = function (ctx, data, cId, cb) { var subscribe = function (ctx, data, cId, cb) {
// Get existing notifications // Get existing notifications
@ -357,17 +412,18 @@ proxy.mailboxes = {
updateMetadata: cfg.updateMetadata, updateMetadata: cfg.updateMetadata,
emit: emit, emit: emit,
clients: [], clients: [],
boxes: {} boxes: {},
req: {}
}; };
var mailboxes = store.proxy.mailboxes = store.proxy.mailboxes || {}; var mailboxes = store.proxy.mailboxes = store.proxy.mailboxes || {};
initializeMailboxes(ctx, mailboxes); initializeMailboxes(ctx, mailboxes);
initializeHistory(ctx);
Object.keys(mailboxes).forEach(function (key) { Object.keys(mailboxes).forEach(function (key) {
if (TYPES.indexOf(key) === -1) { return; } if (TYPES.indexOf(key) === -1) { return; }
var m = mailboxes[key]; var m = mailboxes[key];
console.log(key, m);
if (BLOCKING_TYPES.indexOf(key) === -1) { if (BLOCKING_TYPES.indexOf(key) === -1) {
openChannel(ctx, key, m, function () { openChannel(ctx, key, m, function () {
@ -418,6 +474,9 @@ console.log(key, m);
if (cmd === 'SENDTO') { if (cmd === 'SENDTO') {
return void sendTo(ctx, data.type, data.msg, data.user, cb); return void sendTo(ctx, data.type, data.msg, data.user, cb);
} }
if (cmd === 'LOAD_HISTORY') {
return void loadHistory(ctx, clientId, data, cb);
}
}; };
return mailbox; return mailbox;

@ -172,6 +172,38 @@ define([
}); });
}; };
var historyState = false;
var onHistory = function () {};
mailbox.getMoreHistory = function (type, count, lastKnownHash, cb) {
if (historyState) { return void cb("ALREADY_CALLED"); }
historyState = true;
var txid = Util.uid();
execCommand('LOAD_HISTORY', {
type: type,
count: count,
txid: txid,
lastKnownHash: lastKnownHash
}, function (err, obj) {
if (obj && obj.error) { console.error(obj.error); }
});
var messages = [];
onHistory = function (data) {
if (data.txid !== txid) { return; }
if (data.complete) {
onHistory = function () {};
cb(null, messages);
historyState = false;
return;
}
messages.push({
type: type,
content: {
msg: data.message,
hash: data.hash
}
});
};
};
// CHANNEL WITH WORKER // CHANNEL WITH WORKER
@ -179,6 +211,9 @@ define([
// obj = { ev: 'type', data: obj } // obj = { ev: 'type', data: obj }
var ev = obj.ev; var ev = obj.ev;
var data = obj.data; var data = obj.data;
if (ev === 'HISTORY') {
return void onHistory(data);
}
if (ev === 'MESSAGE') { if (ev === 'MESSAGE') {
return void onMessage(data); return void onMessage(data);
} }

Loading…
Cancel
Save