A few updates for editable metadata
* add and update many comments * correctly handle metadata for young channels and range requests * add a simple implementation of 'writeMetadata'pull/1/head
parent
f5c318f155
commit
1c89e8d6ac
|
@ -91,6 +91,8 @@ module.exports.create = function (cfg) {
|
|||
* offsetByHash:
|
||||
* a map containing message offsets by their hash
|
||||
* this is for every message in history, so it could be very large...
|
||||
* XXX OFFSET
|
||||
* except we remove offsets from the map if they occur before the oldest relevant checkpoint
|
||||
* size: in bytes
|
||||
* metadata:
|
||||
* validationKey
|
||||
|
@ -248,6 +250,7 @@ module.exports.create = function (cfg) {
|
|||
* the fix is to use callbacks and implement queueing for writes
|
||||
* to guarantee that offset computation is always atomic with writes
|
||||
|
||||
TODO rename maybeMsgHash to optionalMsgHash
|
||||
*/
|
||||
const storeMessage = function (ctx, channel, msg, isCp, maybeMsgHash) {
|
||||
const msgBin = new Buffer(msg + '\n', 'utf8');
|
||||
|
@ -272,6 +275,7 @@ module.exports.create = function (cfg) {
|
|||
if (isCp) {
|
||||
index.cpIndex = sliceCpIndex(index.cpIndex, index.line || 0);
|
||||
for (let k in index.offsetByHash) {
|
||||
// XXX OFFSET
|
||||
if (index.offsetByHash[k] < index.cpIndex[0]) {
|
||||
delete index.offsetByHash[k];
|
||||
}
|
||||
|
@ -325,6 +329,7 @@ module.exports.create = function (cfg) {
|
|||
let signedMsg = (isCp) ? msgStruct[4].replace(CHECKPOINT_PATTERN, '') : msgStruct[4];
|
||||
signedMsg = Nacl.util.decodeBase64(signedMsg);
|
||||
// FIXME PERFORMANCE: cache the decoded key instead of decoding it every time
|
||||
// CPU/Memory tradeoff
|
||||
const validateKey = Nacl.util.decodeBase64(metadata_cache[channel.id].validateKey);
|
||||
const validated = Nacl.sign.open(signedMsg, validateKey);
|
||||
if (!validated) {
|
||||
|
@ -337,6 +342,7 @@ module.exports.create = function (cfg) {
|
|||
// is a potential source of bugs if one editor has high latency and
|
||||
// pushes a duplicate of an earlier checkpoint than the latest which
|
||||
// has been pushed by editors with low latency
|
||||
// FIXME
|
||||
if (Array.isArray(id) && id[2]) {
|
||||
// Store new checkpoint hash
|
||||
channel.lastSavedCp = id[2];
|
||||
|
@ -360,14 +366,17 @@ module.exports.create = function (cfg) {
|
|||
returns a number representing the byte offset from the start of the log
|
||||
for whatever history you're seeking.
|
||||
|
||||
query by providing a 'lastKnownHash', which is really just a string of
|
||||
the first 64 characters of an encrypted message.
|
||||
query by providing a 'lastKnownHash',
|
||||
which is really just a string of the first 64 characters of an encrypted message.
|
||||
OR by -1 which indicates that we want the full history (byte offset 0)
|
||||
OR nothing, which indicates that you want whatever messages the historyKeeper deems relevant
|
||||
(typically the last few checkpoints)
|
||||
|
||||
this function embeds a lot of the history keeper's logic:
|
||||
|
||||
0. if you passed -1 as the lastKnownHash it means you want the complete history
|
||||
* I'm not sure why you'd need to call this function if you know it will return 0 in this case...
|
||||
* ansuz
|
||||
* it has a side-effect of filling the index cache if it's empty
|
||||
1. if you provided a lastKnownHash and that message does not exist in the history:
|
||||
* either the client has made a mistake or the history they knew about no longer exists
|
||||
* call back with EINVAL
|
||||
|
@ -378,7 +387,7 @@ module.exports.create = function (cfg) {
|
|||
* return the offset of the earliest checkpoint which 'sliceCpIndex' considers relevant
|
||||
3. if you did provide a lastKnownHash
|
||||
* read through the log until you find the hash that you're looking for
|
||||
* call back with either the bytee offset of the message that you found OR
|
||||
* call back with either the byte offset of the message that you found OR
|
||||
* -1 if you didn't find it
|
||||
|
||||
*/
|
||||
|
@ -390,9 +399,20 @@ module.exports.create = function (cfg) {
|
|||
getIndex(ctx, channelName, waitFor((err, index) => {
|
||||
if (err) { waitFor.abort(); return void cb(err); }
|
||||
|
||||
// check if the "hash" the client is requesting exists in the index
|
||||
const lkh = index.offsetByHash[lastKnownHash];
|
||||
// we evict old hashes from the index as new checkpoints are discovered.
|
||||
// if someone connects and asks for a hash that is no longer relevant,
|
||||
// we tell them it's an invalid request. This is because of the semantics of "GET_HISTORY"
|
||||
// which is only ever used when connecting or reconnecting in typical uses of history...
|
||||
// this assumption should hold for uses by chainpad, but perhaps not for other uses cases.
|
||||
// EXCEPT: other cases don't use checkpoints!
|
||||
// clients that are told that their request is invalid should just make another request
|
||||
// without specifying the hash, and just trust the server to give them the relevant data.
|
||||
// QUESTION: does this mean mailboxes are causing the server to store too much stuff in memory?
|
||||
if (lastKnownHash && typeof(lkh) !== "number") {
|
||||
waitFor.abort();
|
||||
// XXX this smells bad
|
||||
return void cb(new Error('EINVAL'));
|
||||
}
|
||||
|
||||
|
@ -420,6 +440,8 @@ module.exports.create = function (cfg) {
|
|||
// returning falls through to the next block and therefore returns -1
|
||||
if (offset !== -1) { return; }
|
||||
|
||||
// do a lookup from the index
|
||||
// XXX maybe we don't need this anymore?
|
||||
// otherwise we have a non-negative offset and we can start to read from there
|
||||
store.readMessagesBin(channelName, 0, (msgObj, readMore, abort) => {
|
||||
// tryParse return a parsed message or undefined
|
||||
|
@ -444,6 +466,9 @@ module.exports.create = function (cfg) {
|
|||
* streams through the rest of the messages, safely parsing them and returning the parsed content to the handler
|
||||
* calls back when it has reached the end of the log
|
||||
|
||||
Used by:
|
||||
* GET_HISTORY
|
||||
|
||||
*/
|
||||
const getHistoryAsync = (ctx, channelName, lastKnownHash, beforeHash, handler, cb) => {
|
||||
let offset = -1;
|
||||
|
@ -472,6 +497,9 @@ module.exports.create = function (cfg) {
|
|||
* stores all messages in history as they are read
|
||||
* can therefore be very expensive for memory
|
||||
* should probably be converted to a streaming interface
|
||||
|
||||
Used by:
|
||||
* GET_HISTORY_RANGE
|
||||
*/
|
||||
const getOlderHistory = function (channelName, oldestKnownHash, cb) {
|
||||
var messageBuffer = [];
|
||||
|
@ -482,10 +510,11 @@ module.exports.create = function (cfg) {
|
|||
let parsed = tryParse(msgStr);
|
||||
if (typeof parsed === "undefined") { return; }
|
||||
|
||||
if (parsed.validateKey) {
|
||||
metadata_cache[channelName] = parsed;
|
||||
return;
|
||||
}
|
||||
// identify classic metadata messages by their inclusion of a channel.
|
||||
// and don't send metadata, since:
|
||||
// 1. the user won't be interested in it
|
||||
// 2. this metadata is potentially incomplete/incorrect
|
||||
if (parsed.channel) { return; }
|
||||
|
||||
var content = parsed[4];
|
||||
if (typeof(content) !== 'string') { return; }
|
||||
|
@ -547,6 +576,10 @@ module.exports.create = function (cfg) {
|
|||
// Check if the selected channel is expired
|
||||
// If it is, remove it from memory and broadcast a message to its members
|
||||
|
||||
const onChannelMetadataChanged = function (ctx, channel) {
|
||||
// XXX
|
||||
};
|
||||
|
||||
/* checkExpired
|
||||
* synchronously returns true or undefined to indicate whether the channel is expired
|
||||
* according to its metadata
|
||||
|
@ -616,7 +649,9 @@ module.exports.create = function (cfg) {
|
|||
var lastKnownHash = parsed[3];
|
||||
var owners;
|
||||
var expire;
|
||||
if (parsed[2] && typeof parsed[2] === "object") { // FIXME METADATA RECEIVE
|
||||
// XXX we can be a bit more strict in our validation here
|
||||
// maybe we should check that it's an object and not an array?
|
||||
if (parsed[2] && typeof parsed[2] === "object") {
|
||||
validateKey = parsed[2].validateKey;
|
||||
lastKnownHash = parsed[2].lastKnownHash;
|
||||
owners = parsed[2].owners;
|
||||
|
@ -663,39 +698,24 @@ module.exports.create = function (cfg) {
|
|||
// And then check if the channel is expired. If it is, send the error and abort
|
||||
// FIXME this is hard to read because 'checkExpired' has side effects
|
||||
if (checkExpired(ctx, channelName)) { return void waitFor.abort(); }
|
||||
// Send the metadata to the user
|
||||
if (!lastKnownHash && index.cpIndex.length > 1) {
|
||||
sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify(index.metadata)], w);
|
||||
return;
|
||||
}
|
||||
w();
|
||||
// always send metadata with GET_HISTORY requests
|
||||
sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify(index.metadata)], w);
|
||||
}));
|
||||
}).nThen(() => {
|
||||
let msgCount = 0;
|
||||
let expired = false;
|
||||
|
||||
// XXX a lot of this logic is currently wrong
|
||||
// we should have already sent the most up to data metadata (if it exists)
|
||||
// we should have also already checked if the channel has expired
|
||||
// we just need to avoid sending metadata a second time
|
||||
// we should have already sent the most up-to-date metadata (if it exists)
|
||||
// TODO compute lastKnownHash in a manner such that it will always skip past the metadata line?
|
||||
getHistoryAsync(ctx, channelName, lastKnownHash, false, (msg, readMore) => {
|
||||
if (!msg) { return; }
|
||||
if (msg.validateKey) {
|
||||
// If it is a young channel, this is the part where we get the metadata
|
||||
// Check if the channel is expired and abort if it is.
|
||||
if (!metadata_cache[channelName]) { metadata_cache[channelName] = msg; }
|
||||
expired = checkExpired(ctx, channelName);
|
||||
}
|
||||
|
||||
if (expired) { return void readMore(); }
|
||||
msgCount++;
|
||||
|
||||
// check for the channel because it's the one thing that should
|
||||
// always exist in "classic metadata"
|
||||
if (msg.channel && metadata_cache[channelName]) { return readMore(); }
|
||||
sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify(msg)], readMore);
|
||||
}, (err) => {
|
||||
// If the pad is expired, stop here, we've already sent the error message
|
||||
if (expired) { return; }
|
||||
|
||||
if (err && err.code !== 'ENOENT') {
|
||||
if (err.message !== 'EINVAL') { Log.error("HK_GET_HISTORY", err); }
|
||||
const parsedMsg = {error:err.message, channel: channelName};
|
||||
|
@ -726,10 +746,10 @@ module.exports.create = function (cfg) {
|
|||
store.writeMetadata(channelName, JSON.stringify(metadata), function (err) {
|
||||
if (err) {
|
||||
// XXX handle errors
|
||||
return void console.error(err);
|
||||
}
|
||||
});
|
||||
//storeMessage(ctx, chan, JSON.stringify(metadata), false, undefined); // FIXME METADATA WRITE
|
||||
sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify(metadata)]); // FIXME METADATA SEND
|
||||
sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify(metadata)]);
|
||||
}
|
||||
|
||||
// End of history message:
|
||||
|
@ -820,15 +840,13 @@ module.exports.create = function (cfg) {
|
|||
}
|
||||
|
||||
// FIXME METADATA CHANGE
|
||||
/*
|
||||
if (msg[3] === 'SET_METADATA') { // or whatever we call the RPC????
|
||||
// make sure we update our cache of metadata
|
||||
// or at least invalidate it and force other mechanisms to recompute its state
|
||||
// 'output' could be the new state as computed by rpc
|
||||
onChannelMetadataChanged(ctx, msg[4]);
|
||||
}
|
||||
|
||||
*/
|
||||
|
||||
sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify([parsed[0]].concat(output))]);
|
||||
});
|
||||
} catch (e) {
|
||||
|
|
|
@ -221,8 +221,10 @@ How to proceed
|
|||
};
|
||||
|
||||
var writeMetadata = function (env, channelId, data, cb) {
|
||||
// XXX
|
||||
cb("NOT_IMPLEMENTED");
|
||||
var path = mkMetadataPath(env, channelId);
|
||||
// XXX appendFile isn't great
|
||||
// but this is a simple way to get things working
|
||||
Fs.appendFile(path, data + '\n', cb);
|
||||
};
|
||||
|
||||
const NEWLINE_CHR = ('\n').charCodeAt(0);
|
||||
|
@ -268,6 +270,7 @@ const mkOffsetCounter = () => {
|
|||
});
|
||||
};
|
||||
|
||||
// XXX write some docs for this magic
|
||||
const readMessagesBin = (env, id, start, msgHandler, cb) => {
|
||||
const stream = Fs.createReadStream(mkPath(env, id), { start: start });
|
||||
let keepReading = true;
|
||||
|
|
Loading…
Reference in New Issue