diff --git a/lib/historyKeeper.js b/lib/historyKeeper.js index cfdb14717..30b311eb7 100644 --- a/lib/historyKeeper.js +++ b/lib/historyKeeper.js @@ -18,11 +18,11 @@ module.exports.create = function (Env, cb) { id: Env.id, - channelMessage: function (Server, channel, msgStruct) { + channelMessage: function (Server, channel, msgStruct, cb) { // netflux-server emits 'channelMessage' events whenever someone broadcasts to a channel // historyKeeper stores these messages if the channel id indicates that they are // a channel type with permanent history - HK.onChannelMessage(Env, Server, channel, msgStruct); + HK.onChannelMessage(Env, Server, channel, msgStruct, cb); }, channelClose: function (channelName) { // netflux-server emits 'channelClose' events whenever everyone leaves a channel diff --git a/lib/hk-util.js b/lib/hk-util.js index 1b1692afc..f5f6fa2cb 100644 --- a/lib/hk-util.js +++ b/lib/hk-util.js @@ -300,9 +300,10 @@ var trimMapByOffset = function (map, offset) { * the fix is to use callbacks and implement queueing for writes * to guarantee that offset computation is always atomic with writes */ -const storeMessage = function (Env, channel, msg, isCp, optionalMessageHash) { +const storeMessage = function (Env, channel, msg, isCp, optionalMessageHash, cb) { const id = channel.id; const Log = Env.Log; + if (typeof(cb) !== "function") { cb = function () {}; } Env.queueStorage(id, function (next) { const msgBin = Buffer.from(msg + '\n', 'utf8'); @@ -321,6 +322,7 @@ const storeMessage = function (Env, channel, msg, isCp, optionalMessageHash) { // TODO make it possible to respond to clients with errors so they know // their message wasn't stored + cb(err); return void next(); } })); @@ -364,6 +366,10 @@ const storeMessage = function (Env, channel, msg, isCp, optionalMessageHash) { index.offsets = checkOffsetMap(index.offsetByHash); } } + + // call back with the offset of the message we just stored + cb(void 0, index.size); + index.size += msgBin.length; // handle the next element in the queue @@ -846,7 +852,9 @@ HK.onDirectMessage = function (Env, Server, seq, userId, json) { * adds timestamps to incoming messages * writes messages to the store */ -HK.onChannelMessage = function (Env, Server, channel, msgStruct) { +HK.onChannelMessage = function (Env, Server, channel, msgStruct, cb) { + if (typeof(cb) !== "function") { cb = function () {}; } + //console.log(+new Date(), "onChannelMessage"); const Log = Env.Log; @@ -942,7 +950,7 @@ HK.onChannelMessage = function (Env, Server, channel, msgStruct) { // storeMessage //console.log(+new Date(), "Storing message"); - storeMessage(Env, channel, JSON.stringify(msgStruct), isCp, getHash(msgStruct[4], Log)); + storeMessage(Env, channel, JSON.stringify(msgStruct), isCp, getHash(msgStruct[4], Log), cb); //console.log(+new Date(), "Message stored"); }); };