From 38bd27303bade3191c45abf06744e0c833573770 Mon Sep 17 00:00:00 2001 From: ansuz Date: Fri, 8 Dec 2017 11:47:07 +0100 Subject: [PATCH] WIP rpc framework --- www/assert/frame/frame.html | 7 ++ www/assert/frame/frame.js | 147 +++++++++++++++++++++++++++++++ www/assert/frame/respond.js | 32 +++++++ www/assert/main.js | 75 ++++++++++------ www/common/wire.js | 167 +++++++++++++++++++++++++++--------- 5 files changed, 361 insertions(+), 67 deletions(-) create mode 100644 www/assert/frame/frame.html create mode 100644 www/assert/frame/frame.js create mode 100644 www/assert/frame/respond.js diff --git a/www/assert/frame/frame.html b/www/assert/frame/frame.html new file mode 100644 index 000000000..8d2d63c0d --- /dev/null +++ b/www/assert/frame/frame.html @@ -0,0 +1,7 @@ + + + + + + + diff --git a/www/assert/frame/frame.js b/www/assert/frame/frame.js new file mode 100644 index 000000000..42808f6f9 --- /dev/null +++ b/www/assert/frame/frame.js @@ -0,0 +1,147 @@ +(function () { + + var Frame = {}; + + var uid = function () { + return Number(Math.floor(Math.random() * Number.MAX_SAFE_INTEGER)) + .toString(32).replace(/\./g, ''); + }; + + // create an invisible iframe with a given source + // append it to a parent element + // execute a callback when it has loaded + Frame.create = function (parent, src, onload, timeout) { + var iframe = document.createElement('iframe'); + + timeout = timeout || 10000; + var to = window.setTimeout(function () { + onload('[timeoutError] could not load iframe at ' + src); + }, timeout); + + iframe.setAttribute('id', 'cors-store'); + + iframe.onload = function (e) { + onload(void 0, iframe, e); + window.clearTimeout(to); + }; + // We must pass a unique parameter here to avoid cache problems in Firefox with + // the NoScript plugin: if the iframe's content is taken from the cache, the JS + // is not executed with NoScript.... + iframe.setAttribute('src', src + '?t=' + new Date().getTime()); + + iframe.style.display = 'none'; + parent.appendChild(iframe); + }; + + /* given an iframe with an rpc script loaded, create a frame object + with an asynchronous 'send' method */ + Frame.open = function (e, A, timeout) { + var win = e.contentWindow; + + var frame = {}; + frame.id = uid(); + + var listeners = {}; + var timeouts = {}; + + timeout = timeout || 5000; + + frame.accepts = function (o) { + return A.some(function (e) { + switch (typeof(e)) { + case 'string': return e === o; + case 'object': return e.test(o); + } + }); + }; + + var changeHandlers = frame.changeHandlers = []; + + frame.change = function (f) { + if (typeof(f) !== 'function') { + throw new Error('[Frame.change] expected callback'); + } + changeHandlers.push(f); + }; + + var _listener = function (e) { + if (!frame.accepts(e.origin)) { + console.log("message from %s rejected!", e.origin); + return; + } + var message = JSON.parse(e.data); + var uid = message._uid; + var error = message.error; + var data = message.data; + + if (!uid) { + console.log("No uid!"); + return; + } + + if (uid === 'change' && changeHandlers.length) { + changeHandlers.forEach(function (f) { + f(data); + }); + return; + } + + if (timeouts[uid]) { + window.clearTimeout(timeouts[uid]); + } + if (listeners[uid]) { + listeners[uid](error, data, e); + delete listeners[uid]; + } + }; + window.addEventListener('message', _listener); + + frame.close = function () { + window.removeEventListener('message', _listener); + }; + + /* method (string): (set|get|remove) + key (string) + data (string) + cb (function) */ + frame.send = function (method, content, cb) { + var req = { + method: method, + //key: key, + data: content, //data, + }; + + var id = req._uid = uid(); + // uid must not equal 'change' + while(id === 'change') { + id = req._uid = uid(); + } + + if (typeof(cb) === 'function') { + //console.log("setting callback!"); + listeners[id] = cb; + //console.log("setting timeout of %sms", timeout); + timeouts[id] = window.setTimeout(function () { + // when the callback is executed it will clear this timeout + cb('[TimeoutError] request timed out after ' + timeout + 'ms'); + }, timeout); + } else { + console.log(typeof(cb)); + } + + win.postMessage(JSON.stringify(req), '*'); + }; + + return frame; + }; + + if (typeof(module) !== 'undefined' && module.exports) { + module.exports = Frame; + } else if (typeof(define) === 'function' && define.amd) { + define(['jquery'], function () { + return Frame; + }); + } else { + window.Frame = Frame; + } +}()); diff --git a/www/assert/frame/respond.js b/www/assert/frame/respond.js new file mode 100644 index 000000000..07f8e2994 --- /dev/null +++ b/www/assert/frame/respond.js @@ -0,0 +1,32 @@ +var validDomains = [ /.*/i, ]; +var isValidDomain = function (o) { + return validDomains.some(function (e) { + switch (typeof(e)) { + case 'string': return e === o; + case 'object': return e.test(o); + } + }); +}; + +window.addEventListener('message', function(e) { + if (!isValidDomain(e.origin)) { return; } + var payload = JSON.parse(e.data); + var parent = window.parent; + var respond = function (error, data) { + var res = { + _uid: payload._uid, + error: error, + data: data, + }; + parent.postMessage(JSON.stringify(res), '*'); + }; + + //console.error(payload); + switch(payload.method) { + case undefined: + return respond('No method supplied'); + default: + return respond(void 0, "EHLO"); + } +}); + diff --git a/www/assert/main.js b/www/assert/main.js index e88e13fd1..629d7cafd 100644 --- a/www/assert/main.js +++ b/www/assert/main.js @@ -249,7 +249,6 @@ define([ } }; - var evt = Util.mkEvent(); var respond = function (e, out) { evt.fire(e, out); @@ -259,9 +258,8 @@ define([ try { var parsed = JSON.parse(raw); var txid = parsed.txid; - var message = parsed.message; setTimeout(function () { - service(message.command, message.content, function (e, result) { + service(parsed.q, parsed.content, function (e, result) { respond(JSON.stringify({ txid: txid, error: e, @@ -285,33 +283,56 @@ define([ }); }, "Test rpc factory"); -/* assert(function (cb) { - var getBlob = function (url, cb) { - var xhr = new XMLHttpRequest(); - xhr.open("GET", url, true); - xhr.responseType = "blob"; - xhr.onload = function () { - cb(void 0, this.response); - }; - xhr.send(); - }; - - var $img = $('img#thumb-orig'); - getBlob($img.attr('src'), function (e, blob) { - console.log(e, blob); - Thumb.fromImageBlob(blob, function (e, thumb) { - console.log(thumb); - var th = new Image(); - th.src = URL.createObjectURL(thumb); - th.onload = function () { - $(document.body).append($(th).addClass('thumb')); - cb(th.width === Thumb.dimension && th.height === Thumb.dimension); - }; + require([ + '/assert/frame/frame.js', + ], function (Frame) { + Frame.create(document.body, '/assert/frame/frame.html', function (e, frame) { + if (e) { return cb(false); } + + var channel = Frame.open(frame, [ + /.*/i, + ], 5000); + + channel.send('HELO', null, function (e, res) { + if (res === 'EHLO') { return cb(true); } + cb(false); + }); }); }); - }); -*/ + }, "PEWPEW"); + + (function () { + var guid = Wire.uid(); + + var t = Wire.tracker({ + timeout: 1000, + hook: function (txid, q, content) { + console.info(JSON.stringify({ + guid: guid, + txid: txid, + q: q, + content: content, + })); + }, + }); + + assert(function (cb) { + t.call('SHOULD_TIMEOUT', null, function (e) { + if (e === 'TIMEOUT') { return cb(true); } + cb(false); + }); + }, 'tracker should timeout'); + + assert(function (cb) { + var id = t.call('SHOULD_NOT_TIMEOUT', null, function (e, out) { + if (e) { return cb(false); } + if (out === 'YES') { return cb(true); } + cb(false); + }); + t.respond(id, void 0, 'YES'); + }, "tracker should not timeout"); + }()); Drive.test(assert); diff --git a/www/common/wire.js b/www/common/wire.js index c4b59cfae..c30e5defb 100644 --- a/www/common/wire.js +++ b/www/common/wire.js @@ -7,25 +7,118 @@ define([ Requirements -* some transmission methods can be interrupted - * handle disconnects and reconnects -* handle callbacks -* configurable timeout -* Service should expose 'addClient' method - * and handle broadcast +* [x] some transmission methods can be interrupted + * [x] handle disconnects and reconnects +* [x] handle callbacks +* [x] configurable timeout +* [x] be able to answer only queries with a particular id +* be able to implement arbitrary listeners on the service-side + * and not call 'ready' until those listeners are ready +* identical API for: + * iframe postMessage + * server calls over netflux + * postMessage to webworker + * postMessage to sharedWorker +* on-wire protocol should actually be the same for rewriting purposes + * q + * guid (globally unique id) + * txid (message id) + * content +* be able to compose different RPCs as streams + * intercept and rewrite capacity + * multiplex multiple streams over one stream + * blind redirect + * intelligent router + * broadcast (with ACK?) + * message -* - */ - var uid = function () { + var uid = Wire.uid = function () { return Number(Math.floor(Math.random () * Number.MAX_SAFE_INTEGER)).toString(32); }; + +/* tracker(options) + maintains a registry of asynchronous function calls + +allows you to: + hook each call to actually send to a remote service... + abort any call + trigger the pending callback with arguments + set the state of the tracker (active/inactive) + + +*/ + Wire.tracker = function (opt) { + opt = opt || {}; + var hook = opt.hook || function () {}; + var timeout = opt.timeout || 5000; + var pending = {}; + var timeouts = {}; + + var call = function (method, data, cb) { + var id = uid(); + + // if the callback is not invoked in time, time out + timeouts[id] = setTimeout(function () { + if (typeof(pending[id]) === 'function') { + cb("TIMEOUT"); + delete pending[id]; + return; + } + throw new Error('timed out without function to call'); + }, timeout); + + pending[id] = function () { + // invoke the function with arguments... + cb.apply(null, Array.prototype.slice.call(arguments)); + // clear its timeout + clearTimeout(timeouts[id]); + // remove the function from pending + delete pending[id]; + }; + + hook(id, method, data); + + return id; + }; + + var respond = function (id, err, response) { + if (typeof(pending[id]) !== 'function') { + throw new Error('invoked non-existent callback'); + } + pending[id](err, response); + }; + + var abort = function (id) { + if (pending[id]) { + clearTimeout(timeouts[id]); + delete pending[id]; + return true; + } + return false; + }; + + var t = { + call: call, + respond: respond, + abort: abort, + state: true, + }; + + t.setState = function (active) { + t.state = Boolean(active); + }; + + return t; + }; + /* opt = { + timeout: 30000, send: function () { }, @@ -45,50 +138,44 @@ opt = { }; */ - Wire.create = function (opt, cb) { - var ctx = {}; - var pending = ctx.pending = {}; - ctx.connected = false; - - var rpc = {}; + var parseMessage = function (raw) { + try { return JSON.parse(raw); } catch (e) { return; } + }; + Wire.create = function (opt, cb) { opt.constructor(function (e, service) { if (e) { return setTimeout(function () { cb(e); }); } + var rpc = {}; + + var guid = Wire.uid(); + var t = Wire.tracker({ + timeout: opt.timeout, + hook: function (txid, q, content) { + service.send(JSON.stringify({ + guid: guid, + q: q, + txid: txid, + content: content, + })); + }, + }); rpc.send = function (type, data, cb) { - var txid = uid(); - if (typeof(cb) !== 'function') { - throw new Error('expected callback'); - } - - ctx.pending[txid] = function (err, response) { - cb(err, response); - }; - - service.send(JSON.stringify({ - txid: txid, - message: { - command: type, - content: data, - }, - })); + t.call(type, data, cb); }; service.receive(function (raw) { - try { - var data = JSON.parse(raw); - var txid = data.txid; - if (!txid) { throw new Error('NO_TXID'); } - var cb = pending[txid]; - if (data.error) { return void cb(data.error); } - cb(void 0, data.content); - } catch (e) { console.error("UNHANDLED_MESSAGE", raw); } + var data = parseMessage(raw); + if (typeof(data) === 'undefined') { + return console.error("UNHANDLED_MESSAGE", raw); + } + if (!data.txid) { throw new Error('NO_TXID'); } + t.respond(data.txid, data.error, data.content); }); cb(void 0, rpc); }); }; - return Wire; });