implement blob archival and WIP blob iteration methods

pull/1/head
ansuz 5 years ago
parent d1e1a11e38
commit f2db93834b

@ -820,7 +820,21 @@ var removeOwnedChannel = function (Env, channelId, unsafeKey, cb) {
})); }));
}).nThen(function (w) { }).nThen(function (w) {
// remove the blob // remove the blob
Env.blobStore.remove(blobId, w(function (err) {
if (Env.retainData) {
return void Env.blobStore.archive.blob(blobId, w(function (err) {
Log.info('ARCHIVAL_OWNED_FILE_BY_OWNER_RPC', {
safeKey: safeKey,
blobId: blobId,
status: err? String(err): 'SUCCESS',
});
if (err) {
w.abort();
return void cb(err);
}
}));
}
Env.blobStore.remove.blob(blobId, w(function (err) {
Log.info('DELETION_OWNED_FILE_BY_OWNER_RPC', { Log.info('DELETION_OWNED_FILE_BY_OWNER_RPC', {
safeKey: safeKey, safeKey: safeKey,
blobId: blobId, blobId: blobId,
@ -833,7 +847,20 @@ var removeOwnedChannel = function (Env, channelId, unsafeKey, cb) {
})); }));
}).nThen(function () { }).nThen(function () {
// remove the proof // remove the proof
Env.blobStore.removeProof(safeKey, blobId, function (err) { if (Env.retainData) {
return void Env.blobStore.archive.proof(safeKey, blobId, function (err) {
Log.info("ARCHIVAL_PROOF_REMOVAL_BY_OWNER_RPC", {
safeKey: safeKey,
blobId: blobId,
status: err? String(err): 'SUCCESS',
});
if (err) {
return void cb("E_PROOF_REMOVAL");
}
});
}
Env.blobStore.remove.proof(safeKey, blobId, function (err) {
Log.info("DELETION_PROOF_REMOVAL_BY_OWNER_RPC", { Log.info("DELETION_PROOF_REMOVAL_BY_OWNER_RPC", {
safeKey: safeKey, safeKey: safeKey,
blobId: blobId, blobId: blobId,
@ -1692,6 +1719,7 @@ RPC.create = function (
BlobStore.create({ BlobStore.create({
blobPath: config.blobPath, blobPath: config.blobPath,
blobStagingPath: config.blobStagingPath, blobStagingPath: config.blobStagingPath,
archivePath: config.archivePath,
getSession: function (safeKey) { getSession: function (safeKey) {
return getSession(Sessions, safeKey); return getSession(Sessions, safeKey);
}, },

@ -5,6 +5,7 @@ var Path = require("path");
var BlobStore = module.exports; var BlobStore = module.exports;
var nThen = require("nthen"); var nThen = require("nthen");
var Semaphore = require("saferphore");
var Util = require("../lib/common-util"); var Util = require("../lib/common-util");
var isValidSafeKey = function (safeKey) { var isValidSafeKey = function (safeKey) {
@ -17,6 +18,10 @@ var isValidId = function (id) {
// helpers // helpers
var prependArchive = function (Env, path) {
return Path.join(Env.archivePath, path);
};
// /blob/<safeKeyPrefix>/<safeKey>/<blobPrefix>/<blobId> // /blob/<safeKeyPrefix>/<safeKey>/<blobPrefix>/<blobId>
var makeBlobPath = function (Env, blobId) { var makeBlobPath = function (Env, blobId) {
return Path.join(Env.blobPath, blobId.slice(0, 2), blobId); return Path.join(Env.blobPath, blobId.slice(0, 2), blobId);
@ -61,10 +66,9 @@ var isFile = function (filePath, cb) {
}); });
}; };
var makeFileStream = function (dir, full, cb) { var makeFileStream = function (full, cb) {
Fse.mkdirp(dir, function (e) { Fse.mkdirp(Path.dirname(full), function (e) {
if (e || !full) { // !full for pleasing flow, it's already checked if (e || !full) { // !full for pleasing flow, it's already checked
//WARN('makeFileStream', e);
return void cb(e ? e.message : 'INTERNAL_ERROR'); return void cb(e ? e.message : 'INTERNAL_ERROR');
} }
@ -113,7 +117,7 @@ var upload = function (Env, safeKey, content, cb) {
var stagePath = makeStagePath(Env, safeKey); var stagePath = makeStagePath(Env, safeKey);
if (!session.blobstage) { if (!session.blobstage) {
makeFileStream(Path.dirname(stagePath), stagePath, function (e, stream) { makeFileStream(stagePath, function (e, stream) {
if (!stream) { return void cb(e); } if (!stream) { return void cb(e); }
var blobstage = session.blobstage = stream; var blobstage = session.blobstage = stream;
@ -303,8 +307,130 @@ var isOwnedBy = function (Env, safeKey, blobId, cb) {
isFile(proofPath, cb); isFile(proofPath, cb);
}; };
var listFiles = function (Env, handler, cb) {
cb("NOT_IMPLEMENTED"); // archiveBlob
var archiveBlob = function (Env, blobId, cb) {
var blobPath = makeBlobPath(Env, blobId);
var archivePath = prependArchive(Env, blobPath);
Fse.move(blobPath, archivePath, { overwrite: true }, cb);
};
var removeArchivedBlob = function (Env, blobId, cb) {
var archivePath = prependArchive(Env, makeBlobPath(Env, blobId));
Fs.unlink(archivePath, cb);
};
// restoreBlob
var restoreBlob = function (Env, blobId, cb) {
var blobPath = makeBlobPath(Env, blobId);
var archivePath = prependArchive(Env, blobPath);
Fse.move(archivePath, blobPath, cb);
};
// archiveProof
var archiveProof = function (Env, safeKey, blobId, cb) {
var proofPath = makeProofPath(Env, safeKey, blobId);
var archivePath = prependArchive(Env, proofPath);
Fse.move(proofPath, archivePath, { overwrite: true }, cb);
};
var removeArchivedProof = function (Env, safeKey, blobId, cb) {
var archivedPath = prependArchive(Env, makeProofPath(Env, safeKey, blobId));
Fs.unlink(archivedPath, cb);
};
// restoreProof
var restoreProof = function (Env, safeKey, blobId, cb) {
var proofPath = makeProofPath(Env, safeKey, blobId);
var archivePath = prependArchive(Env, proofPath);
Fse.move(archivePath, proofPath, cb);
};
var makeWalker = function (n, handleChild, cb) {
if (!n || typeof(n) !== 'number' || n < 2) { n = 2; }
var W;
var nt = nThen(function (w) {
// this asynchronous bit defers the completion of this block until
// synchronous execution has completed. This means you must create
// the walker and start using it synchronously or else it will call back
// prematurely
setTimeout(w());
W = w;
}).nThen(function () {
cb();
});
// do no more than 20 jobs at a time
var tasks = Semaphore.create(n);
var recurse = function (path) {
tasks.take(function (give) {
var done = give(W());
Fs.readdir(path, function (err, dir) {
if (err) {
if (err.code === 'ENOTDIR') {
return void handleChild(path, done);
}
// XXX handle other error
return done();
}
// everything is fine and it's a directory...
if (dir.length === 0) { return done(); }
dir.forEach(function (d) {
recurse(Path.join(path, d));
});
done();
});
});
};
return recurse;
};
var listProofs = function (root, handler, cb) {
Fs.readdir(root, function (err, dir) {
if (err) { return void cb(err); }
var walk = makeWalker(20, function (path, next) {
// path is the path to a child node on the filesystem
// next handles the next job in a queue
// iterate over proofs
// check for presence of corresponding files
handler(path, next);
//console.log(path);
//next();
}, function () {
// called when there are no more directories or children to process
cb();
});
dir.forEach(function (d) {
// ignore directories that aren't 3 characters long...
if (d.length !== 3) { return; }
walk(Path.join(root, d));
});
});
};
var listBlobs = function (root, handler, cb) {
// iterate over files
Fs.readdir(root, function (err, dir) {
if (err) { return void cb(err); }
var walk = makeWalker(20, function (path, next) {
handler(path, next);
}, function () {
cb();
});
dir.forEach(function (d) {
if (d.length !== 2) { return; }
walk(Path.join(root, d));
});
});
}; };
BlobStore.create = function (config, _cb) { BlobStore.create = function (config, _cb) {
@ -316,6 +442,7 @@ BlobStore.create = function (config, _cb) {
var Env = { var Env = {
blobPath: config.blobPath || './blob', blobPath: config.blobPath || './blob',
blobStagingPath: config.blobStagingPath || './blobstage', blobStagingPath: config.blobStagingPath || './blobstage',
archivePath: config.archivePath || './data/archive',
getSession: config.getSession, getSession: config.getSession,
}; };
@ -327,8 +454,12 @@ BlobStore.create = function (config, _cb) {
Fse.mkdirp(Env.blobStagingPath, w(function (e) { Fse.mkdirp(Env.blobStagingPath, w(function (e) {
if (e) { CB(e); } if (e) { CB(e); }
})); }));
Fse.mkdirp(Path.join(Env.archivePath, Env.blobPath), w(function (e) {
if (e) { CB(e); }
}));
}).nThen(function () { }).nThen(function () {
cb(void 0, { var methods = {
isFileId: isValidId, isFileId: isValidId,
status: function (safeKey, _cb) { status: function (safeKey, _cb) {
// TODO check if the final destination is a file // TODO check if the final destination is a file
@ -357,17 +488,59 @@ BlobStore.create = function (config, _cb) {
isOwnedBy(Env, safeKey, blobId, cb); isOwnedBy(Env, safeKey, blobId, cb);
}, },
remove: function (blobId, _cb) { remove: {
var cb = Util.once(Util.mkAsync(_cb)); blob: function (blobId, _cb) {
if (!isValidId(blobId)) { return void cb("INVALID_ID"); } var cb = Util.once(Util.mkAsync(_cb));
remove(Env, blobId, cb); if (!isValidId(blobId)) { return void cb("INVALID_ID"); }
remove(Env, blobId, cb);
},
proof: function (safeKey, blobId, _cb) {
var cb = Util.once(Util.mkAsync(_cb));
if (!isValidSafeKey(safeKey)) { return void cb('INVALID_SAFEKEY'); }
if (!isValidId(blobId)) { return void cb("INVALID_ID"); }
removeProof(Env, safeKey, blobId, cb);
},
archived: {
blob: function (blobId, _cb) {
var cb = Util.once(Util.mkAsync(_cb));
if (!isValidId(blobId)) { return void cb("INVALID_ID"); }
removeArchivedBlob(Env, blobId, cb);
},
proof: function (safeKey, blobId, _cb) {
var cb = Util.once(Util.mkAsync(_cb));
if (!isValidSafeKey(safeKey)) { return void cb('INVALID_SAFEKEY'); }
if (!isValidId(blobId)) { return void cb("INVALID_ID"); }
removeArchivedProof(Env, safeKey, blobId, cb);
},
},
}, },
removeProof: function (safeKey, blobId, _cb) { archive: {
var cb = Util.once(Util.mkAsync(_cb)); blob: function (blobId, _cb) {
if (!isValidSafeKey(safeKey)) { return void cb('INVALID_SAFEKEY'); } var cb = Util.once(Util.mkAsync(_cb));
if (!isValidId(blobId)) { return void cb("INVALID_ID"); } if (!isValidId(blobId)) { return void cb("INVALID_ID"); }
removeProof(Env, safeKey, blobId, cb); archiveBlob(Env, blobId, cb);
},
proof: function (safeKey, blobId, _cb) {
var cb = Util.once(Util.mkAsync(_cb));
if (!isValidSafeKey(safeKey)) { return void cb('INVALID_SAFEKEY'); }
if (!isValidId(blobId)) { return void cb("INVALID_ID"); }
archiveProof(Env, safeKey, blobId, cb);
},
},
restore: {
blob: function (blobId, _cb) {
var cb = Util.once(Util.mkAsync(_cb));
if (!isValidId(blobId)) { return void cb("INVALID_ID"); }
restoreBlob(Env, blobId, cb);
},
proof: function (safeKey, blobId, _cb) {
var cb = Util.once(Util.mkAsync(_cb));
if (!isValidSafeKey(safeKey)) { return void cb('INVALID_SAFEKEY'); }
if (!isValidId(blobId)) { return void cb("INVALID_ID"); }
restoreProof(Env, safeKey, blobId, cb);
},
}, },
complete: function (safeKey, id, _cb) { complete: function (safeKey, id, _cb) {
@ -388,11 +561,29 @@ BlobStore.create = function (config, _cb) {
getUploadSize(Env, id, cb); getUploadSize(Env, id, cb);
}, },
list: function (handler, _cb) { list: {
var cb = Util.once(Util.mkAsync(_cb)); blobs: function (handler, _cb) {
listFiles(Env, handler, cb); var cb = Util.once(Util.mkAsync(_cb));
listBlobs(Env.blobPath, handler, cb);
},
proofs: function (handler, _cb) {
var cb = Util.once(Util.mkAsync(_cb));
listProofs(Env.blobPath, handler, cb);
},
archived: {
proofs: function (handler, _cb) {
var cb = Util.once(Util.mkAsync(_cb));
listProofs(prependArchive(Env, Env.blobPath), handler, cb);
},
blobs: function (handler, _cb) {
var cb = Util.once(Util.mkAsync(_cb));
listBlobs(prependArchive(Env, Env.blobPath), handler, cb);
},
}
}, },
}); };
cb(void 0, methods);
}); });
}; };

Loading…
Cancel
Save