Remove offset code
parent
fe9d39c66b
commit
ea4c529e39
|
@ -367,8 +367,8 @@ const storeMessage = function (Env, channel, msg, isCp, optionalMessageHash, cb)
|
|||
}
|
||||
}
|
||||
|
||||
// call back with the offset of the message we just stored
|
||||
cb(void 0, index.size);
|
||||
// Message stored, call back
|
||||
cb();
|
||||
|
||||
index.size += msgBin.length;
|
||||
|
||||
|
@ -508,22 +508,9 @@ const getHistoryAsync = (Env, channelName, lastKnownHash, beforeHash, handler, c
|
|||
return void cb(new Error('EUNKNOWN'));
|
||||
}
|
||||
const start = (beforeHash) ? 0 : offset;
|
||||
let msgOffset = start;
|
||||
store.readMessagesBin(channelName, start, (msgObj, readMore, abort) => {
|
||||
if (beforeHash && msgObj.offset >= offset) { return void abort(); }
|
||||
const str = msgObj.buff.toString('utf8');
|
||||
var parsed = tryParse(Env, str);
|
||||
|
||||
// Extract the time and put it in the new opts object
|
||||
let time;
|
||||
if (Array.isArray(parsed)) {
|
||||
if (typeof(parsed[parsed.length - 1]) === "number") { time = parsed.pop(); }
|
||||
parsed.push({
|
||||
time: time,
|
||||
offset: msgObj.offset
|
||||
});
|
||||
}
|
||||
|
||||
const parsed = tryParse(Env, msgObj.buff.toString('utf8'));
|
||||
if (!parsed) { return void readMore(); }
|
||||
handler(parsed, readMore);
|
||||
}, waitFor(function (err) {
|
||||
|
@ -718,20 +705,7 @@ const handleGetHistoryRange = function (Env, Server, seq, userId, parsed) {
|
|||
}
|
||||
|
||||
if (Array.isArray(toSend)) {
|
||||
// toSend: array of objects {msg, hash, offset}
|
||||
toSend.forEach(function (obj) {
|
||||
let msg = obj.msg;
|
||||
|
||||
// Extract the time and put it in the new opts object
|
||||
let time;
|
||||
if (Array.isArray(msg)) {
|
||||
if (typeof(msg[msg.length - 1]) === "number") { time = msg.pop(); }
|
||||
msg.push({
|
||||
time: time,
|
||||
offset: obj.offset
|
||||
});
|
||||
}
|
||||
|
||||
toSend.forEach(function (msg) {
|
||||
Server.send(userId, [0, HISTORY_KEEPER_ID, 'MSG', userId,
|
||||
JSON.stringify(['HISTORY_RANGE', txid, msg])]);
|
||||
});
|
||||
|
|
|
@ -263,12 +263,9 @@ var readMessages = function (path, msgHandler, _cb) {
|
|||
var collector = createIdleStreamCollector(stream);
|
||||
var cb = Util.once(Util.mkAsync(Util.both(_cb, collector)));
|
||||
|
||||
let offset = 0;
|
||||
return readFileBin(stream, function (msgObj, readMore) {
|
||||
collector.keepAlive();
|
||||
const str = msgObj.buff.toString('utf8');
|
||||
msgHandler(str, offset);
|
||||
offset += str.length + 1; // +1 because of line break
|
||||
msgHandler(msgObj.buff.toString('utf8'));
|
||||
readMore();
|
||||
}, function (err) {
|
||||
cb(err);
|
||||
|
@ -849,10 +846,10 @@ var message = function (env, chanName, msg, cb) {
|
|||
var getMessages = function (env, chanName, handler, cb) {
|
||||
var errorState = false;
|
||||
var path = mkPath(env, chanName);
|
||||
readMessages(path, function (msg, offset) {
|
||||
readMessages(path, function (msg) {
|
||||
if (!msg || errorState) { return; }
|
||||
try {
|
||||
handler(msg, offset);
|
||||
handler(msg);
|
||||
} catch (e) {
|
||||
errorState = true;
|
||||
return void cb(e);
|
||||
|
|
|
@ -329,9 +329,7 @@ const getOlderHistory = function (data, cb) {
|
|||
|
||||
var messages = [];
|
||||
var found = false;
|
||||
// XXX use readMessagesBin ?
|
||||
// But we don't know the "start" offset if we use "desiredMessages" or "desiredCheckpoint"
|
||||
store.getMessages(channelName, function (msgStr, offset) {
|
||||
store.getMessages(channelName, function (msgStr) {
|
||||
if (found) { return; }
|
||||
|
||||
let parsed = HK.tryParse(Env, msgStr);
|
||||
|
@ -353,7 +351,6 @@ const getOlderHistory = function (data, cb) {
|
|||
messages.push({
|
||||
msg: parsed,
|
||||
hash: hash,
|
||||
offset: offset
|
||||
});
|
||||
}, function (err) {
|
||||
var toSend = [];
|
||||
|
|
|
@ -2065,18 +2065,12 @@ define([
|
|||
if (msg) {
|
||||
msg = msg.replace(/cp\|(([A-Za-z0-9+\/=]+)\|)?/, '');
|
||||
//var decryptedMsg = crypto.decrypt(msg, true);
|
||||
var opts = {};
|
||||
if (parsed[1][5] && typeof(parsed[1][5]) === "object") {
|
||||
opts = parsed[1][5];
|
||||
console.error(opts);
|
||||
}
|
||||
if (data.debug) {
|
||||
msgs.push({
|
||||
serverHash: msg.slice(0,64),
|
||||
msg: msg,
|
||||
author: parsed[1][1],
|
||||
time: opts.time,
|
||||
offset: opts.offset
|
||||
time: parsed[1][5]
|
||||
});
|
||||
} else {
|
||||
msgs.push(msg);
|
||||
|
@ -2206,17 +2200,12 @@ define([
|
|||
lastKnownHash = msg.slice(0,64);
|
||||
first = false;
|
||||
}
|
||||
var opts = {};
|
||||
if (parsed[2][5] && typeof(parsed[2][5]) === "object") {
|
||||
opts = parsed[2][5];
|
||||
}
|
||||
msg = msg.replace(/cp\|(([A-Za-z0-9+\/=]+)\|)?/, '');
|
||||
msgs.push({
|
||||
serverHash: msg.slice(0,64),
|
||||
msg: msg,
|
||||
author: parsed[2][1],
|
||||
time: opts.time,
|
||||
offset: opts.offset
|
||||
time: parsed[2][5]
|
||||
});
|
||||
}
|
||||
};
|
||||
|
|
|
@ -396,14 +396,11 @@ proxy.mailboxes = {
|
|||
} catch (e) {
|
||||
console.log(e);
|
||||
}
|
||||
var opts = {};
|
||||
if (_msg[5] && typeof(_msg[5]) === "object") { opts = _msg[5]; }
|
||||
ctx.emit('HISTORY', {
|
||||
txid: txid,
|
||||
time: opts.time,
|
||||
time: _msg[5],
|
||||
message: message,
|
||||
hash: _msg[4].slice(0,64),
|
||||
offset: opts.offset
|
||||
hash: _msg[4].slice(0,64)
|
||||
}, [req.cId]);
|
||||
} else if (type === 'HISTORY_RANGE_END') {
|
||||
ctx.emit('HISTORY', {
|
||||
|
|
|
@ -207,7 +207,6 @@ define([
|
|||
if (historyState) { return void cb("ALREADY_CALLED"); }
|
||||
historyState = true;
|
||||
var txid = Util.uid();
|
||||
// XXX OFFSET: no change? offsets are useless in get_history_range
|
||||
execCommand('LOAD_HISTORY', {
|
||||
type: type,
|
||||
count: lastKnownHash ? count + 1 : count,
|
||||
|
|
Loading…
Reference in New Issue