diff --git a/lib/hk-util.js b/lib/hk-util.js index 4b71e7c5b..c0da609ee 100644 --- a/lib/hk-util.js +++ b/lib/hk-util.js @@ -235,6 +235,54 @@ const getIndex = (Env, channelName, cb) => { }); }; +/* checkOffsetMap + +Sorry for the weird function --ansuz + +This should be almost equivalent to `Object.keys(map).length` except +that is will use less memory by not allocating space for the temporary array. +Beyond that, it returns length * -1 if any of the members of the map +are not in ascending order. The function for removing older members of the map +loops over elements in order and deletes them, so ordering is important! + +*/ +var checkOffsetMap = function (map) { + var prev = 0; + var cur; + var ooo = 0; // out of order + var count = 0; + for (let k in map) { + count++; + cur = map[k]; + if (!ooo && prev > cur) { ooo = true; } + prev = cur; + } + return ooo ? count * -1: count; +}; + +/* Pass the map and the number of elements it contains */ +var trimOffsetByOrder = function (map, n) { + var toRemove = Math.max(n - 50, 0); + var i = 0; + for (let k in map) { + if (i >= toRemove) { return; } + i++; + delete map[k]; + } +}; + +/* Remove from the map any byte offsets which are below + the lowest offset you'd like to preserve + (probably the oldest checkpoint */ +var trimMapByOffset = function (map, offset) { + if (!offset) { return; } + for (let k in map) { + if (map[k] < offset) { + delete map[k]; + } + } +}; + /* storeMessage * channel id * the message to store @@ -286,17 +334,28 @@ const storeMessage = function (Env, channel, msg, isCp, optionalMessageHash) { if (typeof (index.line) === "number") { index.line++; } if (isCp) { index.cpIndex = sliceCpIndex(index.cpIndex, index.line || 0); - for (let k in index.offsetByHash) { - if (index.offsetByHash[k] < index.cpIndex[0]) { - delete index.offsetByHash[k]; - } - } + trimMapByOffset(index.offsetByHash, index.cpIndex[0]); index.cpIndex.push({ offset: index.size, line: ((index.line || 0) + 1) }); } - if (optionalMessageHash) { index.offsetByHash[optionalMessageHash] = index.size; } + if (optionalMessageHash) { + index.offsetByHash[optionalMessageHash] = index.size; + index.offsets++; + } + if (index.offsets >= 100 && !index.cpIndex.length) { + let offsetCount = checkOffsetMap(index.offsetByHash); + if (offsetCount < 0) { + Log.warn('OFFSET_TRIM_OOO', { + channel: id, + map: index.OffsetByHash + }); + } else if (offsetCount > 0) { + trimOffsetByOrder(index.offsetByHash, index.offsets); + index.offsets = checkOffsetMap(index.offsetByHash); + } + } index.size += msgBin.length; // handle the next element in the queue diff --git a/lib/workers/db-worker.js b/lib/workers/db-worker.js index 62e182765..65a381479 100644 --- a/lib/workers/db-worker.js +++ b/lib/workers/db-worker.js @@ -118,6 +118,7 @@ const computeIndex = function (data, cb) { const CB = Util.once(cb); const offsetByHash = {}; + let offsetCount = 0; let size = 0; nThen(function (w) { // iterate over all messages in the channel log @@ -151,6 +152,8 @@ const computeIndex = function (data, cb) { // so clear the buffer every time you see a new one messageBuf = []; } + } else if (messageBuf.length > 100 && cpIndex.length === 0) { + messageBuf = messageBuf.slice(0, 50); } // if it's not metadata or a checkpoint then it should be a regular message // store it in the buffer @@ -163,6 +166,7 @@ const computeIndex = function (data, cb) { } // once indexing is complete you should have a buffer of messages since the latest checkpoint + // or the 50-100 latest messages if the channel is of a type without checkpoints. // map the 'hash' of each message to its byte offset in the log, to be used for reconnecting clients messageBuf.forEach((msgObj) => { const msg = HK.tryParse(Env, msgObj.buff.toString('utf8')); @@ -171,6 +175,7 @@ const computeIndex = function (data, cb) { // msgObj.offset is API guaranteed by our storage module // it should always be a valid positive integer offsetByHash[HK.getHash(msg[4])] = msgObj.offset; + offsetCount++; } // There is a trailing \n at the end of the file size = msgObj.offset + msgObj.buff.length + 1; @@ -182,6 +187,7 @@ const computeIndex = function (data, cb) { // Only keep the checkpoints included in the last 100 messages cpIndex: HK.sliceCpIndex(cpIndex, i), offsetByHash: offsetByHash, + offsets: offsetCount, size: size, //metadata: metadata, line: i