From 60c3aceb0dc1a4887d92a9bde6bb3befc4b657f3 Mon Sep 17 00:00:00 2001 From: Yann Flory Date: Mon, 7 Mar 2016 17:18:47 +0100 Subject: [PATCH] Add chatflux server --- www/common/netflux.js | 112 ++++++++++++++++------------------- www/common/realtime-input.js | 84 +++++++++++++++++++------- 2 files changed, 113 insertions(+), 83 deletions(-) diff --git a/www/common/netflux.js b/www/common/netflux.js index 6919a72cd..5764ffd7c 100644 --- a/www/common/netflux.js +++ b/www/common/netflux.js @@ -129,9 +129,6 @@ return /******/ (function(modules) { // webpackBootstrap webChannel.onopen = function () { resolve(webChannel); }; - if (settings.openWebChannel && settings.openWebChannel === true) { - webChannel.onopen(); - } }); }); } @@ -215,8 +212,6 @@ return /******/ (function(modules) { // webpackBootstrap this.protocol = cs.EXCHANGEPROTOCOL_SERVICE; // Public attributes - this.topology = this.settings.topology; - this.topologyService = _ServiceProvider2.default.get(this.topology); this.id; this.myID = this._generateID(); this.channels = new Set(); @@ -290,7 +285,7 @@ return /******/ (function(modules) { // webpackBootstrap this.topologyService = _ServiceProvider2.default.get(topologyServiceName); }, get: function get() { - return this.settings.topology; + return this.settigns.topology; } }]); @@ -619,7 +614,8 @@ return /******/ (function(modules) { // webpackBootstrap for (var _iterator = webChannel.channels[Symbol.iterator](), _step; !(_iteratorNormalCompletion = (_step = _iterator.next()).done); _iteratorNormalCompletion = true) { var c = _step.value; - c.send(data); + var msg = JSON.stringify([c.seq++, data.type, webChannel.id, data.msg]); + c.send(msg); } } catch (err) { _didIteratorError = true; @@ -647,7 +643,8 @@ return /******/ (function(modules) { // webpackBootstrap for (var _iterator2 = webChannel.channels[Symbol.iterator](), _step2; !(_iteratorNormalCompletion2 = (_step2 = _iterator2.next()).done); _iteratorNormalCompletion2 = true) { var c = _step2.value; - c.send(data); + var msg = JSON.stringify([c.seq++, data.type, id, data.msg]); + c.send(msg); } } catch (err) { _didIteratorError2 = true; @@ -1024,9 +1021,7 @@ return /******/ (function(modules) { // webpackBootstrap this.NAME = this.constructor.name; this.protocol = _ServiceProvider2.default.get(cs.EXCHANGEPROTOCOL_SERVICE); this.defaults = { - signaling: 'ws://localhost:9000', - // Maximum number of milliseconds of lag before we fail the connection. - MAX_LAG_BEFORE_DISCONNECT: 20000 + signaling: 'ws://localhost:9000' }; this.settings = Object.assign({}, this.defaults, options); } @@ -1034,30 +1029,23 @@ return /******/ (function(modules) { // webpackBootstrap _createClass(WebSocketService, [{ key: 'join', value: function join(key) { - var _this = this; - var options = arguments.length <= 1 || arguments[1] === undefined ? {} : arguments[1]; var settings = Object.assign({}, this.settings, options); return new Promise(function (resolve, reject) { var connection = undefined; var socket = new window.WebSocket(settings.signaling); + socket.seq = 1; socket.facade = options.facade || null; socket.onopen = function () { - resolve(socket); - }; - socket.onerror = reject; - // Check the status of the socket connection - var isSocketDisconnected = function isSocketDisconnected(realtime, sock) { - return sock.readyState === sock.CLOSING || sock.readyState === sock.CLOSED || realtime.getLag().waiting && realtime.getLag().lag > _this.settings.MAX_LAG_BEFORE_DISCONNECT; - }; - socket.checkSocket = function (realtime) { - if (isSocketDisconnected(realtime, socket) && !socket.intentionallyClosing) { - return true; + if (key && key !== '') { + socket.send(JSON.stringify([socket.seq++, 'JOIN', key])); } else { - return false; + socket.send(JSON.stringify([socket.seq++, 'JOIN'])); } + resolve(socket); }; + socket.onerror = reject; }); } }]); @@ -1202,18 +1190,13 @@ return /******/ (function(modules) { // webpackBootstrap _createClass(WebSocketProtocolService, [{ key: 'onmessage', value: function onmessage(e) { - var msg = e.data; + var msg = JSON.parse(e.data); var socket = e.currentTarget; var webChannel = socket.webChannel; var topology = cs.STAR_SERVICE; - webChannel.topology = topology; - webChannel.topologyService = _ServiceProvider2.default.get(topology); - webChannel.onMessage('', msg); - - /* - let topology = cs.STAR_SERVICE - let topologyService = ServiceProvider.get(topology) - if (msg[0] !== 0) { + var topologyService = _ServiceProvider2.default.get(topology); + + if (msg[0] !== 0) { return; } if (msg[1] === 'IDENT') { @@ -1227,56 +1210,65 @@ return /******/ (function(modules) { // webpackBootstrap socket.send(JSON.stringify(msg)); return; } - if (msg[2] === 'MSG') { - } + if (msg[2] === 'MSG') {} // We have received a new direct message from another user if (msg[2] === 'MSG' && msg[3] === socket.uid) { // Find the peer exists in one of our channels or create a new one - if(typeof socket.facade._onPeerMessage === "function") - socket.facade._onPeerMessage(msg[1], msg); + if (typeof socket.facade._onPeerMessage === "function") socket.facade._onPeerMessage(msg[1], msg); } if (msg[2] === 'JOIN' && (webChannel.id == null || webChannel.id === msg[3])) { - if(!webChannel.id) { // New unnamed channel : get its name from the first "JOIN" message - var chanName = window.location.hash = msg[3]; - webChannel.id = chanName; + if (!webChannel.id) { + // New unnamed channel : get its name from the first "JOIN" message + if (!window.location.hash) { + var chanName = window.location.hash = msg[3]; + } + webChannel.id = msg[3]; } - if (msg[1] === socket.uid) { // If the user catches himself registering, he is synchronized with the server + + if (msg[1] === socket.uid) { + // If the user catches himself registering, he is synchronized with the server webChannel.onopen(); - } - else { // Trigger onJoining() when another user is joining the channel - // Register the user in the list of peers in the channel - var linkQuality = (msg[1] === '_HISTORY_KEEPER_') ? 1000 : 0; - var sendToPeer = function(data) { - topologyService.sendTo(msg[1], webChannel, {type : 'MSG', msg: data}); - } - var peer = {id: msg[1], connector: socket, linkQuality: linkQuality, send: sendToPeer}; - if(webChannel.peers.indexOf(peer) === -1) { + } else { + // Trigger onJoining() when another user is joining the channel + + // Register the user in the list of peers in the channel + var linkQuality = msg[1] === '_HISTORY_KEEPER_' ? 1000 : 0; + var sendToPeer = function sendToPeer(data) { + topologyService.sendTo(msg[1], webChannel, { type: 'MSG', msg: data }); + }; + var peer = { id: msg[1], connector: socket, linkQuality: linkQuality, send: sendToPeer }; + if (webChannel.peers.indexOf(peer) === -1) { webChannel.peers.push(peer); } - if(typeof webChannel.onJoining === "function") - webChannel.onJoining(msg[1]); + + if (typeof webChannel.onJoining === "function") webChannel.onJoining(msg[1]); } } - // We have received a new message in that channel + // We have received a new message in that channel from another peer if (msg[2] === 'MSG' && msg[3] === webChannel.id) { // Find the peer who sent the message and display it //TODO Use Peer instead of peer.id (msg[1]) : - if(typeof webChannel.onMessage === "function") - + if (typeof webChannel.onMessage === "function") webChannel.onMessage(msg[1], msg[4]); } // Someone else has left the channel, remove him from the list of peers if (msg[2] === 'LEAVE' && msg[3] === webChannel.id) { //TODO Use Peer instead of peer.id (msg[1]) : - if(typeof webChannel.onLeaving === "function") - webChannel.onLeaving(msg[1], webChannel); + if (typeof webChannel.onLeaving === "function") webChannel.onLeaving(msg[1], webChannel); } - */ } }, { key: 'message', value: function message(code, data) { - // The message is already prepared and encrypted - return data.data; + var type = undefined; + switch (code) { + case cs.USER_DATA: + type = 'MSG'; + break; + case cs.JOIN_START: + type = 'JOIN'; + break; + } + return { type: type, msg: data.data }; } }]); diff --git a/www/common/realtime-input.js b/www/common/realtime-input.js index 08473e3f8..0f77f421c 100644 --- a/www/common/realtime-input.js +++ b/www/common/realtime-input.js @@ -101,9 +101,20 @@ define([ var initializing = true; var bump = function () {}; + + var onPeerMessage = function (peer, msg) { + if(peer === '_HISTORY_KEEPER_') { + var msgHistory = JSON.parse(msg[4]); + onMessage(msgHistory[1], msgHistory[4]); + } + else { + warn('Illegal direct message'); + } + }; var options = { - signaling: websocketUrl, + // signaling: websocketUrl, + signaling: 'ws://localhost:9000', topology: 'StarTopologyService', protocol: 'WebSocketProtocolService', connector: 'WebSocketService', @@ -111,26 +122,42 @@ define([ }; var realtime; + // Add the Facade's peer messages handler + Netflux._onPeerMessage = onPeerMessage; // Connect to the WebSocket server Netflux.join(channel, options).then(function(wc) { + wc.onMessage = onMessage; // On receiving message wc.onJoining = onJoining; // On user joining the session // Open a Chainpad session realtime = createRealtime(); - realtime.onUserListChange(function (userList) { - var opt = {userList : userList}; - // TODO : onJoining should only a a "newPeer" parameter - wc.onJoining(opt); - }); + + // we're fully synced + initializing = false; + + // execute an onReady callback if one was supplied + if (config.onReady) { + config.onReady(); + } + // On sending message realtime.onMessage(function(message) { + // Do not send authentication messages since it is handled by Netflux + var parsed = parseMessage(message); + if (parsed.content[0] !== 0) { message = Crypto.encrypt(message, cryptKey); wc.send(message); + } }); + + // Get the channel history + var hc; + wc.peers.forEach(function (p) { if (!hc || p.linkQuality > hc.linkQuality) { hc = p; } }); + hc.send(JSON.stringify(['GET_HISTORY', wc.id])); // Check the connection to the channel - checkConnection(wc); + //checkConnection(wc); bindAllEvents(textarea, doc, onEvent, false); @@ -156,8 +183,11 @@ define([ return '\\' +c; })); - var onMessage = function(user, message) { + var onMessage = function(peer, msg) { + // remove the password + var passLen = msg.substring(0,msg.indexOf(':')); + var message = msg.substring(passLen.length+1 + Number(passLen)); message = Crypto.decrypt(message, cryptKey); verbose(message); @@ -183,22 +213,13 @@ define([ } } } - var onJoining = function(optionnalData) { - var userList = optionnalData.userList || []; - if (!initializing || userList.indexOf(userName) === -1) { - return; - } - // if we spot ourselves being added to the document, we'll switch - // 'initializing' off because it means we're fully synced. - initializing = false; + + var onJoining = function(peer, channel) { + + } + + var onLeaving = function(peer, channel) { - // execute an onReady callback if one was supplied - // pass an object so we can extend this later - if (config.onReady) { - config.onReady({ - userList: userList - }); - } } var checkConnection = function(wc) { @@ -230,6 +251,23 @@ define([ } } + var parseMessage = function (msg) { + var res ={}; + // two or more? use a for + ['pass','user','channelId','content'].forEach(function(attr){ + var len=msg.slice(0,msg.indexOf(':')), + // taking an offset lets us slice out the prop + // and saves us one string copy + o=len.length+1, + prop=res[attr]=msg.slice(o,Number(len)+o); + // slice off the property and its descriptor + msg = msg.slice(prop.length+o); + }); + // content is the only attribute that's not a string + res.content=JSON.parse(res.content); + return res; + }; + return { onEvent: function () { onEvent();