Support a callback on channelMessage

pull/1/head
yflory 4 years ago
parent 6c394b37c8
commit fc514fb61d

@ -18,11 +18,11 @@ module.exports.create = function (Env, cb) {
id: Env.id, id: Env.id,
channelMessage: function (Server, channel, msgStruct) { channelMessage: function (Server, channel, msgStruct, cb) {
// netflux-server emits 'channelMessage' events whenever someone broadcasts to a channel // netflux-server emits 'channelMessage' events whenever someone broadcasts to a channel
// historyKeeper stores these messages if the channel id indicates that they are // historyKeeper stores these messages if the channel id indicates that they are
// a channel type with permanent history // a channel type with permanent history
HK.onChannelMessage(Env, Server, channel, msgStruct); HK.onChannelMessage(Env, Server, channel, msgStruct, cb);
}, },
channelClose: function (channelName) { channelClose: function (channelName) {
// netflux-server emits 'channelClose' events whenever everyone leaves a channel // netflux-server emits 'channelClose' events whenever everyone leaves a channel

@ -300,9 +300,10 @@ var trimMapByOffset = function (map, offset) {
* the fix is to use callbacks and implement queueing for writes * the fix is to use callbacks and implement queueing for writes
* to guarantee that offset computation is always atomic with writes * to guarantee that offset computation is always atomic with writes
*/ */
const storeMessage = function (Env, channel, msg, isCp, optionalMessageHash) { const storeMessage = function (Env, channel, msg, isCp, optionalMessageHash, cb) {
const id = channel.id; const id = channel.id;
const Log = Env.Log; const Log = Env.Log;
if (typeof(cb) !== "function") { cb = function () {}; }
Env.queueStorage(id, function (next) { Env.queueStorage(id, function (next) {
const msgBin = Buffer.from(msg + '\n', 'utf8'); const msgBin = Buffer.from(msg + '\n', 'utf8');
@ -321,6 +322,7 @@ const storeMessage = function (Env, channel, msg, isCp, optionalMessageHash) {
// TODO make it possible to respond to clients with errors so they know // TODO make it possible to respond to clients with errors so they know
// their message wasn't stored // their message wasn't stored
cb(err);
return void next(); return void next();
} }
})); }));
@ -364,6 +366,10 @@ const storeMessage = function (Env, channel, msg, isCp, optionalMessageHash) {
index.offsets = checkOffsetMap(index.offsetByHash); index.offsets = checkOffsetMap(index.offsetByHash);
} }
} }
// call back with the offset of the message we just stored
cb(void 0, index.size);
index.size += msgBin.length; index.size += msgBin.length;
// handle the next element in the queue // handle the next element in the queue
@ -846,7 +852,9 @@ HK.onDirectMessage = function (Env, Server, seq, userId, json) {
* adds timestamps to incoming messages * adds timestamps to incoming messages
* writes messages to the store * writes messages to the store
*/ */
HK.onChannelMessage = function (Env, Server, channel, msgStruct) { HK.onChannelMessage = function (Env, Server, channel, msgStruct, cb) {
if (typeof(cb) !== "function") { cb = function () {}; }
//console.log(+new Date(), "onChannelMessage"); //console.log(+new Date(), "onChannelMessage");
const Log = Env.Log; const Log = Env.Log;
@ -942,7 +950,7 @@ HK.onChannelMessage = function (Env, Server, channel, msgStruct) {
// storeMessage // storeMessage
//console.log(+new Date(), "Storing message"); //console.log(+new Date(), "Storing message");
storeMessage(Env, channel, JSON.stringify(msgStruct), isCp, getHash(msgStruct[4], Log)); storeMessage(Env, channel, JSON.stringify(msgStruct), isCp, getHash(msgStruct[4], Log), cb);
//console.log(+new Date(), "Message stored"); //console.log(+new Date(), "Message stored");
}); });
}; };

Loading…
Cancel
Save