From deb2084fc5da975b1d571f26412bb351c39a42d6 Mon Sep 17 00:00:00 2001
From: Yann Flory
Date: Mon, 7 Mar 2016 12:00:45 +0100
Subject: [PATCH 01/65] Add the socket connection checker
---
www/common/netflux.js | 35 +++++++++----------------
www/common/realtime-input.js | 51 ++++++++++++++++++------------------
2 files changed, 39 insertions(+), 47 deletions(-)
diff --git a/www/common/netflux.js b/www/common/netflux.js
index 1d0e9c809..6919a72cd 100644
--- a/www/common/netflux.js
+++ b/www/common/netflux.js
@@ -1025,12 +1025,6 @@ return /******/ (function(modules) { // webpackBootstrap
this.protocol = _ServiceProvider2.default.get(cs.EXCHANGEPROTOCOL_SERVICE);
this.defaults = {
signaling: 'ws://localhost:9000',
- /**
- * If an error is encountered but it is recoverable, do not immediately fail
- * but if it keeps firing errors over and over, do fail.
- */
- recoverableErrorCount: 0,
- MAX_RECOVERABLE_ERRORS: 15,
// Maximum number of milliseconds of lag before we fail the connection.
MAX_LAG_BEFORE_DISCONNECT: 20000
};
@@ -1040,6 +1034,8 @@ return /******/ (function(modules) { // webpackBootstrap
_createClass(WebSocketService, [{
key: 'join',
value: function join(key) {
+ var _this = this;
+
var options = arguments.length <= 1 || arguments[1] === undefined ? {} : arguments[1];
var settings = Object.assign({}, this.settings, options);
@@ -1051,24 +1047,19 @@ return /******/ (function(modules) { // webpackBootstrap
resolve(socket);
};
socket.onerror = reject;
- });
- }
-
- // Check the status of the socket connection
- /*var isSocketDisconnected = function (realtime) {
- let sock = ws._socket;
- return sock.readyState === sock.CLOSING
- || sock.readyState === sock.CLOSED
- || (realtime.getLag().waiting && realtime.getLag().lag > MAX_LAG_BEFORE_DISCONNECT);
- }
- var checkSocket = module.exports.checkSocket = function (realtime) {
- if (isSocketDisconnected(realtime) && !socket.intentionallyClosing) {
+ // Check the status of the socket connection
+ var isSocketDisconnected = function isSocketDisconnected(realtime, sock) {
+ return sock.readyState === sock.CLOSING || sock.readyState === sock.CLOSED || realtime.getLag().waiting && realtime.getLag().lag > _this.settings.MAX_LAG_BEFORE_DISCONNECT;
+ };
+ socket.checkSocket = function (realtime) {
+ if (isSocketDisconnected(realtime, socket) && !socket.intentionallyClosing) {
return true;
- } else {
+ } else {
return false;
- }
- };*/
-
+ }
+ };
+ });
+ }
}]);
return WebSocketService;
diff --git a/www/common/realtime-input.js b/www/common/realtime-input.js
index ecd19e015..08473e3f8 100644
--- a/www/common/realtime-input.js
+++ b/www/common/realtime-input.js
@@ -31,7 +31,7 @@ define([
var debug = function (x) { console.log(x); },
warn = function (x) { console.error(x); },
verbose = function (x) { console.log(x); };
- // verbose = function () {}; // comment out to enable verbose logging
+ verbose = function () {}; // comment out to enable verbose logging
// ------------------ Trapping Keyboard Events ---------------------- //
@@ -130,7 +130,7 @@ define([
});
// Check the connection to the channel
- //checkConnection(wc);
+ checkConnection(wc);
bindAllEvents(textarea, doc, onEvent, false);
@@ -202,31 +202,32 @@ define([
}
var checkConnection = function(wc) {
- //TODO
- /*var socketChecker = setInterval(function () {
- if (netflux.checkSocket(realtime)) {
- warn("Socket disconnected!");
-
- recoverableErrorCount += 1;
-
- if (recoverableErrorCount >= MAX_RECOVERABLE_ERRORS) {
- warn("Giving up!");
- realtime.abort();
- wc.leave()
- .then(null, function(err) {
- warn(err);
- });
- if (config.onAbort) {
- config.onAbort({
- socket: socket
- });
+ if(wc.channels && wc.channels.size > 0) {
+ var channels = Array.from(wc.channels);
+ var channel = channels[0];
+
+ var socketChecker = setInterval(function () {
+ if (channel.checkSocket(realtime)) {
+ warn("Socket disconnected!");
+
+ recoverableErrorCount += 1;
+
+ if (recoverableErrorCount >= MAX_RECOVERABLE_ERRORS) {
+ warn("Giving up!");
+ realtime.abort();
+ try { channel.close(); } catch (e) { warn(e); }
+ if (config.onAbort) {
+ config.onAbort({
+ socket: channel
+ });
+ }
+ if (socketChecker) { clearInterval(socketChecker); }
}
- if (socketChecker) { clearInterval(socketChecker); }
+ } else {
+ // it's working as expected, continue
}
- } else {
- // it's working as expected, continue
- }
- }, 200);*/
+ }, 200);
+ }
}
return {
From 60c3aceb0dc1a4887d92a9bde6bb3befc4b657f3 Mon Sep 17 00:00:00 2001
From: Yann Flory
Date: Mon, 7 Mar 2016 17:18:47 +0100
Subject: [PATCH 02/65] Add chatflux server
---
www/common/netflux.js | 112 ++++++++++++++++-------------------
www/common/realtime-input.js | 84 +++++++++++++++++++-------
2 files changed, 113 insertions(+), 83 deletions(-)
diff --git a/www/common/netflux.js b/www/common/netflux.js
index 6919a72cd..5764ffd7c 100644
--- a/www/common/netflux.js
+++ b/www/common/netflux.js
@@ -129,9 +129,6 @@ return /******/ (function(modules) { // webpackBootstrap
webChannel.onopen = function () {
resolve(webChannel);
};
- if (settings.openWebChannel && settings.openWebChannel === true) {
- webChannel.onopen();
- }
});
});
}
@@ -215,8 +212,6 @@ return /******/ (function(modules) { // webpackBootstrap
this.protocol = cs.EXCHANGEPROTOCOL_SERVICE;
// Public attributes
- this.topology = this.settings.topology;
- this.topologyService = _ServiceProvider2.default.get(this.topology);
this.id;
this.myID = this._generateID();
this.channels = new Set();
@@ -290,7 +285,7 @@ return /******/ (function(modules) { // webpackBootstrap
this.topologyService = _ServiceProvider2.default.get(topologyServiceName);
},
get: function get() {
- return this.settings.topology;
+ return this.settigns.topology;
}
}]);
@@ -619,7 +614,8 @@ return /******/ (function(modules) { // webpackBootstrap
for (var _iterator = webChannel.channels[Symbol.iterator](), _step; !(_iteratorNormalCompletion = (_step = _iterator.next()).done); _iteratorNormalCompletion = true) {
var c = _step.value;
- c.send(data);
+ var msg = JSON.stringify([c.seq++, data.type, webChannel.id, data.msg]);
+ c.send(msg);
}
} catch (err) {
_didIteratorError = true;
@@ -647,7 +643,8 @@ return /******/ (function(modules) { // webpackBootstrap
for (var _iterator2 = webChannel.channels[Symbol.iterator](), _step2; !(_iteratorNormalCompletion2 = (_step2 = _iterator2.next()).done); _iteratorNormalCompletion2 = true) {
var c = _step2.value;
- c.send(data);
+ var msg = JSON.stringify([c.seq++, data.type, id, data.msg]);
+ c.send(msg);
}
} catch (err) {
_didIteratorError2 = true;
@@ -1024,9 +1021,7 @@ return /******/ (function(modules) { // webpackBootstrap
this.NAME = this.constructor.name;
this.protocol = _ServiceProvider2.default.get(cs.EXCHANGEPROTOCOL_SERVICE);
this.defaults = {
- signaling: 'ws://localhost:9000',
- // Maximum number of milliseconds of lag before we fail the connection.
- MAX_LAG_BEFORE_DISCONNECT: 20000
+ signaling: 'ws://localhost:9000'
};
this.settings = Object.assign({}, this.defaults, options);
}
@@ -1034,30 +1029,23 @@ return /******/ (function(modules) { // webpackBootstrap
_createClass(WebSocketService, [{
key: 'join',
value: function join(key) {
- var _this = this;
-
var options = arguments.length <= 1 || arguments[1] === undefined ? {} : arguments[1];
var settings = Object.assign({}, this.settings, options);
return new Promise(function (resolve, reject) {
var connection = undefined;
var socket = new window.WebSocket(settings.signaling);
+ socket.seq = 1;
socket.facade = options.facade || null;
socket.onopen = function () {
- resolve(socket);
- };
- socket.onerror = reject;
- // Check the status of the socket connection
- var isSocketDisconnected = function isSocketDisconnected(realtime, sock) {
- return sock.readyState === sock.CLOSING || sock.readyState === sock.CLOSED || realtime.getLag().waiting && realtime.getLag().lag > _this.settings.MAX_LAG_BEFORE_DISCONNECT;
- };
- socket.checkSocket = function (realtime) {
- if (isSocketDisconnected(realtime, socket) && !socket.intentionallyClosing) {
- return true;
+ if (key && key !== '') {
+ socket.send(JSON.stringify([socket.seq++, 'JOIN', key]));
} else {
- return false;
+ socket.send(JSON.stringify([socket.seq++, 'JOIN']));
}
+ resolve(socket);
};
+ socket.onerror = reject;
});
}
}]);
@@ -1202,18 +1190,13 @@ return /******/ (function(modules) { // webpackBootstrap
_createClass(WebSocketProtocolService, [{
key: 'onmessage',
value: function onmessage(e) {
- var msg = e.data;
+ var msg = JSON.parse(e.data);
var socket = e.currentTarget;
var webChannel = socket.webChannel;
var topology = cs.STAR_SERVICE;
- webChannel.topology = topology;
- webChannel.topologyService = _ServiceProvider2.default.get(topology);
- webChannel.onMessage('', msg);
-
- /*
- let topology = cs.STAR_SERVICE
- let topologyService = ServiceProvider.get(topology)
- if (msg[0] !== 0) {
+ var topologyService = _ServiceProvider2.default.get(topology);
+
+ if (msg[0] !== 0) {
return;
}
if (msg[1] === 'IDENT') {
@@ -1227,56 +1210,65 @@ return /******/ (function(modules) { // webpackBootstrap
socket.send(JSON.stringify(msg));
return;
}
- if (msg[2] === 'MSG') {
- }
+ if (msg[2] === 'MSG') {}
// We have received a new direct message from another user
if (msg[2] === 'MSG' && msg[3] === socket.uid) {
// Find the peer exists in one of our channels or create a new one
- if(typeof socket.facade._onPeerMessage === "function")
- socket.facade._onPeerMessage(msg[1], msg);
+ if (typeof socket.facade._onPeerMessage === "function") socket.facade._onPeerMessage(msg[1], msg);
}
if (msg[2] === 'JOIN' && (webChannel.id == null || webChannel.id === msg[3])) {
- if(!webChannel.id) { // New unnamed channel : get its name from the first "JOIN" message
- var chanName = window.location.hash = msg[3];
- webChannel.id = chanName;
+ if (!webChannel.id) {
+ // New unnamed channel : get its name from the first "JOIN" message
+ if (!window.location.hash) {
+ var chanName = window.location.hash = msg[3];
+ }
+ webChannel.id = msg[3];
}
- if (msg[1] === socket.uid) { // If the user catches himself registering, he is synchronized with the server
+
+ if (msg[1] === socket.uid) {
+ // If the user catches himself registering, he is synchronized with the server
webChannel.onopen();
- }
- else { // Trigger onJoining() when another user is joining the channel
- // Register the user in the list of peers in the channel
- var linkQuality = (msg[1] === '_HISTORY_KEEPER_') ? 1000 : 0;
- var sendToPeer = function(data) {
- topologyService.sendTo(msg[1], webChannel, {type : 'MSG', msg: data});
- }
- var peer = {id: msg[1], connector: socket, linkQuality: linkQuality, send: sendToPeer};
- if(webChannel.peers.indexOf(peer) === -1) {
+ } else {
+ // Trigger onJoining() when another user is joining the channel
+
+ // Register the user in the list of peers in the channel
+ var linkQuality = msg[1] === '_HISTORY_KEEPER_' ? 1000 : 0;
+ var sendToPeer = function sendToPeer(data) {
+ topologyService.sendTo(msg[1], webChannel, { type: 'MSG', msg: data });
+ };
+ var peer = { id: msg[1], connector: socket, linkQuality: linkQuality, send: sendToPeer };
+ if (webChannel.peers.indexOf(peer) === -1) {
webChannel.peers.push(peer);
}
- if(typeof webChannel.onJoining === "function")
- webChannel.onJoining(msg[1]);
+
+ if (typeof webChannel.onJoining === "function") webChannel.onJoining(msg[1]);
}
}
- // We have received a new message in that channel
+ // We have received a new message in that channel from another peer
if (msg[2] === 'MSG' && msg[3] === webChannel.id) {
// Find the peer who sent the message and display it
//TODO Use Peer instead of peer.id (msg[1]) :
- if(typeof webChannel.onMessage === "function")
-
+ if (typeof webChannel.onMessage === "function") webChannel.onMessage(msg[1], msg[4]);
}
// Someone else has left the channel, remove him from the list of peers
if (msg[2] === 'LEAVE' && msg[3] === webChannel.id) {
//TODO Use Peer instead of peer.id (msg[1]) :
- if(typeof webChannel.onLeaving === "function")
- webChannel.onLeaving(msg[1], webChannel);
+ if (typeof webChannel.onLeaving === "function") webChannel.onLeaving(msg[1], webChannel);
}
- */
}
}, {
key: 'message',
value: function message(code, data) {
- // The message is already prepared and encrypted
- return data.data;
+ var type = undefined;
+ switch (code) {
+ case cs.USER_DATA:
+ type = 'MSG';
+ break;
+ case cs.JOIN_START:
+ type = 'JOIN';
+ break;
+ }
+ return { type: type, msg: data.data };
}
}]);
diff --git a/www/common/realtime-input.js b/www/common/realtime-input.js
index 08473e3f8..0f77f421c 100644
--- a/www/common/realtime-input.js
+++ b/www/common/realtime-input.js
@@ -101,9 +101,20 @@ define([
var initializing = true;
var bump = function () {};
+
+ var onPeerMessage = function (peer, msg) {
+ if(peer === '_HISTORY_KEEPER_') {
+ var msgHistory = JSON.parse(msg[4]);
+ onMessage(msgHistory[1], msgHistory[4]);
+ }
+ else {
+ warn('Illegal direct message');
+ }
+ };
var options = {
- signaling: websocketUrl,
+ // signaling: websocketUrl,
+ signaling: 'ws://localhost:9000',
topology: 'StarTopologyService',
protocol: 'WebSocketProtocolService',
connector: 'WebSocketService',
@@ -111,26 +122,42 @@ define([
};
var realtime;
+ // Add the Facade's peer messages handler
+ Netflux._onPeerMessage = onPeerMessage;
// Connect to the WebSocket server
Netflux.join(channel, options).then(function(wc) {
+
wc.onMessage = onMessage; // On receiving message
wc.onJoining = onJoining; // On user joining the session
// Open a Chainpad session
realtime = createRealtime();
- realtime.onUserListChange(function (userList) {
- var opt = {userList : userList};
- // TODO : onJoining should only a a "newPeer" parameter
- wc.onJoining(opt);
- });
+
+ // we're fully synced
+ initializing = false;
+
+ // execute an onReady callback if one was supplied
+ if (config.onReady) {
+ config.onReady();
+ }
+
// On sending message
realtime.onMessage(function(message) {
+ // Do not send authentication messages since it is handled by Netflux
+ var parsed = parseMessage(message);
+ if (parsed.content[0] !== 0) {
message = Crypto.encrypt(message, cryptKey);
wc.send(message);
+ }
});
+
+ // Get the channel history
+ var hc;
+ wc.peers.forEach(function (p) { if (!hc || p.linkQuality > hc.linkQuality) { hc = p; } });
+ hc.send(JSON.stringify(['GET_HISTORY', wc.id]));
// Check the connection to the channel
- checkConnection(wc);
+ //checkConnection(wc);
bindAllEvents(textarea, doc, onEvent, false);
@@ -156,8 +183,11 @@ define([
return '\\' +c;
}));
- var onMessage = function(user, message) {
+ var onMessage = function(peer, msg) {
+ // remove the password
+ var passLen = msg.substring(0,msg.indexOf(':'));
+ var message = msg.substring(passLen.length+1 + Number(passLen));
message = Crypto.decrypt(message, cryptKey);
verbose(message);
@@ -183,22 +213,13 @@ define([
}
}
}
- var onJoining = function(optionnalData) {
- var userList = optionnalData.userList || [];
- if (!initializing || userList.indexOf(userName) === -1) {
- return;
- }
- // if we spot ourselves being added to the document, we'll switch
- // 'initializing' off because it means we're fully synced.
- initializing = false;
+
+ var onJoining = function(peer, channel) {
+
+ }
+
+ var onLeaving = function(peer, channel) {
- // execute an onReady callback if one was supplied
- // pass an object so we can extend this later
- if (config.onReady) {
- config.onReady({
- userList: userList
- });
- }
}
var checkConnection = function(wc) {
@@ -230,6 +251,23 @@ define([
}
}
+ var parseMessage = function (msg) {
+ var res ={};
+ // two or more? use a for
+ ['pass','user','channelId','content'].forEach(function(attr){
+ var len=msg.slice(0,msg.indexOf(':')),
+ // taking an offset lets us slice out the prop
+ // and saves us one string copy
+ o=len.length+1,
+ prop=res[attr]=msg.slice(o,Number(len)+o);
+ // slice off the property and its descriptor
+ msg = msg.slice(prop.length+o);
+ });
+ // content is the only attribute that's not a string
+ res.content=JSON.parse(res.content);
+ return res;
+ };
+
return {
onEvent: function () {
onEvent();
From 09a06a8bc5d26ef4f7c96ada279a2acd9bcc8c49 Mon Sep 17 00:00:00 2001
From: Yann Flory
Date: Mon, 7 Mar 2016 17:35:31 +0100
Subject: [PATCH 03/65] Replace the chainpad server by the netflux server
---
NetFluxWebsocketServer.js | 186 +++++++++++++++++++++++++++++++++++
server.js | 4 +-
www/common/realtime-input.js | 4 +-
3 files changed, 191 insertions(+), 3 deletions(-)
create mode 100644 NetFluxWebsocketServer.js
diff --git a/NetFluxWebsocketServer.js b/NetFluxWebsocketServer.js
new file mode 100644
index 000000000..3ffb96fe7
--- /dev/null
+++ b/NetFluxWebsocketServer.js
@@ -0,0 +1,186 @@
+;(function () { 'use strict';
+let Crypto = require('crypto');
+let WebSocket = require('ws');
+
+let LAG_MAX_BEFORE_DISCONNECT = 30000;
+let LAG_MAX_BEFORE_PING = 15000;
+let HISTORY_KEEPER_ID = "_HISTORY_KEEPER_";
+
+let dropUser;
+
+let now = function () { return (new Date()).getTime(); };
+
+let sendMsg = function (ctx, user, msg) {
+ try {
+ console.log('<' + JSON.stringify(msg));
+ user.socket.send(JSON.stringify(msg));
+ } catch (e) {
+ console.log(e.stack);
+ dropUser(ctx, user);
+ }
+};
+
+let sendChannelMessage = function (ctx, channel, msgStruct) {
+ msgStruct.unshift(0);
+ channel.forEach(function (user) { sendMsg(ctx, user, msgStruct); });
+ if (msgStruct[2] === 'MSG') {
+ ctx.store.message(channel.id, JSON.stringify(msgStruct), function () { });
+ }
+};
+
+dropUser = function (ctx, user) {
+ if (user.socket.readyState !== WebSocket.CLOSING
+ && user.socket.readyState !== WebSocket.CLOSED)
+ {
+ try {
+ user.socket.close();
+ } catch (e) {
+ console.log("Failed to disconnect ["+user.id+"], attempting to terminate");
+ try {
+ user.socket.terminate();
+ } catch (ee) {
+ console.log("Failed to terminate ["+user.id+"] *shrug*");
+ }
+ }
+ }
+ delete ctx.users[user.id];
+ Object.keys(ctx.channels).forEach(function (chanName) {
+ let chan = ctx.channels[chanName];
+ let idx = chan.indexOf(user);
+ if (idx < 0) { return; }
+ console.log("Removing ["+user.id+"] from channel ["+chanName+"]");
+ chan.splice(idx, 1);
+ if (chan.length === 0) {
+ console.log("Removing empty channel ["+chanName+"]");
+ delete ctx.channels[chanName];
+ } else {
+ sendChannelMessage(ctx, chan, [user.id, 'LEAVE', chanName, 'Quit: [ dropUser() ]']);
+ }
+ });
+};
+
+let getHistory = function (ctx, channelName, handler) {
+ ctx.store.getMessages(channelName, function (msgStr) { handler(JSON.parse(msgStr)); });
+};
+
+let randName = function () { return Crypto.randomBytes(16).toString('hex'); };
+
+let handleMessage = function (ctx, user, msg) {
+ let json = JSON.parse(msg);
+ let seq = json.shift();
+ let cmd = json[0];
+ let obj = json[1];
+
+ user.timeOfLastMessage = now();
+ user.pingOutstanding = false;
+
+ if (cmd === 'JOIN') {
+ /*if (obj && obj.length !== 32) {
+ sendMsg(ctx, user, [seq, 'ERROR', 'ENOENT', obj]);
+ return;
+ }*/
+ let chanName = obj || randName();
+ let chan = ctx.channels[chanName] = ctx.channels[chanName] || [];
+ chan.id = chanName;
+ sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'JOIN', chanName]);
+ chan.forEach(function (u) { sendMsg(ctx, user, [0, u.id, 'JOIN', chanName]); });
+ chan.push(user);
+ sendChannelMessage(ctx, chan, [user.id, 'JOIN', chanName]);
+ return;
+ }
+ if (cmd === 'MSG') {
+ if (obj === HISTORY_KEEPER_ID) {
+ let parsed;
+ try { parsed = JSON.parse(json[2]); } catch (err) { return; }
+ if (parsed[0] === 'GET_HISTORY') {
+ console.log('getHistory ' + parsed[1]);
+ getHistory(ctx, parsed[1], function (msg) {
+ sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify(msg)]);
+ });
+ }
+ return;
+ }
+ if (obj && !ctx.channels[obj] && !ctx.users[obj]) {
+ sendMsg(ctx, user, [seq, 'ERROR', 'ENOENT', obj]);
+ return;
+ }
+ let target;
+ json.unshift(user.id);
+ if ((target = ctx.channels[obj])) {
+ sendChannelMessage(ctx, target, json);
+ return;
+ }
+ if ((target = ctx.users[obj])) {
+ json.unshift(0);
+ sendMsg(ctx, target, json);
+ return;
+ }
+ }
+ if (cmd === 'LEAVE') {
+ let err;
+ let chan;
+ let idx;
+ if (!obj) { err = 'EINVAL'; }
+ if (!err && !(chan = ctx.channels[obj])) { err = 'ENOENT'; }
+ if (!err && (idx = chan.indexOf(user)) === -1) { err = 'NOT_IN_CHAN'; }
+ if (err) {
+ sendMsg(ctx, user, [seq, 'ERROR', err]);
+ return;
+ }
+ json.unshift(user.id);
+ sendChannelMessage(ctx, chan, [user.id, 'LEAVE', chan.id]);
+ chan.splice(idx, 1);
+ }
+ if (cmd === 'PING') {
+ sendMsg(ctx, user, [seq, 'PONG', obj]);
+ return;
+ }
+};
+
+let run = module.exports.run = function (storage, socketServer) {
+ let ctx = {
+ users: {},
+ channels: {},
+ store: storage
+ };
+ setInterval(function () {
+ Object.keys(ctx.users).forEach(function (userId) {
+ let u = ctx.users[userId];
+ if (now() - u.timeOfLastMessage > LAG_MAX_BEFORE_DISCONNECT) {
+ dropUser(ctx, u);
+ } else if (!u.pingOutstanding && now() - u.timeOfLastMessage > LAG_MAX_BEFORE_PING) {
+ sendMsg(ctx, u, [0, 'PING', now()]);
+ u.pingOutstanding = true;
+ }
+ });
+ }, 5000);
+ socketServer.on('connection', function(socket) {
+ let conn = socket.upgradeReq.connection;
+ let user = {
+ addr: conn.remoteAddress + '|' + conn.remotePort,
+ socket: socket,
+ id: randName(),
+ timeOfLastMessage: now(),
+ pingOutstanding: false
+ };
+ ctx.users[user.id] = user;
+ sendMsg(ctx, user, [0, 'IDENT', user.id]);
+ socket.on('message', function(message) {
+ console.log('>'+message);
+ try {
+ handleMessage(ctx, user, message);
+ } catch (e) {
+ console.log(e.stack);
+ dropUser(ctx, user);
+ }
+ });
+ socket.on('close', function (evt) {
+ for (let userId in ctx.users) {
+ if (ctx.users[userId].socket === socket) {
+ dropUser(ctx, ctx.users[userId]);
+ }
+ }
+ });
+ });
+};
+}());
diff --git a/server.js b/server.js
index 3a7c49af4..3bfacb792 100644
--- a/server.js
+++ b/server.js
@@ -7,6 +7,7 @@ var Https = require('https');
var Fs = require('fs');
var WebSocketServer = require('ws').Server;
var ChainPadSrv = require('./ChainPadSrv');
+var NetfluxSrv = require('./NetFluxWebsocketServer');
var config = require('./config');
config.websocketPort = config.websocketPort || config.httpPort;
@@ -79,5 +80,6 @@ if (config.websocketPort !== config.httpPort) {
var wsSrv = new WebSocketServer(wsConfig);
Storage.create(config, function (store) {
console.log('DB connected');
- ChainPadSrv.create(wsSrv, store);
+ // ChainPadSrv.create(wsSrv, store);
+ NetfluxSrv.run(store, wsSrv);
});
diff --git a/www/common/realtime-input.js b/www/common/realtime-input.js
index 0f77f421c..6599512b8 100644
--- a/www/common/realtime-input.js
+++ b/www/common/realtime-input.js
@@ -113,8 +113,8 @@ define([
};
var options = {
- // signaling: websocketUrl,
- signaling: 'ws://localhost:9000',
+ signaling: websocketUrl,
+ // signaling: 'ws://localhost:9000',
topology: 'StarTopologyService',
protocol: 'WebSocketProtocolService',
connector: 'WebSocketService',
From c4f62fb8124f5107eb623948a12560202997793a Mon Sep 17 00:00:00 2001
From: Yann Flory
Date: Tue, 8 Mar 2016 11:25:37 +0100
Subject: [PATCH 04/65] First try with WebRTC
---
www/common/netflux.js | 9 ++-
www/common/realtime-input.js | 118 +++++++++++++++++++++--------------
2 files changed, 79 insertions(+), 48 deletions(-)
diff --git a/www/common/netflux.js b/www/common/netflux.js
index 5764ffd7c..5252a4767 100644
--- a/www/common/netflux.js
+++ b/www/common/netflux.js
@@ -254,7 +254,7 @@ return /******/ (function(modules) { // webpackBootstrap
_this.topologyService.broadcast(_this, protocol.message(cs.JOIN_FINISH, id));
_this.onJoining(id);
});
- }).then(function (data) {
+ }, settings).then(function (data) {
return data;
});
}
@@ -747,6 +747,7 @@ return /******/ (function(modules) { // webpackBootstrap
return new Promise(function (resolve, reject) {
var connections = [];
+ console.log(settings);
var socket = new window.WebSocket(settings.signaling);
socket.onopen = function () {
socket.send(JSON.stringify({ key: settings.key }));
@@ -804,8 +805,10 @@ return /******/ (function(modules) { // webpackBootstrap
return new Promise(function (resolve, reject) {
var connection = undefined;
var socket = new window.WebSocket(settings.signaling);
+ console.log('Socket created');
socket.onopen = function () {
connection = new _this2.RTCPeerConnection(settings.webRTCOptions);
+ console.log('RTC created');
connection.onicecandidate = function (e) {
if (e.candidate !== null) {
var candidate = {
@@ -816,6 +819,8 @@ return /******/ (function(modules) { // webpackBootstrap
}
};
var dc = connection.createDataChannel(key);
+ console.log('data channel created');
+ console.log(dc);
dc.onopen = function () {
resolve(dc);
};
@@ -827,6 +832,8 @@ return /******/ (function(modules) { // webpackBootstrap
};
socket.onmessage = function (e) {
var msg = JSON.parse(e.data);
+ console.log('message');
+ console.log(msg);
if (Reflect.has(msg, 'data')) {
if (Reflect.has(msg.data, 'answer')) {
var sd = Object.assign(new _this2.RTCSessionDescription(), msg.data.answer);
diff --git a/www/common/realtime-input.js b/www/common/realtime-input.js
index 6599512b8..787e66acb 100644
--- a/www/common/realtime-input.js
+++ b/www/common/realtime-input.js
@@ -113,61 +113,85 @@ define([
};
var options = {
- signaling: websocketUrl,
- // signaling: 'ws://localhost:9000',
- topology: 'StarTopologyService',
- protocol: 'WebSocketProtocolService',
- connector: 'WebSocketService',
- openWebChannel: true
+ // signaling: websocketUrl,
+ signaling: 'ws://localhost:8000',
+ key: channel
+ // topology: 'StarTopologyService',
+ // protocol: 'WebSocketProtocolService',
+ // connector: 'WebSocketService',
+ // openWebChannel: true
};
+ console.log(options);
var realtime;
// Add the Facade's peer messages handler
Netflux._onPeerMessage = onPeerMessage;
- // Connect to the WebSocket server
- Netflux.join(channel, options).then(function(wc) {
-
- wc.onMessage = onMessage; // On receiving message
- wc.onJoining = onJoining; // On user joining the session
-
- // Open a Chainpad session
- realtime = createRealtime();
-
- // we're fully synced
- initializing = false;
-
- // execute an onReady callback if one was supplied
- if (config.onReady) {
- config.onReady();
- }
-
- // On sending message
- realtime.onMessage(function(message) {
- // Do not send authentication messages since it is handled by Netflux
- var parsed = parseMessage(message);
- if (parsed.content[0] !== 0) {
- message = Crypto.encrypt(message, cryptKey);
- wc.send(message);
+
+ var webchannel = Netflux.create();
+ webchannel.openForJoining(options).then(function(data) {
+ console.log('keys');
+ console.log(channel);
+ console.log(data);
+ webchannel.onmessage = onMessage; // On receiving message
+ webchannel.onJoining = onJoining; // On user joining the session
+ webchannel.onLeaving = onLeaving; // On user leaving the session
+
+ // console.log('resolved');
+
+ onOpen();
+
+ }, function(err) {
+ console.log('rejected');
+ console.error(err);
+ });
+
+ var onOpen = function() {
+ // Connect to the WebSocket server
+ Netflux.join(channel, options).then(function(wc) {
+
+ wc.onmessage = onMessage; // On receiving message
+ wc.onJoining = onJoining; // On user joining the session
+ wc.onLeaving = onLeaving; // On user leaving the session
+
+ // Open a Chainpad session
+ realtime = createRealtime();
+
+ // we're fully synced
+ initializing = false;
+
+ // execute an onReady callback if one was supplied
+ if (config.onReady) {
+ config.onReady();
}
- });
-
- // Get the channel history
- var hc;
- wc.peers.forEach(function (p) { if (!hc || p.linkQuality > hc.linkQuality) { hc = p; } });
- hc.send(JSON.stringify(['GET_HISTORY', wc.id]));
+
+ // On sending message
+ realtime.onMessage(function(message) {
+ // Do not send authentication messages since it is handled by Netflux
+ var parsed = parseMessage(message);
+ if (parsed.content[0] !== 0) {
+ message = Crypto.encrypt(message, cryptKey);
+ wc.send(message);
+ }
+ });
+
+ // Get the channel history
+ // var hc;
+ // wc.peers.forEach(function (p) { if (!hc || p.linkQuality > hc.linkQuality) { hc = p; } });
+ // hc.send(JSON.stringify(['GET_HISTORY', wc.id]));
- // Check the connection to the channel
- //checkConnection(wc);
+ // Check the connection to the channel
+ //checkConnection(wc);
- bindAllEvents(textarea, doc, onEvent, false);
+ bindAllEvents(textarea, doc, onEvent, false);
- sharejs.attach(textarea, realtime);
- bump = realtime.bumpSharejs;
+ sharejs.attach(textarea, realtime);
+ bump = realtime.bumpSharejs;
- realtime.start();
- }, function(error) {
- warn(error);
- });
+ realtime.start();
+ }, function(error) {
+ warn(error);
+ });
+ }
var createRealtime = function() {
return ChainPad.create(userName,
@@ -215,11 +239,11 @@ define([
}
var onJoining = function(peer, channel) {
-
+ console.log('Someone joined : '+peer)
}
var onLeaving = function(peer, channel) {
-
+ console.log('Someone left : '+peer)
}
var checkConnection = function(wc) {
From 9f682a985b395e46c1d861fdd57fd46db4e46d9e Mon Sep 17 00:00:00 2001
From: Caleb James DeLisle
Date: Tue, 8 Mar 2016 11:43:25 +0100
Subject: [PATCH 05/65] lvl should not fail on non-existant channels
---
storage/lvl.js | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/storage/lvl.js b/storage/lvl.js
index 84ff57862..1ddfbe97c 100644
--- a/storage/lvl.js
+++ b/storage/lvl.js
@@ -38,7 +38,7 @@ var getMessages = function (db, channelName, msgHandler) {
if (i < index) { again(i+1); }
}));
};
- again(0);
+ if (index > -1) { again(0); }
});
};
From 870b2dbb7eaa021851cbadcd2e3448b931c532a1 Mon Sep 17 00:00:00 2001
From: Yann Flory
Date: Tue, 8 Mar 2016 11:45:03 +0100
Subject: [PATCH 06/65] Add the WebRTC server in Cryptpad
---
WebRTCSrv.js | 68 ++++++++++++++++++++++++++++++++++++
server.js | 4 ++-
www/common/realtime-input.js | 4 +--
3 files changed, 73 insertions(+), 3 deletions(-)
create mode 100644 WebRTCSrv.js
diff --git a/WebRTCSrv.js b/WebRTCSrv.js
new file mode 100644
index 000000000..06dfd61f4
--- /dev/null
+++ b/WebRTCSrv.js
@@ -0,0 +1,68 @@
+'use strict'
+let WebSocketServer = require('ws').Server
+const PORT = 8000
+const UNSUPPORTED_DATA = 1007
+const POLICY_VIOLATION = 1008
+const CLOSE_UNSUPPORTED = 1003
+
+// let server = new WebSocketServer({port: PORT}, () => {
+ // console.log('Server runs on: ws://localhost:' + PORT)
+// })
+
+var run = module.exports.run = function(storage, server) {
+ server.on('connection', (socket) => {
+ socket.on('message', (data) => {
+ try {
+ let msg = JSON.parse(data)
+ console.log(msg);
+ if (msg.hasOwnProperty('key')) {
+ for (let master of server.clients) {
+ if (master.key === msg.key) {
+ socket.close(POLICY_VIOLATION, 'The key already exists')
+ console.log('ERROR key exists');
+ return
+ }
+ }
+ socket.key = msg.key
+ socket.joiningClients = []
+ } else if (msg.hasOwnProperty('id')) {
+ for (let index in socket.joiningClients) {
+ if (index == msg.id) {
+ socket.joiningClients[index].send(JSON.stringify({data: msg.data}))
+ return
+ }
+ }
+ socket.close(POLICY_VIOLATION, 'Unknown id')
+ } else if (msg.hasOwnProperty('join')) {
+ for (let master of server.clients) {
+ if (master.key === msg.join) {
+ console.log('joined');
+ socket.master = master
+ master.joiningClients.push(socket)
+ let id = master.joiningClients.length - 1
+ master.send(JSON.stringify({id, data: msg.data}))
+ return
+ }
+ }
+ console.log('ERROR unknown key');
+ socket.close(POLICY_VIOLATION, 'Unknown key')
+ } else if (msg.hasOwnProperty('data') && socket.hasOwnProperty('master')) {
+ let id = socket.master.joiningClients.indexOf(socket)
+ socket.master.send(JSON.stringify({id, data: msg.data}))
+ } else {
+ socket.close(UNSUPPORTED_DATA, 'Unsupported message format')
+ }
+ } catch (event) {
+ socket.close(CLOSE_UNSUPPORTED, 'Server accepts only JSON')
+ }
+ })
+
+ socket.on('close', (event) => {
+ if (socket.hasOwnProperty('joiningClients')) {
+ for (let client of socket.joiningClients) {
+ client.close(POLICY_VIOLATION, 'The peer is no longer available')
+ }
+ }
+ })
+ })
+}
\ No newline at end of file
diff --git a/server.js b/server.js
index 3bfacb792..538efce28 100644
--- a/server.js
+++ b/server.js
@@ -8,6 +8,7 @@ var Fs = require('fs');
var WebSocketServer = require('ws').Server;
var ChainPadSrv = require('./ChainPadSrv');
var NetfluxSrv = require('./NetFluxWebsocketServer');
+var WebRTCSrv = require('./WebRTCSrv');
var config = require('./config');
config.websocketPort = config.websocketPort || config.httpPort;
@@ -81,5 +82,6 @@ var wsSrv = new WebSocketServer(wsConfig);
Storage.create(config, function (store) {
console.log('DB connected');
// ChainPadSrv.create(wsSrv, store);
- NetfluxSrv.run(store, wsSrv);
+ // NetfluxSrv.run(store, wsSrv);
+ WebRTCSrv.run(store, wsSrv);
});
diff --git a/www/common/realtime-input.js b/www/common/realtime-input.js
index 787e66acb..17eb724f5 100644
--- a/www/common/realtime-input.js
+++ b/www/common/realtime-input.js
@@ -113,8 +113,8 @@ define([
};
var options = {
- // signaling: websocketUrl,
- signaling: 'ws://localhost:8000',
+ signaling: websocketUrl,
+ // signaling: 'ws://localhost:8000',
key: channel
// topology: 'StarTopologyService',
// protocol: 'WebSocketProtocolService',
From c536ecbc1ce01b69f7692bd97c77d2b84c311ca4 Mon Sep 17 00:00:00 2001
From: Yann Flory
Date: Tue, 8 Mar 2016 15:13:57 +0100
Subject: [PATCH 07/65] Temp commit
---
WebRTCSrv.js | 4 ---
www/common/netflux.js | 2 ++
www/common/realtime-input.js | 53 +++++++++++++++++++++++++++---------
3 files changed, 42 insertions(+), 17 deletions(-)
diff --git a/WebRTCSrv.js b/WebRTCSrv.js
index 06dfd61f4..e3e0a3c42 100644
--- a/WebRTCSrv.js
+++ b/WebRTCSrv.js
@@ -5,10 +5,6 @@ const UNSUPPORTED_DATA = 1007
const POLICY_VIOLATION = 1008
const CLOSE_UNSUPPORTED = 1003
-// let server = new WebSocketServer({port: PORT}, () => {
- // console.log('Server runs on: ws://localhost:' + PORT)
-// })
-
var run = module.exports.run = function(storage, server) {
server.on('connection', (socket) => {
socket.on('message', (data) => {
diff --git a/www/common/netflux.js b/www/common/netflux.js
index 5252a4767..d63470bba 100644
--- a/www/common/netflux.js
+++ b/www/common/netflux.js
@@ -500,6 +500,7 @@ return /******/ (function(modules) { // webpackBootstrap
}, {
key: 'broadcast',
value: function broadcast(webChannel, data) {
+ console.log(data);
var _iteratorNormalCompletion = true;
var _didIteratorError = false;
var _iteratorError = undefined;
@@ -508,6 +509,7 @@ return /******/ (function(modules) { // webpackBootstrap
for (var _iterator = webChannel.channels[Symbol.iterator](), _step; !(_iteratorNormalCompletion = (_step = _iterator.next()).done); _iteratorNormalCompletion = true) {
var c = _step.value;
+ console.log(c);
c.send(data);
}
} catch (err) {
diff --git a/www/common/realtime-input.js b/www/common/realtime-input.js
index 17eb724f5..ebddde624 100644
--- a/www/common/realtime-input.js
+++ b/www/common/realtime-input.js
@@ -127,27 +127,46 @@ define([
// Add the Facade's peer messages handler
Netflux._onPeerMessage = onPeerMessage;
+ function getParameterByName(name, url) {
+ if (!url) url = window.location.href;
+ name = name.replace(/[\[\]]/g, "\\$&");
+ var regex = new RegExp("[?&]" + name + "(=([^]*)|&|#|$)"),
+ results = regex.exec(url);
+ if (!results) return null;
+ if (!results[2]) return '';
+ return decodeURIComponent(results[2].replace(/\+/g, " "));
+ }
+
+ if(getParameterByName("server")) {
+ console.log('SERVER');
+ console.log(channel);
var webchannel = Netflux.create();
webchannel.openForJoining(options).then(function(data) {
- console.log('keys');
- console.log(channel);
- console.log(data);
- webchannel.onmessage = onMessage; // On receiving message
- webchannel.onJoining = onJoining; // On user joining the session
- webchannel.onLeaving = onLeaving; // On user leaving the session
// console.log('resolved');
- onOpen();
+ onOpen(webchannel);
}, function(err) {
console.log('rejected');
console.error(err);
});
-
- var onOpen = function() {
+ }
+ else {
+ console.log('CLIENT');
+ console.log(channel);
// Connect to the WebSocket server
Netflux.join(channel, options).then(function(wc) {
+ onOpen(wc);
+ }, function(error) {
+ warn(error);
+ });
+ }
+
+ var onOpen = function(wc) {
+
+ console.log('joined the channel');
+ console.log(wc.myID);
wc.onmessage = onMessage; // On receiving message
wc.onJoining = onJoining; // On user joining the session
@@ -166,12 +185,16 @@ define([
// On sending message
realtime.onMessage(function(message) {
+ // TODO: put in ChaindpadAdapter
// Do not send authentication messages since it is handled by Netflux
var parsed = parseMessage(message);
if (parsed.content[0] !== 0) {
+ console.log('ENVOI '+message);
message = Crypto.encrypt(message, cryptKey);
wc.send(message);
+ onMessage('', message);
}
+ // END-TODO
});
// Get the channel history
@@ -188,9 +211,7 @@ define([
bump = realtime.bumpSharejs;
realtime.start();
- }, function(error) {
- warn(error);
- });
+
}
var createRealtime = function() {
@@ -208,12 +229,18 @@ define([
}));
var onMessage = function(peer, msg) {
+
+ // TODO : put in ChainpadAdapter
// remove the password
var passLen = msg.substring(0,msg.indexOf(':'));
var message = msg.substring(passLen.length+1 + Number(passLen));
+
message = Crypto.decrypt(message, cryptKey);
-
+ console.log('RECOIS '+message);
+
+ // END-TODO ChainpadAdapter
+
verbose(message);
allMessages.push(message);
if (!initializing) {
From b7885eb5396831f686a1f157c29da365f94fed26 Mon Sep 17 00:00:00 2001
From: Yann Flory
Date: Wed, 9 Mar 2016 18:35:39 +0100
Subject: [PATCH 08/65] Fix channel history
---
WebRTCSrv.js | 27 ++++++-
www/common/netflux.js | 139 +++++++++++++++++++++++------------
www/common/realtime-input.js | 42 +++++++----
3 files changed, 145 insertions(+), 63 deletions(-)
diff --git a/WebRTCSrv.js b/WebRTCSrv.js
index e3e0a3c42..8b0c78321 100644
--- a/WebRTCSrv.js
+++ b/WebRTCSrv.js
@@ -36,6 +36,7 @@ var run = module.exports.run = function(storage, server) {
socket.master = master
master.joiningClients.push(socket)
let id = master.joiningClients.length - 1
+ console.log(id);
master.send(JSON.stringify({id, data: msg.data}))
return
}
@@ -54,10 +55,30 @@ var run = module.exports.run = function(storage, server) {
})
socket.on('close', (event) => {
- if (socket.hasOwnProperty('joiningClients')) {
- for (let client of socket.joiningClients) {
- client.close(POLICY_VIOLATION, 'The peer is no longer available')
+ console.log('someone has closed');
+ // If not master
+ if (socket.hasOwnProperty('master')) {
+ let masterClients = socket.master.joiningClients
+ for (let client of masterClients) {
+ if(client.id === socket.id) {
+ console.log('close client '+client.key)
+ client.close(POLICY_VIOLATION, 'The peer is no longer available')
+ //masterClients.splice(masterClients.indexOf(client),1);
+ }
+ }
+ }
+ else if (socket.hasOwnProperty('joiningClients')) {
+ let firstClient
+ let masterClients = socket.joiningClients
+ for (let client of masterClients) {
+ firstClient = client
+ break;
}
+ firstClient.close(POLICY_VIOLATION, 'The master is no longer available')
+ //masterClients.splice(masterClients.indexOf(firstClient),1);
+ firstClient.joiningClients = masterClients
+ console.log('change master from '+socket.key+' to '+firstClient.key)
+ socket = firstClient
}
})
})
diff --git a/www/common/netflux.js b/www/common/netflux.js
index d63470bba..1c434e46c 100644
--- a/www/common/netflux.js
+++ b/www/common/netflux.js
@@ -226,8 +226,35 @@ return /******/ (function(modules) { // webpackBootstrap
}, {
key: 'send',
value: function send(data) {
- var protocol = _ServiceProvider2.default.get(this.settings.protocol);
- this.topologyService.broadcast(this, protocol.message(cs.USER_DATA, { id: this.myID, data: data }));
+ var channel = this;
+ return new Promise(function (resolve, reject) {
+ if (channel.channels.size === 0) {
+ console.log('sizenull');resolve();
+ }
+ var protocol = _ServiceProvider2.default.get(channel.settings.protocol);
+ channel.topologyService.broadcast(channel, protocol.message(cs.USER_DATA, { id: channel.myID, data: data })).then(resolve, reject);
+ });
+ }
+ }, {
+ key: 'getHistory',
+ value: function getHistory(historyKeeperID) {
+ var channel = this;
+ return new Promise(function (resolve, reject) {
+ console.log(channel);
+ console.log('Je veux history ' + channel.myID);
+ var protocol = _ServiceProvider2.default.get(channel.settings.protocol);
+ channel.topologyService.sendTo(historyKeeperID, channel, protocol.message(cs.GET_HISTORY, { id: channel.myID, data: '' })).then(resolve, reject);
+ });
+ }
+ }, {
+ key: 'sendTo',
+ value: function sendTo(id, msg) {
+ var channel = this;
+ return new Promise(function (resolve, reject) {
+ var protocol = _ServiceProvider2.default.get(channel.settings.protocol);
+ console.log('WCsendTo ' + id);
+ channel.topologyService.sendTo(id, channel, protocol.message(cs.USER_DATA, { id: channel.myID, data: msg })).then(resolve, reject);
+ });
}
}, {
key: 'openForJoining',
@@ -305,6 +332,7 @@ return /******/ (function(modules) { // webpackBootstrap
});
// API user's message
var USER_DATA = exports.USER_DATA = 0;
+ var GET_HISTORY = exports.GET_HISTORY = 6;
// Internal messages
var JOIN_START = exports.JOIN_START = 2;
@@ -379,7 +407,7 @@ return /******/ (function(modules) { // webpackBootstrap
value: function get(code) {
var options = arguments.length <= 1 || arguments[1] === undefined ? {} : arguments[1];
- var service = undefined;
+ var service = void 0;
switch (code) {
case cs.WEBRTC_SERVICE:
service = new _WebRTCService2.default(options);
@@ -441,6 +469,8 @@ return /******/ (function(modules) { // webpackBootstrap
var options = arguments.length <= 0 || arguments[0] === undefined ? {} : arguments[0];
_classCallCheck(this, FullyConnectedService);
+
+ console.log('SERVICE FULLY CONNECTED CONSTRUCTED');
}
_createClass(FullyConnectedService, [{
@@ -500,62 +530,70 @@ return /******/ (function(modules) { // webpackBootstrap
}, {
key: 'broadcast',
value: function broadcast(webChannel, data) {
- console.log(data);
- var _iteratorNormalCompletion = true;
- var _didIteratorError = false;
- var _iteratorError = undefined;
-
- try {
- for (var _iterator = webChannel.channels[Symbol.iterator](), _step; !(_iteratorNormalCompletion = (_step = _iterator.next()).done); _iteratorNormalCompletion = true) {
- var c = _step.value;
+ return new Promise(function (resolve, reject) {
+ var _iteratorNormalCompletion = true;
+ var _didIteratorError = false;
+ var _iteratorError = undefined;
- console.log(c);
- c.send(data);
- }
- } catch (err) {
- _didIteratorError = true;
- _iteratorError = err;
- } finally {
try {
- if (!_iteratorNormalCompletion && _iterator.return) {
- _iterator.return();
+ for (var _iterator = webChannel.channels[Symbol.iterator](), _step; !(_iteratorNormalCompletion = (_step = _iterator.next()).done); _iteratorNormalCompletion = true) {
+ var c = _step.value;
+
+ c.send(data);
}
+ } catch (err) {
+ _didIteratorError = true;
+ _iteratorError = err;
} finally {
- if (_didIteratorError) {
- throw _iteratorError;
+ try {
+ if (!_iteratorNormalCompletion && _iterator.return) {
+ _iterator.return();
+ }
+ } finally {
+ if (_didIteratorError) {
+ throw _iteratorError;
+ }
}
}
- }
+
+ resolve();
+ });
}
}, {
key: 'sendTo',
value: function sendTo(id, webChannel, data) {
- var _iteratorNormalCompletion2 = true;
- var _didIteratorError2 = false;
- var _iteratorError2 = undefined;
+ console.log('sending to ' + id);
- try {
- for (var _iterator2 = webChannel.channels[Symbol.iterator](), _step2; !(_iteratorNormalCompletion2 = (_step2 = _iterator2.next()).done); _iteratorNormalCompletion2 = true) {
- var c = _step2.value;
+ return new Promise(function (resolve, reject) {
+ var _iteratorNormalCompletion2 = true;
+ var _didIteratorError2 = false;
+ var _iteratorError2 = undefined;
- if (c.peerID == id) {
- c.send(data);
- }
- }
- } catch (err) {
- _didIteratorError2 = true;
- _iteratorError2 = err;
- } finally {
try {
- if (!_iteratorNormalCompletion2 && _iterator2.return) {
- _iterator2.return();
+ for (var _iterator2 = webChannel.channels[Symbol.iterator](), _step2; !(_iteratorNormalCompletion2 = (_step2 = _iterator2.next()).done); _iteratorNormalCompletion2 = true) {
+ var c = _step2.value;
+
+ if (c.peerID == id) {
+ c.send(data);
+ }
}
+ } catch (err) {
+ _didIteratorError2 = true;
+ _iteratorError2 = err;
} finally {
- if (_didIteratorError2) {
- throw _iteratorError2;
+ try {
+ if (!_iteratorNormalCompletion2 && _iterator2.return) {
+ _iterator2.return();
+ }
+ } finally {
+ if (_didIteratorError2) {
+ throw _iteratorError2;
+ }
}
}
- }
+
+ resolve();
+ });
}
}, {
key: 'leave',
@@ -805,7 +843,7 @@ return /******/ (function(modules) { // webpackBootstrap
var settings = Object.assign({}, this.settings, options);
return new Promise(function (resolve, reject) {
- var connection = undefined;
+ var connection = void 0;
var socket = new window.WebSocket(settings.signaling);
console.log('Socket created');
socket.onopen = function () {
@@ -928,8 +966,8 @@ return /******/ (function(modules) { // webpackBootstrap
}, function () {});
})();
} else if (msg.sdp.type === 'answer') {
- var sd = Object.assign(new this.RTCSessionDescription(), msg.sdp);
- webChannel.connections.get(msg.senderPeerID).setRemoteDescription(sd, function () {}, function () {});
+ var _sd = Object.assign(new this.RTCSessionDescription(), msg.sdp);
+ webChannel.connections.get(msg.senderPeerID).setRemoteDescription(_sd, function () {}, function () {});
}
} else if (Reflect.has(msg, 'candidate')) {
webChannel.connections.get(msg.senderPeerID).addIceCandidate(new this.RTCIceCandidate(msg.candidate));
@@ -1042,7 +1080,7 @@ return /******/ (function(modules) { // webpackBootstrap
var settings = Object.assign({}, this.settings, options);
return new Promise(function (resolve, reject) {
- var connection = undefined;
+ var connection = void 0;
var socket = new window.WebSocket(settings.signaling);
socket.seq = 1;
socket.facade = options.facade || null;
@@ -1108,6 +1146,10 @@ return /******/ (function(modules) { // webpackBootstrap
case cs.USER_DATA:
webChannel.onmessage(msg.id, msg.data);
break;
+ case cs.GET_HISTORY:
+ console.log("SOMEONE WANTS HISTORY");
+ webChannel.onPeerMessage(msg.id, msg.code);
+ break;
case cs.SERVICE_DATA:
var service = _ServiceProvider2.default.get(msg.service);
service.onmessage(channel, msg.data);
@@ -1139,6 +1181,9 @@ return /******/ (function(modules) { // webpackBootstrap
msg.id = data.id;
msg.data = data.data;
break;
+ case cs.GET_HISTORY:
+ msg.id = data.id;
+ break;
case cs.SERVICE_DATA:
msg.service = data.service;
msg.data = Object.assign({}, data.data);
@@ -1268,7 +1313,7 @@ return /******/ (function(modules) { // webpackBootstrap
}, {
key: 'message',
value: function message(code, data) {
- var type = undefined;
+ var type = void 0;
switch (code) {
case cs.USER_DATA:
type = 'MSG';
diff --git a/www/common/realtime-input.js b/www/common/realtime-input.js
index ebddde624..6ba825f53 100644
--- a/www/common/realtime-input.js
+++ b/www/common/realtime-input.js
@@ -102,16 +102,8 @@ define([
var bump = function () {};
- var onPeerMessage = function (peer, msg) {
- if(peer === '_HISTORY_KEEPER_') {
- var msgHistory = JSON.parse(msg[4]);
- onMessage(msgHistory[1], msgHistory[4]);
- }
- else {
- warn('Illegal direct message');
- }
- };
-
+ var messagesHistory = [];
+
var options = {
signaling: websocketUrl,
// signaling: 'ws://localhost:8000',
@@ -125,7 +117,7 @@ define([
var realtime;
// Add the Facade's peer messages handler
- Netflux._onPeerMessage = onPeerMessage;
+ // Netflux._onPeerMessage = onPeerMessage;
function getParameterByName(name, url) {
if (!url) url = window.location.href;
@@ -171,6 +163,9 @@ define([
wc.onmessage = onMessage; // On receiving message
wc.onJoining = onJoining; // On user joining the session
wc.onLeaving = onLeaving; // On user leaving the session
+ wc.onPeerMessage = function(peerId, type) {
+ onPeerMessage(peerId, wc); // On user leaving the session
+ }
// Open a Chainpad session
realtime = createRealtime();
@@ -191,12 +186,21 @@ define([
if (parsed.content[0] !== 0) {
console.log('ENVOI '+message);
message = Crypto.encrypt(message, cryptKey);
- wc.send(message);
- onMessage('', message);
+ wc.send(message).then(function() {
+ onMessage('', message);
+ });
}
// END-TODO
});
+ var hc;
+ for (let c of wc.channels) { hc = c; break; }
+ if(hc) {
+ console.log('history keeper :');
+ console.log(hc);
+ console.log('onPeer '+hc.peerID)
+ wc.getHistory(hc.peerID);
+ }
// Get the channel history
// var hc;
// wc.peers.forEach(function (p) { if (!hc || p.linkQuality > hc.linkQuality) { hc = p; } });
@@ -227,16 +231,28 @@ define([
var whoami = new RegExp(userName.replace(/[\/\+]/g, function (c) {
return '\\' +c;
}));
+
+ var onPeerMessage = function(peerID, wc) {
+ console.log(messagesHistory);
+ console.log('RTsendTo '+peerID);
+ messagesHistory.forEach(function(msg) {
+ console.log(msg);
+ //var message = Crypto.encrypt('1:y'+msg, cryptKey);
+ wc.sendTo(peerID, msg);
+ });
+ };
var onMessage = function(peer, msg) {
// TODO : put in ChainpadAdapter
// remove the password
+ messagesHistory.push(msg);
var passLen = msg.substring(0,msg.indexOf(':'));
var message = msg.substring(passLen.length+1 + Number(passLen));
message = Crypto.decrypt(message, cryptKey);
+
console.log('RECOIS '+message);
// END-TODO ChainpadAdapter
From ae8f6f7f2c5cebee3ef8939a7fbb75c5ac127210 Mon Sep 17 00:00:00 2001
From: Yann Flory
Date: Thu, 10 Mar 2016 14:03:31 +0100
Subject: [PATCH 09/65] Ability to choose which protocol to use (Websocket or
WebRTC) with Netflux
---
WebRTCSrv.js | 35 +------
config.js.dist | 2 +
customize.dist/index.html | 6 +-
server.js | 43 +++++---
www/common/netflux.js | 119 +++++++++++----------
www/common/realtime-input.js | 194 +++++++++++++++++------------------
www/hack/main.js | 1 +
www/vdom/main.js | 1 +
8 files changed, 196 insertions(+), 205 deletions(-)
diff --git a/WebRTCSrv.js b/WebRTCSrv.js
index 8b0c78321..c97c18b79 100644
--- a/WebRTCSrv.js
+++ b/WebRTCSrv.js
@@ -1,11 +1,10 @@
'use strict'
let WebSocketServer = require('ws').Server
-const PORT = 8000
const UNSUPPORTED_DATA = 1007
const POLICY_VIOLATION = 1008
const CLOSE_UNSUPPORTED = 1003
-var run = module.exports.run = function(storage, server) {
+var run = module.exports.run = function(server) {
server.on('connection', (socket) => {
socket.on('message', (data) => {
try {
@@ -15,7 +14,6 @@ var run = module.exports.run = function(storage, server) {
for (let master of server.clients) {
if (master.key === msg.key) {
socket.close(POLICY_VIOLATION, 'The key already exists')
- console.log('ERROR key exists');
return
}
}
@@ -32,16 +30,13 @@ var run = module.exports.run = function(storage, server) {
} else if (msg.hasOwnProperty('join')) {
for (let master of server.clients) {
if (master.key === msg.join) {
- console.log('joined');
socket.master = master
master.joiningClients.push(socket)
let id = master.joiningClients.length - 1
- console.log(id);
master.send(JSON.stringify({id, data: msg.data}))
return
}
}
- console.log('ERROR unknown key');
socket.close(POLICY_VIOLATION, 'Unknown key')
} else if (msg.hasOwnProperty('data') && socket.hasOwnProperty('master')) {
let id = socket.master.joiningClients.indexOf(socket)
@@ -55,31 +50,11 @@ var run = module.exports.run = function(storage, server) {
})
socket.on('close', (event) => {
- console.log('someone has closed');
- // If not master
- if (socket.hasOwnProperty('master')) {
- let masterClients = socket.master.joiningClients
- for (let client of masterClients) {
- if(client.id === socket.id) {
- console.log('close client '+client.key)
- client.close(POLICY_VIOLATION, 'The peer is no longer available')
- //masterClients.splice(masterClients.indexOf(client),1);
- }
- }
- }
- else if (socket.hasOwnProperty('joiningClients')) {
- let firstClient
- let masterClients = socket.joiningClients
- for (let client of masterClients) {
- firstClient = client
- break;
+ if (socket.hasOwnProperty('joiningClients')) {
+ for (let client of socket.joiningClients) {
+ client.close(POLICY_VIOLATION, 'The peer is no longer available')
}
- firstClient.close(POLICY_VIOLATION, 'The master is no longer available')
- //masterClients.splice(masterClients.indexOf(firstClient),1);
- firstClient.joiningClients = masterClients
- console.log('change master from '+socket.key+' to '+firstClient.key)
- socket = firstClient
}
- })
+ });
})
}
\ No newline at end of file
diff --git a/config.js.dist b/config.js.dist
index f61828e4c..56d7a1fc8 100644
--- a/config.js.dist
+++ b/config.js.dist
@@ -11,6 +11,8 @@ module.exports = {
httpPort: 3000,
// the port used for websockets
websocketPort: 3001,
+ // the port used for webrtc (uncomment to use the WebRTC server)
+ // webrtcPort: 3002,
// You now have a choice of storage engines
diff --git a/customize.dist/index.html b/customize.dist/index.html
index b652894e6..69e2eaa9f 100644
--- a/customize.dist/index.html
+++ b/customize.dist/index.html
@@ -123,9 +123,12 @@
-