diff --git a/www/common/outer/roster.js b/www/common/outer/roster.js index bbf014215..7d2092b73 100644 --- a/www/common/outer/roster.js +++ b/www/common/outer/roster.js @@ -2,6 +2,11 @@ var factory = function (Util, Hash, CPNetflux, Sortify, nThen, Crypto) { var Roster = {}; + // this constant is somewhat arbitrary. + // Adjust it as you like to suit performance expectations + var CHECKPOINT_INTERVAL = 25; + var TIMEOUT_INTERVAL = 30000; // TIMEOUT after 30s + /* roster: { state: { @@ -42,6 +47,7 @@ var factory = function (Util, Hash, CPNetflux, Sortify, nThen, Crypto) { // find the author's role from your knoweldge of the state var role = Util.find(members, [author, 'role']); + // and check if it is 'OWNER' or 'ADMIN' return ['OWNER', 'ADMIN'].indexOf(role) !== -1; }; @@ -109,10 +115,22 @@ var factory = function (Util, Hash, CPNetflux, Sortify, nThen, Crypto) { return Boolean(authorRole && ['OWNER', 'ADMIN'].indexOf(authorRole) !== -1); }; - var shouldCheckpoint = function (ref) { - ref = ref; + var shouldCheckpoint = function (me, ref) { + // if you can't send valid checkpoints, don't try + if (!canCheckpoint(me, ref.state.members)) { return false; } + + // avoid sending checkpoints too often + // it's a balance between network constraints + // and the size of the roster's log + var since = ref.internal.sinceLastCheckpoint; + + if (!since || typeof(since) !== 'number' || since < CHECKPOINT_INTERVAL) { + return false; + } + + // if you can't think of any other reason not to... + return true; }; - shouldCheckpoint = shouldCheckpoint; // XXX lint var commands = Roster.commands = {}; /* Commands are functions with the signature @@ -243,7 +261,6 @@ var factory = function (Util, Hash, CPNetflux, Sortify, nThen, Crypto) { return changed; }; - // XXX what about concurrent checkpoints? Let's solve for race conditions... commands.CHECKPOINT = function (args, author, roster) { // args: complete state @@ -342,6 +359,7 @@ var factory = function (Util, Hash, CPNetflux, Sortify, nThen, Crypto) { }, internal: { initialized: false, + sinceLastCheckpoint: 0, }, }; var roster = {}; @@ -377,10 +395,21 @@ var factory = function (Util, Hash, CPNetflux, Sortify, nThen, Crypto) { return Util.clone(ref.state); }; + var clearPendingCheckpoints = function () { + // clear any pending checkpoints you might have... + if (ref.internal.pendingCheckpointId) { + response.clear(ref.internal.pendingCheckpointId); + delete ref.internal.pendingCheckpointId; + } + clearTimeout(ref.internal.checkpointTimeout); + delete ref.internal.checkpointTimeout; + }; + var webChannel; roster.stop = function () { if (webChannel && typeof(webChannel.leave) === 'function') { webChannel.leave(); + clearPendingCheckpoints(); } else { console.log("FAILED TO LEAVE"); } @@ -415,6 +444,10 @@ var factory = function (Util, Hash, CPNetflux, Sortify, nThen, Crypto) { }; var onMessage = function (msg, user, vKey, isCp , hash, author) { + // count messages received since the last checkpoint + // even if they fail to parse + ref.internal.sinceLastCheckpoint++; + var parsed = Util.tryParse(msg); if (!parsed) { return void console.error("could not parse"); } @@ -446,9 +479,27 @@ var factory = function (Util, Hash, CPNetflux, Sortify, nThen, Crypto) { // if a checkpoint was successfully applied, emit an event if (parsed[0] === 'CHECKPOINT' && changed) { events.checkpoint.fire(hash); + // reset the counter for messages since the last checkpoint + ref.internal.sinceLastCheckpoint = 0; } else if (changed) { events.change.fire(); } + + // CHECKPOINT logic... + clearPendingCheckpoints(); + if (!isReady() || !shouldCheckpoint(me, ref)) { return; } + // a random number of seconds between 5 and 25 + var delay = (1000 * Math.floor(Math.random() * 20)) + 5000; + + // if you're here then you can and should send a checkpoint + // but since multiple users who can and should might be online at once + // and since they'll all trigger this process at the same time... + // we want to stagger attempts at random intervals + setTimeout(function () { + ref.internal.pendingCheckpointId = roster.checkpoint(function (err) { + if (err) { console.error(err); } + }); + }, delay); }; var metadata, crypto; @@ -471,18 +522,20 @@ var factory = function (Util, Hash, CPNetflux, Sortify, nThen, Crypto) { //console.log("Sending with id [%s]", id, msg); //console.log(); - response.expect(id, cb, 3000); + response.expect(id, cb, TIMEOUT_INTERVAL); anon_rpc.send('WRITE_PRIVATE_MESSAGE', [ channel, ciphertext ], function (err) { if (err) { return response.handle(id, [err]); } }); + return id; }; roster.init = function (_data, _cb) { var cb = Util.once(Util.mkAsync(_cb)); if (ref.internal.initialized) { return void cb("ALREADY_INITIALIZED"); } + if (!isMap(_data)) { return void cb("INVALID_ARGUMENTS"); } var data = Util.clone(_data); data.role = 'OWNER'; var members = {}; @@ -493,16 +546,15 @@ var factory = function (Util, Hash, CPNetflux, Sortify, nThen, Crypto) { // commands roster.checkpoint = function (_cb) { var cb = Util.once(Util.mkAsync(_cb)); - //var state = ref.state; - //if (!state) { return cb("UNINITIALIZED"); } - send([ 'CHECKPOINT', ref.state], cb); + send([ 'CHECKPOINT', Util.clone(ref.state)], cb); }; - roster.add = function (data, _cb) { + roster.add = function (_data, _cb) { var cb = Util.once(Util.mkAsync(_cb)); //var state = ref.state; if (!ref.internal.initialized) { return cb("UNINITIALIZED"); } - if (!isMap(data)) { return void cb("INVALID_ARGUMENTS"); } + if (!isMap(_data)) { return void cb("INVALID_ARGUMENTS"); } + var data = Util.clone(_data); // don't add members that are already present // use DESCRIBE to amend @@ -513,12 +565,13 @@ var factory = function (Util, Hash, CPNetflux, Sortify, nThen, Crypto) { send([ 'ADD', data ], cb); }; - roster.remove = function (data, _cb) { + roster.remove = function (_data, _cb) { var cb = Util.once(Util.mkAsync(_cb)); var state = ref.state; if (!state) { return cb("UNINITIALIZED"); } - if (!Array.isArray(data)) { return void cb("INVALID_ARGUMENTS"); } + if (!Array.isArray(_data)) { return void cb("INVALID_ARGUMENTS"); } + var data = Util.clone(_data); var toRemove = []; var current = Object.keys(state.members); @@ -531,11 +584,13 @@ var factory = function (Util, Hash, CPNetflux, Sortify, nThen, Crypto) { send([ 'RM', toRemove ], cb); }; - roster.describe = function (data, _cb) { + roster.describe = function (_data, _cb) { var cb = Util.once(Util.mkAsync(_cb)); var state = ref.state; + if (!state) { return cb("UNINITIALIZED"); } - if (!isMap(data)) { return void cb("INVALID_ARGUMENTS"); } + if (!isMap(_data)) { return void cb("INVALID_ARGUMENTS"); } + var data = Util.clone(_data); Object.keys(data).forEach(function (curve) { var member = data[curve]; @@ -549,10 +604,11 @@ var factory = function (Util, Hash, CPNetflux, Sortify, nThen, Crypto) { send(['DESCRIBE', data], cb); }; - roster.metadata = function (data, _cb) { + roster.metadata = function (_data, _cb) { var cb = Util.once(Util.mkAsync(_cb)); var metadata = ref.state.metadata; - if (!isMap(data)) { return void cb("INVALID_ARGUMENTS"); } + if (!isMap(_data)) { return void cb("INVALID_ARGUMENTS"); } + var data = Util.clone(_data); Object.keys(data).forEach(function (k) { if (data[k] === metadata[k]) { delete data[k]; }