always store messages according to the order in which they were received

pull/1/head
ansuz 5 years ago
parent a34abd748e
commit 689886b9bf

@ -105,7 +105,6 @@ module.exports.create = function (cfg) {
* offsetByHash: * offsetByHash:
* a map containing message offsets by their hash * a map containing message offsets by their hash
* this is for every message in history, so it could be very large... * this is for every message in history, so it could be very large...
* FIXME OFFSET
* except we remove offsets from the map if they occur before the oldest relevant checkpoint * except we remove offsets from the map if they occur before the oldest relevant checkpoint
* size: in bytes * size: in bytes
* metadata: * metadata:
@ -303,32 +302,53 @@ module.exports.create = function (cfg) {
* the fix is to use callbacks and implement queueing for writes * the fix is to use callbacks and implement queueing for writes
* to guarantee that offset computation is always atomic with writes * to guarantee that offset computation is always atomic with writes
*/ */
const storeMessage = function (ctx, channel, msg, isCp, optionalMessageHash) { const storageQueues = {};
// TODO implement a queue so that we know messages are written in order
const storeQueuedMessage = function (ctx, queue, id) {
if (queue.length === 0) {
delete storageQueues[id];
return;
}
const first = queue.shift();
const msgBin = first.msg;
const optionalMessageHash = first.hash;
const isCp = first.isCp;
const msgBin = new Buffer(msg + '\n', 'utf8');
// Store the message first, and update the index only once it's stored. // Store the message first, and update the index only once it's stored.
// store.messageBin can be async so updating the index first may // store.messageBin can be async so updating the index first may
// result in a wrong cpIndex // result in a wrong cpIndex
nThen((waitFor) => { nThen((waitFor) => {
store.messageBin(channel.id, msgBin, waitFor(function (err) { store.messageBin(id, msgBin, waitFor(function (err) {
if (err) { if (err) {
waitFor.abort(); waitFor.abort();
return void Log.error("HK_STORE_MESSAGE_ERROR", err.message); Log.error("HK_STORE_MESSAGE_ERROR", err.message);
// this error is critical, but there's not much we can do at the moment
// proceed with more messages, but they'll probably fail too
// at least you won't have a memory leak
// TODO make it possible to respond to clients with errors so they know
// their message wasn't stored
storeQueuedMessage(ctx, queue, id);
return;
} }
})); }));
}).nThen((waitFor) => { }).nThen((waitFor) => {
getIndex(ctx, channel.id, waitFor((err, index) => { getIndex(ctx, id, waitFor((err, index) => {
if (err) { if (err) {
Log.warn("HK_STORE_MESSAGE_INDEX", err.stack); Log.warn("HK_STORE_MESSAGE_INDEX", err.stack);
// non-critical, we'll be able to get the channel index later // non-critical, we'll be able to get the channel index later
// proceed to the next message in the queue
storeQueuedMessage(ctx, queue, id);
return; return;
} }
if (typeof (index.line) === "number") { index.line++; } if (typeof (index.line) === "number") { index.line++; }
if (isCp) { if (isCp) {
index.cpIndex = sliceCpIndex(index.cpIndex, index.line || 0); index.cpIndex = sliceCpIndex(index.cpIndex, index.line || 0);
for (let k in index.offsetByHash) { for (let k in index.offsetByHash) {
// FIXME OFFSET
if (index.offsetByHash[k] < index.cpIndex[0]) { if (index.offsetByHash[k] < index.cpIndex[0]) {
delete index.offsetByHash[k]; delete index.offsetByHash[k];
} }
@ -340,10 +360,32 @@ module.exports.create = function (cfg) {
} }
if (optionalMessageHash) { index.offsetByHash[optionalMessageHash] = index.size; } if (optionalMessageHash) { index.offsetByHash[optionalMessageHash] = index.size; }
index.size += msgBin.length; index.size += msgBin.length;
// handle the next element in the queue
storeQueuedMessage(ctx, queue, id);
})); }));
}); });
}; };
const storeMessage = function (ctx, channel, msg, isCp, optionalMessageHash) {
const id = channel.id;
const msgBin = new Buffer(msg + '\n', 'utf8');
if (Array.isArray(storageQueues[id])) {
return void storageQueues[id].push({
msg: msgBin,
hash: optionalMessageHash,
isCp: isCp,
});
}
const queue = storageQueues[id] = (storageQueues[id] || [{
msg: msgBin,
hash: optionalMessageHash,
}]);
storeQueuedMessage(ctx, queue, id);
};
var CHECKPOINT_PATTERN = /^cp\|(([A-Za-z0-9+\/=]+)\|)?/; var CHECKPOINT_PATTERN = /^cp\|(([A-Za-z0-9+\/=]+)\|)?/;
/* onChannelMessage /* onChannelMessage

Loading…
Cancel
Save