From 014aacc76a4af0d7fa50657fda96e1ddc1d1ce42 Mon Sep 17 00:00:00 2001 From: Caleb James DeLisle Date: Tue, 23 Jan 2018 16:31:59 +0100 Subject: [PATCH] Added a new RPC to get file offsets of messages by hash or of last 2 checkpoints, also improved checking of valid channel names and fixed a pull-stream bug and exposed async-store to the window --- rpc.js | 42 ++++++++++++++++++++++++++++----- storage/file.js | 23 +++++++++++++++--- www/common/outer/async-store.js | 2 +- 3 files changed, 57 insertions(+), 10 deletions(-) diff --git a/rpc.js b/rpc.js index b48df570a..c27c15f92 100644 --- a/rpc.js +++ b/rpc.js @@ -29,7 +29,7 @@ var WARN = function (e, output) { }; var isValidId = function (chan) { - return chan && chan.length && /^[a-fA-F0-9]/.test(chan) && + return chan && chan.length && /^[a-zA-Z0-9=+-]*$/.test(chan) && [32, 48].indexOf(chan.length) > -1; }; @@ -1006,6 +1006,7 @@ var isUnauthenticatedCall = function (call) { 'GET_MULTIPLE_FILE_SIZE', 'IS_CHANNEL_PINNED', 'IS_NEW_CHANNEL', + 'GET_HISTORY_OFFSET' ].indexOf(call) !== -1; }; @@ -1049,8 +1050,21 @@ const mkEvent = function (once) { }; }; -/*::const ConfigType = require('./config.example.js');*/ -RPC.create = function (config /*:typeof(ConfigType)*/, cb /*:(?Error, ?Function)=>void*/) { +/*:: +const flow_Config = require('./config.example.js'); +type Config_t = typeof(flow_Config); +import type { ChainPadServer_Storage_t } from './storage/file.js' +type NetfluxWebsocketSrvContext_t = { + store: ChainPadServer_Storage_t, + getHistoryOffset: ( + ctx: NetfluxWebsocketSrvContext_t, + channelName: string, + lastKnownHash: ?string, + cb: (err: ?Error, offset: ?number)=>void + )=>void +}; +*/ +RPC.create = function (config /*:Config_t*/, cb /*:(?Error, ?Function)=>void*/) { // load pin-store... console.log('loading rpc module...'); @@ -1081,8 +1095,24 @@ RPC.create = function (config /*:typeof(ConfigType)*/, cb /*:(?Error, ?Function) return msg && msg.length === 2 && isUnauthenticatedCall(msg[0]); }; - var handleUnauthenticatedMessage = function (msg, respond) { + var handleUnauthenticatedMessage = function (msg, respond, nfwssCtx) { switch (msg[0]) { + case 'GET_HISTORY_OFFSET': { + if (typeof(msg[1]) !== 'object' || typeof(msg[1].channelName) !== 'string') { + return respond('INVALID_ARG_FORMAT', msg); + } + const msgHash = typeof(msg[1].msgHash) === 'string' ? msg[1].msgHash : undefined; + nfwssCtx.getHistoryOffset(nfwssCtx, msg[1].channelName, msgHash, (e, ret) => { + if (e) { + if (e.code !== 'ENOENT') { + WARN(e.stack, msg); + } + return respond(e.message); + } + respond(e, [null, ret, null]); + }); + break; + } case 'GET_FILE_SIZE': return void getFileSize(Env, msg[1], function (e, size) { if (e) { @@ -1133,7 +1163,7 @@ RPC.create = function (config /*:typeof(ConfigType)*/, cb /*:(?Error, ?Function) } if (isUnauthenticateMessage(msg)) { - return handleUnauthenticatedMessage(msg, respond); + return handleUnauthenticatedMessage(msg, respond, ctx); } var signature = msg.shift(); @@ -1334,7 +1364,7 @@ RPC.create = function (config /*:typeof(ConfigType)*/, cb /*:(?Error, ?Function) }; var rpc = function ( - ctx /*:{ store: Object }*/, + ctx /*:NetfluxWebsocketSrvContext_t*/, data /*:Array>*/, respond /*:(?string, ?Array)=>void*/) { diff --git a/storage/file.js b/storage/file.js index ce6c3ae13..62ed4fa39 100644 --- a/storage/file.js +++ b/storage/file.js @@ -7,6 +7,12 @@ var nThen = require("nthen"); const ToPull = require('stream-to-pull-stream'); const Pull = require('pull-stream'); +const isValidChannelId = function (id) { + return typeof(id) === 'string' && + [32, 48].indexOf(id.length) > -1 && + /^[a-zA-Z0-9=+-]*$/.test(id); +}; + var mkPath = function (env, channelId) { return Path.join(env.root, channelId.slice(0, 2), channelId) + '.ndjson'; }; @@ -161,7 +167,9 @@ const readMessagesBin = (env, id, start, msgHandler, cb) => { mkBufferSplit(), mkOffsetCounter(), Pull.asyncMap((data, moreCb) => { msgHandler(data, moreCb, ()=>{ keepReading = false; moreCb(); }); }), - Pull.drain(()=>(keepReading), cb) + Pull.drain(() => (keepReading), (err) => { + cb((keepReading) ? err : undefined); + }) ); }; @@ -414,35 +422,44 @@ module.exports.create = function ( } cb({ readMessagesBin: (channelName, start, asyncMsgHandler, cb) => { + if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); } readMessagesBin(env, channelName, start, asyncMsgHandler, cb); }, message: function (channelName, content, cb) { + if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); } message(env, channelName, content, cb); }, messageBin: (channelName, content, cb) => { + if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); } messageBin(env, channelName, content, cb); }, getMessages: function (channelName, msgHandler, cb) { + if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); } getMessages(env, channelName, msgHandler, cb); }, removeChannel: function (channelName, cb) { + if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); } removeChannel(env, channelName, function (err) { cb(err); }); }, closeChannel: function (channelName, cb) { + if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); } closeChannel(env, channelName, cb); }, flushUnusedChannels: function (cb) { flushUnusedChannels(env, cb); }, - getChannelSize: function (chanName, cb) { - channelBytes(env, chanName, cb); + getChannelSize: function (channelName, cb) { + if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); } + channelBytes(env, channelName, cb); }, getChannelMetadata: function (channelName, cb) { + if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); } getChannelMetadata(env, channelName, cb); }, clearChannel: function (channelName, cb) { + if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); } clearChannel(env, channelName, cb); }, }); diff --git a/www/common/outer/async-store.js b/www/common/outer/async-store.js index 68c217108..e00500472 100644 --- a/www/common/outer/async-store.js +++ b/www/common/outer/async-store.js @@ -24,7 +24,7 @@ define([ var storeHash; - var store = {}; + var store = window.CryptPad_AsyncStore = {}; var onSync = function (cb) {