diff --git a/www/common/common-util.js b/www/common/common-util.js index 22f20f710..580b9f539 100644 --- a/www/common/common-util.js +++ b/www/common/common-util.js @@ -1,5 +1,10 @@ (function (window) { var Util = {}; + + Util.tryParse = function (s) { + try { return JSON.parse(s); } catch (e) { return;} + }; + Util.mkAsync = function (f) { return function () { var args = Array.prototype.slice.call(arguments); diff --git a/www/common/rpc.js b/www/common/rpc.js index cefbfb666..e3c6a21a4 100644 --- a/www/common/rpc.js +++ b/www/common/rpc.js @@ -1,144 +1,198 @@ (function () { var factory = function (Util, Nacl) { + // we will send messages with a unique id for each RPC + // that id is returned with each response, indicating which call it was in response to var uid = Util.uid; + + // safely parse json messages, because they might cause parse errors + var tryParse = Util.tryParse; + + // we will sign various message with our edPrivate keys + // this handles that in a generic way var signMsg = function (data, signKey) { var buffer = Nacl.util.decodeUTF8(JSON.stringify(data)); return Nacl.util.encodeBase64(Nacl.sign.detached(buffer, signKey)); }; -/* -types of messages: - pin -> hash - unpin -> hash - getHash -> hash - getTotalSize -> bytes - getFileSize -> bytes -*/ - + // sendMsg takes a pre-formed message, does a little validation + // adds a transaction id to the message and stores its callback + // and finally sends it off to the historyKeeper, which delegates its + // processing to the RPC submodule var sendMsg = function (ctx, data, cb) { - var network = ctx.network; - var hkn = network.historyKeeper; - var txid = uid(); + // enforce async behaviour + setTimeout(function () { + if (typeof(cb) !== 'function') { + return console.error('expected callback'); + } - if (typeof(cb) !== 'function') { - return console.error('expected callback'); - } + var network = ctx.network; + var hkn = network.historyKeeper; + if (typeof(hkn) !== 'string') { return void cb("NO_HISTORY_KEEPER"); } - var pending = ctx.pending[txid] = function (err, response) { - cb(err, response); - }; - pending.data = data; - pending.called = 0; - return network.sendto(hkn, JSON.stringify([txid, data])); + var txid = uid(); + + var pending = ctx.pending[txid] = function (err, response) { + cb(err, response); + }; + pending.data = data; + pending.called = 0; + + return network.sendto(hkn, JSON.stringify([txid, data])); + }); }; - var parse = function (msg) { - try { - return JSON.parse(msg); - } catch (e) { - return null; - } + var matchesAnon = function (ctx, txid) { + if (!ctx.anon) { return false; } + if (typeof(ctx.anon.pending[txid]) !== 'function') { return false; } + return true; }; - var onMsg = function (ctx, msg) { - var parsed = parse(msg); + var handleAnon = function (ctx /* anon_ctx */, txid, body /* parsed messages without txid */) { + // if anon is handling it we know there's a pending callback + var pending = ctx.pending[txid]; + if (body[0] === 'ERROR') { pending(body[1]); } + else { pending(void 0, body.slice(1)); } + delete ctx.pending[txid]; + }; + + var onMsg = function (ctx /* network context */, msg /* string message */) { + if (typeof(msg) !== 'string') { + console.error("received non-string message [%s]", msg); + } + var parsed = tryParse(msg); if (!parsed) { return void console.error(new Error('could not parse message: %s', msg)); } // RPC messages are always arrays. if (!Array.isArray(parsed)) { return; } + // ignore FULL_HISTORY messages + if (/(FULL_HISTORY|HISTORY_RANGE)/.test(parsed[0])) { return; } + var txid = parsed[0]; + // txid must be a string, or this message is not meant for us if (typeof(txid) !== 'string') { return; } - var cookie = parsed[1]; - var pending = ctx.pending[txid]; - - if (!(parsed && parsed.slice)) { - // RPC responses are arrays. this message isn't meant for us. - return; + if (matchesAnon(ctx, txid)) { + return void handleAnon(ctx.anon, txid, parsed.slice(1)); } - if (/(FULL_HISTORY|HISTORY_RANGE)/.test(parsed[0])) { return; } - var response = parsed.slice(2); - - if (typeof(pending) === 'function') { - if (parsed[1] === 'ERROR') { - if (parsed[2] === 'NO_COOKIE') { - return void ctx.send('COOKIE', "", function (e) { - if (e) { - console.error(e); - return void pending(e); - } - - // resend the same command again - // give up if you've already tried resending - if (ctx.resend(txid)) { delete ctx.pending[txid]; } - }); + // iterate over authenticated rpc contexts and check if they are expecting + // a message with this txid + if (ctx.authenticated.some(function (rpc_ctx) { + var pending = rpc_ctx.pending[txid]; + // not meant for you + if (typeof(pending) !== 'function') { return false; } + + // if you're here, the message is for you... + + if (parsed[1] !== 'ERROR') { + // if the server sent you a new cookie, replace the old one + if (/\|/.test(parsed[1]) && rpc_ctx.cookie !== parsed[1]) { + rpc_ctx.cookie = parsed[1]; } + pending(void 0, parsed.slice(2)); - pending(parsed[2]); - delete ctx.pending[txid]; - return; - } else { - // update the cookie - if (/\|/.test(cookie)) { - if (ctx.cookie !== cookie) { - ctx.cookie = cookie; + // if successful, delete the callback... + delete rpc_ctx.pending[txid]; + // prevent further iteration + return true; + } + + // NO_COOKIE errors mean you failed to authenticate. + // request a new cookie and resend the query + if (parsed[2] === 'NO_COOKIE') { + return void ctx.send('COOKIE', "", function (e) { + if (e) { + console.error(e); + return void pending(e); } - } + + // resend the same command again + // give up if you've already tried resending + if (ctx.resend(txid)) { delete ctx.pending[txid]; } + }); } - pending(void 0, response); - // if successful, delete the callback... + // if you're here then your RPC passed authentication but had some other error + // call back with the error message + pending(parsed[2]); + // and delete the pending callback delete ctx.pending[txid]; + + // prevent further iteration + return true; + })) { + // the message was handled, so stop here return; } - // HACK to hide messages from the anon rpc - if (parsed.length !== 4 && parsed[1] !== 'ERROR') { - console.log(parsed); - console.error("received message [%s] for txid[%s] with no callback", msg, txid); - } + console.error("UNHANDLED RPC MESSAGE"); }; - var create = function (network, edPrivateKey, edPublicKey, cb) { - var signKey; + var networks = []; + var contexts = []; - try { - signKey = Nacl.util.decodeBase64(edPrivateKey); - if (signKey.length !== 64) { - throw new Error('private key did not match expected length of 64'); - } - } catch (err) { - return void cb(err); - } + var initNetworkContext = function (network) { + var ctx = { + network: network, + connected: true, + anon: undefined, + authenticated: [], + }; + networks.push(network); + contexts.push(ctx); - var pubBuffer; - try { - pubBuffer = Nacl.util.decodeBase64(edPublicKey); - if (pubBuffer.length !== 32) { - return void cb('expected public key to be 32 uint'); - } - } catch (err) { - return void cb(err); - } + // add listeners... + network.on('message', function (msg, sender) { + if (sender !== network.historyKeeper) { return; } + onMsg(ctx, msg); + }); + network.on('disconnect', function () { + ctx.connected = false; + if (ctx.anon) { ctx.anon.connected = false; } + ctx.authenticated.forEach(function (ctx) { + ctx.connected = false; + }); + }); + + network.on('reconnect', function () { + if (ctx.anon) { ctx.anon.connected = true; } + ctx.authenticated.forEach(function (ctx) { + ctx.connected = true; + }); + }); + return ctx; + }; + + var getNetworkContext = function (network) { + var i; + networks.some(function (current, j) { + if (network !== current) { return false; } + i = j; + return true; + }); + + if (contexts[i]) { return contexts[i]; } + return initNetworkContext(network); + }; + + var initAuthenticatedRpc = function (networkContext, keys) { var ctx = { - network: network, - timeouts: {}, // timeouts - pending: {}, // callbacks + network: networkContext.network, + publicKey: keys.publicKeyString, + timeouts: {}, + pending: {}, cookie: null, connected: true, }; var send = ctx.send = function (type, msg, cb) { if (!ctx.connected && type !== 'COOKIE') { - return void setTimeout(function () { - cb('DISCONNECTED'); - }); + return void Util.mkAsync(cb)("DISCONNECTED"); } // construct a signed message... @@ -150,9 +204,9 @@ types of messages: data.unshift(ctx.cookie); } - var sig = signMsg(data, signKey); + var sig = signMsg(data, keys.signKey); - data.unshift(edPublicKey); + data.unshift(keys.publicKeyString); data.unshift(sig); // [sig, edPublicKey, cookie, type, msg] @@ -169,7 +223,7 @@ types of messages: // update the cookie and signature... pending.data[2] = ctx.cookie; - pending.data[0] = signMsg(pending.data.slice(2), signKey); + pending.data[0] = signMsg(pending.data.slice(2), keys.signKey); try { return ctx.network.sendto(ctx.network.historyKeeper, JSON.stringify([txid, pending.data])); @@ -187,7 +241,7 @@ types of messages: } // construct an unsigned message - var data = [null, edPublicKey, null, type, msg]; + var data = [null, keys.publicKeyString, null, type, msg]; if (ctx.cookie && ctx.cookie.join) { data[2] = ctx.cookie.join('|'); } else { @@ -197,103 +251,101 @@ types of messages: return sendMsg(ctx, data, cb); }; - network.on('message', function (msg, sender) { - if (sender !== network.historyKeeper) { return; } - onMsg(ctx, msg); - }); + ctx.destroy = function () { + // clear all pending timeouts + Object.keys(ctx.timeouts).forEach(function (to) { + clearTimeout(to); + }); - network.on('disconnect', function () { - ctx.connected = false; - }); + // remove the ctx from the network's stack + var idx = networkContext.authenticated.indexOf(ctx); + if (idx === -1) { return; } + networkContext.authenticated.splice(idx, 1); + }; - network.on('reconnect', function () { - send('COOKIE', "", function (e) { - if (e) { return void cb(e); } - ctx.connected = true; - }); - }); + networkContext.authenticated.push(ctx); + return ctx; + }; - // network.onHistoryKeeperChange is defined in chainpad-netflux.js - // The function we pass will be called when the drive reconnects and - // chainpad-netflux detects a new history keeper id - if (network.onHistoryKeeperChange) { - network.onHistoryKeeperChange(function () { - send('COOKIE', "", function (e) { - if (e) { return void cb(e); } - ctx.connected = true; - }); - }); - } + var getAuthenticatedContext = function (networkContext, keys) { + if (!networkContext) { throw new Error('expected network context'); } - send('COOKIE', "", function (e) { - if (e) { return void cb(e); } - // callback to provide 'send' method to whatever needs it - cb(void 0, { send: send, }); + var publicKey = keys.publicKeyString; + + var i; + networkContext.authenticated.some(function (ctx, j) { + if (ctx.publicKey !== publicKey) { return false; } + i = j; + return true; }); + + if (networkContext.authenticated[i]) { return networkContext.authenticated[i]; } + + return initAuthenticatedRpc(networkContext, keys); }; - var onAnonMsg = function (ctx, msg) { - var parsed = parse(msg); + var create = function (network, edPrivateKey, edPublicKey, cb) { + if (typeof(cb) !== 'function') { throw new Error("expected callback"); } - if (!parsed) { - return void console.error(new Error('could not parse message: %s', msg)); + var signKey; + + try { + signKey = Nacl.util.decodeBase64(edPrivateKey); + if (signKey.length !== 64) { + throw new Error('private key did not match expected length of 64'); + } + } catch (err) { + return void cb(err); } - // RPC messages are always arrays. - if (!Array.isArray(parsed)) { return; } - var txid = parsed[0]; + try { + if (Nacl.util.decodeBase64(edPublicKey).length !== 32) { + return void cb('expected public key to be 32 uint'); + } + } catch (err) { return void cb(err); } - // txid must be a string, or this message is not meant for us - if (typeof(txid) !== 'string') { return; } + if (!network) { return void cb('NO_NETWORK'); } - var pending = ctx.pending[txid]; + // get or create a context for the provided network + var net_ctx = getNetworkContext(network); - if (!(parsed && parsed.slice)) { - // RPC responses are arrays. this message isn't meant for us. - return; - } - if (/FULL_HISTORY/.test(parsed[0])) { return; } - var response = parsed.slice(2); - - if (typeof(pending) === 'function') { - if (parsed[1] === 'ERROR') { - pending(parsed[2]); - delete ctx.pending[txid]; - return; - } - pending(void 0, response); + var rpc_ctx = getAuthenticatedContext(net_ctx, { + publicKeyString: edPublicKey, + signKey: signKey, + }); - // if successful, delete the callback... - delete ctx.pending[txid]; - return; - } - // HACK: filter out ugly messages we don't care about - if (typeof(msg) !== 'string') { - console.error("received message [%s] for txid[%s] with no callback", msg, txid); - } + rpc_ctx.send('COOKIE', "", function (e) { + if (e) { return void cb(e); } + // callback to provide 'send' method to whatever needs it + cb(void 0, { + send: rpc_ctx.send, + destroy: rpc_ctx.destroy, + }); + }); }; - var createAnonymous = function (network, cb) { + var initAnonRpc = function (networkContext) { var ctx = { - network: network, - timeouts: {}, // timeouts - pending: {}, // callbacks - cookie: null, + network: networkContext.network, + timeouts: {}, + pending: {}, connected: true, }; - var send = ctx.send = function (type, msg, cb) { + // any particular network will only ever need one anonymous rpc + networkContext.anon = ctx; + + ctx.send = function (type, msg, cb) { if (!ctx.connected) { return void setTimeout(function () { cb('DISCONNECTED'); }); } - // construct an unsigned message... var data = [type, msg]; - // [sig, edPublicKey, cookie, type, msg] + // [type, msg] return sendMsg(ctx, data, cb); }; @@ -314,21 +366,32 @@ types of messages: } }; - network.on('message', function (msg, sender) { - if (sender !== network.historyKeeper) { return; } - onAnonMsg(ctx, msg); - }); + ctx.destroy = function () { + // clear all pending timeouts + Object.keys(ctx.timeouts).forEach(function (to) { + clearTimeout(to); + }); - network.on('disconnect', function () { - ctx.connected = false; - }); + networkContext.anon = undefined; + }; - network.on('reconnect', function () { - ctx.connected = true; - }); + return ctx; + }; + + var getAnonContext = function (networkContext) { + return networkContext.anon || initAnonRpc(networkContext); + }; + + var createAnonymous = function (network, cb) { + if (typeof(cb) !== 'function') { throw new Error("expected callback"); } + if (!network) { return void cb('NO_NETWORK'); } + + // get or create a context for the provided network + var ctx = getAnonContext(getNetworkContext(network)); cb(void 0, { - send: send + send: ctx.send, + destroy: ctx.destroy, }); };