From b2df2ba341ceffdd876679188c1e355fcb1c014a Mon Sep 17 00:00:00 2001 From: ansuz Date: Mon, 12 Mar 2018 17:50:47 +0100 Subject: [PATCH] prototype owned file upload --- package.json | 1 + rpc.js | 96 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 97 insertions(+) diff --git a/package.json b/package.json index 3ab228851..e94c51a14 100644 --- a/package.json +++ b/package.json @@ -6,6 +6,7 @@ "dependencies": { "chainpad-server": "^2.0.0", "express": "~4.16.0", + "mkdirp": "^0.5.1", "nthen": "~0.1.0", "pull-stream": "^3.6.1", "replify": "^1.2.0", diff --git a/rpc.js b/rpc.js index 465413fd1..3ad1a9a42 100644 --- a/rpc.js +++ b/rpc.js @@ -13,6 +13,7 @@ const Package = require('./package.json'); const Pinned = require('./pinned'); const Saferphore = require("saferphore"); const nThen = require("nthen"); +const Mkdirp = require("mkdirp"); var RPC = module.exports; @@ -980,6 +981,93 @@ var upload_complete = function (Env, publicKey, cb) { tryRandomLocation(handleMove); }; +var owned_upload_complete = function (Env, safeKey, cb) { + var session = getSession(Env.Sessions, safeKey); + + // the file has already been uploaded to the staging area + // close the pending writestream + if (session.blobstage && session.blobstage.close) { + session.blobstage.close(); + delete session.blobstage; + } + + var oldPath = makeFilePath(Env.paths.staging, safeKey); + + // construct relevant paths + var root = Env.paths.staging; + + //var safeKey = escapeKeyCharacters(safeKey); + var safeKeyPrefix = safeKey.slice(0, 2); + + var blobId = createFileId(); + var blobIdPrefix = blobId.slice(0, 2); + + var plannedPath = Path.join(root, safeKeyPrefix, safeKey, blobIdPrefix); + + var tries = 0; + + var chooseSafeId = function (cb) { + if (tries >= 3) { + // you've already failed three times in a row + // give up and return an error + cb('E_REPEATED_FAILURE'); + } + + var path = Path.join(plannedPath, blobId); + Fs.access(path, Fs.constants.R_OK | Fs.constants.W_OK, function (e) { + if (!e) { + // generate a new id (with the same prefix) and recurse + blobId = blobIdPrefix + createFileId().slice(2); + return void chooseSafeId(cb); + } else if (e.code === 'ENOENT') { + // no entry, so it's safe for us to proceed + return void cb(void 0, path); + } else { + // it failed in an unexpected way. log it + // try again, but no more than a fixed number of times... + tries++; + chooseSafeId(cb); + } + }); + }; + + // the user wants to move it into their own space + // /blob/safeKeyPrefix/safeKey/blobPrefix/blobID + + var finalPath; + nThen(function (w) { + // make the requisite directory structure using Mkdirp + Mkdirp(plannedPath, w(function (e /*, path */) { + if (e) { // does not throw error if the directory already existed + w.abort(); + return void cb(e); // XXX do we export Errors or strings? + } + })); + }).nThen(function (w) { + // produce an id which confirmably does not collide with another + chooseSafeId(w(function (e, path) { + if (e) { + w.abort(); + return void cb(e); + } + finalPath = path; // this is where you'll put the new file + })); + }).nThen(function (w) { + // move the existing file to its new path + Fs.rename(oldPath /* XXX */, finalPath, w(function (e) { + if (e) { + w.abort(); + return void cb(e.code); + } + // otherwise it worked... + })); + }).nThen(function () { + // clean up their session when you're done + // call back with the blob id... + cb(void 0, blobId); + }); +}; + var upload_status = function (Env, publicKey, filesize, cb) { var paths = Env.paths; @@ -1054,6 +1142,7 @@ var isAuthenticatedCall = function (call) { 'GET_LIMIT', 'UPLOAD_STATUS', 'UPLOAD_COMPLETE', + 'OWNED_UPLOAD_COMPLETE', 'UPLOAD_CANCEL', 'EXPIRE_SESSION', 'CLEAR_OWNED_CHANNEL', @@ -1128,6 +1217,7 @@ RPC.create = function ( var pinPath = paths.pin = keyOrDefaultString('pinPath', './pins'); var blobPath = paths.blob = keyOrDefaultString('blobPath', './blob'); var blobStagingPath = paths.staging = keyOrDefaultString('blobStagingPath', './blobstage'); + console.log(blobStagingPath); var isUnauthenticateMessage = function (msg) { return msg && msg.length === 2 && isUnauthenticatedCall(msg[0]); @@ -1375,6 +1465,12 @@ RPC.create = function ( WARN(e, hash); Respond(e, hash); }); + case 'OWNED_UPLOAD_COMPLETE': + if (!privileged) { return deny(); } + return void owned_upload_complete(Env, safeKey, function (e, blobId) { + WARN(e, blobId); + Respond(e, blobId); + }); case 'UPLOAD_CANCEL': if (!privileged) { return deny(); } return void upload_cancel(Env, safeKey, function (e) {