Merge branch 'offset-optimization' into soon

pull/1/head
ansuz 4 years ago
commit 10e5b7411c

@ -1,6 +1,6 @@
/*@flow*/
/* jshint esversion: 6 */
/* global Buffer */
/* globals Buffer */
var Fs = require("fs");
var Fse = require("fs-extra");
var Path = require("path");
@ -66,6 +66,10 @@ var mkTempPath = function (env, channelId) {
return mkPath(env, channelId) + '.temp';
};
var mkOffsetPath = function (env, channelId) {
return mkPath(env, channelId) + '.offset';
};
// pass in the path so we can reuse the same function for archived files
var channelExists = function (filepath, cb) {
Fs.stat(filepath, function (err, stat) {
@ -131,7 +135,9 @@ const readMessagesBin = (env, id, start, msgHandler, cb) => {
const collector = createIdleStreamCollector(stream);
const handleMessageAndKeepStreamAlive = Util.both(msgHandler, collector.keepAlive);
const done = Util.both(cb, collector);
return void readFileBin(stream, handleMessageAndKeepStreamAlive, done);
return void readFileBin(stream, handleMessageAndKeepStreamAlive, done, {
offset: start,
});
};
// reads classic metadata from a channel log and aborts
@ -190,6 +196,37 @@ var closeChannel = function (env, channelName, cb) {
}
};
var clearOffset = function (env, channelId, cb) {
var path = mkOffsetPath(env, channelId);
// we should always be able to recover from invalid offsets, so failure to delete them
// is not catastrophic. Anything calling this function can optionally ignore errors it might report
Fs.unlink(path, cb);
};
var writeOffset = function (env, channelId, data, cb) {
var path = mkOffsetPath(env, channelId);
var s_data;
try {
s_data = JSON.stringify(data);
} catch (err) {
return void cb(err);
}
Fs.writeFile(path, s_data, cb);
};
var getOffset = function (env, channelId, cb) {
var path = mkOffsetPath(env, channelId);
Fs.readFile(path, function (err, content) {
if (err) { return void cb(err); }
try {
var json = JSON.parse(content);
cb(void 0, json);
} catch (err2) {
cb(err2);
}
});
};
// truncates a file to the end of its metadata line
// TODO write the metadata in a dedicated file
var clearChannel = function (env, channelId, _cb) {
@ -213,6 +250,7 @@ var clearChannel = function (env, channelId, _cb) {
cb();
});
});
clearOffset(env, channelId, function () {});
});
};
@ -389,6 +427,7 @@ var removeChannel = function (env, channelName, cb) {
CB(labelError("E_METADATA_REMOVAL", err));
}
}));
clearOffset(env, channelName, w());
}).nThen(function () {
if (errors === 2) {
return void CB(labelError('E_REMOVE_CHANNEL', new Error("ENOENT")));
@ -604,6 +643,8 @@ var archiveChannel = function (env, channelName, cb) {
return void cb(err);
}
}));
}).nThen(function (w) {
clearOffset(env, channelName, w());
}).nThen(function (w) {
// archive the dedicated metadata channel
var metadataPath = mkMetadataPath(env, channelName);
@ -861,6 +902,7 @@ var trimChannel = function (env, channelName, hash, _cb) {
}
}));
}).nThen(function (w) {
clearOffset(env, channelName, w());
cleanUp(w(function (err) {
if (err) {
w.abort();
@ -1177,6 +1219,25 @@ module.exports.create = function (conf, _cb) {
});
},
// OFFSETS
// these exist strictly as an optimization
// you can always remove them without data loss
clearOffset: function (channelName, _cb) {
var cb = Util.once(Util.mkAsync(_cb));
if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); }
clearOffset(env, channelName, cb);
},
writeOffset: function (channelName, data, _cb) {
var cb = Util.once(Util.mkAsync(_cb));
if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); }
writeOffset(env, channelName, data, cb);
},
getOffset: function (channelName, _cb) {
var cb = Util.once(Util.mkAsync(_cb));
if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); }
getOffset(env, channelName, cb);
},
// METADATA METHODS
// fetch the metadata for a channel
getChannelMetadata: function (channelName, cb) {

@ -44,8 +44,8 @@ const mkBufferSplit = () => {
// return a streaming function which transforms buffers into objects
// containing the buffer and the offset from the start of the stream
const mkOffsetCounter = () => {
let offset = 0;
const mkOffsetCounter = (offset) => {
offset = offset || 0;
return Pull.map((buff) => {
const out = { offset: offset, buff: buff };
// +1 for the eaten newline
@ -59,13 +59,14 @@ const mkOffsetCounter = () => {
// that this function has a lower memory profile than our classic method
// of reading logs line by line.
// it also allows the handler to abort reading at any time
Stream.readFileBin = (stream, msgHandler, cb) => {
Stream.readFileBin = (stream, msgHandler, cb, opt) => {
opt = opt || {};
//const stream = Fs.createReadStream(path, { start: start });
let keepReading = true;
Pull(
ToPull.read(stream),
mkBufferSplit(),
mkOffsetCounter(),
mkOffsetCounter(opt.offset),
Pull.asyncMap((data, moreCb) => {
msgHandler(data, moreCb, () => {
try {

@ -1,5 +1,5 @@
/* jshint esversion: 6 */
/* global process */
/* globals process, Buffer */
const HK = require("../hk-util");
const Store = require("../storage/file");
@ -114,14 +114,15 @@ const init = function (config, _cb) {
* including the initial metadata line, if it exists
*/
const computeIndex = function (data, cb) {
if (!data || !data.channel) {
return void cb('E_NO_CHANNEL');
}
const channelName = data.channel;
const OPEN_CURLY_BRACE = Buffer.from('{');
const CHECKPOINT_PREFIX = Buffer.from('cp|');
const isValidOffsetNumber = function (n) {
return typeof(n) === 'number' && n >= 0;
};
const cpIndex = [];
const computeIndexFromOffset = function (channelName, offset, cb) {
let cpIndex = [];
let messageBuf = [];
let i = 0;
@ -129,27 +130,42 @@ const computeIndex = function (data, cb) {
const offsetByHash = {};
let offsetCount = 0;
let size = 0;
let size = offset || 0;
var start = offset || 0;
let unconventional = false;
nThen(function (w) {
// iterate over all messages in the channel log
// old channels can contain metadata as the first message of the log
// skip over metadata as that is handled elsewhere
// otherwise index important messages in the log
store.readMessagesBin(channelName, 0, (msgObj, readMore) => {
store.readMessagesBin(channelName, start, (msgObj, readMore, abort) => {
let msg;
// keep an eye out for the metadata line if you haven't already seen it
// but only check for metadata on the first line
if (!i && msgObj.buff.indexOf('{') === 0) {
i++; // always increment the message counter
if (i) {
// fall through intentionally because the following blocks are invalid
// for all but the first message
} else if (msgObj.buff.includes(OPEN_CURLY_BRACE)) {
msg = HK.tryParse(Env, msgObj.buff.toString('utf8'));
if (typeof msg === "undefined") { return readMore(); }
if (typeof msg === "undefined") {
i++; // always increment the message counter
return readMore();
}
// validate that the current line really is metadata before storing it as such
// skip this, as you already have metadata...
if (HK.isMetadataMessage(msg)) { return readMore(); }
if (HK.isMetadataMessage(msg)) {
i++; // always increment the message counter
return readMore();
}
} else if (!(msg = HK.tryParse(Env, msgObj.buff.toString('utf8')))) {
w.abort();
abort();
return CB("OFFSET_ERROR");
}
i++;
if (msgObj.buff.indexOf('cp|') > -1) {
if (msgObj.buff.includes(CHECKPOINT_PREFIX)) {
msg = msg || HK.tryParse(Env, msgObj.buff.toString('utf8'));
if (typeof msg === "undefined") { return readMore(); }
// cache the offsets of checkpoints if they can be parsed
@ -164,6 +180,7 @@ const computeIndex = function (data, cb) {
}
} else if (messageBuf.length > 100 && cpIndex.length === 0) {
// take the last 50 messages
unconventional = true;
messageBuf = messageBuf.slice(-50);
}
// if it's not metadata or a checkpoint then it should be a regular message
@ -192,11 +209,38 @@ const computeIndex = function (data, cb) {
size = msgObj.offset + msgObj.buff.length + 1;
});
}));
}).nThen(function (w) {
cpIndex = HK.sliceCpIndex(cpIndex, i);
var new_start;
if (cpIndex.length) {
new_start = cpIndex[0].offset;
} else if (unconventional && messageBuf.length && isValidOffsetNumber(messageBuf[0].offset)) {
new_start = messageBuf[0].offset;
}
if (new_start === start) { return; }
if (!isValidOffsetNumber(new_start)) { return; }
// store the offset of the earliest relevant line so that you can start from there next time...
store.writeOffset(channelName, {
start: new_start,
created: +new Date(),
}, w(function () {
var diff = new_start - start;
Env.Log.info('WORKER_OFFSET_UPDATE', {
channel: channelName,
old_start: start,
new_start: new_start,
diff: diff,
diffMB: diff / 1024 / 1024,
});
}));
}).nThen(function () {
// return the computed index
CB(null, {
// Only keep the checkpoints included in the last 100 messages
cpIndex: HK.sliceCpIndex(cpIndex, i),
cpIndex: cpIndex,
offsetByHash: offsetByHash,
offsets: offsetCount,
size: size,
@ -206,6 +250,47 @@ const computeIndex = function (data, cb) {
});
};
const computeIndex = function (data, cb) {
if (!data || !data.channel) {
return void cb('E_NO_CHANNEL');
}
const channelName = data.channel;
const CB = Util.once(cb);
var start = 0;
nThen(function (w) {
store.getOffset(channelName, w(function (err, obj) {
if (err) { return; }
if (obj && typeof(obj.start) === 'number' && obj.start > 0) {
start = obj.start;
Env.Log.verbose('WORKER_OFFSET_RECOVERY', {
channel: channelName,
start: start,
startMB: start / 1024 / 1024,
});
}
}));
}).nThen(function (w) {
computeIndexFromOffset(channelName, start, w(function (err, index) {
if (err === 'OFFSET_ERROR') {
return Env.Log.error("WORKER_OFFSET_ERROR", {
channel: channelName,
});
}
w.abort();
CB(err, index);
}));
}).nThen(function (w) {
// if you're here there was an OFFSET_ERROR..
// first remove the offset that caused the problem to begin with
store.clearOffset(channelName, w());
}).nThen(function () {
// now get the history as though it were the first time
computeIndexFromOffset(channelName, 0, CB);
});
};
const computeMetadata = function (data, cb) {
const ref = {};
const lineHandler = Meta.createLineHandler(ref, Env.Log.error);

Loading…
Cancel
Save