implement auto-checkpoint logic

pull/1/head
ansuz 5 years ago
parent 7230ee71a8
commit ce5c841a42

@ -2,6 +2,11 @@
var factory = function (Util, Hash, CPNetflux, Sortify, nThen, Crypto) { var factory = function (Util, Hash, CPNetflux, Sortify, nThen, Crypto) {
var Roster = {}; var Roster = {};
// this constant is somewhat arbitrary.
// Adjust it as you like to suit performance expectations
var CHECKPOINT_INTERVAL = 25;
var TIMEOUT_INTERVAL = 30000; // TIMEOUT after 30s
/* /*
roster: { roster: {
state: { state: {
@ -42,6 +47,7 @@ var factory = function (Util, Hash, CPNetflux, Sortify, nThen, Crypto) {
// find the author's role from your knoweldge of the state // find the author's role from your knoweldge of the state
var role = Util.find(members, [author, 'role']); var role = Util.find(members, [author, 'role']);
// and check if it is 'OWNER' or 'ADMIN' // and check if it is 'OWNER' or 'ADMIN'
return ['OWNER', 'ADMIN'].indexOf(role) !== -1; return ['OWNER', 'ADMIN'].indexOf(role) !== -1;
}; };
@ -109,10 +115,22 @@ var factory = function (Util, Hash, CPNetflux, Sortify, nThen, Crypto) {
return Boolean(authorRole && ['OWNER', 'ADMIN'].indexOf(authorRole) !== -1); return Boolean(authorRole && ['OWNER', 'ADMIN'].indexOf(authorRole) !== -1);
}; };
var shouldCheckpoint = function (ref) { var shouldCheckpoint = function (me, ref) {
ref = ref; // if you can't send valid checkpoints, don't try
if (!canCheckpoint(me, ref.state.members)) { return false; }
// avoid sending checkpoints too often
// it's a balance between network constraints
// and the size of the roster's log
var since = ref.internal.sinceLastCheckpoint;
if (!since || typeof(since) !== 'number' || since < CHECKPOINT_INTERVAL) {
return false;
}
// if you can't think of any other reason not to...
return true;
}; };
shouldCheckpoint = shouldCheckpoint; // XXX lint
var commands = Roster.commands = {}; var commands = Roster.commands = {};
/* Commands are functions with the signature /* Commands are functions with the signature
@ -342,6 +360,7 @@ var factory = function (Util, Hash, CPNetflux, Sortify, nThen, Crypto) {
}, },
internal: { internal: {
initialized: false, initialized: false,
sinceLastCheckpoint: 0,
}, },
}; };
var roster = {}; var roster = {};
@ -377,10 +396,21 @@ var factory = function (Util, Hash, CPNetflux, Sortify, nThen, Crypto) {
return Util.clone(ref.state); return Util.clone(ref.state);
}; };
var clearPendingCheckpoints = function () {
// clear any pending checkpoints you might have...
if (ref.internal.pendingCheckpointId) {
response.clear(ref.internal.pendingCheckpointId);
delete ref.internal.pendingCheckpointId;
}
clearTimeout(ref.internal.checkpointTimeout);
delete ref.internal.checkpointTimeout;
};
var webChannel; var webChannel;
roster.stop = function () { roster.stop = function () {
if (webChannel && typeof(webChannel.leave) === 'function') { if (webChannel && typeof(webChannel.leave) === 'function') {
webChannel.leave(); webChannel.leave();
clearPendingCheckpoints();
} else { } else {
console.log("FAILED TO LEAVE"); console.log("FAILED TO LEAVE");
} }
@ -415,6 +445,10 @@ var factory = function (Util, Hash, CPNetflux, Sortify, nThen, Crypto) {
}; };
var onMessage = function (msg, user, vKey, isCp , hash, author) { var onMessage = function (msg, user, vKey, isCp , hash, author) {
// count messages received since the last checkpoint
// even if they fail to parse
ref.internal.sinceLastCheckpoint++;
var parsed = Util.tryParse(msg); var parsed = Util.tryParse(msg);
if (!parsed) { return void console.error("could not parse"); } if (!parsed) { return void console.error("could not parse"); }
@ -446,9 +480,27 @@ var factory = function (Util, Hash, CPNetflux, Sortify, nThen, Crypto) {
// if a checkpoint was successfully applied, emit an event // if a checkpoint was successfully applied, emit an event
if (parsed[0] === 'CHECKPOINT' && changed) { if (parsed[0] === 'CHECKPOINT' && changed) {
events.checkpoint.fire(hash); events.checkpoint.fire(hash);
// reset the counter for messages since the last checkpoint
ref.internal.sinceLastCheckpoint = 0;
} else if (changed) { } else if (changed) {
events.change.fire(); events.change.fire();
} }
// CHECKPOINT logic...
clearPendingCheckpoints();
if (!isReady() || !shouldCheckpoint(me, ref)) { return; }
// a random number of seconds between 5 and 25
var delay = (1000 * Math.floor(Math.random() * 20)) + 5000;
// if you're here then you can and should send a checkpoint
// but since multiple users who can and should might be online at once
// and since they'll all trigger this process at the same time...
// we want to stagger attempts at random intervals
setTimeout(function () {
ref.internal.pendingCheckpointId = roster.checkpoint(function (err) {
if (err) { console.error(err); }
});
}, delay);
}; };
var metadata, crypto; var metadata, crypto;
@ -471,13 +523,14 @@ var factory = function (Util, Hash, CPNetflux, Sortify, nThen, Crypto) {
//console.log("Sending with id [%s]", id, msg); //console.log("Sending with id [%s]", id, msg);
//console.log(); //console.log();
response.expect(id, cb, 3000); response.expect(id, cb, TIMEOUT_INTERVAL);
anon_rpc.send('WRITE_PRIVATE_MESSAGE', [ anon_rpc.send('WRITE_PRIVATE_MESSAGE', [
channel, channel,
ciphertext ciphertext
], function (err) { ], function (err) {
if (err) { return response.handle(id, [err]); } if (err) { return response.handle(id, [err]); }
}); });
return id;
}; };
roster.init = function (_data, _cb) { roster.init = function (_data, _cb) {

Loading…
Cancel
Save