import latest chainpad

pull/1/head
ansuz 9 years ago
parent d0b553d198
commit 9336c4de5c

@ -546,11 +546,15 @@ var ZERO = '0000000000000000000000000000000000000000000000000000000000000000';
// Default number of patches between checkpoints (patches older than this will be pruned)
// default for realtime.config.checkpointInterval
var DEFAULT_CHECKPOINT_INTERVAL = 200;
var DEFAULT_CHECKPOINT_INTERVAL = 50;
// Default number of milliseconds to wait before syncing to the server
var DEFAULT_AVERAGE_SYNC_MILLISECONDS = 300;
// By default, we allow checkpoints at any place but if this is set true, we will blow up on chains
// which have checkpoints not where we expect them to be.
var DEFAULT_STRICT_CHECKPOINT_VALIDATION = false;
var enterChainPad = function (realtime, func) {
return function () {
if (realtime.failed) { return; }
@ -624,10 +628,6 @@ var sendMessage = function (realtime, msg, callback) {
realtime.pending = {
hash: msg.hashOf,
callback: function () {
if (realtime.initialMessage && realtime.initialMessage.hashOf === msg.hashOf) {
debug(realtime, "initial Ack received [" + msg.hashOf + "]");
realtime.initialMessage = null;
}
unschedule(realtime, timeout);
realtime.syncSchedule = schedule(realtime, function () { sync(realtime); }, 0);
callback();
@ -670,22 +670,48 @@ var sync = function (realtime) {
}
var msg;
if (realtime.best === realtime.initialMessage) {
msg = realtime.initialMessage;
if (realtime.setContentPatch) {
msg = realtime.setContentPatch;
} else {
msg = Message.create(Message.PATCH, realtime.uncommitted, realtime.best.hashOf);
}
sendMessage(realtime, msg, function () {
//debug(realtime, "patch sent");
if (realtime.setContentPatch) {
debug(realtime, "initial Ack received [" + msg.hashOf + "]");
realtime.setContentPatch = null;
}
});
};
var storeMessage = function (realtime, msg) {
Common.assert(msg.lastMsgHash);
Common.assert(msg.hashOf);
realtime.messages[msg.hashOf] = msg;
(realtime.messagesByParent[msg.lastMsgHash] =
realtime.messagesByParent[msg.lastMsgHash] || []).push(msg);
};
var forgetMessage = function (realtime, msg) {
Common.assert(msg.lastMsgHash);
Common.assert(msg.hashOf);
delete realtime.messages[msg.hashOf];
var list = realtime.messagesByParent[msg.lastMsgHash];
Common.assert(list.indexOf(msg) > -1);
list.splice(list.indexOf(msg), 1);
if (list.length === 0) {
delete realtime.messagesByParent[msg.lastMsgHash];
}
};
var create = ChainPad.create = function (config) {
config = config || {};
var initialState = config.initialState || '';
config.checkpointInterval = config.checkpointInterval || DEFAULT_CHECKPOINT_INTERVAL;
config.avgSyncMilliseconds = config.avgSyncMilliseconds || DEFAULT_AVERAGE_SYNC_MILLISECONDS;
config.strictCheckpointValidation =
config.strictCheckpointValidation || DEFAULT_STRICT_CHECKPOINT_VALIDATION;
var realtime = {
type: 'ChainPad',
@ -716,6 +742,11 @@ var create = ChainPad.create = function (config) {
// this is only used if PARANOIA is enabled.
userInterfaceContent: undefined,
// If we want to set the content to a particular thing, this patch will be sent across the
// wire. If the patch is not accepted we will not try to recover it. This is used for
// setting initial state.
setContentPatch: null,
failed: false,
// hash and callback for previously send patch, currently in flight.
@ -729,26 +760,31 @@ var create = ChainPad.create = function (config) {
userName: config.userName || 'anonymous',
};
if (Common.PARANOIA) {
realtime.userInterfaceContent = initialState;
}
var zeroPatch = Patch.create(EMPTY_STR_HASH);
if (initialState !== '') {
var initialOp = Operation.create(0, 0, initialState);
Patch.addOperation(zeroPatch, initialOp);
}
zeroPatch.inverseOf = Patch.invert(zeroPatch, '');
zeroPatch.inverseOf.inverseOf = zeroPatch;
var zeroMsg = Message.create(Message.PATCH, zeroPatch, ZERO);
zeroMsg.hashOf = Message.hashOf(zeroMsg);
zeroMsg.parentCount = 0;
realtime.messages[zeroMsg.hashOf] = zeroMsg;
(realtime.messagesByParent[zeroMsg.lastMessageHash] || []).push(zeroMsg);
zeroMsg.isInitialMessage = true;
storeMessage(realtime, zeroMsg);
realtime.rootMessage = zeroMsg;
realtime.best = zeroMsg;
if (initialState !== '') {
var initPatch = Patch.create(EMPTY_STR_HASH);
Patch.addOperation(initPatch, Operation.create(0, 0, initialState));
initPatch.inverseOf = Patch.invert(initPatch, '');
initPatch.inverseOf.inverseOf = initPatch;
var initMsg = Message.create(Message.PATCH, initPatch, zeroMsg.hashOf);
initMsg.hashOf = Message.hashOf(initMsg);
initMsg.isInitialMessage = true;
storeMessage(realtime, initMsg);
realtime.best = initMsg;
realtime.authDoc = initialState;
realtime.uncommitted = Patch.create(zeroPatch.inverseOf.parentHash);
realtime.setContentPatch = initMsg;
}
realtime.uncommitted = Patch.create(realtime.best.content.inverseOf.parentHash);
if (Common.PARANOIA) {
realtime.userInterfaceContent = initialState;
@ -828,16 +864,22 @@ var parentCount = function (realtime, message) {
var applyPatch = function (realtime, isFromMe, patch) {
Common.assert(patch);
Common.assert(patch.inverseOf);
if (isFromMe && !patch.isInitialStatePatch) {
var inverseOldUncommitted = Patch.invert(realtime.uncommitted, realtime.authDoc);
var userInterfaceContent = Patch.apply(realtime.uncommitted, realtime.authDoc);
if (Common.PARANOIA) {
Common.assert(userInterfaceContent === realtime.userInterfaceContent);
}
realtime.uncommitted = Patch.merge(inverseOldUncommitted, patch);
realtime.uncommitted = Patch.invert(realtime.uncommitted, userInterfaceContent);
if (isFromMe) {
// Case 1: We're applying a patch which we originally created (yay our work was accepted)
// We will merge the inverse of the patch with our uncommitted work in order that
// we do not try to commit that work over again.
// Case 2: We're reverting a patch which had originally come from us, a.k.a. we're applying
// the inverse of that patch.
//
// In either scenario, we want to apply the inverse of the patch we are applying, to the
// uncommitted work. Whatever we "add" to the authDoc we "remove" from the uncommittedWork.
//
Common.assert(patch.parentHash === realtime.uncommitted.parentHash);
realtime.uncommitted = Patch.merge(patch.inverseOf, realtime.uncommitted);
} else {
// It's someone else's patch which was received, we need to *transform* out uncommitted
// work over their patch in order to preserve intent as much as possible.
realtime.uncommitted =
Patch.transform(
realtime.uncommitted, patch, realtime.authDoc, realtime.config.transformFunction);
@ -882,12 +924,21 @@ var pushUIPatch = function (realtime, patch) {
}
};
var validContent = function (realtime, contentGetter) {
if (!realtime.config.validateContent) { return true; }
try {
return realtime.validateContent(contentGetter());
} catch (e) {
warn(realtime, "Error in content validator [" + e.stack + "]");
}
return false;
};
var handleMessage = ChainPad.handleMessage = function (realtime, msgStr, isFromMe) {
if (Common.PARANOIA) { check(realtime); }
var msg = Message.fromString(msgStr);
// otherwise it's a disconnect.
if (msg.messageType !== Message.PATCH && msg.messageType !== Message.CHECKPOINT) {
debug(realtime, "unrecognized message type " + msg.messageType);
return;
@ -901,12 +952,18 @@ var handleMessage = ChainPad.handleMessage = function (realtime, msgStr, isFromM
return;
}
realtime.messages[msg.hashOf] = msg;
(realtime.messagesByParent[msg.lastMsgHash] =
realtime.messagesByParent[msg.lastMsgHash] || []).push(msg);
if (msg.content.isCheckpoint &&
!validContent(realtime, function () { return msg.content.operations[0].toInsert }))
{
// If it's not a checkpoint, we verify it later on...
debug(realtime, "Checkpoint [" + msg.hashOf + "] failed content validation");
return;
}
storeMessage(realtime, msg);
if (!isAncestorOf(realtime, realtime.rootMessage, msg)) {
if (realtime.rootMessage === realtime.best && msg.content.isCheckpoint) {
if (msg.content.isCheckpoint && realtime.best.isInitialMessage) {
// We're starting with a trucated chain from a checkpoint, we will adopt this
// as the root message and go with it...
var userDoc = Patch.apply(realtime.uncommitted, realtime.authDoc);
@ -957,8 +1014,10 @@ var handleMessage = ChainPad.handleMessage = function (realtime, msgStr, isFromM
commonAncestor = getParent(realtime, commonAncestor);
}
Common.assert(commonAncestor);
debug(realtime, "Patch [" + msg.hashOf + "] better than best chain, switching");
} else {
debug(realtime, "Patch [" + msg.hashOf + "] chain is ["+pcMsg+"] best chain is ["+pcBest+"]");
debug(realtime, "Patch [" + msg.hashOf + "] chain is [" + pcMsg + "] best chain is [" +
pcBest + "]");
if (Common.PARANOIA) { check(realtime); }
return;
}
@ -994,7 +1053,7 @@ var handleMessage = ChainPad.handleMessage = function (realtime, msgStr, isFromM
debug(realtime, "patch [" + msg.hashOf + "] parentHash is not valid");
if (Common.PARANOIA) { check(realtime); }
if (Common.TESTING) { throw new Error(); }
delete realtime.messages[msg.hashOf];
forgetMessage(realtime, msg);
return;
}
@ -1014,11 +1073,13 @@ var handleMessage = ChainPad.handleMessage = function (realtime, msgStr, isFromM
}
if (checkpointP && checkpointP !== realtime.rootMessage) {
var point = parentCount(realtime, checkpointP);
if ((point % realtime.config.checkpointInterval) !== 0) {
if (realtime.config.strictCheckpointValidation &&
(point % realtime.config.checkpointInterval) !== 0)
{
debug(realtime, "checkpoint [" + msg.hashOf + "] at invalid point [" + point + "]");
if (Common.PARANOIA) { check(realtime); }
if (Common.TESTING) { throw new Error(); }
delete realtime.messages[msg.hashOf];
forgetMessage(realtime, msg);
return;
}
@ -1026,8 +1087,7 @@ var handleMessage = ChainPad.handleMessage = function (realtime, msgStr, isFromM
debug(realtime, "checkpoint [" + msg.hashOf + "]");
for (var m = getParent(realtime, checkpointP); m; m = getParent(realtime, m)) {
debug(realtime, "pruning [" + m.hashOf + "]");
delete realtime.messages[m.hashOf];
delete realtime.messagesByParent[m.hashOf];
forgetMessage(realtime, m);
}
realtime.rootMessage = checkpointP;
}
@ -1038,7 +1098,14 @@ var handleMessage = ChainPad.handleMessage = function (realtime, msgStr, isFromM
debug(realtime, "patch [" + msg.hashOf + "] can be simplified");
if (Common.PARANOIA) { check(realtime); }
if (Common.TESTING) { throw new Error(); }
delete realtime.messages[msg.hashOf];
forgetMessage(realtime, msg);
return;
}
if (!validContent(realtime,
function () { return Patch.apply(patch, authDocAtTimeOfPatch); }))
{
debug(realtime, "Patch [" + msg.hashOf + "] failed content validation");
return;
}
}
@ -1058,6 +1125,7 @@ var handleMessage = ChainPad.handleMessage = function (realtime, msgStr, isFromM
for (var i = 0; i < toRevert.length; i++) {
debug(realtime, "reverting [" + toRevert[i].hashOf + "]");
if (toRevert[i].isFromMe) { debug(realtime, "reverting patch 'from me' [" + JSON.stringify(toRevert[i].content.operations) + "]"); }
uncommittedPatch = Patch.merge(uncommittedPatch, toRevert[i].content.inverseOf);
revertPatch(realtime, toRevert[i].isFromMe, toRevert[i].content);
}

Loading…
Cancel
Save