many more comments. cache metadata even in the event of a metadata stream error

pull/1/head
ansuz 5 years ago
parent bf058d8042
commit abe654f565

@ -105,7 +105,7 @@ module.exports.create = function (cfg) {
const computeIndex = function (channelName, cb) { const computeIndex = function (channelName, cb) {
const cpIndex = []; const cpIndex = [];
let messageBuf = []; let messageBuf = [];
let metadata; // FIXME METADATA READ let metadata;
let i = 0; let i = 0;
const ref = {}; const ref = {};
@ -124,19 +124,13 @@ module.exports.create = function (cfg) {
// keep an eye out for the metadata line if you haven't already seen it // keep an eye out for the metadata line if you haven't already seen it
// but only check for metadata on the first line // but only check for metadata on the first line
if (!i && !metadata && msgObj.buff.indexOf('{') === 0) { if (!i && !metadata && msgObj.buff.indexOf('{') === 0) {
i++; // always increment i i++; // always increment the message counter
msg = tryParse(msgObj.buff.toString('utf8')); // FIXME METADATA READ msg = tryParse(msgObj.buff.toString('utf8'));
if (typeof msg === "undefined") { return rmcb(); } if (typeof msg === "undefined") { return rmcb(); }
// XXX metadata should be truthey, an object, and not an array... // validate that the current line really is metadata before storing it as such
if (msg && typeof(msg) === 'object' && !Array.isArray(msg)) { if (msg && typeof(msg) === 'object' && !Array.isArray(msg)) {
metadata = msg; metadata = msg;
// metadata can contain:
// validateKey, owners, expiration...
//if (msg.validateKey || msg.owners || msg.expire) {
//metadata_cache[channelName] = msg;
//}
return rmcb(); return rmcb();
} }
} }
@ -144,14 +138,19 @@ module.exports.create = function (cfg) {
if (msgObj.buff.indexOf('cp|') > -1) { if (msgObj.buff.indexOf('cp|') > -1) {
msg = msg || tryParse(msgObj.buff.toString('utf8')); msg = msg || tryParse(msgObj.buff.toString('utf8'));
if (typeof msg === "undefined") { return rmcb(); } if (typeof msg === "undefined") { return rmcb(); }
// cache the offsets of checkpoints if they can be parsed
if (msg[2] === 'MSG' && msg[4].indexOf('cp|') === 0) { if (msg[2] === 'MSG' && msg[4].indexOf('cp|') === 0) {
cpIndex.push({ cpIndex.push({
offset: msgObj.offset, offset: msgObj.offset,
line: i line: i
}); });
// we only want to store messages since the latest checkpoint
// so clear the buffer every time you see a new one
messageBuf = []; messageBuf = [];
} }
} }
// if it's not metadata or a checkpoint then it should be a regular message
// store it in the buffer
messageBuf.push(msgObj); messageBuf.push(msgObj);
return rmcb(); return rmcb();
}, w((err) => { }, w((err) => {
@ -159,6 +158,9 @@ module.exports.create = function (cfg) {
w.abort(); w.abort();
return void CB(err); return void CB(err);
} }
// once indexing is complete you should have a buffer of messages since the latest checkpoint
// map the 'hash' of each message to its byte offset in the log, to be used for reconnecting clients
messageBuf.forEach((msgObj) => { messageBuf.forEach((msgObj) => {
const msg = tryParse(msgObj.buff.toString('utf8')); const msg = tryParse(msgObj.buff.toString('utf8'));
if (typeof msg === "undefined") { return; } if (typeof msg === "undefined") { return; }
@ -172,28 +174,29 @@ module.exports.create = function (cfg) {
}); });
})); }));
}).nThen(function (w) { }).nThen(function (w) {
// get amended metadata // create a function which will iterate over amendments to the metadata
const handler = Meta.createLineHandler(ref, Log.error); const handler = Meta.createLineHandler(ref, Log.error);
if (metadata) { // initialize the accumulator in case there was a foundational metadata line in the log content
handler(void 0, metadata); if (metadata) { handler(void 0, metadata); }
}
// iterate over the dedicated metadata log (if it exists)
// proceed even in the event of a stream error on the metadata log
store.readDedicatedMetadata(channelName, handler, w(function (err) { store.readDedicatedMetadata(channelName, handler, w(function (err) {
if (err) { if (err) {
return void Log.error("DEDICATED_METADATA_ERROR", err); return void Log.error("DEDICATED_METADATA_ERROR", err);
} }
metadata = metadata_cache[channelName] = ref.meta;
})); }));
}).nThen(function () { }).nThen(function () {
// FIXME METADATA READ // when all is done, cache the metadata in memory
metadata = metadata_cache[channelName] = ref.meta;
// and return the computed index
CB(null, { CB(null, {
// Only keep the checkpoints included in the last 100 messages // Only keep the checkpoints included in the last 100 messages
cpIndex: sliceCpIndex(cpIndex, i), cpIndex: sliceCpIndex(cpIndex, i),
offsetByHash: offsetByHash, offsetByHash: offsetByHash,
size: size, size: size,
metadata: metadata, // FIXME METADATA STORE metadata: metadata,
line: i line: i
}); });
}); });

Loading…
Cancel
Save