replace inline blob management functionality with dedicated module

pull/1/head
ansuz 5 years ago
parent c7f70d7264
commit e5cea3c250

643
rpc.js

@ -21,9 +21,15 @@ const Meta = require("./lib/metadata");
const WriteQueue = require("./lib/write-queue");
const BatchRead = require("./lib/batch-read");
const Util = require("./lib/common-util");
const escapeKeyCharacters = Util.escapeKeyCharacters;
const unescapeKeyCharacters = Util.unescapeKeyCharacters;
const mkEvent = Util.mkEvent;
var RPC = module.exports;
var Store = require("./storage/file");
var BlobStore = require("./storage/blob");
var DEFAULT_LIMIT = 50 * 1024 * 1024;
var SESSION_EXPIRATION_TIME = 60 * 1000;
@ -45,42 +51,6 @@ var isValidId = function (chan) {
[32, 48].indexOf(chan.length) > -1;
};
/*
var uint8ArrayToHex = function (a) {
// call slice so Uint8Arrays work as expected
return Array.prototype.slice.call(a).map(function (e) {
var n = Number(e & 0xff).toString(16);
if (n === 'NaN') {
throw new Error('invalid input resulted in NaN');
}
switch (n.length) {
case 0: return '00'; // just being careful, shouldn't happen
case 1: return '0' + n;
case 2: return n;
default: throw new Error('unexpected value');
}
}).join('');
};
*/
var testFileId = function (id) {
if (id.length !== 48 || /[^a-f0-9]/.test(id)) {
return false;
}
return true;
};
/*
var createFileId = function () {
var id = uint8ArrayToHex(Nacl.randomBytes(24));
if (!testFileId(id)) {
throw new Error('file ids must consist of 48 hex characters');
}
return id;
};
*/
var makeToken = function () {
return Number(Math.floor(Math.random() * Number.MAX_SAFE_INTEGER))
.toString(16);
@ -110,14 +80,6 @@ var parseCookie = function (cookie) {
return c;
};
var escapeKeyCharacters = function (key) {
return key && key.replace && key.replace(/\//g, '-');
};
var unescapeKeyCharacters = function (key) {
return key.replace(/\-/g, '/');
};
var getSession = function (Sessions, key) {
var safeKey = escapeKeyCharacters(key);
if (Sessions[safeKey]) {
@ -272,28 +234,6 @@ var getChannelList = function (Env, publicKey, cb) {
});
};
var makeFilePath = function (root, id) { // FIXME FILES
if (typeof(id) !== 'string' || id.length <= 2) { return null; }
return Path.join(root, id.slice(0, 2), id);
};
var getUploadSize = function (Env, channel, cb) { // FIXME FILES
var paths = Env.paths;
var path = makeFilePath(paths.blob, channel);
if (!path) {
return cb('INVALID_UPLOAD_ID');
}
Fs.stat(path, function (err, stats) {
if (err) {
// if a file was deleted, its size is 0 bytes
if (err.code === 'ENOENT') { return cb(void 0, 0); }
return void cb(err.code);
}
cb(void 0, stats.size);
});
};
var getFileSize = function (Env, channel, cb) {
if (!isValidId(channel)) { return void cb('INVALID_CHAN'); }
if (channel.length === 32) {
@ -311,7 +251,7 @@ var getFileSize = function (Env, channel, cb) {
}
// 'channel' refers to a file, so you need another API
getUploadSize(Env, channel, function (e, size) {
Env.blobStore.size(channel, function (e, size) {
if (typeof(size) === 'undefined') { return void cb(e); }
cb(void 0, size);
});
@ -481,15 +421,14 @@ var getTotalSize = function (Env, publicKey, cb) {
return void getChannelList(Env, publicKey, function (channels) {
if (!channels) { return done('INVALID_PIN_LIST'); } // unexpected
var count = channels.length;
if (!count) { return void done(void 0, 0); }
channels.forEach(function (channel) { // FIXME this might as well be nThen
getFileSize(Env, channel, function (e, size) {
count--;
nThen(function (w) {
channels.forEach(function (channel) { // TODO semaphore?
getFileSize(Env, channel, w(function (e, size) {
if (!e) { bytes += size; }
if (count === 0) { return done(void 0, bytes); }
}));
});
}).nThen(function () {
done(void 0, bytes);
});
});
});
@ -838,48 +777,6 @@ var resetUserPins = function (Env, publicKey, channelList, cb) {
});
};
var makeFileStream = function (root, id, cb) { // FIXME FILES
var stub = id.slice(0, 2);
var full = makeFilePath(root, id);
if (!full) {
WARN('makeFileStream', 'invalid id ' + id);
return void cb('BAD_ID');
}
Fse.mkdirp(Path.join(root, stub), function (e) {
if (e || !full) { // !full for pleasing flow, it's already checked
WARN('makeFileStream', e);
return void cb(e ? e.message : 'INTERNAL_ERROR');
}
try {
var stream = Fs.createWriteStream(full, {
flags: 'a',
encoding: 'binary',
highWaterMark: Math.pow(2, 16),
});
stream.on('open', function () {
cb(void 0, stream);
});
stream.on('error', function (e) {
WARN('stream error', e);
});
} catch (err) {
cb('BAD_STREAM');
}
});
};
var isFile = function (filePath, cb) { // FIXME FILES
/*:: if (typeof(filePath) !== 'string') { throw new Error('should never happen'); } */
Fs.stat(filePath, function (e, stats) {
if (e) {
if (e.code === 'ENOENT') { return void cb(void 0, false); }
return void cb(e.message);
}
return void cb(void 0, stats.isFile());
});
};
var clearOwnedChannel = function (Env, channelId, unsafeKey, cb) {
if (typeof(channelId) !== 'string' || channelId.length !== 32) {
return cb('INVALID_ARGUMENTS');
@ -899,74 +796,49 @@ var clearOwnedChannel = function (Env, channelId, unsafeKey, cb) {
});
};
var removeOwnedBlob = function (Env, blobId, unsafeKey, cb) { // FIXME FILES // FIXME METADATA
var safeKey = escapeKeyCharacters(unsafeKey);
var safeKeyPrefix = safeKey.slice(0,3);
var blobPrefix = blobId.slice(0,2);
var removeOwnedChannel = function (Env, channelId, unsafeKey, cb) {
if (typeof(channelId) !== 'string' || !isValidId(channelId)) {
return cb('INVALID_ARGUMENTS');
}
var blobPath = makeFilePath(Env.paths.blob, blobId);
var ownPath = Path.join(Env.paths.blob, safeKeyPrefix, safeKey, blobPrefix, blobId);
if (Env.blobStore.isFileId(channelId)) {
var safeKey = escapeKeyCharacters(unsafeKey);
var blobId = channelId;
nThen(function (w) {
// Check if the blob exists
isFile(blobPath, w(function (e, isFile) {
if (e) {
w.abort();
return void cb(e);
}
if (!isFile) {
WARN('removeOwnedBlob', 'The provided blob ID is not a file!');
w.abort();
return void cb('EINVAL_BLOBID');
}
}));
}).nThen(function (w) {
// Check if you're the owner
isFile(ownPath, w(function (e, isFile) {
if (e) {
w.abort();
return void cb(e);
}
if (!isFile) {
WARN('removeOwnedBlob', 'Incorrect owner');
return void nThen(function (w) {
// check if you have permissions
Env.blobStore.isOwnedBy(safeKey, blobId, w(function (err, owned) {
if (err || !owned) {
w.abort();
return void cb('INSUFFICIENT_PERMISSIONS');
return void cb("INSUFFICIENT_PERMISSIONS");
}
}));
}).nThen(function (w) {
// Delete the blob
/*:: if (typeof(blobPath) !== 'string') { throw new Error('should never happen'); } */
Fs.unlink(blobPath, w(function (e) { // TODO move to cold storage
// remove the blob
Env.blobStore.remove(blobId, w(function (err) {
Log.info('DELETION_OWNED_FILE_BY_OWNER_RPC', {
safeKey: safeKey,
blobPath: blobPath,
status: e? String(e): 'SUCCESS',
blobId: blobId,
status: err? String(err): 'SUCCESS',
});
if (e) {
if (err) {
w.abort();
return void cb(e.code);
return void cb(err);
}
}));
}).nThen(function () {
// Delete the proof of ownership
Fs.unlink(ownPath, function (e) {
Log.info('DELETION_OWNED_FILE_PROOF_BY_OWNER_RPC', {
// remove the proof
Env.blobStore.removeProof(safeKey, blobId, function (err) {
Log.info("DELETION_PROOF_REMOVAL_BY_OWNER_RPC", {
safeKey: safeKey,
proofPath: ownPath,
status: e? String(e): 'SUCCESS',
blobId: blobId,
status: err? String(err): 'SUCCESS',
});
cb(e && e.code);
if (err) {
return void cb("E_PROOF_REMOVAL");
}
});
});
};
var removeOwnedChannel = function (Env, channelId, unsafeKey, cb) {
if (typeof(channelId) !== 'string' || !isValidId(channelId)) {
return cb('INVALID_ARGUMENTS');
}
if (testFileId(channelId)) {
return void removeOwnedBlob(Env, channelId, unsafeKey, cb);
}
getMetadata(Env, channelId, function (err, metadata) {
@ -1015,362 +887,6 @@ var removePins = function (Env, safeKey, cb) {
});
};
var upload = function (Env, publicKey, content, cb) { // FIXME FILES
var paths = Env.paths;
var dec;
try { dec = Buffer.from(content, 'base64'); }
catch (e) { return void cb('DECODE_BUFFER'); }
var len = dec.length;
var session = getSession(Env.Sessions, publicKey);
if (typeof(session.currentUploadSize) !== 'number' ||
typeof(session.pendingUploadSize) !== 'number') {
// improperly initialized... maybe they didn't check before uploading?
// reject it, just in case
return cb('NOT_READY');
}
if (session.currentUploadSize > session.pendingUploadSize) {
return cb('E_OVER_LIMIT');
}
if (!session.blobstage) {
makeFileStream(paths.staging, publicKey, function (e, stream) {
if (!stream) { return void cb(e); }
var blobstage = session.blobstage = stream;
blobstage.write(dec);
session.currentUploadSize += len;
cb(void 0, dec.length);
});
} else {
session.blobstage.write(dec);
session.currentUploadSize += len;
cb(void 0, dec.length);
}
};
var upload_cancel = function (Env, publicKey, fileSize, cb) { // FIXME FILES
var paths = Env.paths;
var session = getSession(Env.Sessions, publicKey);
session.pendingUploadSize = fileSize;
session.currentUploadSize = 0;
if (session.blobstage) { session.blobstage.close(); }
var path = makeFilePath(paths.staging, publicKey);
if (!path) {
Log.error('UPLOAD_CANCEL_INVALID_PATH', {
staging: paths.staging,
key: publicKey,
path: path,
});
return void cb('NO_FILE');
}
Fs.unlink(path, function (e) {
if (e) { return void cb('E_UNLINK'); }
cb(void 0);
});
};
var upload_complete = function (Env, publicKey, id, cb) { // FIXME FILES
var paths = Env.paths;
var session = getSession(Env.Sessions, publicKey);
if (session.blobstage && session.blobstage.close) {
session.blobstage.close();
delete session.blobstage;
}
if (!testFileId(id)) {
WARN('uploadComplete', "id is invalid");
return void cb('EINVAL_ID');
}
var oldPath = makeFilePath(paths.staging, publicKey);
if (!oldPath) {
WARN('safeMkdir', "oldPath is null");
return void cb('RENAME_ERR');
}
var tryLocation = function (cb) {
var prefix = id.slice(0, 2);
var newPath = makeFilePath(paths.blob, id);
if (typeof(newPath) !== 'string') {
WARN('safeMkdir', "newPath is null");
return void cb('RENAME_ERR');
}
Fse.mkdirp(Path.join(paths.blob, prefix), function (e) {
if (e || !newPath) {
WARN('safeMkdir', e);
return void cb('RENAME_ERR');
}
isFile(newPath, function (e, yes) {
if (e) {
WARN('isFile', e);
return void cb(e);
}
if (yes) {
WARN('isFile', 'FILE EXISTS!');
return void cb('RENAME_ERR');
}
cb(void 0, newPath, id);
});
});
};
var handleMove = function (e, newPath, id) {
if (e || !oldPath || !newPath) {
return void cb(e || 'PATH_ERR');
}
Fse.move(oldPath, newPath, function (e) {
if (e) {
WARN('rename', e);
return void cb('RENAME_ERR');
}
cb(void 0, id);
});
};
tryLocation(handleMove);
};
/* FIXME FILES
var owned_upload_complete = function (Env, safeKey, cb) {
var session = getSession(Env.Sessions, safeKey);
// the file has already been uploaded to the staging area
// close the pending writestream
if (session.blobstage && session.blobstage.close) {
session.blobstage.close();
delete session.blobstage;
}
var oldPath = makeFilePath(Env.paths.staging, safeKey);
if (typeof(oldPath) !== 'string') {
return void cb('EINVAL_CONFIG');
}
// construct relevant paths
var root = Env.paths.staging;
//var safeKey = escapeKeyCharacters(safeKey);
var safeKeyPrefix = safeKey.slice(0, 2);
var blobId = createFileId();
var blobIdPrefix = blobId.slice(0, 2);
var plannedPath = Path.join(root, safeKeyPrefix, safeKey, blobIdPrefix);
var tries = 0;
var chooseSafeId = function (cb) {
if (tries >= 3) {
// you've already failed three times in a row
// give up and return an error
cb('E_REPEATED_FAILURE');
}
var path = Path.join(plannedPath, blobId);
Fs.access(path, Fs.constants.R_OK | Fs.constants.W_OK, function (e) {
if (!e) {
// generate a new id (with the same prefix) and recurse
blobId = blobIdPrefix + createFileId().slice(2);
return void chooseSafeId(cb);
} else if (e.code === 'ENOENT') {
// no entry, so it's safe for us to proceed
return void cb(void 0, path);
} else {
// it failed in an unexpected way. log it
// try again, but no more than a fixed number of times...
tries++;
chooseSafeId(cb);
}
});
};
// the user wants to move it into their own space
// /blob/safeKeyPrefix/safeKey/blobPrefix/blobID
var finalPath;
nThen(function (w) {
// make the requisite directory structure using Mkdirp
Mkdirp(plannedPath, w(function (e) {
if (e) { // does not throw error if the directory already existed
w.abort();
return void cb(e);
}
}));
}).nThen(function (w) {
// produce an id which confirmably does not collide with another
chooseSafeId(w(function (e, path) {
if (e) {
w.abort();
return void cb(e);
}
finalPath = path; // this is where you'll put the new file
}));
}).nThen(function (w) {
// move the existing file to its new path
// flow is dumb and I need to guard against this which will never happen
// / *:: if (typeof(oldPath) === 'object') { throw new Error('should never happen'); } * /
Fs.move(oldPath, finalPath, w(function (e) {
if (e) {
w.abort();
return void cb(e.code);
}
// otherwise it worked...
}));
}).nThen(function () {
// clean up their session when you're done
// call back with the blob id...
cb(void 0, blobId);
});
};
*/
var owned_upload_complete = function (Env, safeKey, id, cb) { // FIXME FILES
var session = getSession(Env.Sessions, safeKey);
// the file has already been uploaded to the staging area
// close the pending writestream
if (session.blobstage && session.blobstage.close) {
session.blobstage.close();
delete session.blobstage;
}
if (!testFileId(id)) {
WARN('ownedUploadComplete', "id is invalid");
return void cb('EINVAL_ID');
}
var oldPath = makeFilePath(Env.paths.staging, safeKey);
if (typeof(oldPath) !== 'string') {
return void cb('EINVAL_CONFIG');
}
// construct relevant paths
var root = Env.paths.blob;
//var safeKey = escapeKeyCharacters(safeKey);
var safeKeyPrefix = safeKey.slice(0, 3);
//var blobId = createFileId();
var blobIdPrefix = id.slice(0, 2);
var ownPath = Path.join(root, safeKeyPrefix, safeKey, blobIdPrefix);
var filePath = Path.join(root, blobIdPrefix);
var tryId = function (path, cb) {
Fs.access(path, Fs.constants.R_OK | Fs.constants.W_OK, function (e) {
if (!e) {
// generate a new id (with the same prefix) and recurse
WARN('ownedUploadComplete', 'id is already used '+ id);
return void cb('EEXISTS');
} else if (e.code === 'ENOENT') {
// no entry, so it's safe for us to proceed
return void cb();
} else {
// it failed in an unexpected way. log it
WARN('ownedUploadComplete', e);
return void cb(e.code);
}
});
};
// the user wants to move it into blob and create a empty file with the same id
// in their own space:
// /blob/safeKeyPrefix/safeKey/blobPrefix/blobID
var finalPath;
var finalOwnPath;
nThen(function (w) {
// make the requisite directory structure using Mkdirp
Fse.mkdirp(filePath, w(function (e /*, path */) {
if (e) { // does not throw error if the directory already existed
w.abort();
return void cb(e.code);
}
}));
Fse.mkdirp(ownPath, w(function (e /*, path */) {
if (e) { // does not throw error if the directory already existed
w.abort();
return void cb(e.code);
}
}));
}).nThen(function (w) {
// make sure the id does not collide with another
finalPath = Path.join(filePath, id);
finalOwnPath = Path.join(ownPath, id);
tryId(finalPath, w(function (e) {
if (e) {
w.abort();
return void cb(e);
}
}));
}).nThen(function (w) {
// Create the empty file proving ownership
Fs.writeFile(finalOwnPath, '', w(function (e) {
if (e) {
w.abort();
return void cb(e.code);
}
// otherwise it worked...
}));
}).nThen(function (w) {
// move the existing file to its new path
// flow is dumb and I need to guard against this which will never happen
/*:: if (typeof(oldPath) === 'object') { throw new Error('should never happen'); } */
Fse.move(oldPath, finalPath, w(function (e) {
if (e) {
// Remove the ownership file
Fs.unlink(finalOwnPath, function (e) {
WARN('E_UNLINK_OWN_FILE', e);
});
w.abort();
return void cb(e.code);
}
// otherwise it worked...
}));
}).nThen(function () {
// clean up their session when you're done
// call back with the blob id...
cb(void 0, id);
});
};
var upload_status = function (Env, publicKey, filesize, cb) { // FIXME FILES
var paths = Env.paths;
// validate that the provided size is actually a positive number
if (typeof(filesize) !== 'number' &&
filesize >= 0) { return void cb('E_INVALID_SIZE'); }
if (filesize >= Env.maxUploadSize) { return cb('TOO_LARGE'); }
// validate that the provided path is not junk
var filePath = makeFilePath(paths.staging, publicKey);
if (!filePath) { return void cb('E_INVALID_PATH'); }
getFreeSpace(Env, publicKey, function (e, free) {
if (e || !filePath) { return void cb(e); } // !filePath for pleasing flow
if (filesize >= free) { return cb('NOT_ENOUGH_SPACE'); }
isFile(filePath, function (e, yes) {
if (e) {
WARN('upload', e);
return cb('UNNOWN_ERROR');
}
cb(e, yes);
});
});
};
/*
We assume that the server is secured against MitM attacks
via HTTPS, and that malicious actors do not have code execution
@ -1745,25 +1261,34 @@ var isAuthenticatedCall = function (call) {
].indexOf(call) !== -1;
};
const mkEvent = function (once) {
var handlers = [];
var fired = false;
return {
reg: function (cb) {
if (once && fired) { return void setTimeout(cb); }
handlers.push(cb);
},
unreg: function (cb) {
if (handlers.indexOf(cb) === -1) { throw new Error("Not registered"); }
handlers.splice(handlers.indexOf(cb), 1);
},
fire: function () {
if (once && fired) { return; }
fired = true;
var args = Array.prototype.slice.call(arguments);
handlers.forEach(function (h) { h.apply(null, args); });
}
};
// upload_status
var upload_status = function (Env, safeKey, filesize, _cb) { // FIXME FILES
var cb = Util.once(Util.mkAsync(_cb));
// validate that the provided size is actually a positive number
if (typeof(filesize) !== 'number' &&
filesize >= 0) { return void cb('E_INVALID_SIZE'); }
if (filesize >= Env.maxUploadSize) { return cb('TOO_LARGE'); }
nThen(function (w) {
var abortAndCB = Util.both(w.abort, cb);
Env.blobStore.status(safeKey, w(function (err, inProgress) {
// if there's an error something is weird
if (err) { return void abortAndCB(err); }
// we cannot upload two things at once
if (inProgress) { return void abortAndCB(void 0, true); }
}));
}).nThen(function () {
// if yuo're here then there are no pending uploads
// check if you have space in your quota to upload something of this size
getFreeSpace(Env, safeKey, function (e, free) {
if (e) { return void cb(e); }
if (filesize >= free) { return cb('NOT_ENOUGH_SPACE'); }
cb(void 0, false);
});
});
};
/*::
@ -1823,8 +1348,6 @@ RPC.create = function (
var Sessions = Env.Sessions;
var paths = Env.paths;
var pinPath = paths.pin = keyOrDefaultString('pinPath', './pins');
var blobPath = paths.blob = keyOrDefaultString('blobPath', './blob');
var blobStagingPath = paths.staging = keyOrDefaultString('blobStagingPath', './blobstage');
paths.block = keyOrDefaultString('blockPath', './block');
paths.data = keyOrDefaultString('filePath', './datastore');
@ -2057,13 +1580,13 @@ RPC.create = function (
Respond(void 0, "OK");
});
case 'UPLOAD':
return void upload(Env, safeKey, msg[1], function (e, len) {
return void Env.blobStore.upload(safeKey, msg[1], function (e, len) {
WARN(e, len);
Respond(e, len);
});
case 'UPLOAD_STATUS':
var filesize = msg[1];
return void upload_status(Env, safeKey, msg[1], function (e, yes) {
return void upload_status(Env, safeKey, filesize, function (e, yes) {
if (!e && !yes) {
// no pending uploads, set the new size
var user = getSession(Sessions, safeKey);
@ -2073,12 +1596,12 @@ RPC.create = function (
Respond(e, yes);
});
case 'UPLOAD_COMPLETE':
return void upload_complete(Env, safeKey, msg[1], function (e, hash) {
return void Env.blobStore.complete(safeKey, msg[1], function (e, hash) {
WARN(e, hash);
Respond(e, hash);
});
case 'OWNED_UPLOAD_COMPLETE':
return void owned_upload_complete(Env, safeKey, msg[1], function (e, blobId) {
return void Env.blobStore.completeOwned(safeKey, msg[1], function (e, blobId) {
WARN(e, blobId);
Respond(e, blobId);
});
@ -2086,7 +1609,7 @@ RPC.create = function (
// msg[1] is fileSize
// if we pass it here, we can start an upload right away without calling
// UPLOAD_STATUS again
return void upload_cancel(Env, safeKey, msg[1], function (e) {
return void Env.blobStore.cancel(safeKey, msg[1], function (e) {
WARN(e, 'UPLOAD_CANCEL');
Respond(e);
});
@ -2155,21 +1678,27 @@ RPC.create = function (
loadChannelPins(Env);
nThen(function (w) {
Store.create({
filePath: pinPath,
}, function (s) {
}, w(function (s) {
Env.pinStore = s;
Fse.mkdirp(blobPath, function (e) {
if (e) { throw e; }
Fse.mkdirp(blobStagingPath, function (e) {
if (e) { throw e; }
}));
BlobStore.create({
blobPath: config.blobPath,
blobStagingPath: config.blobStagingPath,
getSession: function (safeKey) {
return getSession(Sessions, safeKey);
},
}, w(function (err, blob) {
if (err) { throw new Error(err); }
Env.blobStore = blob;
}));
}).nThen(function () {
cb(void 0, rpc);
// expire old sessions once per minute
setInterval(function () {
expireSessions(Sessions);
}, SESSION_EXPIRATION_TIME);
});
});
});
};

Loading…
Cancel
Save