From f2db93834b3bf6e7081fd26c32ca5db9cc2090f3 Mon Sep 17 00:00:00 2001 From: ansuz Date: Fri, 20 Sep 2019 16:43:52 +0200 Subject: [PATCH] implement blob archival and WIP blob iteration methods --- rpc.js | 32 ++++++- storage/blob.js | 231 +++++++++++++++++++++++++++++++++++++++++++----- 2 files changed, 241 insertions(+), 22 deletions(-) diff --git a/rpc.js b/rpc.js index 62a8690a2..f7d2d2fcb 100644 --- a/rpc.js +++ b/rpc.js @@ -820,7 +820,21 @@ var removeOwnedChannel = function (Env, channelId, unsafeKey, cb) { })); }).nThen(function (w) { // remove the blob - Env.blobStore.remove(blobId, w(function (err) { + + if (Env.retainData) { + return void Env.blobStore.archive.blob(blobId, w(function (err) { + Log.info('ARCHIVAL_OWNED_FILE_BY_OWNER_RPC', { + safeKey: safeKey, + blobId: blobId, + status: err? String(err): 'SUCCESS', + }); + if (err) { + w.abort(); + return void cb(err); + } + })); + } + Env.blobStore.remove.blob(blobId, w(function (err) { Log.info('DELETION_OWNED_FILE_BY_OWNER_RPC', { safeKey: safeKey, blobId: blobId, @@ -833,7 +847,20 @@ var removeOwnedChannel = function (Env, channelId, unsafeKey, cb) { })); }).nThen(function () { // remove the proof - Env.blobStore.removeProof(safeKey, blobId, function (err) { + if (Env.retainData) { + return void Env.blobStore.archive.proof(safeKey, blobId, function (err) { + Log.info("ARCHIVAL_PROOF_REMOVAL_BY_OWNER_RPC", { + safeKey: safeKey, + blobId: blobId, + status: err? String(err): 'SUCCESS', + }); + if (err) { + return void cb("E_PROOF_REMOVAL"); + } + }); + } + + Env.blobStore.remove.proof(safeKey, blobId, function (err) { Log.info("DELETION_PROOF_REMOVAL_BY_OWNER_RPC", { safeKey: safeKey, blobId: blobId, @@ -1692,6 +1719,7 @@ RPC.create = function ( BlobStore.create({ blobPath: config.blobPath, blobStagingPath: config.blobStagingPath, + archivePath: config.archivePath, getSession: function (safeKey) { return getSession(Sessions, safeKey); }, diff --git a/storage/blob.js b/storage/blob.js index 317d7755f..fd39a7f67 100644 --- a/storage/blob.js +++ b/storage/blob.js @@ -5,6 +5,7 @@ var Path = require("path"); var BlobStore = module.exports; var nThen = require("nthen"); +var Semaphore = require("saferphore"); var Util = require("../lib/common-util"); var isValidSafeKey = function (safeKey) { @@ -17,6 +18,10 @@ var isValidId = function (id) { // helpers +var prependArchive = function (Env, path) { + return Path.join(Env.archivePath, path); +}; + // /blob//// var makeBlobPath = function (Env, blobId) { return Path.join(Env.blobPath, blobId.slice(0, 2), blobId); @@ -61,10 +66,9 @@ var isFile = function (filePath, cb) { }); }; -var makeFileStream = function (dir, full, cb) { - Fse.mkdirp(dir, function (e) { +var makeFileStream = function (full, cb) { + Fse.mkdirp(Path.dirname(full), function (e) { if (e || !full) { // !full for pleasing flow, it's already checked - //WARN('makeFileStream', e); return void cb(e ? e.message : 'INTERNAL_ERROR'); } @@ -113,7 +117,7 @@ var upload = function (Env, safeKey, content, cb) { var stagePath = makeStagePath(Env, safeKey); if (!session.blobstage) { - makeFileStream(Path.dirname(stagePath), stagePath, function (e, stream) { + makeFileStream(stagePath, function (e, stream) { if (!stream) { return void cb(e); } var blobstage = session.blobstage = stream; @@ -303,8 +307,130 @@ var isOwnedBy = function (Env, safeKey, blobId, cb) { isFile(proofPath, cb); }; -var listFiles = function (Env, handler, cb) { - cb("NOT_IMPLEMENTED"); + +// archiveBlob +var archiveBlob = function (Env, blobId, cb) { + var blobPath = makeBlobPath(Env, blobId); + var archivePath = prependArchive(Env, blobPath); + Fse.move(blobPath, archivePath, { overwrite: true }, cb); +}; + +var removeArchivedBlob = function (Env, blobId, cb) { + var archivePath = prependArchive(Env, makeBlobPath(Env, blobId)); + Fs.unlink(archivePath, cb); +}; + +// restoreBlob +var restoreBlob = function (Env, blobId, cb) { + var blobPath = makeBlobPath(Env, blobId); + var archivePath = prependArchive(Env, blobPath); + Fse.move(archivePath, blobPath, cb); +}; + +// archiveProof +var archiveProof = function (Env, safeKey, blobId, cb) { + var proofPath = makeProofPath(Env, safeKey, blobId); + var archivePath = prependArchive(Env, proofPath); + Fse.move(proofPath, archivePath, { overwrite: true }, cb); +}; + +var removeArchivedProof = function (Env, safeKey, blobId, cb) { + var archivedPath = prependArchive(Env, makeProofPath(Env, safeKey, blobId)); + Fs.unlink(archivedPath, cb); +}; + +// restoreProof +var restoreProof = function (Env, safeKey, blobId, cb) { + var proofPath = makeProofPath(Env, safeKey, blobId); + var archivePath = prependArchive(Env, proofPath); + Fse.move(archivePath, proofPath, cb); +}; + +var makeWalker = function (n, handleChild, cb) { + if (!n || typeof(n) !== 'number' || n < 2) { n = 2; } + + var W; + var nt = nThen(function (w) { + // this asynchronous bit defers the completion of this block until + // synchronous execution has completed. This means you must create + // the walker and start using it synchronously or else it will call back + // prematurely + setTimeout(w()); + W = w; + }).nThen(function () { + cb(); + }); + + // do no more than 20 jobs at a time + var tasks = Semaphore.create(n); + + var recurse = function (path) { + tasks.take(function (give) { + var done = give(W()); + Fs.readdir(path, function (err, dir) { + if (err) { + if (err.code === 'ENOTDIR') { + return void handleChild(path, done); + } + // XXX handle other error + return done(); + } + // everything is fine and it's a directory... + if (dir.length === 0) { return done(); } + dir.forEach(function (d) { + recurse(Path.join(path, d)); + }); + done(); + }); + }); + }; + + return recurse; +}; + +var listProofs = function (root, handler, cb) { + Fs.readdir(root, function (err, dir) { + if (err) { return void cb(err); } + + var walk = makeWalker(20, function (path, next) { + // path is the path to a child node on the filesystem + + // next handles the next job in a queue + + // iterate over proofs + // check for presence of corresponding files + + handler(path, next); + //console.log(path); + //next(); + }, function () { + // called when there are no more directories or children to process + cb(); + }); + + dir.forEach(function (d) { + // ignore directories that aren't 3 characters long... + if (d.length !== 3) { return; } + walk(Path.join(root, d)); + }); + }); +}; + +var listBlobs = function (root, handler, cb) { + // iterate over files + Fs.readdir(root, function (err, dir) { + if (err) { return void cb(err); } + var walk = makeWalker(20, function (path, next) { + handler(path, next); + }, function () { + cb(); + }); + + dir.forEach(function (d) { + if (d.length !== 2) { return; } + walk(Path.join(root, d)); + }); + }); }; BlobStore.create = function (config, _cb) { @@ -316,6 +442,7 @@ BlobStore.create = function (config, _cb) { var Env = { blobPath: config.blobPath || './blob', blobStagingPath: config.blobStagingPath || './blobstage', + archivePath: config.archivePath || './data/archive', getSession: config.getSession, }; @@ -327,8 +454,12 @@ BlobStore.create = function (config, _cb) { Fse.mkdirp(Env.blobStagingPath, w(function (e) { if (e) { CB(e); } })); + + Fse.mkdirp(Path.join(Env.archivePath, Env.blobPath), w(function (e) { + if (e) { CB(e); } + })); }).nThen(function () { - cb(void 0, { + var methods = { isFileId: isValidId, status: function (safeKey, _cb) { // TODO check if the final destination is a file @@ -357,17 +488,59 @@ BlobStore.create = function (config, _cb) { isOwnedBy(Env, safeKey, blobId, cb); }, - remove: function (blobId, _cb) { - var cb = Util.once(Util.mkAsync(_cb)); - if (!isValidId(blobId)) { return void cb("INVALID_ID"); } - remove(Env, blobId, cb); + remove: { + blob: function (blobId, _cb) { + var cb = Util.once(Util.mkAsync(_cb)); + if (!isValidId(blobId)) { return void cb("INVALID_ID"); } + remove(Env, blobId, cb); + }, + proof: function (safeKey, blobId, _cb) { + var cb = Util.once(Util.mkAsync(_cb)); + if (!isValidSafeKey(safeKey)) { return void cb('INVALID_SAFEKEY'); } + if (!isValidId(blobId)) { return void cb("INVALID_ID"); } + removeProof(Env, safeKey, blobId, cb); + }, + archived: { + blob: function (blobId, _cb) { + var cb = Util.once(Util.mkAsync(_cb)); + if (!isValidId(blobId)) { return void cb("INVALID_ID"); } + removeArchivedBlob(Env, blobId, cb); + }, + proof: function (safeKey, blobId, _cb) { + var cb = Util.once(Util.mkAsync(_cb)); + if (!isValidSafeKey(safeKey)) { return void cb('INVALID_SAFEKEY'); } + if (!isValidId(blobId)) { return void cb("INVALID_ID"); } + removeArchivedProof(Env, safeKey, blobId, cb); + }, + }, }, - removeProof: function (safeKey, blobId, _cb) { - var cb = Util.once(Util.mkAsync(_cb)); - if (!isValidSafeKey(safeKey)) { return void cb('INVALID_SAFEKEY'); } - if (!isValidId(blobId)) { return void cb("INVALID_ID"); } - removeProof(Env, safeKey, blobId, cb); + archive: { + blob: function (blobId, _cb) { + var cb = Util.once(Util.mkAsync(_cb)); + if (!isValidId(blobId)) { return void cb("INVALID_ID"); } + archiveBlob(Env, blobId, cb); + }, + proof: function (safeKey, blobId, _cb) { + var cb = Util.once(Util.mkAsync(_cb)); + if (!isValidSafeKey(safeKey)) { return void cb('INVALID_SAFEKEY'); } + if (!isValidId(blobId)) { return void cb("INVALID_ID"); } + archiveProof(Env, safeKey, blobId, cb); + }, + }, + + restore: { + blob: function (blobId, _cb) { + var cb = Util.once(Util.mkAsync(_cb)); + if (!isValidId(blobId)) { return void cb("INVALID_ID"); } + restoreBlob(Env, blobId, cb); + }, + proof: function (safeKey, blobId, _cb) { + var cb = Util.once(Util.mkAsync(_cb)); + if (!isValidSafeKey(safeKey)) { return void cb('INVALID_SAFEKEY'); } + if (!isValidId(blobId)) { return void cb("INVALID_ID"); } + restoreProof(Env, safeKey, blobId, cb); + }, }, complete: function (safeKey, id, _cb) { @@ -388,11 +561,29 @@ BlobStore.create = function (config, _cb) { getUploadSize(Env, id, cb); }, - list: function (handler, _cb) { - var cb = Util.once(Util.mkAsync(_cb)); - listFiles(Env, handler, cb); + list: { + blobs: function (handler, _cb) { + var cb = Util.once(Util.mkAsync(_cb)); + listBlobs(Env.blobPath, handler, cb); + }, + proofs: function (handler, _cb) { + var cb = Util.once(Util.mkAsync(_cb)); + listProofs(Env.blobPath, handler, cb); + }, + archived: { + proofs: function (handler, _cb) { + var cb = Util.once(Util.mkAsync(_cb)); + listProofs(prependArchive(Env, Env.blobPath), handler, cb); + }, + blobs: function (handler, _cb) { + var cb = Util.once(Util.mkAsync(_cb)); + listBlobs(prependArchive(Env, Env.blobPath), handler, cb); + }, + } }, - }); + }; + + cb(void 0, methods); }); };