move to chainpad version 2

pull/1/head
ansuz 9 years ago
parent 297d8c2d44
commit 976a08cc7a

@ -435,38 +435,20 @@ var REGISTER = Message.REGISTER = 0;
var REGISTER_ACK = Message.REGISTER_ACK = 1;
var PATCH = Message.PATCH = 2;
var DISCONNECT = Message.DISCONNECT = 3;
var PING = Message.PING = 4;
var PONG = Message.PONG = 5;
var check = Message.check = function(msg) {
Common.assert(msg.type === 'Message');
Common.assert(typeof(msg.userName) === 'string');
Common.assert(typeof(msg.authToken) === 'string');
Common.assert(typeof(msg.channelId) === 'string');
if (msg.messageType === PATCH) {
Patch.check(msg.content);
Common.assert(typeof(msg.lastMsgHash) === 'string');
} else if (msg.messageType === PING || msg.messageType === PONG) {
Common.assert(typeof(msg.lastMsgHash) === 'undefined');
Common.assert(typeof(msg.content) === 'number');
} else if (msg.messageType === REGISTER
|| msg.messageType === REGISTER_ACK
|| msg.messageType === DISCONNECT)
{
Common.assert(typeof(msg.lastMsgHash) === 'undefined');
Common.assert(typeof(msg.content) === 'undefined');
} else {
throw new Error("invalid message type [" + msg.messageType + "]");
}
};
var create = Message.create = function (userName, authToken, channelId, type, content, lastMsgHash) {
var create = Message.create = function (type, content, lastMsgHash) {
var msg = {
type: 'Message',
userName: userName,
authToken: authToken,
channelId: channelId,
messageType: type,
content: content,
lastMsgHash: lastMsgHash
@ -477,62 +459,67 @@ var create = Message.create = function (userName, authToken, channelId, type, co
var toString = Message.toString = function (msg) {
if (Common.PARANOIA) { check(msg); }
var prefix = msg.messageType + ':';
var content = '';
if (msg.messageType === REGISTER) {
content = JSON.stringify([REGISTER]);
} else if (msg.messageType === PING || msg.messageType === PONG) {
content = JSON.stringify([msg.messageType, msg.content]);
} else if (msg.messageType === PATCH) {
content = JSON.stringify([PATCH, Patch.toObj(msg.content), msg.lastMsgHash]);
}
return msg.authToken.length + ":" + msg.authToken +
msg.userName.length + ":" + msg.userName +
msg.channelId.length + ":" + msg.channelId +
content.length + ':' + content;
if (msg.messageType === PATCH) {
return JSON.stringify([PATCH, Patch.toObj(msg.content), msg.lastMsgHash]);
} else {
throw new Error();
}
};
var discardBencode = function (msg, arr) {
var len = msg.substring(0,msg.indexOf(':'));
msg = msg.substring(len.length+1);
var value = msg.substring(0,Number(len));
msg = msg.substring(value.length);
if (arr) { arr.push(value); }
return msg;
};
var fromString = Message.fromString = function (str) {
var msg = str;
var unameLen = msg.substring(0,msg.indexOf(':'));
msg = msg.substring(unameLen.length+1);
var userName = msg.substring(0,Number(unameLen));
msg = msg.substring(userName.length);
var channelIdLen = msg.substring(0,msg.indexOf(':'));
msg = msg.substring(channelIdLen.length+1);
var channelId = msg.substring(0,Number(channelIdLen));
msg = msg.substring(channelId.length);
if (str.charAt(0) === '[') {
var m = JSON.parse(str);
return create(m[0], Patch.fromObj(m[1]), m[2]);
} else {
/* Just in case we receive messages in the old format,
we should try to parse them. We only need the content, though,
so just extract that and throw the rest away */
var last;
var parts = [];
// chop off all the bencoded components
while (msg) {
msg = discardBencode(msg, parts);
}
var contentStrLen = msg.substring(0,msg.indexOf(':'));
msg = msg.substring(contentStrLen.length+1);
var contentStr = msg.substring(0,Number(contentStrLen));
// grab the last component from the parts
// we don't need anything else
var contentStr = parts.slice(-1)[0];
var content = JSON.parse(contentStr);
var message;
if (content[0] === PATCH) {
message = create(userName, PATCH, Patch.fromObj(content[1]), content[2]);
} else if ([4,5].indexOf(content[0]) !== -1 /* === PING || content[0] === PONG*/) {
// it's a ping or pong, which we don't want to support anymore
message = create(userName, content[0], content[1]);
} else {
message = create(userName, content[0]);
}
Common.assert(contentStr.length === Number(contentStrLen));
// This check validates every operation in the patch.
check(message);
var content = JSON.parse(contentStr);
var message;
if (content[0] === PATCH) {
message = create(userName, '', channelId, PATCH, Patch.fromObj(content[1]), content[2]);
} else if (content[0] === PING || content[0] === PONG) {
message = create(userName, '', channelId, content[0], content[1]);
} else {
message = create(userName, '', channelId, content[0]);
return message
}
// This check validates every operation in the patch.
check(message);
return message
};
var hashOf = Message.hashOf = function (msg) {
if (Common.PARANOIA) { check(msg); }
var authToken = msg.authToken;
msg.authToken = '';
var hash = Sha.hex_sha256(toString(msg));
msg.authToken = authToken;
return hash;
};
@ -554,10 +541,10 @@ var hashOf = Message.hashOf = function (msg) {
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
var Common = require('./Common');
var Common = module.exports.Common = require('./Common');
var Operation = module.exports.Operation = require('./Operation');
var Patch = require('./Patch');
var Message = require('./Message');
var Patch = module.exports.Patch = require('./Patch');
var Message = module.exports.Message = require('./Message');
var Sha = module.exports.Sha = require('./SHA256');
var ChainPad = {};
@ -634,12 +621,7 @@ var sync = function (realtime) {
if (realtime.best === realtime.initialMessage) {
msg = realtime.initialMessage;
} else {
msg = Message.create(realtime.userName,
realtime.authToken,
realtime.channelId,
Message.PATCH,
realtime.uncommitted,
realtime.best.hashOf);
msg = Message.create(Message.PATCH, realtime.uncommitted, realtime.best.hashOf);
}
var strMsg = Message.toString(msg);
@ -647,6 +629,8 @@ var sync = function (realtime) {
onMessage(realtime, strMsg, function (err) {
if (err) {
debug(realtime, "Posting to server failed [" + err + "]");
} else {
handleMessage(realtime, strMsg, true);
}
});
@ -670,46 +654,9 @@ var sync = function (realtime) {
if (Common.PARANOIA) { check(realtime); }
};
var getMessages = function (realtime) {
realtime.registered = true;
/*var to = schedule(realtime, function () {
throw new Error("failed to connect to the server");
}, 5000);*/
var msg = Message.create(realtime.userName,
realtime.authToken,
realtime.channelId,
Message.REGISTER);
onMessage(realtime, Message.toString(msg), function (err) {
if (err) { throw err; }
});
};
var sendPing = function (realtime) {
realtime.pingSchedule = undefined;
realtime.lastPingTime = (new Date()).getTime();
var msg = Message.create(realtime.userName,
realtime.authToken,
realtime.channelId,
Message.PING,
realtime.lastPingTime);
onMessage(realtime, Message.toString(msg), function (err) {
if (err) { throw err; }
});
};
var onPong = function (realtime, msg) {
if (Common.PARANOIA) {
Common.assert(realtime.lastPingTime === Number(msg.content));
}
realtime.lastPingLag = (new Date()).getTime() - Number(msg.content);
realtime.lastPingTime = 0;
realtime.pingSchedule =
schedule(realtime, function () { sendPing(realtime); }, realtime.pingCycle);
};
var create = ChainPad.create = function (userName, authToken, channelId, initialState, config) {
var create = ChainPad.create = function (config) {
config = config || {};
var initialState = config.initialState || '';
var realtime = {
type: 'ChainPad',
@ -720,10 +667,6 @@ var create = ChainPad.create = function (userName, authToken, channelId, initial
logLevel: typeof(config.logLevel) !== 'undefined'? config.logLevel: 1,
userName: userName,
authToken: authToken,
channelId: channelId,
/** A patch representing all uncommitted work. */
uncommitted: null,
@ -755,23 +698,13 @@ var create = ChainPad.create = function (userName, authToken, channelId, initial
rootMessage: null,
userName: config.userName || 'anonymous',
/**
* Set to the message which sets the initialState if applicable.
* Reset to null after the initial message has been successfully broadcasted.
*/
initialMessage: null,
userListChangeHandlers: [],
userList: [],
/** The schedule() for sending pings. */
pingSchedule: undefined,
lastPingLag: 0,
lastPingTime: 0,
/** Average number of milliseconds between pings. */
pingCycle: 5000
};
if (Common.PARANOIA) {
@ -781,7 +714,7 @@ var create = ChainPad.create = function (userName, authToken, channelId, initial
var zeroPatch = Patch.create(EMPTY_STR_HASH);
zeroPatch.inverseOf = Patch.invert(zeroPatch, '');
zeroPatch.inverseOf.inverseOf = zeroPatch;
var zeroMsg = Message.create('', '', channelId, Message.PATCH, zeroPatch, ZERO);
var zeroMsg = Message.create(Message.PATCH, zeroPatch, ZERO);
zeroMsg.hashOf = Message.hashOf(zeroMsg);
zeroMsg.parentCount = 0;
realtime.messages[zeroMsg.hashOf] = zeroMsg;
@ -810,14 +743,10 @@ var create = ChainPad.create = function (userName, authToken, channelId, initial
if (Common.PARANOIA) {
realtime.userInterfaceContent = initialState;
}
initialMessage = Message.create(realtime.userName,
realtime.authToken,
realtime.channelId,
Message.PATCH,
initialStatePatch,
zeroMsg.hashOf);
initialMessage = Message.create(Message.PATCH, initialStatePatch, zeroMsg.hashOf);
initialMessage.hashOf = Message.hashOf(initialMessage);
initialMessage.parentCount = 1;
initialMessage.isFromMe = true;
realtime.messages[initialMessage.hashOf] = initialMessage;
(realtime.messagesByParent[initialMessage.lastMessageHash] || []).push(initialMessage);
@ -887,8 +816,10 @@ var parentCount = function (realtime, message) {
return message.parentCount;
};
var applyPatch = function (realtime, author, patch) {
if (author === realtime.userName && !patch.isInitialStatePatch) {
var applyPatch = function (realtime, isFromMe, patch) {
Common.assert(patch);
Common.assert(patch.inverseOf);
if (isFromMe && !patch.isInitialStatePatch) {
var inverseOldUncommitted = Patch.invert(realtime.uncommitted, realtime.authDoc);
var userInterfaceContent = Patch.apply(realtime.uncommitted, realtime.authDoc);
if (Common.PARANOIA) {
@ -907,12 +838,14 @@ var applyPatch = function (realtime, author, patch) {
realtime.authDoc = Patch.apply(patch, realtime.authDoc);
if (Common.PARANOIA) {
Common.assert(realtime.uncommitted.parentHash === patch.inverseOf.parentHash);
Common.assert(Sha.hex_sha256(realtime.authDoc) === realtime.uncommitted.parentHash);
realtime.userInterfaceContent = Patch.apply(realtime.uncommitted, realtime.authDoc);
}
};
var revertPatch = function (realtime, author, patch) {
applyPatch(realtime, author, patch.inverseOf);
var revertPatch = function (realtime, isFromMe, patch) {
applyPatch(realtime, isFromMe, patch.inverseOf);
};
var getBestChild = function (realtime, msg) {
@ -925,55 +858,23 @@ var getBestChild = function (realtime, msg) {
return best;
};
var userListChange = function (realtime) {
for (var i = 0; i < realtime.userListChangeHandlers.length; i++) {
var list = [];
list.push.apply(list, realtime.userList);
realtime.userListChangeHandlers[i](list);
}
};
var handleMessage = ChainPad.handleMessage = function (realtime, msgStr) {
var handleMessage = ChainPad.handleMessage = function (realtime, msgStr, isFromMe) {
if (Common.PARANOIA) { check(realtime); }
var msg = Message.fromString(msgStr);
Common.assert(msg.channelId === realtime.channelId);
if (msg.messageType === Message.REGISTER_ACK) {
debug(realtime, "registered");
realtime.registered = true;
sendPing(realtime);
return;
}
if (msg.messageType === Message.REGISTER) {
realtime.userList.push(msg.userName);
userListChange(realtime);
return;
}
if (msg.messageType === Message.PONG) {
onPong(realtime, msg);
return;
}
if (msg.messageType === Message.DISCONNECT) {
if (msg.userName === '') {
realtime.userList = [];
userListChange(realtime);
return;
}
var idx = realtime.userList.indexOf(msg.userName);
if (Common.PARANOIA) { Common.assert(idx > -1); }
if (idx > -1) {
realtime.userList.splice(idx, 1);
userListChange(realtime);
}
// These are all deprecated message types
if (['REGISTER', 'PONG', 'DISCONNECT'].map(function (x) {
return Message[x];
}).indexOf(msg.messageType) !== -1) {
console.log("Deprecated message type: [%s]", msg.messageType);
return;
}
// otherwise it's a disconnect.
if (msg.messageType !== Message.PATCH) { return; }
if (msg.messageType !== Message.PATCH) {
console.error("disconnect");
return; }
msg.hashOf = Message.hashOf(msg);
@ -1002,6 +903,7 @@ var handleMessage = ChainPad.handleMessage = function (realtime, msgStr) {
// of this message fills in a hole in the chain which makes another patch better, swap to the
// best child of this patch since longest chain always wins.
msg = getBestChild(realtime, msg);
msg.isFromMe = isFromMe;
var patch = msg.content;
// Find the ancestor of this patch which is in the main chain, reverting as necessary
@ -1040,6 +942,7 @@ var handleMessage = ChainPad.handleMessage = function (realtime, msgStr) {
var authDocAtTimeOfPatch = realtime.authDoc;
for (var i = 0; i < toRevert.length; i++) {
Common.assert(typeof(toRevert[i].content.inverseOf) !== 'undefined');
authDocAtTimeOfPatch = Patch.apply(toRevert[i].content.inverseOf, authDocAtTimeOfPatch);
}
@ -1086,13 +989,13 @@ var handleMessage = ChainPad.handleMessage = function (realtime, msgStr) {
for (var i = 0; i < toRevert.length; i++) {
debug(realtime, "reverting [" + toRevert[i].hashOf + "]");
uncommittedPatch = Patch.merge(uncommittedPatch, toRevert[i].content.inverseOf);
revertPatch(realtime, toRevert[i].userName, toRevert[i].content);
revertPatch(realtime, toRevert[i].isFromMe, toRevert[i].content);
}
for (var i = 0; i < toApply.length; i++) {
debug(realtime, "applying [" + toApply[i].hashOf + "]");
uncommittedPatch = Patch.merge(uncommittedPatch, toApply[i].content);
applyPatch(realtime, toApply[i].userName, toApply[i].content);
applyPatch(realtime, toApply[i].isFromMe, toApply[i].content);
}
uncommittedPatch = Patch.merge(uncommittedPatch, realtime.uncommitted);
@ -1125,22 +1028,6 @@ var handleMessage = ChainPad.handleMessage = function (realtime, msgStr) {
if (Common.PARANOIA) { check(realtime); }
};
var wasEverState = function (content, realtime) {
Common.assert(typeof(content) === 'string');
// without this we would never get true on the ^HEAD
if (realtime.authDoc === content) {
return true;
}
var hash = Sha.hex_sha256(content);
var patchMsg = realtime.best;
do {
if (patchMsg.content.parentHash === hash) { return true; }
} while ((patchMsg = getParent(realtime, patchMsg)));
return false;
};
var getDepthOfState = function (content, minDepth, realtime) {
Common.assert(typeof(content) === 'string');
@ -1169,47 +1056,29 @@ var getDepthOfState = function (content, minDepth, realtime) {
}
depth++;
} while ((patchMsg = getParent(realtime, patchMsg)));
return;
return -1;
};
module.exports.create = function (userName, authToken, channelId, initialState, conf) {
Common.assert(typeof(userName) === 'string');
Common.assert(typeof(authToken) === 'string');
Common.assert(typeof(channelId) === 'string');
Common.assert(typeof(initialState) === 'string');
var realtime = ChainPad.create(userName, authToken, channelId, initialState, conf);
module.exports.create = function (conf) {
var realtime = ChainPad.create(conf);
return {
onPatch: enterChainPad(realtime, function (handler) {
Common.assert(typeof(handler) === 'function');
realtime.patchHandlers.push(handler);
}),
onRemove: enterChainPad(realtime, function (handler) {
Common.assert(typeof(handler) === 'function');
realtime.opHandlers.unshift(function (op) {
if (op.toRemove > 0) { handler(op.offset, op.toRemove); }
});
}),
onInsert: enterChainPad(realtime, function (handler) {
Common.assert(typeof(handler) === 'function');
realtime.opHandlers.push(function (op) {
if (op.toInsert.length > 0) { handler(op.offset, op.toInsert); }
});
}),
remove: enterChainPad(realtime, function (offset, numChars) {
doOperation(realtime, Operation.create(offset, numChars, ''));
}),
insert: enterChainPad(realtime, function (offset, str) {
doOperation(realtime, Operation.create(offset, 0, str));
patch: enterChainPad(realtime, function (offset, count, chars) {
doOperation(realtime, Operation.create(offset, count, chars));
}),
onMessage: enterChainPad(realtime, function (handler) {
Common.assert(typeof(handler) === 'function');
realtime.messageHandlers.push(handler);
}),
message: enterChainPad(realtime, function (message) {
handleMessage(realtime, message);
handleMessage(realtime, message, false);
}),
start: enterChainPad(realtime, function () {
getMessages(realtime);
if (realtime.syncSchedule) { unschedule(realtime, realtime.syncSchedule); }
realtime.syncSchedule = schedule(realtime, function () { sync(realtime); });
}),
@ -1221,19 +1090,7 @@ module.exports.create = function (userName, authToken, channelId, initialState,
}),
getAuthDoc: function () { return realtime.authDoc; },
getUserDoc: function () { return Patch.apply(realtime.uncommitted, realtime.authDoc); },
onUserListChange: enterChainPad(realtime, function (handler) {
Common.assert(typeof(handler) === 'function');
realtime.userListChangeHandlers.push(handler);
}),
getLag: function () {
if (realtime.lastPingTime) {
return { waiting:1, lag: (new Date()).getTime() - realtime.lastPingTime };
}
return { waiting:0, lag: realtime.lastPingLag };
},
wasEverState: function (content) {
return wasEverState(content, realtime);
},
getDepthOfState: function (content, minDepth) {
return getDepthOfState(content, minDepth, realtime);
}

@ -22,36 +22,12 @@ define([
return Nacl.util.encodeUTF8(unpacked);
};
// this is crap because of bencoding messages... it should go away....
var splitMessage = function (msg, sending) {
var idx = 0;
var nl;
for (var i = ((sending) ? 0 : 1); i < 3; i++) {
nl = msg.indexOf(':',idx);
idx = nl + Number(msg.substring(idx,nl)) + 1;
}
return [ msg.substring(0,idx), msg.substring(msg.indexOf(':',idx) + 1) ];
};
var encrypt = module.exports.encrypt = function (msg, key) {
var spl = splitMessage(msg, true);
var json = JSON.parse(spl[1]);
// non-patches are not encrypted.
if (json[0] !== 2) { return msg; }
json[1] = encryptStr(JSON.stringify(json[1]), key);
var res = JSON.stringify(json);
return spl[0] + res.length + ':' + res;
return encryptStr(msg, key);
};
var decrypt = module.exports.decrypt = function (msg, key) {
var spl = splitMessage(msg, false);
var json = JSON.parse(spl[1]);
// non-patches are not encrypted.
if (json[0] !== 2) { return msg; }
if (typeof(json[1]) !== 'string') { throw new Error(); }
json[1] = JSON.parse(decryptStr(json[1], key));
var res = JSON.stringify(json);
return spl[0] + res.length + ':' + res;
return decryptStr(msg, key);
};
var parseKey = module.exports.parseKey = function (str) {

@ -288,4 +288,4 @@ define(function () {
};
return { connect: connect };
});
});

@ -37,6 +37,8 @@ define([
verbose = function (x) { console.log(x); };
verbose = function () {}; // comment out to enable verbose logging
var unBencode = function (str) { return str.replace(/^\d+:/, ''); };
var start = module.exports.start =
function (config)
{
@ -59,27 +61,7 @@ define([
var realtime;
var parseMessage = function (msg) {
var res ={};
// two or more? use a for
['pass','user','channelId','content'].forEach(function(attr){
var len=msg.slice(0,msg.indexOf(':')),
// taking an offset lets us slice out the prop
// and saves us one string copy
o=len.length+1,
prop=res[attr]=msg.slice(o,Number(len)+o);
// slice off the property and its descriptor
msg = msg.slice(prop.length+o);
});
// content is the only attribute that's not a string
res.content=JSON.parse(res.content);
return res;
};
var mkMessage = function (user, chan, content) {
content = JSON.stringify(content);
return user.length + ':' + user +
chan.length + ':' + chan +
content.length + ':' + content;
return unBencode(msg);//.slice(msg.indexOf(':[') + 1);
};
var userList = {
@ -137,6 +119,12 @@ define([
config.onLocal();
}
}
// slice off the bencoded header
// Why are we getting bencoded stuff to begin with?
// FIXME this shouldn't be necessary
message = unBencode(message);//.slice(message.indexOf(':[') + 1);
// pass the message into Chainpad
realtime.message(message);
};
@ -154,10 +142,7 @@ define([
// shim between chainpad and netflux
chainpadAdapter = {
msgIn : function(peerId, msg) {
var parsed = parseMessage(msg);
// Remove the password from the message
var passLen = msg.substring(0,msg.indexOf(':'));
var message = msg.substring(passLen.length+1 + Number(passLen));
var message = parseMessage(msg);
try {
var decryptedMsg = Crypto.decrypt(message, cryptKey);
messagesHistory.push(decryptedMsg);
@ -166,33 +151,24 @@ define([
console.error(err);
return message;
}
},
msgOut : function(msg, wc) {
var parsed = parseMessage(msg);
if(parsed.content[0] === 0) { // We're registering : send a REGISTER_ACK to Chainpad
onMessage('', '1:y'+mkMessage('', channel, [1,0]));
return;
}
if(parsed.content[0] === 4) { // PING message from Chainpad
parsed.content[0] = 5;
onMessage('', '1:y'+mkMessage(parsed.user, parsed.channelId, parsed.content));
// wc.sendPing();
return;
try {
return Crypto.encrypt(msg, cryptKey);
} catch (err) {
console.log(msg);
throw err;
}
return Crypto.encrypt(msg, cryptKey);
}
};
var createRealtime = function(chan) {
return ChainPad.create(userName,
passwd,
channel,
config.initialState || '',
{
transformFunction: config.transformFunction,
logLevel: typeof(config.logLevel) !== 'undefined'? config.logLevel : 1
});
return ChainPad.create({
userName: userName,
initialState: config.initialState,
transformFunction: config.transformFunction,
logLevel: typeof(config.logLevel) !== 'undefined'? config.logLevel : 1
});
};
@ -225,13 +201,12 @@ define([
}
// Sending a message...
realtime.onMessage(function(message) {
realtime.onMessage(function(message, cb) {
// Filter messages sent by Chainpad to make it compatible with Netflux
message = chainpadAdapter.msgOut(message, wc);
if(message) {
wc.bcast(message).then(function() {
// Send the message back to Chainpad once it is sent to the recipients.
onMessage(wc.myID, message);
cb();
}, function(err) {
// The message has not been sent, display the error.
console.error(err);
@ -283,8 +258,6 @@ define([
// pass messages that come out of netflux into our local handler
network.on('disconnect', function (evt) {
// TODO also abort if Netflux times out
// that will be managed in Netflux-client.js
if (config.onAbort) {
config.onAbort({
reason: evt.reason

Loading…
Cancel
Save