|
|
@ -300,6 +300,7 @@ var trimMapByOffset = function (map, offset) {
|
|
|
|
* the fix is to use callbacks and implement queueing for writes
|
|
|
|
* the fix is to use callbacks and implement queueing for writes
|
|
|
|
* to guarantee that offset computation is always atomic with writes
|
|
|
|
* to guarantee that offset computation is always atomic with writes
|
|
|
|
*/
|
|
|
|
*/
|
|
|
|
|
|
|
|
// FIXME 'optionalMessageHash' is always supplied, so we could consider renaming it for clarity
|
|
|
|
const storeMessage = function (Env, channel, msg, isCp, optionalMessageHash, cb) {
|
|
|
|
const storeMessage = function (Env, channel, msg, isCp, optionalMessageHash, cb) {
|
|
|
|
const id = channel.id;
|
|
|
|
const id = channel.id;
|
|
|
|
const Log = Env.Log;
|
|
|
|
const Log = Env.Log;
|
|
|
@ -338,6 +339,12 @@ const storeMessage = function (Env, channel, msg, isCp, optionalMessageHash, cb)
|
|
|
|
cb();
|
|
|
|
cb();
|
|
|
|
return void next();
|
|
|
|
return void next();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if (optionalMessageHash && typeof(index.offsetByHash[optionalMessageHash]) === 'number') {
|
|
|
|
|
|
|
|
cb();
|
|
|
|
|
|
|
|
return void next();
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
if (typeof (index.line) === "number") { index.line++; }
|
|
|
|
if (typeof (index.line) === "number") { index.line++; }
|
|
|
|
if (isCp) {
|
|
|
|
if (isCp) {
|
|
|
|
index.cpIndex = sliceCpIndex(index.cpIndex, index.line || 0);
|
|
|
|
index.cpIndex = sliceCpIndex(index.cpIndex, index.line || 0);
|
|
|
@ -352,7 +359,7 @@ const storeMessage = function (Env, channel, msg, isCp, optionalMessageHash, cb)
|
|
|
|
can actually cause it to become incorrect, leading to incorrect behaviour when clients connect
|
|
|
|
can actually cause it to become incorrect, leading to incorrect behaviour when clients connect
|
|
|
|
with a lastKnownHash. We avoid this by only assigning new offsets to the map.
|
|
|
|
with a lastKnownHash. We avoid this by only assigning new offsets to the map.
|
|
|
|
*/
|
|
|
|
*/
|
|
|
|
if (optionalMessageHash && typeof(index.offsetByHash[optionalMessageHash]) === 'undefined') {
|
|
|
|
if (optionalMessageHash /* && typeof(index.offsetByHash[optionalMessageHash]) === 'undefined' */) {
|
|
|
|
index.offsetByHash[optionalMessageHash] = index.size;
|
|
|
|
index.offsetByHash[optionalMessageHash] = index.size;
|
|
|
|
index.offsets++;
|
|
|
|
index.offsets++;
|
|
|
|
}
|
|
|
|
}
|
|
|
|