delegate more work from getOlderHistory to the worker

pull/1/head
ansuz 5 years ago
parent fd169ff39c
commit b56c73be6e

@ -665,31 +665,17 @@ const handleGetHistoryRange = function (Env, Server, seq, userId, parsed) {
}
Server.send(userId, [seq, 'ACK']);
Env.getOlderHistory(channelName, oldestKnownHash, function (err, messages) {
Env.getOlderHistory(channelName, oldestKnownHash, desiredMessages, desiredCheckpoint, function (err, toSend) {
if (err && err.code !== 'ENOENT') {
Env.Log.error("HK_GET_OLDER_HISTORY", err);
}
if (!Array.isArray(messages)) { messages = []; }
// FIXME this reduction could be done in the worker instead of the main process
var toSend = [];
if (typeof (desiredMessages) === "number") {
toSend = messages.slice(-desiredMessages);
} else {
let cpCount = 0;
for (var i = messages.length - 1; i >= 0; i--) {
if (/^cp\|/.test(messages[i][4]) && i !== (messages.length - 1)) {
cpCount++;
}
toSend.unshift(messages[i]);
if (cpCount >= desiredCheckpoint) { break; }
}
}
if (Array.isArray(toSend)) {
toSend.forEach(function (msg) {
Server.send(userId, [0, HISTORY_KEEPER_ID, 'MSG', userId,
JSON.stringify(['HISTORY_RANGE', txid, msg])]);
});
}
Server.send(userId, [0, HISTORY_KEEPER_ID, 'MSG', userId,
JSON.stringify(['HISTORY_RANGE_END', txid, channelName])

@ -222,10 +222,10 @@ const computeMetadata = function (data, cb) {
const getOlderHistory = function (data, cb) {
const oldestKnownHash = data.hash;
const channelName = data.channel;
const desiredMessages = data.desiredMessages;
const desiredCheckpoint = data.desiredCheckpoint;
//const store = Env.store;
//const Log = Env.Log;
var messageBuffer = [];
var messages = [];
var found = false;
store.getMessages(channelName, function (msgStr) {
if (found) { return; }
@ -246,9 +246,22 @@ const getOlderHistory = function (data, cb) {
if (hash === oldestKnownHash) {
found = true;
}
messageBuffer.push(parsed);
messages.push(parsed);
}, function (err) {
cb(err, messageBuffer);
var toSend = [];
if (typeof (desiredMessages) === "number") {
toSend = messages.slice(-desiredMessages);
} else {
let cpCount = 0;
for (var i = messages.length - 1; i >= 0; i--) {
if (/^cp\|/.test(messages[i][4]) && i !== (messages.length - 1)) {
cpCount++;
}
toSend.unshift(messages[i]);
if (cpCount >= desiredCheckpoint) { break; }
}
}
cb(err, toSend);
});
};

@ -250,12 +250,14 @@ Workers.initialize = function (Env, config, _cb) {
});
};
Env.getOlderHistory = function (channel, oldestKnownHash, cb) {
Env.getOlderHistory = function (channel, oldestKnownHash, desiredMessages, desiredCheckpoint, cb) {
Env.store.getWeakLock(channel, function (next) {
sendCommand({
channel: channel,
command: "GET_OLDER_HISTORY",
hash: oldestKnownHash,
desiredMessages: desiredMessages,
desiredCheckpoint: desiredCheckpoint,
}, Util.both(next, cb));
});
};

Loading…
Cancel
Save