From e1850088461490437b46d100d7d4baa035d632b8 Mon Sep 17 00:00:00 2001 From: ansuz Date: Wed, 18 Nov 2020 18:26:10 +0530 Subject: [PATCH] complete uploads in child processes also fix a nasty race condition for unowned file uploads --- lib/commands/upload.js | 22 ++++------------------ lib/storage/blob.js | 39 +++++++++++++++++++-------------------- lib/workers/db-worker.js | 33 +++++++++++++++++++++++++++++++++ lib/workers/index.js | 9 +++++++++ 4 files changed, 65 insertions(+), 38 deletions(-) diff --git a/lib/commands/upload.js b/lib/commands/upload.js index 7286caa93..6dc0aa911 100644 --- a/lib/commands/upload.js +++ b/lib/commands/upload.js @@ -75,21 +75,9 @@ Upload.upload = function (Env, safeKey, chunk, cb) { Env.blobStore.upload(safeKey, chunk, cb); }; -var reportStatus = function (Env, label, safeKey, err, id) { - var data = { - safeKey: safeKey, - err: err && err.message || err, - id: id, - }; - var method = err? 'error': 'info'; - Env.Log[method](label, data); -}; - Upload.complete = function (Env, safeKey, arg, cb) { - Env.blobStore.complete(safeKey, arg, function (err, id) { - reportStatus(Env, 'UPLOAD_COMPLETE', safeKey, err, id); - cb(err, id); - }); + Env.blobStore.closeBlobstage(safeKey); + Env.completeUpload(safeKey, arg, false, cb); }; Upload.cancel = function (Env, safeKey, arg, cb) { @@ -97,9 +85,7 @@ Upload.cancel = function (Env, safeKey, arg, cb) { }; Upload.complete_owned = function (Env, safeKey, arg, cb) { - Env.blobStore.completeOwned(safeKey, arg, function (err, id) { - reportStatus(Env, 'UPLOAD_COMPLETE_OWNED', safeKey, err, id); - cb(err, id); - }); + Env.blobStore.closeBlobstage(safeKey); + Env.completeUpload(safeKey, arg, true, cb); }; diff --git a/lib/storage/blob.js b/lib/storage/blob.js index dfbc802b4..044eeaeaa 100644 --- a/lib/storage/blob.js +++ b/lib/storage/blob.js @@ -139,6 +139,15 @@ var upload = function (Env, safeKey, content, cb) { } }; +var closeBlobstage = function (Env, safeKey) { + var session = Env.getSession(safeKey); + if (!(session && session.blobstage && typeof(session.blobstage.close) === 'function')) { + return; + } + session.blobstage.close(); + delete session.blobstage; +}; + // upload_cancel var upload_cancel = function (Env, safeKey, fileSize, cb) { var session = Env.getSession(safeKey); @@ -159,27 +168,22 @@ var upload_cancel = function (Env, safeKey, fileSize, cb) { // upload_complete var upload_complete = function (Env, safeKey, id, cb) { - var session = Env.getSession(safeKey); - - if (session.blobstage && session.blobstage.close) { - session.blobstage.close(); - delete session.blobstage; - } + closeBlobstage(Env, safeKey); var oldPath = makeStagePath(Env, safeKey); var newPath = makeBlobPath(Env, id); nThen(function (w) { // make sure the path to your final location exists - Fse.mkdirp(Path.dirname(newPath), function (e) { + Fse.mkdirp(Path.dirname(newPath), w(function (e) { if (e) { w.abort(); return void cb('RENAME_ERR'); } - }); + })); }).nThen(function (w) { // make sure there's not already something in that exact location - isFile(newPath, function (e, yes) { + isFile(newPath, w(function (e, yes) { if (e) { w.abort(); return void cb(e); @@ -188,8 +192,8 @@ var upload_complete = function (Env, safeKey, id, cb) { w.abort(); return void cb('RENAME_ERR'); } - cb(void 0, newPath, id); - }); + cb(void 0, id); + })); }).nThen(function () { // finally, move the old file to the new path // FIXME we could just move and handle the EEXISTS instead of the above block @@ -217,15 +221,7 @@ var tryId = function (path, cb) { // owned_upload_complete var owned_upload_complete = function (Env, safeKey, id, cb) { - var session = Env.getSession(safeKey); - - // the file has already been uploaded to the staging area - // close the pending writestream - if (session.blobstage && session.blobstage.close) { - session.blobstage.close(); - delete session.blobstage; - } - + closeBlobstage(Env, safeKey); if (!isValidId(id)) { return void cb('EINVAL_ID'); } @@ -582,6 +578,9 @@ BlobStore.create = function (config, _cb) { }, }, + closeBlobstage: function (safeKey) { + closeBlobstage(Env, safeKey); + }, complete: function (safeKey, id, _cb) { var cb = Util.once(Util.mkAsync(_cb)); if (!isValidSafeKey(safeKey)) { return void cb('INVALID_SAFEKEY'); } diff --git a/lib/workers/db-worker.js b/lib/workers/db-worker.js index 0b4c4d03d..9d5abf386 100644 --- a/lib/workers/db-worker.js +++ b/lib/workers/db-worker.js @@ -457,6 +457,38 @@ const evictInactive = function (data, cb) { Eviction(Env, cb); }; +var reportStatus = function (Env, label, safeKey, err, id) { + var data = { + safeKey: safeKey, + err: err && err.message || err, + id: id, + }; + var method = err? 'error': 'info'; + Env.Log[method](label, data); +}; + +const completeUpload = function (data, cb) { + if (!data) { return void cb('INVALID_ARGS'); } + var owned = data.owned; + var safeKey = data.safeKey; + var arg = data.arg; + + var method; + var label; + if (owned) { + method = 'completeOwned'; + label = 'UPLOAD_COMPLETE_OWNED'; + } else { + method = 'complete'; + label = 'UPLOAD_COMPLETE'; + } + + Env.blobStore[method](safeKey, arg, function (err, id) { + reportStatus(Env, label, safeKey, err, id); + cb(err, id); + }); +}; + const COMMANDS = { COMPUTE_INDEX: computeIndex, COMPUTE_METADATA: computeMetadata, @@ -471,6 +503,7 @@ const COMMANDS = { RUN_TASKS: runTasks, WRITE_TASK: writeTask, EVICT_INACTIVE: evictInactive, + COMPLETE_UPLOAD: completeUpload, }; COMMANDS.INLINE = function (data, cb) { diff --git a/lib/workers/index.js b/lib/workers/index.js index 522339812..b5704c68b 100644 --- a/lib/workers/index.js +++ b/lib/workers/index.js @@ -424,6 +424,15 @@ Workers.initialize = function (Env, config, _cb) { }, cb); }; + Env.completeUpload = function (safeKey, arg, owned, cb) { + sendCommand({ + command: "COMPLETE_UPLOAD", + owned: owned, + safeKey: safeKey, + arg: arg, + }, cb); + }; + cb(void 0); }); };