very WIP update to serve accumulated metadata updates

pull/1/head
ansuz 5 years ago
parent 6c907dbc32
commit 92dda92a00

@ -5,6 +5,8 @@
const nThen = require('nthen');
const Nacl = require('tweetnacl');
const Crypto = require('crypto');
const Once = require("./lib/once");
const Meta = require("./lib/metadata");
let Log;
const now = function () { return (new Date()).getTime(); };
@ -61,51 +63,78 @@ module.exports.create = function (cfg) {
const cpIndex = [];
let messageBuf = [];
let validateKey;
let metadata; // FIXME METADATA
let metadata; // FIXME METADATA READ
let i = 0;
store.readMessagesBin(channelName, 0, (msgObj, rmcb) => {
let msg;
i++;
if (!validateKey && msgObj.buff.indexOf('validateKey') > -1) {
metadata = msg = tryParse(msgObj.buff.toString('utf8')); // FIXME METADATA
if (typeof msg === "undefined") { return rmcb(); }
if (msg.validateKey) {
validateKey = historyKeeperKeys[channelName] = msg;
return rmcb();
const ref = {};
const CB = Once(cb);
const offsetByHash = {};
let size = 0;
nThen(function (w) {
store.readMessagesBin(channelName, 0, (msgObj, rmcb) => {
let msg;
i++;
if (!validateKey && msgObj.buff.indexOf('validateKey') > -1) {
metadata = msg = tryParse(msgObj.buff.toString('utf8')); // FIXME METADATA READ
if (typeof msg === "undefined") { return rmcb(); }
if (msg.validateKey) {
validateKey = historyKeeperKeys[channelName] = msg;
return rmcb();
}
}
}
if (msgObj.buff.indexOf('cp|') > -1) {
msg = msg || tryParse(msgObj.buff.toString('utf8'));
if (typeof msg === "undefined") { return rmcb(); }
if (msg[2] === 'MSG' && msg[4].indexOf('cp|') === 0) {
cpIndex.push({
offset: msgObj.offset,
line: i
});
messageBuf = [];
if (msgObj.buff.indexOf('cp|') > -1) {
msg = msg || tryParse(msgObj.buff.toString('utf8'));
if (typeof msg === "undefined") { return rmcb(); }
if (msg[2] === 'MSG' && msg[4].indexOf('cp|') === 0) {
cpIndex.push({
offset: msgObj.offset,
line: i
});
messageBuf = [];
}
}
messageBuf.push(msgObj);
return rmcb();
}, w((err) => {
if (err && err.code !== 'ENOENT') {
w.abort();
return void CB(err);
}
messageBuf.forEach((msgObj) => {
const msg = tryParse(msgObj.buff.toString('utf8'));
if (typeof msg === "undefined") { return; }
if (msg[0] === 0 && msg[2] === 'MSG' && typeof(msg[4]) === 'string') {
offsetByHash[getHash(msg[4])] = msgObj.offset;
}
// There is a trailing \n at the end of the file
size = msgObj.offset + msgObj.buff.length + 1;
});
}));
}).nThen(function (w) {
// get amended metadata
const handler = Meta.createLineHandler(ref, Log.error);
if (metadata) {
handler(void 0, metadata);
}
messageBuf.push(msgObj);
return rmcb();
}, (err) => {
if (err && err.code !== 'ENOENT') { return void cb(err); }
const offsetByHash = {};
let size = 0;
messageBuf.forEach((msgObj) => {
const msg = tryParse(msgObj.buff.toString('utf8'));
if (typeof msg === "undefined") { return; }
if (msg[0] === 0 && msg[2] === 'MSG' && typeof(msg[4]) === 'string') {
offsetByHash[getHash(msg[4])] = msgObj.offset;
store.readDedicatedMetadata(channelName, handler, w(function (err) {
if (err) {
return void Log.error("DEDICATED_METADATA_ERROR", err);
}
// There is a trailing \n at the end of the file
size = msgObj.offset + msgObj.buff.length + 1;
});
cb(null, {
metadata = ref.meta;
}));
}).nThen(function () {
// FIXME METADATA READ
CB(null, {
// Only keep the checkpoints included in the last 100 messages
cpIndex: sliceCpIndex(cpIndex, i),
offsetByHash: offsetByHash,
size: size,
metadata: metadata, // FIXME METADATA
metadata: metadata, // FIXME METADATA STORE
line: i
});
});

Loading…
Cancel
Save