diff --git a/NetfluxWebsocketSrv.js b/NetfluxWebsocketSrv.js index b8e0f5bae..0c78d4d8c 100644 --- a/NetfluxWebsocketSrv.js +++ b/NetfluxWebsocketSrv.js @@ -7,6 +7,7 @@ const LAG_MAX_BEFORE_PING = 15000; const HISTORY_KEEPER_ID = Crypto.randomBytes(8).toString('hex'); const USE_HISTORY_KEEPER = true; +const USE_FILE_BACKUP_STORAGE = true; let dropUser; @@ -86,7 +87,7 @@ const handleMessage = function (ctx, user, msg) { return; } let chanName = obj || randName(); - sendMsg(ctx, user, [seq, 'ACK']); + sendMsg(ctx, user, [seq, 'JACK', chanName]); let chan = ctx.channels[chanName] = ctx.channels[chanName] || []; chan.id = chanName; if (USE_HISTORY_KEEPER) { @@ -131,11 +132,11 @@ const handleMessage = function (ctx, user, msg) { let err; let chan; let idx; - if (!obj) { err = 'EINVAL'; } + if (!obj) { err = 'EINVAL'; obj = 'undefined';} if (!err && !(chan = ctx.channels[obj])) { err = 'ENOENT'; } if (!err && (idx = chan.indexOf(user)) === -1) { err = 'NOT_IN_CHAN'; } if (err) { - sendMsg(ctx, user, [seq, 'ERROR', err]); + sendMsg(ctx, user, [seq, 'ERROR', err, obj]); return; } sendMsg(ctx, user, [seq, 'ACK']); @@ -153,7 +154,7 @@ let run = module.exports.run = function (storage, socketServer) { let ctx = { users: {}, channels: {}, - store: LogStore.create('messages.log', storage) + store: (USE_FILE_BACKUP_STORAGE) ? LogStore.create('messages.log', storage) : storage }; setInterval(function () { Object.keys(ctx.users).forEach(function (userId) { @@ -161,7 +162,7 @@ let run = module.exports.run = function (storage, socketServer) { if (now() - u.timeOfLastMessage > LAG_MAX_BEFORE_DISCONNECT) { dropUser(ctx, u); } else if (!u.pingOutstanding && now() - u.timeOfLastMessage > LAG_MAX_BEFORE_PING) { - sendMsg(ctx, u, [0, 'PING', now()]); + sendMsg(ctx, u, [0, '', 'PING', now()]); u.pingOutstanding = true; } }); diff --git a/www/common/netflux.js b/www/common/netflux.js index 68a703151..5a59d7876 100644 --- a/www/common/netflux.js +++ b/www/common/netflux.js @@ -674,8 +674,8 @@ return /******/ (function(modules) { // webpackBootstrap var msg = undefined; // Create the string message + var date = new Date().getTime(); if (data.type === 'PING') { - var date = new Date().getTime(); msg = JSON.stringify([c.seq++, 'PING', date]); } else { msg = JSON.stringify([c.seq++, data.type, webChannel.id, data.msg]); @@ -686,10 +686,11 @@ return /******/ (function(modules) { // webpackBootstrap srvMsg.shift(); srvMsg.unshift(webChannel.myID); srvMsg.unshift(0); - webChannel.waitingAck[c.seq - 1] = srvMsg; + webChannel.waitingAck[c.seq - 1] = { resolve: resolve, reject: reject, time: date, data: srvMsg }; // Send the message to the server c.send(msg); } + // resolve(); } catch (err) { _didIteratorError = true; _iteratorError = err; @@ -704,8 +705,6 @@ return /******/ (function(modules) { // webpackBootstrap } } } - - resolve(); }); } }, { @@ -1108,7 +1107,8 @@ return /******/ (function(modules) { // webpackBootstrap this.NAME = this.constructor.name; this.protocol = _ServiceProvider2.default.get(cs.EXCHANGEPROTOCOL_SERVICE); this.defaults = { - signaling: 'ws://localhost:9000' + signaling: 'ws://localhost:9000', + REQUEST_TIMEOUT: 5000 }; this.settings = Object.assign({}, this.defaults, options); } @@ -1122,6 +1122,19 @@ return /******/ (function(modules) { // webpackBootstrap return new Promise(function (resolve, reject) { var connection = undefined; var socket = new window.WebSocket(settings.signaling); + setInterval(function () { + if (socket.webChannel && socket.webChannel.waitingAck) { + var waitingAck = socket.webChannel.waitingAck; + for (var id in waitingAck) { + var req = waitingAck[id]; + var now = new Date().getTime(); + if (now - req.time > settings.REQUEST_TIMEOUT) { + delete socket.webChannel.waitingAck[id]; + req.reject({ type: 'TIMEOUT', message: 'waited ' + now - req.time + 'ms' }); + } + } + } + }, 5000); socket.seq = 1; socket.facade = options.facade || null; socket.onopen = function () { @@ -1345,15 +1358,15 @@ return /******/ (function(modules) { // webpackBootstrap if (msg[1] === 'ACK') { var seq = msg[0]; if (webChannel.waitingAck[seq]) { - var newMsg = webChannel.waitingAck[seq]; - if (parseInt(newMsg[3]) === newMsg[3]) { - // PING message + var waitingAck = webChannel.waitingAck[seq]; + waitingAck.resolve(); + var newMsg = waitingAck.data; + if (newMsg[2] === 'PING') { + // PING message : set the lag var lag = new Date().getTime() - newMsg[3]; webChannel.getLag = function () { return lag; }; - } else { - if (typeof webChannel.onmessage === "function") webChannel.onmessage(newMsg[1], newMsg[4]); } delete webChannel.waitingAck[seq]; } diff --git a/www/common/realtime-input.js b/www/common/realtime-input.js index fecd64cd3..c646d8054 100644 --- a/www/common/realtime-input.js +++ b/www/common/realtime-input.js @@ -294,14 +294,17 @@ define([ // Open a Chainpad session realtime = createRealtime(); - // On sending message + // Sending a message... realtime.onMessage(function(message) { - // Prevent Chainpad from sending authentication messages since it is handled by Netflux + // Filter messages sent by Chainpad to make it compatible with Netflux message = chainpadAdapter.msgOut(message, wc); if(message) { wc.send(message).then(function() { - // Send the message back to Chainpad once it is sent to all peers if using the WebRTC protocol - if(rtc) { onMessage(wc.myID, message); } + // Send the message back to Chainpad once it is sent to the recipients. + onMessage(wc.myID, message); + }, function(err) { + // The message has not been sent, display the error. + console.error(err); }); } }); diff --git a/www/pad/main.js b/www/pad/main.js index 48b2b5d8a..0ec1877b7 100644 --- a/www/pad/main.js +++ b/www/pad/main.js @@ -98,7 +98,7 @@ define([ we should check when such an element is going to be removed, and prevent that from happening. */ if (info.node && info.node.tagName === 'SPAN' && - info.node.contentEditable === "true") { + info.node.contentEditable === false) { // it seems to be a magicline plugin element... if (info.diff.action === 'removeElement') { // and you're about to remove it...