From 058548b95a476604495388a837b77f99aeb3cff1 Mon Sep 17 00:00:00 2001 From: ansuz Date: Mon, 30 May 2016 14:58:20 +0200 Subject: [PATCH] revert my reversion Revert "Revert "Added checkpoints with the new code from ChainPad"" This reverts commit 6e2e8bf21f3a2f36d681ee07527eb3bc950c4062. --- NetfluxWebsocketSrv.js | 22 +- www/common/chainpad.js | 396 +++++++++++++++++++++-------------- www/common/realtime-input.js | 10 +- 3 files changed, 264 insertions(+), 164 deletions(-) diff --git a/NetfluxWebsocketSrv.js b/NetfluxWebsocketSrv.js index 8ce1ab442..03889ee8e 100644 --- a/NetfluxWebsocketSrv.js +++ b/NetfluxWebsocketSrv.js @@ -87,7 +87,27 @@ dropUser = function (ctx, user) { }; const getHistory = function (ctx, channelName, handler, cb) { - ctx.store.getMessages(channelName, function (msgStr) { handler(JSON.parse(msgStr)); }, cb); + var messageBuf = []; + ctx.store.getMessages(channelName, function (msgStr) { + messageBuf.push(JSON.parse(msgStr)); + }, function () { + var startPoint; + var cpCount = 0; + var msgBuff2 = []; + for (startPoint = messageBuf.length - 1; startPoint >= 0; startPoint--) { + var msg = messageBuf[startPoint]; + msgBuff2.push(msg); + if (msg[2] === 'MSG' && msg[4].indexOf('cp|') === 0) { + cpCount++; + if (cpCount >= 2) { + for (var x = msgBuff2.pop(); x; x = msgBuff2.pop()) { handler(x); } + break; + } + } + //console.log(messageBuf[startPoint]); + } + cb(); + }); }; const randName = function () { return Crypto.randomBytes(16).toString('hex'); }; diff --git a/www/common/chainpad.js b/www/common/chainpad.js index 8d9ec786b..e93330845 100644 --- a/www/common/chainpad.js +++ b/www/common/chainpad.js @@ -28,7 +28,8 @@ var create = Patch.create = function (parentHash) { return { type: 'Patch', operations: [], - parentHash: parentHash + parentHash: parentHash, + isCheckpoint: false }; }; @@ -45,6 +46,13 @@ var check = Patch.check = function (patch, docLength_opt) { docLength_opt += Operation.lengthChange(patch.operations[i]); } } + if (patch.isCheckpoint) { + Common.assert(patch.operations.length === 1); + Common.assert(patch.operations[0].offset === 0); + if (typeof(docLength_opt) === 'number') { + Common.assert(!docLength_opt || patch.operations[0].toRemove === docLength_opt); + } + } }; var toObj = Patch.toObj = function (patch) { @@ -104,6 +112,20 @@ var addOperation = Patch.addOperation = function (patch, op) { if (Common.PARANOIA) { check(patch); } }; +var createCheckpoint = Patch.createCheckpoint = + function (parentContent, checkpointContent, parentContentHash_opt) +{ + var op = Operation.create(0, parentContent.length, checkpointContent); + if (Common.PARANOIA && parentContentHash_opt) { + Common.assert(parentContentHash_opt === hash(parentContent)); + } + parentContentHash_opt = parentContentHash_opt || hash(parentContent); + var out = create(parentContentHash_opt); + addOperation(out, op); + out.isCheckpoint = true; + return out; +}; + var clone = Patch.clone = function (patch) { if (Common.PARANOIA) { check(patch); } var out = create(); @@ -380,7 +402,7 @@ var PARANOIA = module.exports.PARANOIA = true; var VALIDATE_ENTIRE_CHAIN_EACH_MSG = module.exports.VALIDATE_ENTIRE_CHAIN_EACH_MSG = false; /* throw errors over non-compliant messages which would otherwise be treated as invalid */ -var TESTING = module.exports.TESTING = true; +var TESTING = module.exports.TESTING = false; var assert = module.exports.assert = function (expr) { if (!expr) { throw new Error("Failed assertion"); } @@ -435,10 +457,11 @@ var REGISTER = Message.REGISTER = 0; var REGISTER_ACK = Message.REGISTER_ACK = 1; var PATCH = Message.PATCH = 2; var DISCONNECT = Message.DISCONNECT = 3; +var CHECKPOINT = Message.CHECKPOINT = 4; var check = Message.check = function(msg) { Common.assert(msg.type === 'Message'); - if (msg.messageType === PATCH) { + if (msg.messageType === PATCH || msg.messageType === CHECKPOINT) { Patch.check(msg.content); Common.assert(typeof(msg.lastMsgHash) === 'string'); } else { @@ -459,9 +482,8 @@ var create = Message.create = function (type, content, lastMsgHash) { var toString = Message.toString = function (msg) { if (Common.PARANOIA) { check(msg); } - - if (msg.messageType === PATCH) { - return JSON.stringify([PATCH, Patch.toObj(msg.content), msg.lastMsgHash]); + if (msg.messageType === PATCH || msg.messageType === CHECKPOINT) { + return JSON.stringify([msg.messageType, Patch.toObj(msg.content), msg.lastMsgHash]); } else { throw new Error(); } @@ -478,43 +500,11 @@ var discardBencode = function (msg, arr) { }; var fromString = Message.fromString = function (str) { - var msg = str; - - if (str.charAt(0) === '[') { - var m = JSON.parse(str); - return create(m[0], Patch.fromObj(m[1]), m[2]); - } else { - /* Just in case we receive messages in the old format, - we should try to parse them. We only need the content, though, - so just extract that and throw the rest away */ - var last; - var parts = []; - - // chop off all the bencoded components - while (msg) { - msg = discardBencode(msg, parts); - } - - // grab the last component from the parts - // we don't need anything else - var contentStr = parts.slice(-1)[0]; - - var content = JSON.parse(contentStr); - var message; - if (content[0] === PATCH) { - message = create(userName, PATCH, Patch.fromObj(content[1]), content[2]); - } else if ([4,5].indexOf(content[0]) !== -1 /* === PING || content[0] === PONG*/) { - // it's a ping or pong, which we don't want to support anymore - message = create(userName, content[0], content[1]); - } else { - message = create(userName, content[0]); - } - - // This check validates every operation in the patch. - check(message); - - return message - } + var m = JSON.parse(str); + if (m[0] !== CHECKPOINT && m[0] !== PATCH) { throw new Error("invalid message type " + m[0]); } + var msg = create(m[0], Patch.fromObj(m[1]), m[2]); + if (m[0] === CHECKPOINT) { msg.content.isCheckpoint = true; } + return msg; }; var hashOf = Message.hashOf = function (msg) { @@ -550,8 +540,16 @@ var Sha = module.exports.Sha = require('./SHA256'); var ChainPad = {}; // hex_sha256('') -var EMPTY_STR_HASH = 'e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855'; -var ZERO = '0000000000000000000000000000000000000000000000000000000000000000'; +var EMPTY_STR_HASH = module.exports.EMPTY_STR_HASH = + 'e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855'; +var ZERO = '0000000000000000000000000000000000000000000000000000000000000000'; + +// Default number of patches between checkpoints (patches older than this will be pruned) +// default for realtime.config.checkpointInterval +var DEFAULT_CHECKPOINT_INTERVAL = 200; + +// Default number of milliseconds to wait before syncing to the server +var DEFAULT_AVERAGE_SYNC_MILLISECONDS = 300; var enterChainPad = function (realtime, func) { return function () { @@ -567,8 +565,9 @@ var debug = function (realtime, msg) { }; var schedule = function (realtime, func, timeout) { + if (realtime.aborted) { return; } if (!timeout) { - timeout = Math.floor(Math.random() * 2 * realtime.avgSyncTime); + timeout = Math.floor(Math.random() * 2 * realtime.config.avgSyncMilliseconds); } var to = setTimeout(enterChainPad(realtime, function () { realtime.schedules.splice(realtime.schedules.indexOf(to), 1); @@ -598,12 +597,52 @@ var onMessage = function (realtime, message, callback) { } }; +var sendMessage = function (realtime, msg, callback) { + var strMsg = Message.toString(msg); + + onMessage(realtime, strMsg, function (err) { + if (err) { + debug(realtime, "Posting to server failed [" + err + "]"); + realtime.pending = null; + } else { + var pending = realtime.pending; + realtime.pending = null; + Common.assert(pending.hash === msg.hashOf); + handleMessage(realtime, strMsg, true); + pending.callback(); + } + }); + + msg.hashOf = msg.hashOf || Message.hashOf(msg); + + var timeout = schedule(realtime, function () { + debug(realtime, "Failed to send message [" + msg.hashOf + "] to server"); + sync(realtime); + }, 10000 + (Math.random() * 5000)); + + if (realtime.pending) { throw new Error("there is already a pending message"); } + realtime.pending = { + hash: msg.hashOf, + callback: function () { + if (realtime.initialMessage && realtime.initialMessage.hashOf === msg.hashOf) { + debug(realtime, "initial Ack received [" + msg.hashOf + "]"); + realtime.initialMessage = null; + } + unschedule(realtime, timeout); + realtime.syncSchedule = schedule(realtime, function () { sync(realtime); }, 0); + callback(); + } + }; + if (Common.PARANOIA) { check(realtime); } +}; + var sync = function (realtime) { if (Common.PARANOIA) { check(realtime); } - if (realtime.syncSchedule) { + if (realtime.syncSchedule && !realtime.pending) { unschedule(realtime, realtime.syncSchedule); realtime.syncSchedule = null; } else { + //debug(realtime, "already syncing..."); // we're currently waiting on something from the server. return; } @@ -617,6 +656,19 @@ var sync = function (realtime) { return; } + if (((parentCount(realtime, realtime.best) + 1) % realtime.config.checkpointInterval) === 0) { + var best = realtime.best; + debug(realtime, "Sending checkpoint"); + var cpp = Patch.createCheckpoint(realtime.authDoc, + realtime.authDoc, + realtime.best.content.inverseOf.parentHash); + var cp = Message.create(Message.CHECKPOINT, cpp, realtime.best.hashOf); + sendMessage(realtime, cp, function () { + debug(realtime, "Checkpoint sent and accepted"); + }); + return; + } + var msg; if (realtime.best === realtime.initialMessage) { msg = realtime.initialMessage; @@ -624,39 +676,16 @@ var sync = function (realtime) { msg = Message.create(Message.PATCH, realtime.uncommitted, realtime.best.hashOf); } - var strMsg = Message.toString(msg); - - onMessage(realtime, strMsg, function (err) { - if (err) { - debug(realtime, "Posting to server failed [" + err + "]"); - } else { - handleMessage(realtime, strMsg, true); - } + sendMessage(realtime, msg, function () { + //debug(realtime, "patch sent"); }); - - var hash = Message.hashOf(msg); - - var timeout = schedule(realtime, function () { - debug(realtime, "Failed to send message ["+hash+"] to server"); - sync(realtime); - }, 10000 + (Math.random() * 5000)); - realtime.pending = { - hash: hash, - callback: function () { - if (realtime.initialMessage && realtime.initialMessage.hashOf === hash) { - debug(realtime, "initial Ack received ["+hash+"]"); - realtime.initialMessage = null; - } - unschedule(realtime, timeout); - realtime.syncSchedule = schedule(realtime, function () { sync(realtime); }, 0); - } - }; - if (Common.PARANOIA) { check(realtime); } }; var create = ChainPad.create = function (config) { config = config || {}; var initialState = config.initialState || ''; + config.checkpointInterval = config.checkpointInterval || DEFAULT_CHECKPOINT_INTERVAL; + config.avgSyncMilliseconds = config.avgSyncMilliseconds || DEFAULT_AVERAGE_SYNC_MILLISECONDS; var realtime = { type: 'ChainPad', @@ -665,7 +694,7 @@ var create = ChainPad.create = function (config) { config: config, - logLevel: typeof(config.logLevel) !== 'undefined'? config.logLevel: 1, + logLevel: (typeof(config.logLevel) === 'number') ? config.logLevel : 1, /** A patch representing all uncommitted work. */ uncommitted: null, @@ -673,18 +702,17 @@ var create = ChainPad.create = function (config) { uncommittedDocLength: initialState.length, patchHandlers: [], - opHandlers: [], + changeHandlers: [], messageHandlers: [], schedules: [], + aborted: false, syncSchedule: null, registered: false, - avgSyncTime: 100, - // this is only used if PARANOIA is enabled. userInterfaceContent: undefined, @@ -699,12 +727,6 @@ var create = ChainPad.create = function (config) { rootMessage: null, userName: config.userName || 'anonymous', - - /** - * Set to the message which sets the initialState if applicable. - * Reset to null after the initial message has been successfully broadcasted. - */ - initialMessage: null, }; if (Common.PARANOIA) { @@ -712,6 +734,10 @@ var create = ChainPad.create = function (config) { } var zeroPatch = Patch.create(EMPTY_STR_HASH); + if (initialState !== '') { + var initialOp = Operation.create(0, 0, initialState); + Patch.addOperation(zeroPatch, initialOp); + } zeroPatch.inverseOf = Patch.invert(zeroPatch, ''); zeroPatch.inverseOf.inverseOf = zeroPatch; var zeroMsg = Message.create(Message.PATCH, zeroPatch, ZERO); @@ -721,40 +747,12 @@ var create = ChainPad.create = function (config) { (realtime.messagesByParent[zeroMsg.lastMessageHash] || []).push(zeroMsg); realtime.rootMessage = zeroMsg; realtime.best = zeroMsg; - - if (initialState === '') { - realtime.uncommitted = Patch.create(zeroPatch.inverseOf.parentHash); - return realtime; - } - - var initialOp = Operation.create(0, 0, initialState); - var initialStatePatch = Patch.create(zeroPatch.inverseOf.parentHash); - Patch.addOperation(initialStatePatch, initialOp); - initialStatePatch.inverseOf = Patch.invert(initialStatePatch, ''); - initialStatePatch.inverseOf.inverseOf = initialStatePatch; - - // flag this patch so it can be handled specially. - // Specifically, we never treat an initialStatePatch as our own, - // we let it be reverted to prevent duplication of data. - initialStatePatch.isInitialStatePatch = true; - initialStatePatch.inverseOf.isInitialStatePatch = true; - realtime.authDoc = initialState; + realtime.uncommitted = Patch.create(zeroPatch.inverseOf.parentHash); + if (Common.PARANOIA) { realtime.userInterfaceContent = initialState; } - initialMessage = Message.create(Message.PATCH, initialStatePatch, zeroMsg.hashOf); - initialMessage.hashOf = Message.hashOf(initialMessage); - initialMessage.parentCount = 1; - initialMessage.isFromMe = true; - - realtime.messages[initialMessage.hashOf] = initialMessage; - (realtime.messagesByParent[initialMessage.lastMessageHash] || []).push(initialMessage); - - realtime.best = initialMessage; - realtime.uncommitted = Patch.create(initialStatePatch.inverseOf.parentHash); - realtime.initialMessage = initialMessage; - return realtime; }; @@ -803,6 +801,17 @@ var doOperation = ChainPad.doOperation = function (realtime, op) { realtime.uncommittedDocLength += Operation.lengthChange(op); }; +var doPatch = ChainPad.doPatch = function (realtime, patch) { + if (Common.PARANOIA) { + check(realtime); + Common.assert(Patch.invert(realtime.uncommitted).parentHash === patch.parentHash); + realtime.userInterfaceContent = Patch.apply(patch, realtime.userInterfaceContent); + } + Patch.check(patch, realtime.uncommittedDocLength); + realtime.uncommitted = Patch.merge(realtime.uncommitted, patch); + realtime.uncommittedDocLength += Patch.lengthChange(patch); +}; + var isAncestorOf = function (realtime, ancestor, decendent) { if (!decendent || !ancestor) { return false; } if (ancestor === decendent) { return true; } @@ -858,31 +867,34 @@ var getBestChild = function (realtime, msg) { return best; }; +var pushUIPatch = function (realtime, patch) { + if (patch.operations.length) { + // push the uncommittedPatch out to the user interface. + for (var i = 0; i < realtime.patchHandlers.length; i++) { + realtime.patchHandlers[i](patch); + } + for (var i = 0; i < realtime.changeHandlers.length; i++) { + for (var j = patch.operations.length; j >= 0; j--) { + var op = patch.operations[j]; + realtime.changeHandlers[i](op.offset, op.toRemove, op.toInsert); + } + } + } +}; + var handleMessage = ChainPad.handleMessage = function (realtime, msgStr, isFromMe) { if (Common.PARANOIA) { check(realtime); } var msg = Message.fromString(msgStr); - // These are all deprecated message types - if (['REGISTER', 'PONG', 'DISCONNECT'].map(function (x) { - return Message[x]; - }).indexOf(msg.messageType) !== -1) { - console.log("Deprecated message type: [%s]", msg.messageType); + // otherwise it's a disconnect. + if (msg.messageType !== Message.PATCH && msg.messageType !== Message.CHECKPOINT) { + debug(realtime, "unrecognized message type " + msg.messageType); return; } - // otherwise it's a disconnect. - if (msg.messageType !== Message.PATCH) { - console.error("disconnect"); - return; } - msg.hashOf = Message.hashOf(msg); - if (realtime.pending && realtime.pending.hash === msg.hashOf) { - realtime.pending.callback(); - realtime.pending = null; - } - if (realtime.messages[msg.hashOf]) { debug(realtime, "Patch [" + msg.hashOf + "] is already known"); if (Common.PARANOIA) { check(realtime); } @@ -894,10 +906,33 @@ var handleMessage = ChainPad.handleMessage = function (realtime, msgStr, isFromM realtime.messagesByParent[msg.lastMsgHash] || []).push(msg); if (!isAncestorOf(realtime, realtime.rootMessage, msg)) { - // we'll probably find the missing parent later. - debug(realtime, "Patch [" + msg.hashOf + "] not connected to root"); - if (Common.PARANOIA) { check(realtime); } - return; + if (realtime.rootMessage === realtime.best && msg.content.isCheckpoint) { + // We're starting with a trucated chain from a checkpoint, we will adopt this + // as the root message and go with it... + var userDoc = Patch.apply(realtime.uncommitted, realtime.authDoc); + Common.assert(!Common.PARANOIA || realtime.userInterfaceContent === userDoc); + var fixUserDocPatch = Patch.invert(realtime.uncommitted, realtime.authDoc); + Patch.addOperation(fixUserDocPatch, + Operation.create(0, realtime.authDoc.length, msg.content.operations[0].toInsert)); + fixUserDocPatch = + Patch.simplify(fixUserDocPatch, userDoc, realtime.config.operationSimplify); + + msg.parentCount = 0; + realtime.rootMessage = realtime.best = msg; + + realtime.authDoc = msg.content.operations[0].toInsert; + realtime.uncommitted = Patch.create(Sha.hex_sha256(realtime.authDoc)); + realtime.uncommittedDocLength = realtime.authDoc.length; + pushUIPatch(realtime, fixUserDocPatch); + + if (Common.PARANOIA) { realtime.userInterfaceContent = realtime.authDoc; } + return; + } else { + // we'll probably find the missing parent later. + debug(realtime, "Patch [" + msg.hashOf + "] not connected to root"); + if (Common.PARANOIA) { check(realtime); } + return; + } } // of this message fills in a hole in the chain which makes another patch better, swap to the @@ -963,14 +998,49 @@ var handleMessage = ChainPad.handleMessage = function (realtime, msgStr, isFromM return; } - var simplePatch = - Patch.simplify(patch, authDocAtTimeOfPatch, realtime.config.operationSimplify); - if (!Patch.equals(simplePatch, patch)) { - debug(realtime, "patch [" + msg.hashOf + "] can be simplified"); - if (Common.PARANOIA) { check(realtime); } - if (Common.TESTING) { throw new Error(); } - delete realtime.messages[msg.hashOf]; - return; + if (patch.isCheckpoint) { + // Ok, we have a checkpoint patch. + // If the chain length is not equal to checkpointInterval then this patch is invalid. + var i = 0; + var checkpointP; + for (var m = getParent(realtime, msg); m; m = getParent(realtime, m)) { + if (m.content.isCheckpoint) { + if (checkpointP) { + checkpointP = m; + break; + } + checkpointP = m; + } + } + if (checkpointP && checkpointP !== realtime.rootMessage) { + var point = parentCount(realtime, checkpointP); + if ((point % realtime.config.checkpointInterval) !== 0) { + debug(realtime, "checkpoint [" + msg.hashOf + "] at invalid point [" + point + "]"); + if (Common.PARANOIA) { check(realtime); } + if (Common.TESTING) { throw new Error(); } + delete realtime.messages[msg.hashOf]; + return; + } + + // Time to prune some old messages from the chain + debug(realtime, "checkpoint [" + msg.hashOf + "]"); + for (var m = getParent(realtime, checkpointP); m; m = getParent(realtime, m)) { + debug(realtime, "pruning [" + m.hashOf + "]"); + delete realtime.messages[m.hashOf]; + delete realtime.messagesByParent[m.hashOf]; + } + realtime.rootMessage = checkpointP; + } + } else { + var simplePatch = + Patch.simplify(patch, authDocAtTimeOfPatch, realtime.config.operationSimplify); + if (!Patch.equals(simplePatch, patch)) { + debug(realtime, "patch [" + msg.hashOf + "] can be simplified"); + if (Common.PARANOIA) { check(realtime); } + if (Common.TESTING) { throw new Error(); } + delete realtime.messages[msg.hashOf]; + return; + } } patch.inverseOf = Patch.invert(patch, authDocAtTimeOfPatch); @@ -1012,19 +1082,8 @@ var handleMessage = ChainPad.handleMessage = function (realtime, msgStr, isFromM Common.assert(newUserInterfaceContent === realtime.userInterfaceContent); } - if (uncommittedPatch.operations.length) { - // push the uncommittedPatch out to the user interface. - for (var i = 0; i < realtime.patchHandlers.length; i++) { - realtime.patchHandlers[i](uncommittedPatch); - } - if (realtime.opHandlers.length) { - for (var i = uncommittedPatch.operations.length-1; i >= 0; i--) { - for (var j = 0; j < realtime.opHandlers.length; j++) { - realtime.opHandlers[j](uncommittedPatch.operations[i]); - } - } - } - } + pushUIPatch(realtime, uncommittedPatch); + if (Common.PARANOIA) { check(realtime); } }; @@ -1061,13 +1120,26 @@ var getDepthOfState = function (content, minDepth, realtime) { module.exports.create = function (conf) { var realtime = ChainPad.create(conf); - return { + var out = { onPatch: enterChainPad(realtime, function (handler) { Common.assert(typeof(handler) === 'function'); realtime.patchHandlers.push(handler); }), + patch: enterChainPad(realtime, function (patch, x, y) { + if (typeof(patch) === 'number') { + // Actually they meant to call realtime.change() + out.change(patch, x, y); + return; + } + doPatch(realtime, patch); + }), - patch: enterChainPad(realtime, function (offset, count, chars) { + onChange: enterChainPad(realtime, function (handler) { + Common.assert(typeof(handler) === 'function'); + realtime.changeHandlers.push(handler); + }), + change: enterChainPad(realtime, function (offset, count, chars) { + if (count === 0 && chars === '') { return; } doOperation(realtime, Operation.create(offset, count, chars)); }), @@ -1075,26 +1147,32 @@ module.exports.create = function (conf) { Common.assert(typeof(handler) === 'function'); realtime.messageHandlers.push(handler); }), + message: enterChainPad(realtime, function (message) { handleMessage(realtime, message, false); }), + start: enterChainPad(realtime, function () { if (realtime.syncSchedule) { unschedule(realtime, realtime.syncSchedule); } realtime.syncSchedule = schedule(realtime, function () { sync(realtime); }); }), + abort: enterChainPad(realtime, function () { + realtime.aborted = true; realtime.schedules.forEach(function (s) { clearTimeout(s) }); }), - sync: enterChainPad(realtime, function () { - sync(realtime); - }), + + sync: enterChainPad(realtime, function () { sync(realtime); }), + getAuthDoc: function () { return realtime.authDoc; }, + getUserDoc: function () { return Patch.apply(realtime.uncommitted, realtime.authDoc); }, getDepthOfState: function (content, minDepth) { return getDepthOfState(content, minDepth, realtime); } }; + return out; }; }, diff --git a/www/common/realtime-input.js b/www/common/realtime-input.js index 209c7e532..d838fda3a 100644 --- a/www/common/realtime-input.js +++ b/www/common/realtime-input.js @@ -142,19 +142,21 @@ define([ // shim between chainpad and netflux chainpadAdapter = { msgIn : function(peerId, msg) { - var message = parseMessage(msg); + msg = msg.replace(/^cp\|/, ''); try { - var decryptedMsg = Crypto.decrypt(message, cryptKey); + var decryptedMsg = Crypto.decrypt(msg, cryptKey); messagesHistory.push(decryptedMsg); return decryptedMsg; } catch (err) { console.error(err); - return message; + return msg; } }, msgOut : function(msg, wc) { try { - return Crypto.encrypt(msg, cryptKey); + var cmsg = Crypto.encrypt(msg, cryptKey); + if (msg.indexOf('[4') === 0) { cmsg = 'cp|' + cmsg; } + return cmsg; } catch (err) { console.log(msg); throw err;