trim lookup tables for channels without checkpoints

pull/1/head
ansuz 5 years ago
parent a6943b12b4
commit 77961e3954

@ -235,6 +235,54 @@ const getIndex = (Env, channelName, cb) => {
});
};
/* checkOffsetMap
Sorry for the weird function --ansuz
This should be almost equivalent to `Object.keys(map).length` except
that is will use less memory by not allocating space for the temporary array.
Beyond that, it returns length * -1 if any of the members of the map
are not in ascending order. The function for removing older members of the map
loops over elements in order and deletes them, so ordering is important!
*/
var checkOffsetMap = function (map) {
var prev = 0;
var cur;
var ooo = 0; // out of order
var count = 0;
for (let k in map) {
count++;
cur = map[k];
if (!ooo && prev > cur) { ooo = true; }
prev = cur;
}
return ooo ? count * -1: count;
};
/* Pass the map and the number of elements it contains */
var trimOffsetByOrder = function (map, n) {
var toRemove = Math.max(n - 50, 0);
var i = 0;
for (let k in map) {
if (i >= toRemove) { return; }
i++;
delete map[k];
}
};
/* Remove from the map any byte offsets which are below
the lowest offset you'd like to preserve
(probably the oldest checkpoint */
var trimMapByOffset = function (map, offset) {
if (!offset) { return; }
for (let k in map) {
if (map[k] < offset) {
delete map[k];
}
}
};
/* storeMessage
* channel id
* the message to store
@ -286,17 +334,28 @@ const storeMessage = function (Env, channel, msg, isCp, optionalMessageHash) {
if (typeof (index.line) === "number") { index.line++; }
if (isCp) {
index.cpIndex = sliceCpIndex(index.cpIndex, index.line || 0);
for (let k in index.offsetByHash) {
if (index.offsetByHash[k] < index.cpIndex[0]) {
delete index.offsetByHash[k];
}
}
trimMapByOffset(index.offsetByHash, index.cpIndex[0]);
index.cpIndex.push({
offset: index.size,
line: ((index.line || 0) + 1)
});
}
if (optionalMessageHash) { index.offsetByHash[optionalMessageHash] = index.size; }
if (optionalMessageHash) {
index.offsetByHash[optionalMessageHash] = index.size;
index.offsets++;
}
if (index.offsets >= 100 && !index.cpIndex.length) {
let offsetCount = checkOffsetMap(index.offsetByHash);
if (offsetCount < 0) {
Log.warn('OFFSET_TRIM_OOO', {
channel: id,
map: index.OffsetByHash
});
} else if (offsetCount > 0) {
trimOffsetByOrder(index.offsetByHash, index.offsets);
index.offsets = checkOffsetMap(index.offsetByHash);
}
}
index.size += msgBin.length;
// handle the next element in the queue

@ -118,6 +118,7 @@ const computeIndex = function (data, cb) {
const CB = Util.once(cb);
const offsetByHash = {};
let offsetCount = 0;
let size = 0;
nThen(function (w) {
// iterate over all messages in the channel log
@ -151,6 +152,8 @@ const computeIndex = function (data, cb) {
// so clear the buffer every time you see a new one
messageBuf = [];
}
} else if (messageBuf.length > 100 && cpIndex.length === 0) {
messageBuf = messageBuf.slice(0, 50);
}
// if it's not metadata or a checkpoint then it should be a regular message
// store it in the buffer
@ -163,6 +166,7 @@ const computeIndex = function (data, cb) {
}
// once indexing is complete you should have a buffer of messages since the latest checkpoint
// or the 50-100 latest messages if the channel is of a type without checkpoints.
// map the 'hash' of each message to its byte offset in the log, to be used for reconnecting clients
messageBuf.forEach((msgObj) => {
const msg = HK.tryParse(Env, msgObj.buff.toString('utf8'));
@ -171,6 +175,7 @@ const computeIndex = function (data, cb) {
// msgObj.offset is API guaranteed by our storage module
// it should always be a valid positive integer
offsetByHash[HK.getHash(msg[4])] = msgObj.offset;
offsetCount++;
}
// There is a trailing \n at the end of the file
size = msgObj.offset + msgObj.buff.length + 1;
@ -182,6 +187,7 @@ const computeIndex = function (data, cb) {
// Only keep the checkpoints included in the last 100 messages
cpIndex: HK.sliceCpIndex(cpIndex, i),
offsetByHash: offsetByHash,
offsets: offsetCount,
size: size,
//metadata: metadata,
line: i

Loading…
Cancel
Save