Merge branch 'staging' into team

pull/1/head
yflory 5 years ago
commit e9785c7ef6

@ -1,5 +1,5 @@
/* jshint esversion: 6 */ /* jshint esversion: 6 */
/* global Buffer, process */ /* global Buffer */
;(function () { 'use strict'; ;(function () { 'use strict';
const nThen = require('nthen'); const nThen = require('nthen');
@ -7,9 +7,11 @@ const Nacl = require('tweetnacl');
const Crypto = require('crypto'); const Crypto = require('crypto');
const Once = require("./lib/once"); const Once = require("./lib/once");
const Meta = require("./lib/metadata"); const Meta = require("./lib/metadata");
const WriteQueue = require("./lib/write-queue");
let Log; let Log;
const now = function () { return (new Date()).getTime(); }; const now = function () { return (new Date()).getTime(); };
const ONE_DAY = 1000 * 60 * 60 * 24; // one day in milliseconds
/* getHash /* getHash
* this function slices off the leading portion of a message which is * this function slices off the leading portion of a message which is
@ -80,6 +82,7 @@ module.exports.create = function (cfg) {
const rpc = cfg.rpc; const rpc = cfg.rpc;
const tasks = cfg.tasks; const tasks = cfg.tasks;
const store = cfg.store; const store = cfg.store;
const retainData = cfg.retainData;
Log = cfg.log; Log = cfg.log;
Log.silly('HK_LOADING', 'LOADING HISTORY_KEEPER MODULE'); Log.silly('HK_LOADING', 'LOADING HISTORY_KEEPER MODULE');
@ -302,88 +305,135 @@ module.exports.create = function (cfg) {
* the fix is to use callbacks and implement queueing for writes * the fix is to use callbacks and implement queueing for writes
* to guarantee that offset computation is always atomic with writes * to guarantee that offset computation is always atomic with writes
*/ */
const storageQueues = {}; const queueStorage = WriteQueue();
const storeQueuedMessage = function (ctx, queue, id) { const storeMessage = function (ctx, channel, msg, isCp, optionalMessageHash) {
if (queue.length === 0) { const id = channel.id;
delete storageQueues[id]; const msgBin = new Buffer(msg + '\n', 'utf8');
return;
}
const first = queue.shift();
const msgBin = first.msg;
const optionalMessageHash = first.hash;
const isCp = first.isCp;
// Store the message first, and update the index only once it's stored.
// store.messageBin can be async so updating the index first may
// result in a wrong cpIndex
nThen((waitFor) => {
store.messageBin(id, msgBin, waitFor(function (err) {
if (err) {
waitFor.abort();
Log.error("HK_STORE_MESSAGE_ERROR", err.message);
// this error is critical, but there's not much we can do at the moment
// proceed with more messages, but they'll probably fail too
// at least you won't have a memory leak
// TODO make it possible to respond to clients with errors so they know
// their message wasn't stored
storeQueuedMessage(ctx, queue, id);
return;
}
}));
}).nThen((waitFor) => {
getIndex(ctx, id, waitFor((err, index) => {
if (err) {
Log.warn("HK_STORE_MESSAGE_INDEX", err.stack);
// non-critical, we'll be able to get the channel index later
// proceed to the next message in the queue queueStorage(id, function (next) {
storeQueuedMessage(ctx, queue, id); // Store the message first, and update the index only once it's stored.
return; // store.messageBin can be async so updating the index first may
} // result in a wrong cpIndex
if (typeof (index.line) === "number") { index.line++; } nThen((waitFor) => {
if (isCp) { store.messageBin(id, msgBin, waitFor(function (err) {
index.cpIndex = sliceCpIndex(index.cpIndex, index.line || 0); if (err) {
for (let k in index.offsetByHash) { waitFor.abort();
if (index.offsetByHash[k] < index.cpIndex[0]) { Log.error("HK_STORE_MESSAGE_ERROR", err.message);
delete index.offsetByHash[k];
// this error is critical, but there's not much we can do at the moment
// proceed with more messages, but they'll probably fail too
// at least you won't have a memory leak
// TODO make it possible to respond to clients with errors so they know
// their message wasn't stored
return void next();
}
}));
}).nThen((waitFor) => {
getIndex(ctx, id, waitFor((err, index) => {
if (err) {
Log.warn("HK_STORE_MESSAGE_INDEX", err.stack);
// non-critical, we'll be able to get the channel index later
return void next();
}
if (typeof (index.line) === "number") { index.line++; }
if (isCp) {
index.cpIndex = sliceCpIndex(index.cpIndex, index.line || 0);
for (let k in index.offsetByHash) {
if (index.offsetByHash[k] < index.cpIndex[0]) {
delete index.offsetByHash[k];
}
} }
index.cpIndex.push(({
offset: index.size,
line: ((index.line || 0) + 1)
} /*:cp_index_item*/));
} }
index.cpIndex.push(({ if (optionalMessageHash) { index.offsetByHash[optionalMessageHash] = index.size; }
offset: index.size, index.size += msgBin.length;
line: ((index.line || 0) + 1)
} /*:cp_index_item*/));
}
if (optionalMessageHash) { index.offsetByHash[optionalMessageHash] = index.size; }
index.size += msgBin.length;
// handle the next element in the queue // handle the next element in the queue
storeQueuedMessage(ctx, queue, id); next();
})); }));
});
}); });
}; };
const storeMessage = function (ctx, channel, msg, isCp, optionalMessageHash) { /* historyKeeperBroadcast
const id = channel.id; * uses API from the netflux server to send messages to every member of a channel
* sendMsg runs in a try-catch and drops users if sending a message fails
*/
const historyKeeperBroadcast = function (ctx, channel, msg) {
let chan = ctx.channels[channel] || (([] /*:any*/) /*:Chan_t*/);
chan.forEach(function (user) {
sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify(msg)]);
});
};
const msgBin = new Buffer(msg + '\n', 'utf8'); /* expireChannel is here to clean up channels that should have been removed
if (Array.isArray(storageQueues[id])) { but for some reason are still present
return void storageQueues[id].push({ */
msg: msgBin, const expireChannel = function (ctx, channel) {
hash: optionalMessageHash, if (retainData) {
isCp: isCp, return void store.archiveChannel(channel, function (err) {
Log.info("ARCHIVAL_CHANNEL_BY_HISTORY_KEEPER_EXPIRATION", {
channelId: channel,
status: err? String(err): "SUCCESS",
});
}); });
} }
const queue = storageQueues[id] = (storageQueues[id] || [{ store.removeChannel(channel, function (err) {
msg: msgBin, Log.info("DELETION_CHANNEL_BY_HISTORY_KEEPER_EXPIRATION", {
hash: optionalMessageHash, channelid: channel,
}]); status: err? String(err): "SUCCESS",
storeQueuedMessage(ctx, queue, id); });
});
};
/* checkExpired
* synchronously returns true or undefined to indicate whether the channel is expired
* according to its metadata
* has some side effects:
* closes the channel via the store.closeChannel API
* and then broadcasts to all channel members that the channel has expired
* removes the channel from the netflux-server's in-memory cache
* removes the channel metadata from history keeper's in-memory cache
FIXME the boolean nature of this API should be separated from its side effects
*/
const checkExpired = function (ctx, channel) {
if (!(channel && channel.length === STANDARD_CHANNEL_LENGTH)) { return false; }
let metadata = metadata_cache[channel];
if (!(metadata && typeof(metadata.expire) === 'number')) { return false; }
// the number of milliseconds ago the channel should have expired
let pastDue = (+new Date()) - metadata.expire;
// less than zero means that it hasn't expired yet
if (pastDue < 0) { return false; }
// if it should have expired more than a day ago...
// there may have been a problem with scheduling tasks
// or the scheduled tasks may not be running
// so trigger a removal from here
if (pastDue >= ONE_DAY) { expireChannel(ctx, channel); }
// close the channel
store.closeChannel(channel, function () {
historyKeeperBroadcast(ctx, channel, {
error: 'EEXPIRED',
channel: channel
});
// remove it from any caches after you've told anyone in the channel
// that it has expired
delete ctx.channels[channel];
delete metadata_cache[channel];
});
// return true to indicate that it has expired
return true;
}; };
var CHECKPOINT_PATTERN = /^cp\|(([A-Za-z0-9+\/=]+)\|)?/; var CHECKPOINT_PATTERN = /^cp\|(([A-Za-z0-9+\/=]+)\|)?/;
@ -436,12 +486,8 @@ module.exports.create = function (cfg) {
metadata = index.metadata; metadata = index.metadata;
if (metadata.expire && metadata.expire < +new Date()) { // don't write messages to expired channels
// don't store message sent to expired channels if (checkExpired(ctx, channel)) { return void w.abort(); }
w.abort();
return;
// TODO if a channel expired a long time ago but it's still here, remove it
}
// if there's no validateKey present skip to the next block // if there's no validateKey present skip to the next block
if (!metadata.validateKey) { return; } if (!metadata.validateKey) { return; }
@ -674,26 +720,6 @@ module.exports.create = function (cfg) {
}); });
}; };
/*::
type Chan_t = {
indexOf: (any)=>number,
id: string,
lastSavedCp: string,
forEach: ((any)=>void)=>void,
push: (any)=>void,
};
*/
/* historyKeeperBroadcast
* uses API from the netflux server to send messages to every member of a channel
* sendMsg runs in a try-catch and drops users if sending a message fails
*/
const historyKeeperBroadcast = function (ctx, channel, msg) {
let chan = ctx.channels[channel] || (([] /*:any*/) /*:Chan_t*/);
chan.forEach(function (user) {
sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify(msg)]);
});
};
/* onChannelCleared /* onChannelCleared
* broadcasts to all clients in a channel if that channel is deleted * broadcasts to all clients in a channel if that channel is deleted
@ -729,33 +755,6 @@ module.exports.create = function (cfg) {
} }
}; };
/* checkExpired
* synchronously returns true or undefined to indicate whether the channel is expired
* according to its metadata
* has some side effects:
* closes the channel via the store.closeChannel API
* and then broadcasts to all channel members that the channel has expired
* removes the channel from the netflux-server's in-memory cache
* removes the channel metadata from history keeper's in-memory cache
FIXME the boolean nature of this API should be separated from its side effects
*/
const checkExpired = function (ctx, channel) {
if (channel && channel.length === STANDARD_CHANNEL_LENGTH && metadata_cache[channel] &&
metadata_cache[channel].expire && metadata_cache[channel].expire < +new Date()) {
store.closeChannel(channel, function () {
historyKeeperBroadcast(ctx, channel, {
error: 'EEXPIRED',
channel: channel
});
});
delete ctx.channels[channel];
delete metadata_cache[channel];
return true;
}
return;
};
/* onDirectMessage /* onDirectMessage
* exported for use by the netflux-server * exported for use by the netflux-server
* parses and handles all direct messages directed to the history keeper * parses and handles all direct messages directed to the history keeper
@ -772,7 +771,6 @@ module.exports.create = function (cfg) {
const onDirectMessage = function (ctx, seq, user, json) { const onDirectMessage = function (ctx, seq, user, json) {
let parsed; let parsed;
let channelName; let channelName;
let obj = HISTORY_KEEPER_ID;
Log.silly('HK_MESSAGE', json); Log.silly('HK_MESSAGE', json);
@ -809,6 +807,7 @@ module.exports.create = function (cfg) {
} }
} }
metadata.channel = channelName; metadata.channel = channelName;
metadata.created = +new Date();
// if the user sends us an invalid key, we won't be able to validate their messages // if the user sends us an invalid key, we won't be able to validate their messages
// so they'll never get written to the log anyway. Let's just drop their message // so they'll never get written to the log anyway. Let's just drop their message
@ -913,7 +912,7 @@ module.exports.create = function (cfg) {
channelName = parsed[1]; channelName = parsed[1];
var map = parsed[2]; var map = parsed[2];
if (!(map && typeof(map) === 'object')) { if (!(map && typeof(map) === 'object')) {
return void sendMsg(ctx, user, [seq, 'ERROR', 'INVALID_ARGS', obj]); return void sendMsg(ctx, user, [seq, 'ERROR', 'INVALID_ARGS', HISTORY_KEEPER_ID]);
} }
var oldestKnownHash = map.from; var oldestKnownHash = map.from;
@ -921,11 +920,11 @@ module.exports.create = function (cfg) {
var desiredCheckpoint = map.cpCount; var desiredCheckpoint = map.cpCount;
var txid = map.txid; var txid = map.txid;
if (typeof(desiredMessages) !== 'number' && typeof(desiredCheckpoint) !== 'number') { if (typeof(desiredMessages) !== 'number' && typeof(desiredCheckpoint) !== 'number') {
return void sendMsg(ctx, user, [seq, 'ERROR', 'UNSPECIFIED_COUNT', obj]); return void sendMsg(ctx, user, [seq, 'ERROR', 'UNSPECIFIED_COUNT', HISTORY_KEEPER_ID]);
} }
if (!txid) { if (!txid) {
return void sendMsg(ctx, user, [seq, 'ERROR', 'NO_TXID', obj]); return void sendMsg(ctx, user, [seq, 'ERROR', 'NO_TXID', HISTORY_KEEPER_ID]);
} }
sendMsg(ctx, user, [seq, 'ACK']); sendMsg(ctx, user, [seq, 'ACK']);
@ -1024,33 +1023,6 @@ module.exports.create = function (cfg) {
} }
}; };
var cciLock = false;
const checkChannelIntegrity = function (ctx) {
if (process.env['CRYPTPAD_DEBUG'] && !cciLock) {
let nt = nThen;
cciLock = true;
Object.keys(ctx.channels).forEach(function (channelName) {
const chan = ctx.channels[channelName];
if (!chan.index) { return; }
nt = nt((waitFor) => {
store.getChannelSize(channelName, waitFor((err, size) => {
if (err) {
return void Log.debug("HK_CHECK_CHANNEL_INTEGRITY",
"Couldn't get size of channel " + channelName);
}
if (size !== chan.index.size) {
return void Log.debug("HK_CHECK_CHANNEL_SIZE",
"channel size mismatch for " + channelName +
" --- cached: " + chan.index.size +
" --- fileSize: " + size);
}
}));
}).nThen;
});
nt(() => { cciLock = false; });
}
};
return { return {
id: HISTORY_KEEPER_ID, id: HISTORY_KEEPER_ID,
setConfig: setConfig, setConfig: setConfig,
@ -1058,7 +1030,6 @@ module.exports.create = function (cfg) {
dropChannel: dropChannel, dropChannel: dropChannel,
checkExpired: checkExpired, checkExpired: checkExpired,
onDirectMessage: onDirectMessage, onDirectMessage: onDirectMessage,
checkChannelIntegrity: checkChannelIntegrity
}; };
}; };

@ -0,0 +1,40 @@
/*
var q = Queue();
q(id, function (next) {
// whatever you need to do....
// when you're done
next();
});
*/
var fix1 = function (f, x) {
return function () { f(x); };
};
module.exports = function () {
var map = {};
var next = function (id) {
if (map[id] && map[id].length === 0) { return void delete map[id]; }
var task = map[id].shift();
task(fix1(next, id));
};
return function (id, task) {
// support initialization with just a function
if (typeof(id) === 'function' && typeof(task) === 'undefined') {
task = id;
id = '';
}
// ...but you really need to pass a function
if (typeof(task) !== 'function') { throw new Error("Expected function"); }
// if the intended queue already has tasks in progress, add this one to the end of the queue
if (map[id]) { return void map[id].push(task); }
// otherwise create a queue containing the given task
map[id] = [task];
next(id);
};
};

@ -18,7 +18,7 @@ const nThen = require("nthen");
const getFolderSize = require("get-folder-size"); const getFolderSize = require("get-folder-size");
const Pins = require("./lib/pins"); const Pins = require("./lib/pins");
const Meta = require("./lib/metadata"); const Meta = require("./lib/metadata");
const WriteQueue = require("./lib/write-queue");
var RPC = module.exports; var RPC = module.exports;
@ -340,8 +340,7 @@ var getMetadata = function (Env, channel, cb) {
value: value value: value
} }
*/ */
// XXX global saferphore may cause issues here, a queue "per channel" is probably better var queueMetadata = WriteQueue();
var metadataSem = Saferphore.create(1);
var setMetadata = function (Env, data, unsafeKey, cb) { var setMetadata = function (Env, data, unsafeKey, cb) {
var channel = data.channel; var channel = data.channel;
var command = data.command; var command = data.command;
@ -349,16 +348,15 @@ var setMetadata = function (Env, data, unsafeKey, cb) {
if (!command || typeof (command) !== 'string') { return void cb ('INVALID_COMMAND'); } if (!command || typeof (command) !== 'string') { return void cb ('INVALID_COMMAND'); }
if (Meta.commands.indexOf(command) === -1) { return void('UNSUPPORTED_COMMAND'); } if (Meta.commands.indexOf(command) === -1) { return void('UNSUPPORTED_COMMAND'); }
metadataSem.take(function (give) { queueMetadata(channel, function (next) {
var g = give();
getMetadata(Env, channel, function (err, metadata) { getMetadata(Env, channel, function (err, metadata) {
if (err) { if (err) {
g(); cb(err);
return void cb(err); return void next();
} }
if (!(metadata && Array.isArray(metadata.owners))) { if (!(metadata && Array.isArray(metadata.owners))) {
g(); cb('E_NO_OWNERS');
return void cb('E_NO_OWNERS'); return void next();
} }
// Confirm that the channel is owned by the user in question // Confirm that the channel is owned by the user in question
@ -372,13 +370,13 @@ var setMetadata = function (Env, data, unsafeKey, cb) {
|| !Array.isArray(data.value) || !Array.isArray(data.value)
|| data.value.length !== 1 || data.value.length !== 1
|| data.value[0] !== unsafeKey) { || data.value[0] !== unsafeKey) {
g(); cb('INSUFFICIENT_PERMISSIONS');
return void cb('INSUFFICIENT_PERMISSIONS'); return void next();
} }
} else if (metadata.owners.indexOf(unsafeKey) === -1) { } else if (metadata.owners.indexOf(unsafeKey) === -1) {
g(); cb('INSUFFICIENT_PERMISSIONS');
return void cb('INSUFFICIENT_PERMISSIONS'); return void next();
} }
// Add the new metadata line // Add the new metadata line
@ -387,22 +385,23 @@ var setMetadata = function (Env, data, unsafeKey, cb) {
try { try {
changed = Meta.handleCommand(metadata, line); changed = Meta.handleCommand(metadata, line);
} catch (e) { } catch (e) {
g(); cb(e);
return void cb(e); return void next();
} }
// if your command is valid but it didn't result in any change to the metadata, // if your command is valid but it didn't result in any change to the metadata,
// call back now and don't write any "useless" line to the log // call back now and don't write any "useless" line to the log
if (!changed) { if (!changed) {
g(); cb(void 0, metadata);
return void cb(void 0, metadata); return void next();
} }
Env.msgStore.writeMetadata(channel, JSON.stringify(line), function (e) { Env.msgStore.writeMetadata(channel, JSON.stringify(line), function (e) {
g();
if (e) { if (e) {
return void cb(e); cb(e);
return void next();
} }
cb(void 0, metadata); cb(void 0, metadata);
next();
}); });
}); });
}); });
@ -1884,8 +1883,6 @@ RPC.create = function (
}; };
var rpc0 = function (ctx, data, respond) { var rpc0 = function (ctx, data, respond) {
if (!Env.msgStore) { Env.msgStore = ctx.store; }
if (!Array.isArray(data)) { if (!Array.isArray(data)) {
Log.debug('INVALID_ARG_FORMET', data); Log.debug('INVALID_ARG_FORMET', data);
return void respond('INVALID_ARG_FORMAT'); return void respond('INVALID_ARG_FORMAT');

@ -321,7 +321,8 @@ var nt = nThen(function (w) {
tasks: config.tasks, tasks: config.tasks,
rpc: rpc, rpc: rpc,
store: config.store, store: config.store,
log: log log: log,
retainData: Boolean(config.retainData),
}; };
historyKeeper = HK.create(hkConfig); historyKeeper = HK.create(hkConfig);
}).nThen(function () { }).nThen(function () {

@ -89,7 +89,6 @@ proxy.mailboxes = {
if (!anonRpc) { return void cb({error: "anonymous rpc session not ready"}); } if (!anonRpc) { return void cb({error: "anonymous rpc session not ready"}); }
var crypto = Crypto.Mailbox.createEncryptor(keys); var crypto = Crypto.Mailbox.createEncryptor(keys);
var network = ctx.store.network;
var text = JSON.stringify({ var text = JSON.stringify({
type: type, type: type,
@ -100,7 +99,7 @@ proxy.mailboxes = {
anonRpc.send("WRITE_PRIVATE_MESSAGE", [ anonRpc.send("WRITE_PRIVATE_MESSAGE", [
user.channel, user.channel,
ciphertext ciphertext
], function (err, response) { ], function (err /*, response */) {
if (err) { if (err) {
return void cb({ return void cb({
error: err, error: err,

Loading…
Cancel
Save