refactor rpc with support for multiple authenticated sessions

pull/1/head
ansuz 5 years ago
parent 53142c91e7
commit 75b655e1e8

@ -1,5 +1,10 @@
(function (window) { (function (window) {
var Util = {}; var Util = {};
Util.tryParse = function (s) {
try { return JSON.parse(s); } catch (e) { return;}
};
Util.mkAsync = function (f) { Util.mkAsync = function (f) {
return function () { return function () {
var args = Array.prototype.slice.call(arguments); var args = Array.prototype.slice.call(arguments);

@ -1,71 +1,108 @@
(function () { (function () {
var factory = function (Util, Nacl) { var factory = function (Util, Nacl) {
// we will send messages with a unique id for each RPC
// that id is returned with each response, indicating which call it was in response to
var uid = Util.uid; var uid = Util.uid;
// safely parse json messages, because they might cause parse errors
var tryParse = Util.tryParse;
// we will sign various message with our edPrivate keys
// this handles that in a generic way
var signMsg = function (data, signKey) { var signMsg = function (data, signKey) {
var buffer = Nacl.util.decodeUTF8(JSON.stringify(data)); var buffer = Nacl.util.decodeUTF8(JSON.stringify(data));
return Nacl.util.encodeBase64(Nacl.sign.detached(buffer, signKey)); return Nacl.util.encodeBase64(Nacl.sign.detached(buffer, signKey));
}; };
/* // sendMsg takes a pre-formed message, does a little validation
types of messages: // adds a transaction id to the message and stores its callback
pin -> hash // and finally sends it off to the historyKeeper, which delegates its
unpin -> hash // processing to the RPC submodule
getHash -> hash
getTotalSize -> bytes
getFileSize -> bytes
*/
var sendMsg = function (ctx, data, cb) { var sendMsg = function (ctx, data, cb) {
var network = ctx.network; // enforce async behaviour
var hkn = network.historyKeeper; setTimeout(function () {
var txid = uid();
if (typeof(cb) !== 'function') { if (typeof(cb) !== 'function') {
return console.error('expected callback'); return console.error('expected callback');
} }
var network = ctx.network;
var hkn = network.historyKeeper;
if (typeof(hkn) !== 'string') { return void cb("NO_HISTORY_KEEPER"); }
var txid = uid();
var pending = ctx.pending[txid] = function (err, response) { var pending = ctx.pending[txid] = function (err, response) {
cb(err, response); cb(err, response);
}; };
pending.data = data; pending.data = data;
pending.called = 0; pending.called = 0;
return network.sendto(hkn, JSON.stringify([txid, data])); return network.sendto(hkn, JSON.stringify([txid, data]));
});
}; };
var parse = function (msg) { var matchesAnon = function (ctx, txid) {
try { if (!ctx.anon) { return false; }
return JSON.parse(msg); if (typeof(ctx.anon.pending[txid]) !== 'function') { return false; }
} catch (e) { return true;
return null; };
}
var handleAnon = function (ctx /* anon_ctx */, txid, body /* parsed messages without txid */) {
// if anon is handling it we know there's a pending callback
var pending = ctx.pending[txid];
if (body[0] === 'ERROR') { pending(body[1]); }
else { pending(void 0, body.slice(1)); }
delete ctx.pending[txid];
}; };
var onMsg = function (ctx, msg) { var onMsg = function (ctx /* network context */, msg /* string message */) {
var parsed = parse(msg); if (typeof(msg) !== 'string') {
console.error("received non-string message [%s]", msg);
}
var parsed = tryParse(msg);
if (!parsed) { if (!parsed) {
return void console.error(new Error('could not parse message: %s', msg)); return void console.error(new Error('could not parse message: %s', msg));
} }
// RPC messages are always arrays. // RPC messages are always arrays.
if (!Array.isArray(parsed)) { return; } if (!Array.isArray(parsed)) { return; }
// ignore FULL_HISTORY messages
if (/(FULL_HISTORY|HISTORY_RANGE)/.test(parsed[0])) { return; }
var txid = parsed[0]; var txid = parsed[0];
// txid must be a string, or this message is not meant for us // txid must be a string, or this message is not meant for us
if (typeof(txid) !== 'string') { return; } if (typeof(txid) !== 'string') { return; }
var cookie = parsed[1];
var pending = ctx.pending[txid]; if (matchesAnon(ctx, txid)) {
return void handleAnon(ctx.anon, txid, parsed.slice(1));
}
if (!(parsed && parsed.slice)) { // iterate over authenticated rpc contexts and check if they are expecting
// RPC responses are arrays. this message isn't meant for us. // a message with this txid
return; if (ctx.authenticated.some(function (rpc_ctx) {
var pending = rpc_ctx.pending[txid];
// not meant for you
if (typeof(pending) !== 'function') { return false; }
// if you're here, the message is for you...
if (parsed[1] !== 'ERROR') {
// if the server sent you a new cookie, replace the old one
if (/\|/.test(parsed[1]) && rpc_ctx.cookie !== parsed[1]) {
rpc_ctx.cookie = parsed[1];
} }
if (/(FULL_HISTORY|HISTORY_RANGE)/.test(parsed[0])) { return; } pending(void 0, parsed.slice(2));
var response = parsed.slice(2); // if successful, delete the callback...
delete rpc_ctx.pending[txid];
// prevent further iteration
return true;
}
if (typeof(pending) === 'function') { // NO_COOKIE errors mean you failed to authenticate.
if (parsed[1] === 'ERROR') { // request a new cookie and resend the query
if (parsed[2] === 'NO_COOKIE') { if (parsed[2] === 'NO_COOKIE') {
return void ctx.send('COOKIE', "", function (e) { return void ctx.send('COOKIE', "", function (e) {
if (e) { if (e) {
@ -79,66 +116,83 @@ types of messages:
}); });
} }
// if you're here then your RPC passed authentication but had some other error
// call back with the error message
pending(parsed[2]); pending(parsed[2]);
// and delete the pending callback
delete ctx.pending[txid]; delete ctx.pending[txid];
return;
} else {
// update the cookie
if (/\|/.test(cookie)) {
if (ctx.cookie !== cookie) {
ctx.cookie = cookie;
}
}
}
pending(void 0, response);
// if successful, delete the callback... // prevent further iteration
delete ctx.pending[txid]; return true;
})) {
// the message was handled, so stop here
return; return;
} }
// HACK to hide messages from the anon rpc console.error("UNHANDLED RPC MESSAGE");
if (parsed.length !== 4 && parsed[1] !== 'ERROR') {
console.log(parsed);
console.error("received message [%s] for txid[%s] with no callback", msg, txid);
}
}; };
var create = function (network, edPrivateKey, edPublicKey, cb) { var networks = [];
var signKey; var contexts = [];
try { var initNetworkContext = function (network) {
signKey = Nacl.util.decodeBase64(edPrivateKey); var ctx = {
if (signKey.length !== 64) { network: network,
throw new Error('private key did not match expected length of 64'); connected: true,
} anon: undefined,
} catch (err) { authenticated: [],
return void cb(err); };
} networks.push(network);
contexts.push(ctx);
var pubBuffer; // add listeners...
try { network.on('message', function (msg, sender) {
pubBuffer = Nacl.util.decodeBase64(edPublicKey); if (sender !== network.historyKeeper) { return; }
if (pubBuffer.length !== 32) { onMsg(ctx, msg);
return void cb('expected public key to be 32 uint'); });
}
} catch (err) { network.on('disconnect', function () {
return void cb(err); ctx.connected = false;
} if (ctx.anon) { ctx.anon.connected = false; }
ctx.authenticated.forEach(function (ctx) {
ctx.connected = false;
});
});
network.on('reconnect', function () {
if (ctx.anon) { ctx.anon.connected = true; }
ctx.authenticated.forEach(function (ctx) {
ctx.connected = true;
});
});
return ctx;
};
var getNetworkContext = function (network) {
var i;
networks.some(function (current, j) {
if (network !== current) { return false; }
i = j;
return true;
});
if (contexts[i]) { return contexts[i]; }
return initNetworkContext(network);
};
var initAuthenticatedRpc = function (networkContext, keys) {
var ctx = { var ctx = {
network: network, network: networkContext.network,
timeouts: {}, // timeouts publicKey: keys.publicKeyString,
pending: {}, // callbacks timeouts: {},
pending: {},
cookie: null, cookie: null,
connected: true, connected: true,
}; };
var send = ctx.send = function (type, msg, cb) { var send = ctx.send = function (type, msg, cb) {
if (!ctx.connected && type !== 'COOKIE') { if (!ctx.connected && type !== 'COOKIE') {
return void setTimeout(function () { return void Util.mkAsync(cb)("DISCONNECTED");
cb('DISCONNECTED');
});
} }
// construct a signed message... // construct a signed message...
@ -150,9 +204,9 @@ types of messages:
data.unshift(ctx.cookie); data.unshift(ctx.cookie);
} }
var sig = signMsg(data, signKey); var sig = signMsg(data, keys.signKey);
data.unshift(edPublicKey); data.unshift(keys.publicKeyString);
data.unshift(sig); data.unshift(sig);
// [sig, edPublicKey, cookie, type, msg] // [sig, edPublicKey, cookie, type, msg]
@ -169,7 +223,7 @@ types of messages:
// update the cookie and signature... // update the cookie and signature...
pending.data[2] = ctx.cookie; pending.data[2] = ctx.cookie;
pending.data[0] = signMsg(pending.data.slice(2), signKey); pending.data[0] = signMsg(pending.data.slice(2), keys.signKey);
try { try {
return ctx.network.sendto(ctx.network.historyKeeper, return ctx.network.sendto(ctx.network.historyKeeper,
JSON.stringify([txid, pending.data])); JSON.stringify([txid, pending.data]));
@ -187,7 +241,7 @@ types of messages:
} }
// construct an unsigned message // construct an unsigned message
var data = [null, edPublicKey, null, type, msg]; var data = [null, keys.publicKeyString, null, type, msg];
if (ctx.cookie && ctx.cookie.join) { if (ctx.cookie && ctx.cookie.join) {
data[2] = ctx.cookie.join('|'); data[2] = ctx.cookie.join('|');
} else { } else {
@ -197,103 +251,101 @@ types of messages:
return sendMsg(ctx, data, cb); return sendMsg(ctx, data, cb);
}; };
network.on('message', function (msg, sender) { ctx.destroy = function () {
if (sender !== network.historyKeeper) { return; } // clear all pending timeouts
onMsg(ctx, msg); Object.keys(ctx.timeouts).forEach(function (to) {
clearTimeout(to);
}); });
network.on('disconnect', function () { // remove the ctx from the network's stack
ctx.connected = false; var idx = networkContext.authenticated.indexOf(ctx);
}); if (idx === -1) { return; }
networkContext.authenticated.splice(idx, 1);
};
network.on('reconnect', function () { networkContext.authenticated.push(ctx);
send('COOKIE', "", function (e) { return ctx;
if (e) { return void cb(e); } };
ctx.connected = true;
});
});
// network.onHistoryKeeperChange is defined in chainpad-netflux.js var getAuthenticatedContext = function (networkContext, keys) {
// The function we pass will be called when the drive reconnects and if (!networkContext) { throw new Error('expected network context'); }
// chainpad-netflux detects a new history keeper id
if (network.onHistoryKeeperChange) {
network.onHistoryKeeperChange(function () {
send('COOKIE', "", function (e) {
if (e) { return void cb(e); }
ctx.connected = true;
});
});
}
send('COOKIE', "", function (e) { var publicKey = keys.publicKeyString;
if (e) { return void cb(e); }
// callback to provide 'send' method to whatever needs it var i;
cb(void 0, { send: send, }); networkContext.authenticated.some(function (ctx, j) {
if (ctx.publicKey !== publicKey) { return false; }
i = j;
return true;
}); });
};
var onAnonMsg = function (ctx, msg) { if (networkContext.authenticated[i]) { return networkContext.authenticated[i]; }
var parsed = parse(msg);
if (!parsed) { return initAuthenticatedRpc(networkContext, keys);
return void console.error(new Error('could not parse message: %s', msg)); };
}
// RPC messages are always arrays.
if (!Array.isArray(parsed)) { return; }
var txid = parsed[0];
// txid must be a string, or this message is not meant for us var create = function (network, edPrivateKey, edPublicKey, cb) {
if (typeof(txid) !== 'string') { return; } if (typeof(cb) !== 'function') { throw new Error("expected callback"); }
var pending = ctx.pending[txid]; var signKey;
if (!(parsed && parsed.slice)) { try {
// RPC responses are arrays. this message isn't meant for us. signKey = Nacl.util.decodeBase64(edPrivateKey);
return; if (signKey.length !== 64) {
throw new Error('private key did not match expected length of 64');
} }
if (/FULL_HISTORY/.test(parsed[0])) { return; } } catch (err) {
var response = parsed.slice(2); return void cb(err);
if (typeof(pending) === 'function') {
if (parsed[1] === 'ERROR') {
pending(parsed[2]);
delete ctx.pending[txid];
return;
} }
pending(void 0, response);
// if successful, delete the callback... try {
delete ctx.pending[txid]; if (Nacl.util.decodeBase64(edPublicKey).length !== 32) {
return; return void cb('expected public key to be 32 uint');
}
// HACK: filter out ugly messages we don't care about
if (typeof(msg) !== 'string') {
console.error("received message [%s] for txid[%s] with no callback", msg, txid);
} }
} catch (err) { return void cb(err); }
if (!network) { return void cb('NO_NETWORK'); }
// get or create a context for the provided network
var net_ctx = getNetworkContext(network);
var rpc_ctx = getAuthenticatedContext(net_ctx, {
publicKeyString: edPublicKey,
signKey: signKey,
});
rpc_ctx.send('COOKIE', "", function (e) {
if (e) { return void cb(e); }
// callback to provide 'send' method to whatever needs it
cb(void 0, {
send: rpc_ctx.send,
destroy: rpc_ctx.destroy,
});
});
}; };
var createAnonymous = function (network, cb) { var initAnonRpc = function (networkContext) {
var ctx = { var ctx = {
network: network, network: networkContext.network,
timeouts: {}, // timeouts timeouts: {},
pending: {}, // callbacks pending: {},
cookie: null,
connected: true, connected: true,
}; };
var send = ctx.send = function (type, msg, cb) { // any particular network will only ever need one anonymous rpc
networkContext.anon = ctx;
ctx.send = function (type, msg, cb) {
if (!ctx.connected) { if (!ctx.connected) {
return void setTimeout(function () { return void setTimeout(function () {
cb('DISCONNECTED'); cb('DISCONNECTED');
}); });
} }
// construct an unsigned message... // construct an unsigned message...
var data = [type, msg]; var data = [type, msg];
// [sig, edPublicKey, cookie, type, msg] // [type, msg]
return sendMsg(ctx, data, cb); return sendMsg(ctx, data, cb);
}; };
@ -314,21 +366,32 @@ types of messages:
} }
}; };
network.on('message', function (msg, sender) { ctx.destroy = function () {
if (sender !== network.historyKeeper) { return; } // clear all pending timeouts
onAnonMsg(ctx, msg); Object.keys(ctx.timeouts).forEach(function (to) {
clearTimeout(to);
}); });
network.on('disconnect', function () { networkContext.anon = undefined;
ctx.connected = false; };
});
network.on('reconnect', function () { return ctx;
ctx.connected = true; };
});
var getAnonContext = function (networkContext) {
return networkContext.anon || initAnonRpc(networkContext);
};
var createAnonymous = function (network, cb) {
if (typeof(cb) !== 'function') { throw new Error("expected callback"); }
if (!network) { return void cb('NO_NETWORK'); }
// get or create a context for the provided network
var ctx = getAnonContext(getNetworkContext(network));
cb(void 0, { cb(void 0, {
send: send send: ctx.send,
destroy: ctx.destroy,
}); });
}; };

Loading…
Cancel
Save