revert my reversion

Revert "Revert "Added checkpoints with the new code from ChainPad""

This reverts commit 6e2e8bf21f.
pull/1/head
ansuz 9 years ago
parent 6e2e8bf21f
commit 058548b95a

@ -87,7 +87,27 @@ dropUser = function (ctx, user) {
}; };
const getHistory = function (ctx, channelName, handler, cb) { const getHistory = function (ctx, channelName, handler, cb) {
ctx.store.getMessages(channelName, function (msgStr) { handler(JSON.parse(msgStr)); }, cb); var messageBuf = [];
ctx.store.getMessages(channelName, function (msgStr) {
messageBuf.push(JSON.parse(msgStr));
}, function () {
var startPoint;
var cpCount = 0;
var msgBuff2 = [];
for (startPoint = messageBuf.length - 1; startPoint >= 0; startPoint--) {
var msg = messageBuf[startPoint];
msgBuff2.push(msg);
if (msg[2] === 'MSG' && msg[4].indexOf('cp|') === 0) {
cpCount++;
if (cpCount >= 2) {
for (var x = msgBuff2.pop(); x; x = msgBuff2.pop()) { handler(x); }
break;
}
}
//console.log(messageBuf[startPoint]);
}
cb();
});
}; };
const randName = function () { return Crypto.randomBytes(16).toString('hex'); }; const randName = function () { return Crypto.randomBytes(16).toString('hex'); };

@ -28,7 +28,8 @@ var create = Patch.create = function (parentHash) {
return { return {
type: 'Patch', type: 'Patch',
operations: [], operations: [],
parentHash: parentHash parentHash: parentHash,
isCheckpoint: false
}; };
}; };
@ -45,6 +46,13 @@ var check = Patch.check = function (patch, docLength_opt) {
docLength_opt += Operation.lengthChange(patch.operations[i]); docLength_opt += Operation.lengthChange(patch.operations[i]);
} }
} }
if (patch.isCheckpoint) {
Common.assert(patch.operations.length === 1);
Common.assert(patch.operations[0].offset === 0);
if (typeof(docLength_opt) === 'number') {
Common.assert(!docLength_opt || patch.operations[0].toRemove === docLength_opt);
}
}
}; };
var toObj = Patch.toObj = function (patch) { var toObj = Patch.toObj = function (patch) {
@ -104,6 +112,20 @@ var addOperation = Patch.addOperation = function (patch, op) {
if (Common.PARANOIA) { check(patch); } if (Common.PARANOIA) { check(patch); }
}; };
var createCheckpoint = Patch.createCheckpoint =
function (parentContent, checkpointContent, parentContentHash_opt)
{
var op = Operation.create(0, parentContent.length, checkpointContent);
if (Common.PARANOIA && parentContentHash_opt) {
Common.assert(parentContentHash_opt === hash(parentContent));
}
parentContentHash_opt = parentContentHash_opt || hash(parentContent);
var out = create(parentContentHash_opt);
addOperation(out, op);
out.isCheckpoint = true;
return out;
};
var clone = Patch.clone = function (patch) { var clone = Patch.clone = function (patch) {
if (Common.PARANOIA) { check(patch); } if (Common.PARANOIA) { check(patch); }
var out = create(); var out = create();
@ -380,7 +402,7 @@ var PARANOIA = module.exports.PARANOIA = true;
var VALIDATE_ENTIRE_CHAIN_EACH_MSG = module.exports.VALIDATE_ENTIRE_CHAIN_EACH_MSG = false; var VALIDATE_ENTIRE_CHAIN_EACH_MSG = module.exports.VALIDATE_ENTIRE_CHAIN_EACH_MSG = false;
/* throw errors over non-compliant messages which would otherwise be treated as invalid */ /* throw errors over non-compliant messages which would otherwise be treated as invalid */
var TESTING = module.exports.TESTING = true; var TESTING = module.exports.TESTING = false;
var assert = module.exports.assert = function (expr) { var assert = module.exports.assert = function (expr) {
if (!expr) { throw new Error("Failed assertion"); } if (!expr) { throw new Error("Failed assertion"); }
@ -435,10 +457,11 @@ var REGISTER = Message.REGISTER = 0;
var REGISTER_ACK = Message.REGISTER_ACK = 1; var REGISTER_ACK = Message.REGISTER_ACK = 1;
var PATCH = Message.PATCH = 2; var PATCH = Message.PATCH = 2;
var DISCONNECT = Message.DISCONNECT = 3; var DISCONNECT = Message.DISCONNECT = 3;
var CHECKPOINT = Message.CHECKPOINT = 4;
var check = Message.check = function(msg) { var check = Message.check = function(msg) {
Common.assert(msg.type === 'Message'); Common.assert(msg.type === 'Message');
if (msg.messageType === PATCH) { if (msg.messageType === PATCH || msg.messageType === CHECKPOINT) {
Patch.check(msg.content); Patch.check(msg.content);
Common.assert(typeof(msg.lastMsgHash) === 'string'); Common.assert(typeof(msg.lastMsgHash) === 'string');
} else { } else {
@ -459,9 +482,8 @@ var create = Message.create = function (type, content, lastMsgHash) {
var toString = Message.toString = function (msg) { var toString = Message.toString = function (msg) {
if (Common.PARANOIA) { check(msg); } if (Common.PARANOIA) { check(msg); }
if (msg.messageType === PATCH || msg.messageType === CHECKPOINT) {
if (msg.messageType === PATCH) { return JSON.stringify([msg.messageType, Patch.toObj(msg.content), msg.lastMsgHash]);
return JSON.stringify([PATCH, Patch.toObj(msg.content), msg.lastMsgHash]);
} else { } else {
throw new Error(); throw new Error();
} }
@ -478,43 +500,11 @@ var discardBencode = function (msg, arr) {
}; };
var fromString = Message.fromString = function (str) { var fromString = Message.fromString = function (str) {
var msg = str;
if (str.charAt(0) === '[') {
var m = JSON.parse(str); var m = JSON.parse(str);
return create(m[0], Patch.fromObj(m[1]), m[2]); if (m[0] !== CHECKPOINT && m[0] !== PATCH) { throw new Error("invalid message type " + m[0]); }
} else { var msg = create(m[0], Patch.fromObj(m[1]), m[2]);
/* Just in case we receive messages in the old format, if (m[0] === CHECKPOINT) { msg.content.isCheckpoint = true; }
we should try to parse them. We only need the content, though, return msg;
so just extract that and throw the rest away */
var last;
var parts = [];
// chop off all the bencoded components
while (msg) {
msg = discardBencode(msg, parts);
}
// grab the last component from the parts
// we don't need anything else
var contentStr = parts.slice(-1)[0];
var content = JSON.parse(contentStr);
var message;
if (content[0] === PATCH) {
message = create(userName, PATCH, Patch.fromObj(content[1]), content[2]);
} else if ([4,5].indexOf(content[0]) !== -1 /* === PING || content[0] === PONG*/) {
// it's a ping or pong, which we don't want to support anymore
message = create(userName, content[0], content[1]);
} else {
message = create(userName, content[0]);
}
// This check validates every operation in the patch.
check(message);
return message
}
}; };
var hashOf = Message.hashOf = function (msg) { var hashOf = Message.hashOf = function (msg) {
@ -550,9 +540,17 @@ var Sha = module.exports.Sha = require('./SHA256');
var ChainPad = {}; var ChainPad = {};
// hex_sha256('') // hex_sha256('')
var EMPTY_STR_HASH = 'e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855'; var EMPTY_STR_HASH = module.exports.EMPTY_STR_HASH =
'e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855';
var ZERO = '0000000000000000000000000000000000000000000000000000000000000000'; var ZERO = '0000000000000000000000000000000000000000000000000000000000000000';
// Default number of patches between checkpoints (patches older than this will be pruned)
// default for realtime.config.checkpointInterval
var DEFAULT_CHECKPOINT_INTERVAL = 200;
// Default number of milliseconds to wait before syncing to the server
var DEFAULT_AVERAGE_SYNC_MILLISECONDS = 300;
var enterChainPad = function (realtime, func) { var enterChainPad = function (realtime, func) {
return function () { return function () {
if (realtime.failed) { return; } if (realtime.failed) { return; }
@ -567,8 +565,9 @@ var debug = function (realtime, msg) {
}; };
var schedule = function (realtime, func, timeout) { var schedule = function (realtime, func, timeout) {
if (realtime.aborted) { return; }
if (!timeout) { if (!timeout) {
timeout = Math.floor(Math.random() * 2 * realtime.avgSyncTime); timeout = Math.floor(Math.random() * 2 * realtime.config.avgSyncMilliseconds);
} }
var to = setTimeout(enterChainPad(realtime, function () { var to = setTimeout(enterChainPad(realtime, function () {
realtime.schedules.splice(realtime.schedules.indexOf(to), 1); realtime.schedules.splice(realtime.schedules.indexOf(to), 1);
@ -598,12 +597,52 @@ var onMessage = function (realtime, message, callback) {
} }
}; };
var sendMessage = function (realtime, msg, callback) {
var strMsg = Message.toString(msg);
onMessage(realtime, strMsg, function (err) {
if (err) {
debug(realtime, "Posting to server failed [" + err + "]");
realtime.pending = null;
} else {
var pending = realtime.pending;
realtime.pending = null;
Common.assert(pending.hash === msg.hashOf);
handleMessage(realtime, strMsg, true);
pending.callback();
}
});
msg.hashOf = msg.hashOf || Message.hashOf(msg);
var timeout = schedule(realtime, function () {
debug(realtime, "Failed to send message [" + msg.hashOf + "] to server");
sync(realtime);
}, 10000 + (Math.random() * 5000));
if (realtime.pending) { throw new Error("there is already a pending message"); }
realtime.pending = {
hash: msg.hashOf,
callback: function () {
if (realtime.initialMessage && realtime.initialMessage.hashOf === msg.hashOf) {
debug(realtime, "initial Ack received [" + msg.hashOf + "]");
realtime.initialMessage = null;
}
unschedule(realtime, timeout);
realtime.syncSchedule = schedule(realtime, function () { sync(realtime); }, 0);
callback();
}
};
if (Common.PARANOIA) { check(realtime); }
};
var sync = function (realtime) { var sync = function (realtime) {
if (Common.PARANOIA) { check(realtime); } if (Common.PARANOIA) { check(realtime); }
if (realtime.syncSchedule) { if (realtime.syncSchedule && !realtime.pending) {
unschedule(realtime, realtime.syncSchedule); unschedule(realtime, realtime.syncSchedule);
realtime.syncSchedule = null; realtime.syncSchedule = null;
} else { } else {
//debug(realtime, "already syncing...");
// we're currently waiting on something from the server. // we're currently waiting on something from the server.
return; return;
} }
@ -617,6 +656,19 @@ var sync = function (realtime) {
return; return;
} }
if (((parentCount(realtime, realtime.best) + 1) % realtime.config.checkpointInterval) === 0) {
var best = realtime.best;
debug(realtime, "Sending checkpoint");
var cpp = Patch.createCheckpoint(realtime.authDoc,
realtime.authDoc,
realtime.best.content.inverseOf.parentHash);
var cp = Message.create(Message.CHECKPOINT, cpp, realtime.best.hashOf);
sendMessage(realtime, cp, function () {
debug(realtime, "Checkpoint sent and accepted");
});
return;
}
var msg; var msg;
if (realtime.best === realtime.initialMessage) { if (realtime.best === realtime.initialMessage) {
msg = realtime.initialMessage; msg = realtime.initialMessage;
@ -624,39 +676,16 @@ var sync = function (realtime) {
msg = Message.create(Message.PATCH, realtime.uncommitted, realtime.best.hashOf); msg = Message.create(Message.PATCH, realtime.uncommitted, realtime.best.hashOf);
} }
var strMsg = Message.toString(msg); sendMessage(realtime, msg, function () {
//debug(realtime, "patch sent");
onMessage(realtime, strMsg, function (err) {
if (err) {
debug(realtime, "Posting to server failed [" + err + "]");
} else {
handleMessage(realtime, strMsg, true);
}
}); });
var hash = Message.hashOf(msg);
var timeout = schedule(realtime, function () {
debug(realtime, "Failed to send message ["+hash+"] to server");
sync(realtime);
}, 10000 + (Math.random() * 5000));
realtime.pending = {
hash: hash,
callback: function () {
if (realtime.initialMessage && realtime.initialMessage.hashOf === hash) {
debug(realtime, "initial Ack received ["+hash+"]");
realtime.initialMessage = null;
}
unschedule(realtime, timeout);
realtime.syncSchedule = schedule(realtime, function () { sync(realtime); }, 0);
}
};
if (Common.PARANOIA) { check(realtime); }
}; };
var create = ChainPad.create = function (config) { var create = ChainPad.create = function (config) {
config = config || {}; config = config || {};
var initialState = config.initialState || ''; var initialState = config.initialState || '';
config.checkpointInterval = config.checkpointInterval || DEFAULT_CHECKPOINT_INTERVAL;
config.avgSyncMilliseconds = config.avgSyncMilliseconds || DEFAULT_AVERAGE_SYNC_MILLISECONDS;
var realtime = { var realtime = {
type: 'ChainPad', type: 'ChainPad',
@ -665,7 +694,7 @@ var create = ChainPad.create = function (config) {
config: config, config: config,
logLevel: typeof(config.logLevel) !== 'undefined'? config.logLevel: 1, logLevel: (typeof(config.logLevel) === 'number') ? config.logLevel : 1,
/** A patch representing all uncommitted work. */ /** A patch representing all uncommitted work. */
uncommitted: null, uncommitted: null,
@ -673,18 +702,17 @@ var create = ChainPad.create = function (config) {
uncommittedDocLength: initialState.length, uncommittedDocLength: initialState.length,
patchHandlers: [], patchHandlers: [],
opHandlers: [], changeHandlers: [],
messageHandlers: [], messageHandlers: [],
schedules: [], schedules: [],
aborted: false,
syncSchedule: null, syncSchedule: null,
registered: false, registered: false,
avgSyncTime: 100,
// this is only used if PARANOIA is enabled. // this is only used if PARANOIA is enabled.
userInterfaceContent: undefined, userInterfaceContent: undefined,
@ -699,12 +727,6 @@ var create = ChainPad.create = function (config) {
rootMessage: null, rootMessage: null,
userName: config.userName || 'anonymous', userName: config.userName || 'anonymous',
/**
* Set to the message which sets the initialState if applicable.
* Reset to null after the initial message has been successfully broadcasted.
*/
initialMessage: null,
}; };
if (Common.PARANOIA) { if (Common.PARANOIA) {
@ -712,6 +734,10 @@ var create = ChainPad.create = function (config) {
} }
var zeroPatch = Patch.create(EMPTY_STR_HASH); var zeroPatch = Patch.create(EMPTY_STR_HASH);
if (initialState !== '') {
var initialOp = Operation.create(0, 0, initialState);
Patch.addOperation(zeroPatch, initialOp);
}
zeroPatch.inverseOf = Patch.invert(zeroPatch, ''); zeroPatch.inverseOf = Patch.invert(zeroPatch, '');
zeroPatch.inverseOf.inverseOf = zeroPatch; zeroPatch.inverseOf.inverseOf = zeroPatch;
var zeroMsg = Message.create(Message.PATCH, zeroPatch, ZERO); var zeroMsg = Message.create(Message.PATCH, zeroPatch, ZERO);
@ -721,40 +747,12 @@ var create = ChainPad.create = function (config) {
(realtime.messagesByParent[zeroMsg.lastMessageHash] || []).push(zeroMsg); (realtime.messagesByParent[zeroMsg.lastMessageHash] || []).push(zeroMsg);
realtime.rootMessage = zeroMsg; realtime.rootMessage = zeroMsg;
realtime.best = zeroMsg; realtime.best = zeroMsg;
realtime.authDoc = initialState;
if (initialState === '') {
realtime.uncommitted = Patch.create(zeroPatch.inverseOf.parentHash); realtime.uncommitted = Patch.create(zeroPatch.inverseOf.parentHash);
return realtime;
}
var initialOp = Operation.create(0, 0, initialState);
var initialStatePatch = Patch.create(zeroPatch.inverseOf.parentHash);
Patch.addOperation(initialStatePatch, initialOp);
initialStatePatch.inverseOf = Patch.invert(initialStatePatch, '');
initialStatePatch.inverseOf.inverseOf = initialStatePatch;
// flag this patch so it can be handled specially.
// Specifically, we never treat an initialStatePatch as our own,
// we let it be reverted to prevent duplication of data.
initialStatePatch.isInitialStatePatch = true;
initialStatePatch.inverseOf.isInitialStatePatch = true;
realtime.authDoc = initialState;
if (Common.PARANOIA) { if (Common.PARANOIA) {
realtime.userInterfaceContent = initialState; realtime.userInterfaceContent = initialState;
} }
initialMessage = Message.create(Message.PATCH, initialStatePatch, zeroMsg.hashOf);
initialMessage.hashOf = Message.hashOf(initialMessage);
initialMessage.parentCount = 1;
initialMessage.isFromMe = true;
realtime.messages[initialMessage.hashOf] = initialMessage;
(realtime.messagesByParent[initialMessage.lastMessageHash] || []).push(initialMessage);
realtime.best = initialMessage;
realtime.uncommitted = Patch.create(initialStatePatch.inverseOf.parentHash);
realtime.initialMessage = initialMessage;
return realtime; return realtime;
}; };
@ -803,6 +801,17 @@ var doOperation = ChainPad.doOperation = function (realtime, op) {
realtime.uncommittedDocLength += Operation.lengthChange(op); realtime.uncommittedDocLength += Operation.lengthChange(op);
}; };
var doPatch = ChainPad.doPatch = function (realtime, patch) {
if (Common.PARANOIA) {
check(realtime);
Common.assert(Patch.invert(realtime.uncommitted).parentHash === patch.parentHash);
realtime.userInterfaceContent = Patch.apply(patch, realtime.userInterfaceContent);
}
Patch.check(patch, realtime.uncommittedDocLength);
realtime.uncommitted = Patch.merge(realtime.uncommitted, patch);
realtime.uncommittedDocLength += Patch.lengthChange(patch);
};
var isAncestorOf = function (realtime, ancestor, decendent) { var isAncestorOf = function (realtime, ancestor, decendent) {
if (!decendent || !ancestor) { return false; } if (!decendent || !ancestor) { return false; }
if (ancestor === decendent) { return true; } if (ancestor === decendent) { return true; }
@ -858,31 +867,34 @@ var getBestChild = function (realtime, msg) {
return best; return best;
}; };
var pushUIPatch = function (realtime, patch) {
if (patch.operations.length) {
// push the uncommittedPatch out to the user interface.
for (var i = 0; i < realtime.patchHandlers.length; i++) {
realtime.patchHandlers[i](patch);
}
for (var i = 0; i < realtime.changeHandlers.length; i++) {
for (var j = patch.operations.length; j >= 0; j--) {
var op = patch.operations[j];
realtime.changeHandlers[i](op.offset, op.toRemove, op.toInsert);
}
}
}
};
var handleMessage = ChainPad.handleMessage = function (realtime, msgStr, isFromMe) { var handleMessage = ChainPad.handleMessage = function (realtime, msgStr, isFromMe) {
if (Common.PARANOIA) { check(realtime); } if (Common.PARANOIA) { check(realtime); }
var msg = Message.fromString(msgStr); var msg = Message.fromString(msgStr);
// These are all deprecated message types // otherwise it's a disconnect.
if (['REGISTER', 'PONG', 'DISCONNECT'].map(function (x) { if (msg.messageType !== Message.PATCH && msg.messageType !== Message.CHECKPOINT) {
return Message[x]; debug(realtime, "unrecognized message type " + msg.messageType);
}).indexOf(msg.messageType) !== -1) {
console.log("Deprecated message type: [%s]", msg.messageType);
return; return;
} }
// otherwise it's a disconnect.
if (msg.messageType !== Message.PATCH) {
console.error("disconnect");
return; }
msg.hashOf = Message.hashOf(msg); msg.hashOf = Message.hashOf(msg);
if (realtime.pending && realtime.pending.hash === msg.hashOf) {
realtime.pending.callback();
realtime.pending = null;
}
if (realtime.messages[msg.hashOf]) { if (realtime.messages[msg.hashOf]) {
debug(realtime, "Patch [" + msg.hashOf + "] is already known"); debug(realtime, "Patch [" + msg.hashOf + "] is already known");
if (Common.PARANOIA) { check(realtime); } if (Common.PARANOIA) { check(realtime); }
@ -894,11 +906,34 @@ var handleMessage = ChainPad.handleMessage = function (realtime, msgStr, isFromM
realtime.messagesByParent[msg.lastMsgHash] || []).push(msg); realtime.messagesByParent[msg.lastMsgHash] || []).push(msg);
if (!isAncestorOf(realtime, realtime.rootMessage, msg)) { if (!isAncestorOf(realtime, realtime.rootMessage, msg)) {
if (realtime.rootMessage === realtime.best && msg.content.isCheckpoint) {
// We're starting with a trucated chain from a checkpoint, we will adopt this
// as the root message and go with it...
var userDoc = Patch.apply(realtime.uncommitted, realtime.authDoc);
Common.assert(!Common.PARANOIA || realtime.userInterfaceContent === userDoc);
var fixUserDocPatch = Patch.invert(realtime.uncommitted, realtime.authDoc);
Patch.addOperation(fixUserDocPatch,
Operation.create(0, realtime.authDoc.length, msg.content.operations[0].toInsert));
fixUserDocPatch =
Patch.simplify(fixUserDocPatch, userDoc, realtime.config.operationSimplify);
msg.parentCount = 0;
realtime.rootMessage = realtime.best = msg;
realtime.authDoc = msg.content.operations[0].toInsert;
realtime.uncommitted = Patch.create(Sha.hex_sha256(realtime.authDoc));
realtime.uncommittedDocLength = realtime.authDoc.length;
pushUIPatch(realtime, fixUserDocPatch);
if (Common.PARANOIA) { realtime.userInterfaceContent = realtime.authDoc; }
return;
} else {
// we'll probably find the missing parent later. // we'll probably find the missing parent later.
debug(realtime, "Patch [" + msg.hashOf + "] not connected to root"); debug(realtime, "Patch [" + msg.hashOf + "] not connected to root");
if (Common.PARANOIA) { check(realtime); } if (Common.PARANOIA) { check(realtime); }
return; return;
} }
}
// of this message fills in a hole in the chain which makes another patch better, swap to the // of this message fills in a hole in the chain which makes another patch better, swap to the
// best child of this patch since longest chain always wins. // best child of this patch since longest chain always wins.
@ -963,6 +998,40 @@ var handleMessage = ChainPad.handleMessage = function (realtime, msgStr, isFromM
return; return;
} }
if (patch.isCheckpoint) {
// Ok, we have a checkpoint patch.
// If the chain length is not equal to checkpointInterval then this patch is invalid.
var i = 0;
var checkpointP;
for (var m = getParent(realtime, msg); m; m = getParent(realtime, m)) {
if (m.content.isCheckpoint) {
if (checkpointP) {
checkpointP = m;
break;
}
checkpointP = m;
}
}
if (checkpointP && checkpointP !== realtime.rootMessage) {
var point = parentCount(realtime, checkpointP);
if ((point % realtime.config.checkpointInterval) !== 0) {
debug(realtime, "checkpoint [" + msg.hashOf + "] at invalid point [" + point + "]");
if (Common.PARANOIA) { check(realtime); }
if (Common.TESTING) { throw new Error(); }
delete realtime.messages[msg.hashOf];
return;
}
// Time to prune some old messages from the chain
debug(realtime, "checkpoint [" + msg.hashOf + "]");
for (var m = getParent(realtime, checkpointP); m; m = getParent(realtime, m)) {
debug(realtime, "pruning [" + m.hashOf + "]");
delete realtime.messages[m.hashOf];
delete realtime.messagesByParent[m.hashOf];
}
realtime.rootMessage = checkpointP;
}
} else {
var simplePatch = var simplePatch =
Patch.simplify(patch, authDocAtTimeOfPatch, realtime.config.operationSimplify); Patch.simplify(patch, authDocAtTimeOfPatch, realtime.config.operationSimplify);
if (!Patch.equals(simplePatch, patch)) { if (!Patch.equals(simplePatch, patch)) {
@ -972,6 +1041,7 @@ var handleMessage = ChainPad.handleMessage = function (realtime, msgStr, isFromM
delete realtime.messages[msg.hashOf]; delete realtime.messages[msg.hashOf];
return; return;
} }
}
patch.inverseOf = Patch.invert(patch, authDocAtTimeOfPatch); patch.inverseOf = Patch.invert(patch, authDocAtTimeOfPatch);
patch.inverseOf.inverseOf = patch; patch.inverseOf.inverseOf = patch;
@ -1012,19 +1082,8 @@ var handleMessage = ChainPad.handleMessage = function (realtime, msgStr, isFromM
Common.assert(newUserInterfaceContent === realtime.userInterfaceContent); Common.assert(newUserInterfaceContent === realtime.userInterfaceContent);
} }
if (uncommittedPatch.operations.length) { pushUIPatch(realtime, uncommittedPatch);
// push the uncommittedPatch out to the user interface.
for (var i = 0; i < realtime.patchHandlers.length; i++) {
realtime.patchHandlers[i](uncommittedPatch);
}
if (realtime.opHandlers.length) {
for (var i = uncommittedPatch.operations.length-1; i >= 0; i--) {
for (var j = 0; j < realtime.opHandlers.length; j++) {
realtime.opHandlers[j](uncommittedPatch.operations[i]);
}
}
}
}
if (Common.PARANOIA) { check(realtime); } if (Common.PARANOIA) { check(realtime); }
}; };
@ -1061,13 +1120,26 @@ var getDepthOfState = function (content, minDepth, realtime) {
module.exports.create = function (conf) { module.exports.create = function (conf) {
var realtime = ChainPad.create(conf); var realtime = ChainPad.create(conf);
return { var out = {
onPatch: enterChainPad(realtime, function (handler) { onPatch: enterChainPad(realtime, function (handler) {
Common.assert(typeof(handler) === 'function'); Common.assert(typeof(handler) === 'function');
realtime.patchHandlers.push(handler); realtime.patchHandlers.push(handler);
}), }),
patch: enterChainPad(realtime, function (patch, x, y) {
if (typeof(patch) === 'number') {
// Actually they meant to call realtime.change()
out.change(patch, x, y);
return;
}
doPatch(realtime, patch);
}),
patch: enterChainPad(realtime, function (offset, count, chars) { onChange: enterChainPad(realtime, function (handler) {
Common.assert(typeof(handler) === 'function');
realtime.changeHandlers.push(handler);
}),
change: enterChainPad(realtime, function (offset, count, chars) {
if (count === 0 && chars === '') { return; }
doOperation(realtime, Operation.create(offset, count, chars)); doOperation(realtime, Operation.create(offset, count, chars));
}), }),
@ -1075,26 +1147,32 @@ module.exports.create = function (conf) {
Common.assert(typeof(handler) === 'function'); Common.assert(typeof(handler) === 'function');
realtime.messageHandlers.push(handler); realtime.messageHandlers.push(handler);
}), }),
message: enterChainPad(realtime, function (message) { message: enterChainPad(realtime, function (message) {
handleMessage(realtime, message, false); handleMessage(realtime, message, false);
}), }),
start: enterChainPad(realtime, function () { start: enterChainPad(realtime, function () {
if (realtime.syncSchedule) { unschedule(realtime, realtime.syncSchedule); } if (realtime.syncSchedule) { unschedule(realtime, realtime.syncSchedule); }
realtime.syncSchedule = schedule(realtime, function () { sync(realtime); }); realtime.syncSchedule = schedule(realtime, function () { sync(realtime); });
}), }),
abort: enterChainPad(realtime, function () { abort: enterChainPad(realtime, function () {
realtime.aborted = true;
realtime.schedules.forEach(function (s) { clearTimeout(s) }); realtime.schedules.forEach(function (s) { clearTimeout(s) });
}), }),
sync: enterChainPad(realtime, function () {
sync(realtime); sync: enterChainPad(realtime, function () { sync(realtime); }),
}),
getAuthDoc: function () { return realtime.authDoc; }, getAuthDoc: function () { return realtime.authDoc; },
getUserDoc: function () { return Patch.apply(realtime.uncommitted, realtime.authDoc); }, getUserDoc: function () { return Patch.apply(realtime.uncommitted, realtime.authDoc); },
getDepthOfState: function (content, minDepth) { getDepthOfState: function (content, minDepth) {
return getDepthOfState(content, minDepth, realtime); return getDepthOfState(content, minDepth, realtime);
} }
}; };
return out;
}; };
}, },

@ -142,19 +142,21 @@ define([
// shim between chainpad and netflux // shim between chainpad and netflux
chainpadAdapter = { chainpadAdapter = {
msgIn : function(peerId, msg) { msgIn : function(peerId, msg) {
var message = parseMessage(msg); msg = msg.replace(/^cp\|/, '');
try { try {
var decryptedMsg = Crypto.decrypt(message, cryptKey); var decryptedMsg = Crypto.decrypt(msg, cryptKey);
messagesHistory.push(decryptedMsg); messagesHistory.push(decryptedMsg);
return decryptedMsg; return decryptedMsg;
} catch (err) { } catch (err) {
console.error(err); console.error(err);
return message; return msg;
} }
}, },
msgOut : function(msg, wc) { msgOut : function(msg, wc) {
try { try {
return Crypto.encrypt(msg, cryptKey); var cmsg = Crypto.encrypt(msg, cryptKey);
if (msg.indexOf('[4') === 0) { cmsg = 'cp|' + cmsg; }
return cmsg;
} catch (err) { } catch (err) {
console.log(msg); console.log(msg);
throw err; throw err;

Loading…
Cancel
Save