Use new server format for history keeper time

pull/1/head
yflory 4 years ago
parent f01c670b88
commit 0fc8bfde4a

@ -508,9 +508,22 @@ const getHistoryAsync = (Env, channelName, lastKnownHash, beforeHash, handler, c
return void cb(new Error('EUNKNOWN'));
}
const start = (beforeHash) ? 0 : offset;
let msgOffset = start;
store.readMessagesBin(channelName, start, (msgObj, readMore, abort) => {
if (beforeHash && msgObj.offset >= offset) { return void abort(); }
var parsed = tryParse(Env, msgObj.buff.toString('utf8'));
const str = msgObj.buff.toString('utf8');
var parsed = tryParse(Env, str);
// Extract the time and put it in the new opts object
let time;
if (Array.isArray(parsed)) {
if (typeof(parsed[parsed.length - 1]) === "number") { time = parsed.pop(); }
parsed.push({
time: time,
offset: msgObj.offset
});
}
if (!parsed) { return void readMore(); }
handler(parsed, readMore);
}, waitFor(function (err) {
@ -705,7 +718,20 @@ const handleGetHistoryRange = function (Env, Server, seq, userId, parsed) {
}
if (Array.isArray(toSend)) {
toSend.forEach(function (msg) {
// toSend: array of objects {msg, hash, offset}
toSend.forEach(function (obj) {
let msg = obj.msg;
// Extract the time and put it in the new opts object
let time;
if (Array.isArray(msg)) {
if (typeof(msg[msg.length - 1]) === "number") { time = msg.pop(); }
msg.push({
time: time,
offset: obj.offset
});
}
Server.send(userId, [0, HISTORY_KEEPER_ID, 'MSG', userId,
JSON.stringify(['HISTORY_RANGE', txid, msg])]);
});

@ -263,9 +263,12 @@ var readMessages = function (path, msgHandler, _cb) {
var collector = createIdleStreamCollector(stream);
var cb = Util.once(Util.mkAsync(Util.both(_cb, collector)));
let offset = 0;
return readFileBin(stream, function (msgObj, readMore) {
collector.keepAlive();
msgHandler(msgObj.buff.toString('utf8'));
const str = msgObj.buff.toString('utf8');
msgHandler(str, offset);
offset += str.length + 1; // +1 because of line break
readMore();
}, function (err) {
cb(err);
@ -846,10 +849,10 @@ var message = function (env, chanName, msg, cb) {
var getMessages = function (env, chanName, handler, cb) {
var errorState = false;
var path = mkPath(env, chanName);
readMessages(path, function (msg) {
readMessages(path, function (msg, offset) {
if (!msg || errorState) { return; }
try {
handler(msg);
handler(msg, offset);
} catch (e) {
errorState = true;
return void cb(e);

@ -329,7 +329,9 @@ const getOlderHistory = function (data, cb) {
var messages = [];
var found = false;
store.getMessages(channelName, function (msgStr) {
// XXX use readMessagesBin ?
// But we don't know the "start" offset if we use "desiredMessages" or "desiredCheckpoint"
store.getMessages(channelName, function (msgStr, offset) {
if (found) { return; }
let parsed = HK.tryParse(Env, msgStr);
@ -348,7 +350,11 @@ const getOlderHistory = function (data, cb) {
if (hash === oldestKnownHash) {
found = true;
}
messages.push(parsed);
messages.push({
msg: parsed,
hash: hash,
offset: offset
});
}, function (err) {
var toSend = [];
if (typeof (desiredMessages) === "number") {
@ -356,14 +362,14 @@ const getOlderHistory = function (data, cb) {
} else if (untilHash) {
for (var j = messages.length - 1; j >= 0; j--) {
toSend.unshift(messages[j]);
if (Array.isArray(messages[j]) && HK.getHash(messages[j][4]) === untilHash) {
if (messages[j] && messages[j].hash === untilHash) {
break;
}
}
} else {
let cpCount = 0;
for (var i = messages.length - 1; i >= 0; i--) {
if (/^cp\|/.test(messages[i][4]) && i !== (messages.length - 1)) {
if (/^cp\|/.test(messages[i].msg[4]) && i !== (messages.length - 1)) {
cpCount++;
}
toSend.unshift(messages[i]);

@ -2065,11 +2065,18 @@ define([
if (msg) {
msg = msg.replace(/cp\|(([A-Za-z0-9+\/=]+)\|)?/, '');
//var decryptedMsg = crypto.decrypt(msg, true);
var opts = {};
if (parsed[1][5] && typeof(parsed[1][5]) === "object") {
opts = parsed[1][5];
console.error(opts);
}
if (data.debug) {
msgs.push({
serverHash: msg.slice(0,64),
msg: msg,
author: parsed[1][1],
time: parsed[1][5]
time: opts.time,
offset: opts.offset
});
} else {
msgs.push(msg);
@ -2199,12 +2206,17 @@ define([
lastKnownHash = msg.slice(0,64);
first = false;
}
var opts = {};
if (parsed[2][5] && typeof(parsed[2][5]) === "object") {
opts = parsed[2][5];
}
msg = msg.replace(/cp\|(([A-Za-z0-9+\/=]+)\|)?/, '');
msgs.push({
serverHash: msg.slice(0,64),
msg: msg,
author: parsed[2][1],
time: parsed[2][5]
time: opts.time,
offset: opts.offset
});
}
};

@ -396,11 +396,14 @@ proxy.mailboxes = {
} catch (e) {
console.log(e);
}
var opts = {};
if (_msg[5] && typeof(_msg[5]) === "object") { opts = _msg[5]; }
ctx.emit('HISTORY', {
txid: txid,
time: _msg[5],
time: opts.time,
message: message,
hash: _msg[4].slice(0,64)
hash: _msg[4].slice(0,64),
offset: opts.offset
}, [req.cId]);
} else if (type === 'HISTORY_RANGE_END') {
ctx.emit('HISTORY', {

@ -207,6 +207,7 @@ define([
if (historyState) { return void cb("ALREADY_CALLED"); }
historyState = true;
var txid = Util.uid();
// XXX OFFSET: no change? offsets are useless in get_history_range
execCommand('LOAD_HISTORY', {
type: type,
count: lastKnownHash ? count + 1 : count,

@ -1222,6 +1222,7 @@ define([
if (typeof(_msg) === "object") {
decryptedMsgs.push({
author: _msg.author,
serverHash: _msg.serverHash,
time: _msg.time,
msg: crypto.decrypt(_msg.msg, true, true)
});

@ -33,6 +33,9 @@
}
.cp-app-debug-progress, .cp-app-debug-init {
text-align: center;
input {
width: auto !important;
}
}
#cp-app-debug-loading {
text-align: center;

Loading…
Cancel
Save