|
|
@ -1,5 +1,5 @@
|
|
|
|
/* jshint esversion: 6 */
|
|
|
|
/* jshint esversion: 6 */
|
|
|
|
/* global process */
|
|
|
|
/* globals process, Buffer */
|
|
|
|
|
|
|
|
|
|
|
|
const HK = require("../hk-util");
|
|
|
|
const HK = require("../hk-util");
|
|
|
|
const Store = require("../storage/file");
|
|
|
|
const Store = require("../storage/file");
|
|
|
@ -114,14 +114,15 @@ const init = function (config, _cb) {
|
|
|
|
* including the initial metadata line, if it exists
|
|
|
|
* including the initial metadata line, if it exists
|
|
|
|
|
|
|
|
|
|
|
|
*/
|
|
|
|
*/
|
|
|
|
const computeIndex = function (data, cb) {
|
|
|
|
|
|
|
|
if (!data || !data.channel) {
|
|
|
|
|
|
|
|
return void cb('E_NO_CHANNEL');
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
const channelName = data.channel;
|
|
|
|
const OPEN_CURLY_BRACE = Buffer.from('{');
|
|
|
|
|
|
|
|
const CHECKPOINT_PREFIX = Buffer.from('cp|');
|
|
|
|
|
|
|
|
const isValidOffsetNumber = function (n) {
|
|
|
|
|
|
|
|
return typeof(n) === 'number' && n >= 0;
|
|
|
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
|
|
const cpIndex = [];
|
|
|
|
const computeIndexFromOffset = function (channelName, offset, cb) {
|
|
|
|
|
|
|
|
let cpIndex = [];
|
|
|
|
let messageBuf = [];
|
|
|
|
let messageBuf = [];
|
|
|
|
let i = 0;
|
|
|
|
let i = 0;
|
|
|
|
|
|
|
|
|
|
|
@ -129,27 +130,42 @@ const computeIndex = function (data, cb) {
|
|
|
|
|
|
|
|
|
|
|
|
const offsetByHash = {};
|
|
|
|
const offsetByHash = {};
|
|
|
|
let offsetCount = 0;
|
|
|
|
let offsetCount = 0;
|
|
|
|
let size = 0;
|
|
|
|
let size = offset || 0;
|
|
|
|
|
|
|
|
var start = offset || 0;
|
|
|
|
|
|
|
|
let unconventional = false;
|
|
|
|
|
|
|
|
|
|
|
|
nThen(function (w) {
|
|
|
|
nThen(function (w) {
|
|
|
|
// iterate over all messages in the channel log
|
|
|
|
// iterate over all messages in the channel log
|
|
|
|
// old channels can contain metadata as the first message of the log
|
|
|
|
// old channels can contain metadata as the first message of the log
|
|
|
|
// skip over metadata as that is handled elsewhere
|
|
|
|
// skip over metadata as that is handled elsewhere
|
|
|
|
// otherwise index important messages in the log
|
|
|
|
// otherwise index important messages in the log
|
|
|
|
store.readMessagesBin(channelName, 0, (msgObj, readMore) => {
|
|
|
|
store.readMessagesBin(channelName, start, (msgObj, readMore, abort) => {
|
|
|
|
let msg;
|
|
|
|
let msg;
|
|
|
|
// keep an eye out for the metadata line if you haven't already seen it
|
|
|
|
// keep an eye out for the metadata line if you haven't already seen it
|
|
|
|
// but only check for metadata on the first line
|
|
|
|
// but only check for metadata on the first line
|
|
|
|
if (!i && msgObj.buff.indexOf('{') === 0) {
|
|
|
|
if (i) {
|
|
|
|
i++; // always increment the message counter
|
|
|
|
// fall through intentionally because the following blocks are invalid
|
|
|
|
|
|
|
|
// for all but the first message
|
|
|
|
|
|
|
|
} else if (msgObj.buff.includes(OPEN_CURLY_BRACE)) {
|
|
|
|
msg = HK.tryParse(Env, msgObj.buff.toString('utf8'));
|
|
|
|
msg = HK.tryParse(Env, msgObj.buff.toString('utf8'));
|
|
|
|
if (typeof msg === "undefined") { return readMore(); }
|
|
|
|
if (typeof msg === "undefined") {
|
|
|
|
|
|
|
|
i++; // always increment the message counter
|
|
|
|
|
|
|
|
return readMore();
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// validate that the current line really is metadata before storing it as such
|
|
|
|
// validate that the current line really is metadata before storing it as such
|
|
|
|
// skip this, as you already have metadata...
|
|
|
|
// skip this, as you already have metadata...
|
|
|
|
if (HK.isMetadataMessage(msg)) { return readMore(); }
|
|
|
|
if (HK.isMetadataMessage(msg)) {
|
|
|
|
|
|
|
|
i++; // always increment the message counter
|
|
|
|
|
|
|
|
return readMore();
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
} else if (!(msg = HK.tryParse(Env, msgObj.buff.toString('utf8')))) {
|
|
|
|
|
|
|
|
w.abort();
|
|
|
|
|
|
|
|
abort();
|
|
|
|
|
|
|
|
return CB("OFFSET_ERROR");
|
|
|
|
}
|
|
|
|
}
|
|
|
|
i++;
|
|
|
|
i++;
|
|
|
|
if (msgObj.buff.indexOf('cp|') > -1) {
|
|
|
|
if (msgObj.buff.includes(CHECKPOINT_PREFIX)) {
|
|
|
|
msg = msg || HK.tryParse(Env, msgObj.buff.toString('utf8'));
|
|
|
|
msg = msg || HK.tryParse(Env, msgObj.buff.toString('utf8'));
|
|
|
|
if (typeof msg === "undefined") { return readMore(); }
|
|
|
|
if (typeof msg === "undefined") { return readMore(); }
|
|
|
|
// cache the offsets of checkpoints if they can be parsed
|
|
|
|
// cache the offsets of checkpoints if they can be parsed
|
|
|
@ -164,6 +180,7 @@ const computeIndex = function (data, cb) {
|
|
|
|
}
|
|
|
|
}
|
|
|
|
} else if (messageBuf.length > 100 && cpIndex.length === 0) {
|
|
|
|
} else if (messageBuf.length > 100 && cpIndex.length === 0) {
|
|
|
|
// take the last 50 messages
|
|
|
|
// take the last 50 messages
|
|
|
|
|
|
|
|
unconventional = true;
|
|
|
|
messageBuf = messageBuf.slice(-50);
|
|
|
|
messageBuf = messageBuf.slice(-50);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
// if it's not metadata or a checkpoint then it should be a regular message
|
|
|
|
// if it's not metadata or a checkpoint then it should be a regular message
|
|
|
@ -192,11 +209,38 @@ const computeIndex = function (data, cb) {
|
|
|
|
size = msgObj.offset + msgObj.buff.length + 1;
|
|
|
|
size = msgObj.offset + msgObj.buff.length + 1;
|
|
|
|
});
|
|
|
|
});
|
|
|
|
}));
|
|
|
|
}));
|
|
|
|
|
|
|
|
}).nThen(function (w) {
|
|
|
|
|
|
|
|
cpIndex = HK.sliceCpIndex(cpIndex, i);
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
var new_start;
|
|
|
|
|
|
|
|
if (cpIndex.length) {
|
|
|
|
|
|
|
|
new_start = cpIndex[0].offset;
|
|
|
|
|
|
|
|
} else if (unconventional && messageBuf.length && isValidOffsetNumber(messageBuf[0].offset)) {
|
|
|
|
|
|
|
|
new_start = messageBuf[0].offset;
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if (new_start === start) { return; }
|
|
|
|
|
|
|
|
if (!isValidOffsetNumber(new_start)) { return; }
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// store the offset of the earliest relevant line so that you can start from there next time...
|
|
|
|
|
|
|
|
store.writeOffset(channelName, {
|
|
|
|
|
|
|
|
start: new_start,
|
|
|
|
|
|
|
|
created: +new Date(),
|
|
|
|
|
|
|
|
}, w(function () {
|
|
|
|
|
|
|
|
var diff = new_start - start;
|
|
|
|
|
|
|
|
Env.Log.info('WORKER_OFFSET_UPDATE', {
|
|
|
|
|
|
|
|
channel: channelName,
|
|
|
|
|
|
|
|
old_start: start,
|
|
|
|
|
|
|
|
new_start: new_start,
|
|
|
|
|
|
|
|
diff: diff,
|
|
|
|
|
|
|
|
diffMB: diff / 1024 / 1024,
|
|
|
|
|
|
|
|
});
|
|
|
|
|
|
|
|
}));
|
|
|
|
}).nThen(function () {
|
|
|
|
}).nThen(function () {
|
|
|
|
// return the computed index
|
|
|
|
// return the computed index
|
|
|
|
CB(null, {
|
|
|
|
CB(null, {
|
|
|
|
// Only keep the checkpoints included in the last 100 messages
|
|
|
|
// Only keep the checkpoints included in the last 100 messages
|
|
|
|
cpIndex: HK.sliceCpIndex(cpIndex, i),
|
|
|
|
cpIndex: cpIndex,
|
|
|
|
offsetByHash: offsetByHash,
|
|
|
|
offsetByHash: offsetByHash,
|
|
|
|
offsets: offsetCount,
|
|
|
|
offsets: offsetCount,
|
|
|
|
size: size,
|
|
|
|
size: size,
|
|
|
@ -206,6 +250,47 @@ const computeIndex = function (data, cb) {
|
|
|
|
});
|
|
|
|
});
|
|
|
|
};
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
const computeIndex = function (data, cb) {
|
|
|
|
|
|
|
|
if (!data || !data.channel) {
|
|
|
|
|
|
|
|
return void cb('E_NO_CHANNEL');
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
const channelName = data.channel;
|
|
|
|
|
|
|
|
const CB = Util.once(cb);
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
var start = 0;
|
|
|
|
|
|
|
|
nThen(function (w) {
|
|
|
|
|
|
|
|
store.getOffset(channelName, w(function (err, obj) {
|
|
|
|
|
|
|
|
if (err) { return; }
|
|
|
|
|
|
|
|
if (obj && typeof(obj.start) === 'number' && obj.start > 0) {
|
|
|
|
|
|
|
|
start = obj.start;
|
|
|
|
|
|
|
|
Env.Log.verbose('WORKER_OFFSET_RECOVERY', {
|
|
|
|
|
|
|
|
channel: channelName,
|
|
|
|
|
|
|
|
start: start,
|
|
|
|
|
|
|
|
startMB: start / 1024 / 1024,
|
|
|
|
|
|
|
|
});
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
}));
|
|
|
|
|
|
|
|
}).nThen(function (w) {
|
|
|
|
|
|
|
|
computeIndexFromOffset(channelName, start, w(function (err, index) {
|
|
|
|
|
|
|
|
if (err === 'OFFSET_ERROR') {
|
|
|
|
|
|
|
|
return Env.Log.error("WORKER_OFFSET_ERROR", {
|
|
|
|
|
|
|
|
channel: channelName,
|
|
|
|
|
|
|
|
});
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
w.abort();
|
|
|
|
|
|
|
|
CB(err, index);
|
|
|
|
|
|
|
|
}));
|
|
|
|
|
|
|
|
}).nThen(function (w) {
|
|
|
|
|
|
|
|
// if you're here there was an OFFSET_ERROR..
|
|
|
|
|
|
|
|
// first remove the offset that caused the problem to begin with
|
|
|
|
|
|
|
|
store.clearOffset(channelName, w());
|
|
|
|
|
|
|
|
}).nThen(function () {
|
|
|
|
|
|
|
|
// now get the history as though it were the first time
|
|
|
|
|
|
|
|
computeIndexFromOffset(channelName, 0, CB);
|
|
|
|
|
|
|
|
});
|
|
|
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
|
|
const computeMetadata = function (data, cb) {
|
|
|
|
const computeMetadata = function (data, cb) {
|
|
|
|
const ref = {};
|
|
|
|
const ref = {};
|
|
|
|
const lineHandler = Meta.createLineHandler(ref, Env.Log.error);
|
|
|
|
const lineHandler = Meta.createLineHandler(ref, Env.Log.error);
|
|
|
|