diff --git a/rpc.js b/rpc.js index c382a275a..54faf0f2c 100644 --- a/rpc.js +++ b/rpc.js @@ -21,9 +21,15 @@ const Meta = require("./lib/metadata"); const WriteQueue = require("./lib/write-queue"); const BatchRead = require("./lib/batch-read"); +const Util = require("./lib/common-util"); +const escapeKeyCharacters = Util.escapeKeyCharacters; +const unescapeKeyCharacters = Util.unescapeKeyCharacters; +const mkEvent = Util.mkEvent; + var RPC = module.exports; var Store = require("./storage/file"); +var BlobStore = require("./storage/blob"); var DEFAULT_LIMIT = 50 * 1024 * 1024; var SESSION_EXPIRATION_TIME = 60 * 1000; @@ -45,42 +51,6 @@ var isValidId = function (chan) { [32, 48].indexOf(chan.length) > -1; }; -/* -var uint8ArrayToHex = function (a) { - // call slice so Uint8Arrays work as expected - return Array.prototype.slice.call(a).map(function (e) { - var n = Number(e & 0xff).toString(16); - if (n === 'NaN') { - throw new Error('invalid input resulted in NaN'); - } - - switch (n.length) { - case 0: return '00'; // just being careful, shouldn't happen - case 1: return '0' + n; - case 2: return n; - default: throw new Error('unexpected value'); - } - }).join(''); -}; -*/ - -var testFileId = function (id) { - if (id.length !== 48 || /[^a-f0-9]/.test(id)) { - return false; - } - return true; -}; - -/* -var createFileId = function () { - var id = uint8ArrayToHex(Nacl.randomBytes(24)); - if (!testFileId(id)) { - throw new Error('file ids must consist of 48 hex characters'); - } - return id; -}; -*/ - var makeToken = function () { return Number(Math.floor(Math.random() * Number.MAX_SAFE_INTEGER)) .toString(16); @@ -110,14 +80,6 @@ var parseCookie = function (cookie) { return c; }; -var escapeKeyCharacters = function (key) { - return key && key.replace && key.replace(/\//g, '-'); -}; - -var unescapeKeyCharacters = function (key) { - return key.replace(/\-/g, '/'); -}; - var getSession = function (Sessions, key) { var safeKey = escapeKeyCharacters(key); if (Sessions[safeKey]) { @@ -272,28 +234,6 @@ var getChannelList = function (Env, publicKey, cb) { }); }; -var makeFilePath = function (root, id) { // FIXME FILES - if (typeof(id) !== 'string' || id.length <= 2) { return null; } - return Path.join(root, id.slice(0, 2), id); -}; - -var getUploadSize = function (Env, channel, cb) { // FIXME FILES - var paths = Env.paths; - var path = makeFilePath(paths.blob, channel); - if (!path) { - return cb('INVALID_UPLOAD_ID'); - } - - Fs.stat(path, function (err, stats) { - if (err) { - // if a file was deleted, its size is 0 bytes - if (err.code === 'ENOENT') { return cb(void 0, 0); } - return void cb(err.code); - } - cb(void 0, stats.size); - }); -}; - var getFileSize = function (Env, channel, cb) { if (!isValidId(channel)) { return void cb('INVALID_CHAN'); } if (channel.length === 32) { @@ -311,7 +251,7 @@ var getFileSize = function (Env, channel, cb) { } // 'channel' refers to a file, so you need another API - getUploadSize(Env, channel, function (e, size) { + Env.blobStore.size(channel, function (e, size) { if (typeof(size) === 'undefined') { return void cb(e); } cb(void 0, size); }); @@ -481,15 +421,14 @@ var getTotalSize = function (Env, publicKey, cb) { return void getChannelList(Env, publicKey, function (channels) { if (!channels) { return done('INVALID_PIN_LIST'); } // unexpected - var count = channels.length; - if (!count) { return void done(void 0, 0); } - - channels.forEach(function (channel) { // FIXME this might as well be nThen - getFileSize(Env, channel, function (e, size) { - count--; - if (!e) { bytes += size; } - if (count === 0) { return done(void 0, bytes); } + nThen(function (w) { + channels.forEach(function (channel) { // TODO semaphore? + getFileSize(Env, channel, w(function (e, size) { + if (!e) { bytes += size; } + })); }); + }).nThen(function () { + done(void 0, bytes); }); }); }); @@ -838,48 +777,6 @@ var resetUserPins = function (Env, publicKey, channelList, cb) { }); }; -var makeFileStream = function (root, id, cb) { // FIXME FILES - var stub = id.slice(0, 2); - var full = makeFilePath(root, id); - if (!full) { - WARN('makeFileStream', 'invalid id ' + id); - return void cb('BAD_ID'); - } - Fse.mkdirp(Path.join(root, stub), function (e) { - if (e || !full) { // !full for pleasing flow, it's already checked - WARN('makeFileStream', e); - return void cb(e ? e.message : 'INTERNAL_ERROR'); - } - - try { - var stream = Fs.createWriteStream(full, { - flags: 'a', - encoding: 'binary', - highWaterMark: Math.pow(2, 16), - }); - stream.on('open', function () { - cb(void 0, stream); - }); - stream.on('error', function (e) { - WARN('stream error', e); - }); - } catch (err) { - cb('BAD_STREAM'); - } - }); -}; - -var isFile = function (filePath, cb) { // FIXME FILES - /*:: if (typeof(filePath) !== 'string') { throw new Error('should never happen'); } */ - Fs.stat(filePath, function (e, stats) { - if (e) { - if (e.code === 'ENOENT') { return void cb(void 0, false); } - return void cb(e.message); - } - return void cb(void 0, stats.isFile()); - }); -}; - var clearOwnedChannel = function (Env, channelId, unsafeKey, cb) { if (typeof(channelId) !== 'string' || channelId.length !== 32) { return cb('INVALID_ARGUMENTS'); @@ -899,74 +796,49 @@ var clearOwnedChannel = function (Env, channelId, unsafeKey, cb) { }); }; -var removeOwnedBlob = function (Env, blobId, unsafeKey, cb) { // FIXME FILES // FIXME METADATA - var safeKey = escapeKeyCharacters(unsafeKey); - var safeKeyPrefix = safeKey.slice(0,3); - var blobPrefix = blobId.slice(0,2); - - var blobPath = makeFilePath(Env.paths.blob, blobId); - var ownPath = Path.join(Env.paths.blob, safeKeyPrefix, safeKey, blobPrefix, blobId); - - nThen(function (w) { - // Check if the blob exists - isFile(blobPath, w(function (e, isFile) { - if (e) { - w.abort(); - return void cb(e); - } - if (!isFile) { - WARN('removeOwnedBlob', 'The provided blob ID is not a file!'); - w.abort(); - return void cb('EINVAL_BLOBID'); - } - })); - }).nThen(function (w) { - // Check if you're the owner - isFile(ownPath, w(function (e, isFile) { - if (e) { - w.abort(); - return void cb(e); - } - if (!isFile) { - WARN('removeOwnedBlob', 'Incorrect owner'); - w.abort(); - return void cb('INSUFFICIENT_PERMISSIONS'); - } - })); - }).nThen(function (w) { - // Delete the blob - /*:: if (typeof(blobPath) !== 'string') { throw new Error('should never happen'); } */ - Fs.unlink(blobPath, w(function (e) { // TODO move to cold storage - Log.info('DELETION_OWNED_FILE_BY_OWNER_RPC', { - safeKey: safeKey, - blobPath: blobPath, - status: e? String(e): 'SUCCESS', - }); - if (e) { - w.abort(); - return void cb(e.code); - } - })); - }).nThen(function () { - // Delete the proof of ownership - Fs.unlink(ownPath, function (e) { - Log.info('DELETION_OWNED_FILE_PROOF_BY_OWNER_RPC', { - safeKey: safeKey, - proofPath: ownPath, - status: e? String(e): 'SUCCESS', - }); - cb(e && e.code); - }); - }); -}; - var removeOwnedChannel = function (Env, channelId, unsafeKey, cb) { if (typeof(channelId) !== 'string' || !isValidId(channelId)) { return cb('INVALID_ARGUMENTS'); } - if (testFileId(channelId)) { - return void removeOwnedBlob(Env, channelId, unsafeKey, cb); + if (Env.blobStore.isFileId(channelId)) { + var safeKey = escapeKeyCharacters(unsafeKey); + var blobId = channelId; + + return void nThen(function (w) { + // check if you have permissions + Env.blobStore.isOwnedBy(safeKey, blobId, w(function (err, owned) { + if (err || !owned) { + w.abort(); + return void cb("INSUFFICIENT_PERMISSIONS"); + } + })); + }).nThen(function (w) { + // remove the blob + Env.blobStore.remove(blobId, w(function (err) { + Log.info('DELETION_OWNED_FILE_BY_OWNER_RPC', { + safeKey: safeKey, + blobId: blobId, + status: err? String(err): 'SUCCESS', + }); + if (err) { + w.abort(); + return void cb(err); + } + })); + }).nThen(function () { + // remove the proof + Env.blobStore.removeProof(safeKey, blobId, function (err) { + Log.info("DELETION_PROOF_REMOVAL_BY_OWNER_RPC", { + safeKey: safeKey, + blobId: blobId, + status: err? String(err): 'SUCCESS', + }); + if (err) { + return void cb("E_PROOF_REMOVAL"); + } + }); + }); } getMetadata(Env, channelId, function (err, metadata) { @@ -1015,362 +887,6 @@ var removePins = function (Env, safeKey, cb) { }); }; -var upload = function (Env, publicKey, content, cb) { // FIXME FILES - var paths = Env.paths; - var dec; - try { dec = Buffer.from(content, 'base64'); } - catch (e) { return void cb('DECODE_BUFFER'); } - var len = dec.length; - - var session = getSession(Env.Sessions, publicKey); - - if (typeof(session.currentUploadSize) !== 'number' || - typeof(session.pendingUploadSize) !== 'number') { - // improperly initialized... maybe they didn't check before uploading? - // reject it, just in case - return cb('NOT_READY'); - } - - if (session.currentUploadSize > session.pendingUploadSize) { - return cb('E_OVER_LIMIT'); - } - - if (!session.blobstage) { - makeFileStream(paths.staging, publicKey, function (e, stream) { - if (!stream) { return void cb(e); } - - var blobstage = session.blobstage = stream; - blobstage.write(dec); - session.currentUploadSize += len; - cb(void 0, dec.length); - }); - } else { - session.blobstage.write(dec); - session.currentUploadSize += len; - cb(void 0, dec.length); - } -}; - -var upload_cancel = function (Env, publicKey, fileSize, cb) { // FIXME FILES - var paths = Env.paths; - - var session = getSession(Env.Sessions, publicKey); - session.pendingUploadSize = fileSize; - session.currentUploadSize = 0; - if (session.blobstage) { session.blobstage.close(); } - - var path = makeFilePath(paths.staging, publicKey); - if (!path) { - Log.error('UPLOAD_CANCEL_INVALID_PATH', { - staging: paths.staging, - key: publicKey, - path: path, - }); - return void cb('NO_FILE'); - } - - Fs.unlink(path, function (e) { - if (e) { return void cb('E_UNLINK'); } - cb(void 0); - }); -}; - -var upload_complete = function (Env, publicKey, id, cb) { // FIXME FILES - var paths = Env.paths; - var session = getSession(Env.Sessions, publicKey); - - if (session.blobstage && session.blobstage.close) { - session.blobstage.close(); - delete session.blobstage; - } - - if (!testFileId(id)) { - WARN('uploadComplete', "id is invalid"); - return void cb('EINVAL_ID'); - } - - var oldPath = makeFilePath(paths.staging, publicKey); - if (!oldPath) { - WARN('safeMkdir', "oldPath is null"); - return void cb('RENAME_ERR'); - } - - var tryLocation = function (cb) { - var prefix = id.slice(0, 2); - var newPath = makeFilePath(paths.blob, id); - if (typeof(newPath) !== 'string') { - WARN('safeMkdir', "newPath is null"); - return void cb('RENAME_ERR'); - } - - Fse.mkdirp(Path.join(paths.blob, prefix), function (e) { - if (e || !newPath) { - WARN('safeMkdir', e); - return void cb('RENAME_ERR'); - } - isFile(newPath, function (e, yes) { - if (e) { - WARN('isFile', e); - return void cb(e); - } - if (yes) { - WARN('isFile', 'FILE EXISTS!'); - return void cb('RENAME_ERR'); - } - - cb(void 0, newPath, id); - }); - }); - }; - - var handleMove = function (e, newPath, id) { - if (e || !oldPath || !newPath) { - return void cb(e || 'PATH_ERR'); - } - - Fse.move(oldPath, newPath, function (e) { - if (e) { - WARN('rename', e); - return void cb('RENAME_ERR'); - } - cb(void 0, id); - }); - }; - - tryLocation(handleMove); -}; - -/* FIXME FILES -var owned_upload_complete = function (Env, safeKey, cb) { - var session = getSession(Env.Sessions, safeKey); - - // the file has already been uploaded to the staging area - // close the pending writestream - if (session.blobstage && session.blobstage.close) { - session.blobstage.close(); - delete session.blobstage; - } - - var oldPath = makeFilePath(Env.paths.staging, safeKey); - if (typeof(oldPath) !== 'string') { - return void cb('EINVAL_CONFIG'); - } - - // construct relevant paths - var root = Env.paths.staging; - - //var safeKey = escapeKeyCharacters(safeKey); - var safeKeyPrefix = safeKey.slice(0, 2); - - var blobId = createFileId(); - var blobIdPrefix = blobId.slice(0, 2); - - var plannedPath = Path.join(root, safeKeyPrefix, safeKey, blobIdPrefix); - - var tries = 0; - - var chooseSafeId = function (cb) { - if (tries >= 3) { - // you've already failed three times in a row - // give up and return an error - cb('E_REPEATED_FAILURE'); - } - - var path = Path.join(plannedPath, blobId); - Fs.access(path, Fs.constants.R_OK | Fs.constants.W_OK, function (e) { - if (!e) { - // generate a new id (with the same prefix) and recurse - blobId = blobIdPrefix + createFileId().slice(2); - return void chooseSafeId(cb); - } else if (e.code === 'ENOENT') { - // no entry, so it's safe for us to proceed - return void cb(void 0, path); - } else { - // it failed in an unexpected way. log it - // try again, but no more than a fixed number of times... - tries++; - chooseSafeId(cb); - } - }); - }; - - // the user wants to move it into their own space - // /blob/safeKeyPrefix/safeKey/blobPrefix/blobID - - var finalPath; - nThen(function (w) { - // make the requisite directory structure using Mkdirp - Mkdirp(plannedPath, w(function (e) { - if (e) { // does not throw error if the directory already existed - w.abort(); - return void cb(e); - } - })); - }).nThen(function (w) { - // produce an id which confirmably does not collide with another - chooseSafeId(w(function (e, path) { - if (e) { - w.abort(); - return void cb(e); - } - finalPath = path; // this is where you'll put the new file - })); - }).nThen(function (w) { - // move the existing file to its new path - - // flow is dumb and I need to guard against this which will never happen - // / *:: if (typeof(oldPath) === 'object') { throw new Error('should never happen'); } * / - Fs.move(oldPath, finalPath, w(function (e) { - if (e) { - w.abort(); - return void cb(e.code); - } - // otherwise it worked... - })); - }).nThen(function () { - // clean up their session when you're done - // call back with the blob id... - cb(void 0, blobId); - }); -}; -*/ - -var owned_upload_complete = function (Env, safeKey, id, cb) { // FIXME FILES - var session = getSession(Env.Sessions, safeKey); - - // the file has already been uploaded to the staging area - // close the pending writestream - if (session.blobstage && session.blobstage.close) { - session.blobstage.close(); - delete session.blobstage; - } - - if (!testFileId(id)) { - WARN('ownedUploadComplete', "id is invalid"); - return void cb('EINVAL_ID'); - } - - var oldPath = makeFilePath(Env.paths.staging, safeKey); - if (typeof(oldPath) !== 'string') { - return void cb('EINVAL_CONFIG'); - } - - // construct relevant paths - var root = Env.paths.blob; - - //var safeKey = escapeKeyCharacters(safeKey); - var safeKeyPrefix = safeKey.slice(0, 3); - - //var blobId = createFileId(); - var blobIdPrefix = id.slice(0, 2); - - var ownPath = Path.join(root, safeKeyPrefix, safeKey, blobIdPrefix); - var filePath = Path.join(root, blobIdPrefix); - - var tryId = function (path, cb) { - Fs.access(path, Fs.constants.R_OK | Fs.constants.W_OK, function (e) { - if (!e) { - // generate a new id (with the same prefix) and recurse - WARN('ownedUploadComplete', 'id is already used '+ id); - return void cb('EEXISTS'); - } else if (e.code === 'ENOENT') { - // no entry, so it's safe for us to proceed - return void cb(); - } else { - // it failed in an unexpected way. log it - WARN('ownedUploadComplete', e); - return void cb(e.code); - } - }); - }; - - // the user wants to move it into blob and create a empty file with the same id - // in their own space: - // /blob/safeKeyPrefix/safeKey/blobPrefix/blobID - - var finalPath; - var finalOwnPath; - nThen(function (w) { - // make the requisite directory structure using Mkdirp - Fse.mkdirp(filePath, w(function (e /*, path */) { - if (e) { // does not throw error if the directory already existed - w.abort(); - return void cb(e.code); - } - })); - Fse.mkdirp(ownPath, w(function (e /*, path */) { - if (e) { // does not throw error if the directory already existed - w.abort(); - return void cb(e.code); - } - })); - }).nThen(function (w) { - // make sure the id does not collide with another - finalPath = Path.join(filePath, id); - finalOwnPath = Path.join(ownPath, id); - tryId(finalPath, w(function (e) { - if (e) { - w.abort(); - return void cb(e); - } - })); - }).nThen(function (w) { - // Create the empty file proving ownership - Fs.writeFile(finalOwnPath, '', w(function (e) { - if (e) { - w.abort(); - return void cb(e.code); - } - // otherwise it worked... - })); - }).nThen(function (w) { - // move the existing file to its new path - - // flow is dumb and I need to guard against this which will never happen - /*:: if (typeof(oldPath) === 'object') { throw new Error('should never happen'); } */ - Fse.move(oldPath, finalPath, w(function (e) { - if (e) { - // Remove the ownership file - Fs.unlink(finalOwnPath, function (e) { - WARN('E_UNLINK_OWN_FILE', e); - }); - w.abort(); - return void cb(e.code); - } - // otherwise it worked... - })); - }).nThen(function () { - // clean up their session when you're done - // call back with the blob id... - cb(void 0, id); - }); -}; - -var upload_status = function (Env, publicKey, filesize, cb) { // FIXME FILES - var paths = Env.paths; - - // validate that the provided size is actually a positive number - if (typeof(filesize) !== 'number' && - filesize >= 0) { return void cb('E_INVALID_SIZE'); } - - if (filesize >= Env.maxUploadSize) { return cb('TOO_LARGE'); } - // validate that the provided path is not junk - var filePath = makeFilePath(paths.staging, publicKey); - if (!filePath) { return void cb('E_INVALID_PATH'); } - - getFreeSpace(Env, publicKey, function (e, free) { - if (e || !filePath) { return void cb(e); } // !filePath for pleasing flow - if (filesize >= free) { return cb('NOT_ENOUGH_SPACE'); } - isFile(filePath, function (e, yes) { - if (e) { - WARN('upload', e); - return cb('UNNOWN_ERROR'); - } - cb(e, yes); - }); - }); -}; - /* We assume that the server is secured against MitM attacks via HTTPS, and that malicious actors do not have code execution @@ -1745,25 +1261,34 @@ var isAuthenticatedCall = function (call) { ].indexOf(call) !== -1; }; -const mkEvent = function (once) { - var handlers = []; - var fired = false; - return { - reg: function (cb) { - if (once && fired) { return void setTimeout(cb); } - handlers.push(cb); - }, - unreg: function (cb) { - if (handlers.indexOf(cb) === -1) { throw new Error("Not registered"); } - handlers.splice(handlers.indexOf(cb), 1); - }, - fire: function () { - if (once && fired) { return; } - fired = true; - var args = Array.prototype.slice.call(arguments); - handlers.forEach(function (h) { h.apply(null, args); }); - } - }; +// upload_status +var upload_status = function (Env, safeKey, filesize, _cb) { // FIXME FILES + var cb = Util.once(Util.mkAsync(_cb)); + + // validate that the provided size is actually a positive number + if (typeof(filesize) !== 'number' && + filesize >= 0) { return void cb('E_INVALID_SIZE'); } + + if (filesize >= Env.maxUploadSize) { return cb('TOO_LARGE'); } + + nThen(function (w) { + var abortAndCB = Util.both(w.abort, cb); + Env.blobStore.status(safeKey, w(function (err, inProgress) { + // if there's an error something is weird + if (err) { return void abortAndCB(err); } + + // we cannot upload two things at once + if (inProgress) { return void abortAndCB(void 0, true); } + })); + }).nThen(function () { + // if yuo're here then there are no pending uploads + // check if you have space in your quota to upload something of this size + getFreeSpace(Env, safeKey, function (e, free) { + if (e) { return void cb(e); } + if (filesize >= free) { return cb('NOT_ENOUGH_SPACE'); } + cb(void 0, false); + }); + }); }; /*:: @@ -1823,8 +1348,6 @@ RPC.create = function ( var Sessions = Env.Sessions; var paths = Env.paths; var pinPath = paths.pin = keyOrDefaultString('pinPath', './pins'); - var blobPath = paths.blob = keyOrDefaultString('blobPath', './blob'); - var blobStagingPath = paths.staging = keyOrDefaultString('blobStagingPath', './blobstage'); paths.block = keyOrDefaultString('blockPath', './block'); paths.data = keyOrDefaultString('filePath', './datastore'); @@ -2057,13 +1580,13 @@ RPC.create = function ( Respond(void 0, "OK"); }); case 'UPLOAD': - return void upload(Env, safeKey, msg[1], function (e, len) { + return void Env.blobStore.upload(safeKey, msg[1], function (e, len) { WARN(e, len); Respond(e, len); }); case 'UPLOAD_STATUS': var filesize = msg[1]; - return void upload_status(Env, safeKey, msg[1], function (e, yes) { + return void upload_status(Env, safeKey, filesize, function (e, yes) { if (!e && !yes) { // no pending uploads, set the new size var user = getSession(Sessions, safeKey); @@ -2073,12 +1596,12 @@ RPC.create = function ( Respond(e, yes); }); case 'UPLOAD_COMPLETE': - return void upload_complete(Env, safeKey, msg[1], function (e, hash) { + return void Env.blobStore.complete(safeKey, msg[1], function (e, hash) { WARN(e, hash); Respond(e, hash); }); case 'OWNED_UPLOAD_COMPLETE': - return void owned_upload_complete(Env, safeKey, msg[1], function (e, blobId) { + return void Env.blobStore.completeOwned(safeKey, msg[1], function (e, blobId) { WARN(e, blobId); Respond(e, blobId); }); @@ -2086,7 +1609,7 @@ RPC.create = function ( // msg[1] is fileSize // if we pass it here, we can start an upload right away without calling // UPLOAD_STATUS again - return void upload_cancel(Env, safeKey, msg[1], function (e) { + return void Env.blobStore.cancel(safeKey, msg[1], function (e) { WARN(e, 'UPLOAD_CANCEL'); Respond(e); }); @@ -2155,21 +1678,27 @@ RPC.create = function ( loadChannelPins(Env); - Store.create({ - filePath: pinPath, - }, function (s) { - Env.pinStore = s; - - Fse.mkdirp(blobPath, function (e) { - if (e) { throw e; } - Fse.mkdirp(blobStagingPath, function (e) { - if (e) { throw e; } - cb(void 0, rpc); - // expire old sessions once per minute - setInterval(function () { - expireSessions(Sessions); - }, SESSION_EXPIRATION_TIME); - }); - }); + nThen(function (w) { + Store.create({ + filePath: pinPath, + }, w(function (s) { + Env.pinStore = s; + })); + BlobStore.create({ + blobPath: config.blobPath, + blobStagingPath: config.blobStagingPath, + getSession: function (safeKey) { + return getSession(Sessions, safeKey); + }, + }, w(function (err, blob) { + if (err) { throw new Error(err); } + Env.blobStore = blob; + })); + }).nThen(function () { + cb(void 0, rpc); + // expire old sessions once per minute + setInterval(function () { + expireSessions(Sessions); + }, SESSION_EXPIRATION_TIME); }); };