History keeper is now separated from chainpad-server
parent
babb91686d
commit
f24d34c0af
@ -0,0 +1,618 @@
|
||||
/* jshint esversion: 6 */
|
||||
;(function () { 'use strict';
|
||||
|
||||
const nThen = require('nthen');
|
||||
const Nacl = require('tweetnacl');
|
||||
const Crypto = require('crypto');
|
||||
|
||||
|
||||
const now = function () { return (new Date()).getTime(); };
|
||||
|
||||
const getHash = function (msg) {
|
||||
if (typeof(msg) !== 'string') {
|
||||
console.log('getHash() called on', typeof(msg), msg);
|
||||
return '';
|
||||
}
|
||||
return msg.slice(0,64);
|
||||
};
|
||||
|
||||
const tryParse = function (str) {
|
||||
try {
|
||||
return JSON.parse(str);
|
||||
} catch (err) {
|
||||
console.error(err);
|
||||
}
|
||||
};
|
||||
|
||||
const sliceCpIndex = function (cpIndex, line) {
|
||||
// Remove "old" checkpoints (cp sent before 100 messages ago)
|
||||
const minLine = Math.max(0, (line - 100));
|
||||
let start = cpIndex.slice(0, -2);
|
||||
const end = cpIndex.slice(-2);
|
||||
start = start.filter(function (obj) {
|
||||
return obj.line > minLine;
|
||||
});
|
||||
return start.concat(end);
|
||||
};
|
||||
|
||||
module.exports.create = function (cfg) {
|
||||
const rpc = cfg.rpc;
|
||||
const tasks = cfg.tasks;
|
||||
const store = cfg.store;
|
||||
|
||||
var Env = {};
|
||||
const historyKeeperKeys = {};
|
||||
const HISTORY_KEEPER_ID = Crypto.randomBytes(8).toString('hex');
|
||||
|
||||
let sendMsg = function () {};
|
||||
let STANDARD_CHANNEL_LENGTH, EPHEMERAL_CHANNEL_LENGTH;
|
||||
const setConfig = function (config) {
|
||||
STANDARD_CHANNEL_LENGTH = config.STANDARD_CHANNEL_LENGTH;
|
||||
EPHEMERAL_CHANNEL_LENGTH = config.EPHEMERAL_CHANNEl_LENGTH;
|
||||
sendMsg = config.sendMsg;
|
||||
};
|
||||
|
||||
const computeIndex = function (channelName, cb) {
|
||||
const cpIndex = [];
|
||||
let messageBuf = [];
|
||||
let validateKey;
|
||||
let metadata;
|
||||
let i = 0;
|
||||
store.readMessagesBin(channelName, 0, (msgObj, rmcb) => {
|
||||
let msg;
|
||||
i++;
|
||||
if (!validateKey && msgObj.buff.indexOf('validateKey') > -1) {
|
||||
metadata = msg = tryParse(msgObj.buff.toString('utf8'));
|
||||
if (typeof msg === "undefined") { return rmcb(); }
|
||||
if (msg.validateKey) {
|
||||
validateKey = historyKeeperKeys[channelName] = msg;
|
||||
return rmcb();
|
||||
}
|
||||
}
|
||||
if (msgObj.buff.indexOf('cp|') > -1) {
|
||||
msg = msg || tryParse(msgObj.buff.toString('utf8'));
|
||||
if (typeof msg === "undefined") { return rmcb(); }
|
||||
if (msg[2] === 'MSG' && msg[4].indexOf('cp|') === 0) {
|
||||
cpIndex.push({
|
||||
offset: msgObj.offset,
|
||||
line: i
|
||||
});
|
||||
messageBuf = [];
|
||||
}
|
||||
}
|
||||
messageBuf.push(msgObj);
|
||||
return rmcb();
|
||||
}, (err) => {
|
||||
if (err && err.code !== 'ENOENT') { return void cb(err); }
|
||||
const offsetByHash = {};
|
||||
let size = 0;
|
||||
messageBuf.forEach((msgObj) => {
|
||||
const msg = tryParse(msgObj.buff.toString('utf8'));
|
||||
if (typeof msg === "undefined") { return; }
|
||||
if (msg[0] === 0 && msg[2] === 'MSG' && typeof(msg[4]) === 'string') {
|
||||
offsetByHash[getHash(msg[4])] = msgObj.offset;
|
||||
}
|
||||
// There is a trailing \n at the end of the file
|
||||
size = msgObj.offset + msgObj.buff.length + 1;
|
||||
});
|
||||
cb(null, {
|
||||
// Only keep the checkpoints included in the last 100 messages
|
||||
cpIndex: sliceCpIndex(cpIndex, i),
|
||||
offsetByHash: offsetByHash,
|
||||
size: size,
|
||||
metadata: metadata,
|
||||
line: i
|
||||
});
|
||||
});
|
||||
};
|
||||
|
||||
const getIndex = (ctx, channelName, cb) => {
|
||||
const chan = ctx.channels[channelName];
|
||||
if (chan && chan.index) { return void cb(undefined, chan.index); }
|
||||
computeIndex(channelName, (err, ret) => {
|
||||
if (err) { return void cb(err); }
|
||||
if (chan) { chan.index = ret; }
|
||||
cb(undefined, ret);
|
||||
});
|
||||
};
|
||||
|
||||
/*::
|
||||
type cp_index_item = {
|
||||
offset: number,
|
||||
line: number
|
||||
}
|
||||
*/
|
||||
|
||||
|
||||
const storeMessage = function (ctx, channel, msg, isCp, maybeMsgHash) {
|
||||
const msgBin = new Buffer(msg + '\n', 'utf8');
|
||||
// Store the message first, and update the index only once it's stored.
|
||||
// store.messageBin can be async so updating the index first may
|
||||
// result in a wrong cpIndex
|
||||
nThen((waitFor) => {
|
||||
store.messageBin(channel.id, msgBin, waitFor(function (err) {
|
||||
if (err) {
|
||||
waitFor.abort();
|
||||
return void console.log("Error writing message: " + err.message);
|
||||
}
|
||||
}));
|
||||
}).nThen((waitFor) => {
|
||||
getIndex(ctx, channel.id, waitFor((err, index) => {
|
||||
if (err) {
|
||||
console.log("getIndex()");
|
||||
console.log(err.stack);
|
||||
// non-critical, we'll be able to get the channel index later
|
||||
return;
|
||||
}
|
||||
if (typeof (index.line) === "number") { index.line++; }
|
||||
if (isCp) {
|
||||
index.cpIndex = sliceCpIndex(index.cpIndex, index.line || 0);
|
||||
for (let k in index.offsetByHash) {
|
||||
if (index.offsetByHash[k] < index.cpIndex[0]) {
|
||||
delete index.offsetByHash[k];
|
||||
}
|
||||
}
|
||||
index.cpIndex.push(({
|
||||
offset: index.size,
|
||||
line: ((index.line || 0) + 1)
|
||||
} /*:cp_index_item*/));
|
||||
}
|
||||
if (maybeMsgHash) { index.offsetByHash[maybeMsgHash] = index.size; }
|
||||
index.size += msgBin.length;
|
||||
}));
|
||||
});
|
||||
};
|
||||
|
||||
// Determine what we should store when a message a broadcasted to a channel
|
||||
const onChannelMessage = function (ctx, channel, msgStruct) {
|
||||
// don't store messages if the channel id indicates that it's an ephemeral message
|
||||
if (!channel.id || channel.id.length === EPHEMERAL_CHANNEL_LENGTH) { return; }
|
||||
|
||||
const isCp = /^cp\|/.test(msgStruct[4]);
|
||||
if (historyKeeperKeys[channel.id] && historyKeeperKeys[channel.id].expire &&
|
||||
historyKeeperKeys[channel.id].expire < +new Date()) {
|
||||
return; // Don't store messages on expired channel
|
||||
}
|
||||
let id;
|
||||
if (isCp) {
|
||||
/*::if (typeof(msgStruct[4]) !== 'string') { throw new Error(); }*/
|
||||
id = /cp\|(([A-Za-z0-9+\/=]+)\|)?/.exec(msgStruct[4]);
|
||||
if (Array.isArray(id) && id[2] && id[2] === channel.lastSavedCp) {
|
||||
// Reject duplicate checkpoints
|
||||
return;
|
||||
}
|
||||
}
|
||||
if (historyKeeperKeys[channel.id] && historyKeeperKeys[channel.id].validateKey) {
|
||||
/*::if (typeof(msgStruct[4]) !== 'string') { throw new Error(); }*/
|
||||
let signedMsg = (isCp) ? msgStruct[4].replace(/^cp\|(([A-Za-z0-9+\/=]+)\|)?/, '') : msgStruct[4];
|
||||
signedMsg = Nacl.util.decodeBase64(signedMsg);
|
||||
const validateKey = Nacl.util.decodeBase64(historyKeeperKeys[channel.id].validateKey);
|
||||
const validated = Nacl.sign.open(signedMsg, validateKey);
|
||||
if (!validated) {
|
||||
console.log("Signed message rejected"); // TODO logging
|
||||
return;
|
||||
}
|
||||
}
|
||||
if (isCp) {
|
||||
// WARNING: the fact that we only check the most recent checkpoints
|
||||
// is a potential source of bugs if one editor has high latency and
|
||||
// pushes a duplicate of an earlier checkpoint than the latest which
|
||||
// has been pushed by editors with low latency
|
||||
if (Array.isArray(id) && id[2]) {
|
||||
// Store new checkpoint hash
|
||||
channel.lastSavedCp = id[2];
|
||||
}
|
||||
}
|
||||
msgStruct.push(now());
|
||||
storeMessage(ctx, channel, JSON.stringify(msgStruct), isCp, getHash(msgStruct[4]));
|
||||
};
|
||||
|
||||
const dropChannel = function (chanName) {
|
||||
delete historyKeeperKeys[chanName];
|
||||
};
|
||||
|
||||
const getHistoryOffset = (ctx, channelName, lastKnownHash, cb /*:(e:?Error, os:?number)=>void*/) => {
|
||||
// lastKnownhash === -1 means we want the complete history
|
||||
if (lastKnownHash === -1) { return void cb(null, 0); }
|
||||
let offset = -1;
|
||||
nThen((waitFor) => {
|
||||
getIndex(ctx, channelName, waitFor((err, index) => {
|
||||
if (err) { waitFor.abort(); return void cb(err); }
|
||||
|
||||
// Check last known hash
|
||||
const lkh = index.offsetByHash[lastKnownHash];
|
||||
if (lastKnownHash && typeof(lkh) !== "number") {
|
||||
waitFor.abort();
|
||||
return void cb(new Error('EINVAL'));
|
||||
}
|
||||
|
||||
// Since last 2 checkpoints
|
||||
if (!lastKnownHash) {
|
||||
waitFor.abort();
|
||||
// Less than 2 checkpoints in the history: return everything
|
||||
if (index.cpIndex.length < 2) { return void cb(null, 0); }
|
||||
// Otherwise return the second last checkpoint's index
|
||||
return void cb(null, index.cpIndex[0].offset);
|
||||
/* LATER...
|
||||
in practice, two checkpoints can be very close together
|
||||
we have measures to avoid duplicate checkpoints, but editors
|
||||
can produce nearby checkpoints which are slightly different,
|
||||
and slip past these protections. To be really careful, we can
|
||||
seek past nearby checkpoints by some number of patches so as
|
||||
to ensure that all editors have sufficient knowledge of history
|
||||
to reconcile their differences. */
|
||||
}
|
||||
|
||||
offset = lkh;
|
||||
}));
|
||||
}).nThen((waitFor) => {
|
||||
if (offset !== -1) { return; }
|
||||
store.readMessagesBin(channelName, 0, (msgObj, rmcb, abort) => {
|
||||
const msg = tryParse(msgObj.buff.toString('utf8'));
|
||||
if (typeof msg === "undefined") { return rmcb(); }
|
||||
if (typeof(msg[4]) !== 'string' || lastKnownHash !== getHash(msg[4])) {
|
||||
return void rmcb();
|
||||
}
|
||||
offset = msgObj.offset;
|
||||
abort();
|
||||
}, waitFor(function (err) {
|
||||
if (err) { waitFor.abort(); return void cb(err); }
|
||||
}));
|
||||
}).nThen((waitFor) => {
|
||||
cb(null, offset);
|
||||
});
|
||||
};
|
||||
|
||||
const getHistoryAsync = (ctx, channelName, lastKnownHash, beforeHash, handler, cb) => {
|
||||
let offset = -1;
|
||||
nThen((waitFor) => {
|
||||
getHistoryOffset(ctx, channelName, lastKnownHash, waitFor((err, os) => {
|
||||
if (err) {
|
||||
waitFor.abort();
|
||||
return void cb(err);
|
||||
}
|
||||
offset = os;
|
||||
}));
|
||||
}).nThen((waitFor) => {
|
||||
if (offset === -1) { return void cb(new Error("could not find offset")); }
|
||||
const start = (beforeHash) ? 0 : offset;
|
||||
store.readMessagesBin(channelName, start, (msgObj, rmcb, abort) => {
|
||||
if (beforeHash && msgObj.offset >= offset) { return void abort(); }
|
||||
handler(tryParse(msgObj.buff.toString('utf8')), rmcb);
|
||||
}, waitFor(function (err) {
|
||||
return void cb(err);
|
||||
}));
|
||||
});
|
||||
};
|
||||
|
||||
const getOlderHistory = function (channelName, oldestKnownHash, cb) {
|
||||
var messageBuffer = [];
|
||||
var found = false;
|
||||
store.getMessages(channelName, function (msgStr) {
|
||||
if (found) { return; }
|
||||
|
||||
let parsed = tryParse(msgStr);
|
||||
if (typeof parsed === "undefined") { return; }
|
||||
|
||||
if (parsed.validateKey) {
|
||||
historyKeeperKeys[channelName] = parsed;
|
||||
return;
|
||||
}
|
||||
|
||||
var content = parsed[4];
|
||||
if (typeof(content) !== 'string') { return; }
|
||||
|
||||
var hash = getHash(content);
|
||||
if (hash === oldestKnownHash) {
|
||||
found = true;
|
||||
}
|
||||
messageBuffer.push(parsed);
|
||||
}, function (err) {
|
||||
if (err) {
|
||||
console.error("getOlderHistory", err);
|
||||
}
|
||||
cb(messageBuffer);
|
||||
});
|
||||
};
|
||||
|
||||
/*::
|
||||
type Chan_t = {
|
||||
indexOf: (any)=>number,
|
||||
id: string,
|
||||
lastSavedCp: string,
|
||||
forEach: ((any)=>void)=>void,
|
||||
push: (any)=>void,
|
||||
};
|
||||
*/
|
||||
|
||||
|
||||
const historyKeeperBroadcast = function (ctx, channel, msg) {
|
||||
let chan = ctx.channels[channel] || (([] /*:any*/) /*:Chan_t*/);
|
||||
chan.forEach(function (user) {
|
||||
sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify(msg)]);
|
||||
});
|
||||
};
|
||||
const onChannelCleared = function (ctx, channel) {
|
||||
historyKeeperBroadcast(ctx, channel, {
|
||||
error: 'ECLEARED',
|
||||
channel: channel
|
||||
});
|
||||
};
|
||||
// When a channel is removed from datastore, broadcast a message to all its connected users
|
||||
const onChannelDeleted = function (ctx, channel) {
|
||||
store.closeChannel(channel, function () {
|
||||
historyKeeperBroadcast(ctx, channel, {
|
||||
error: 'EDELETED',
|
||||
channel: channel
|
||||
});
|
||||
});
|
||||
delete ctx.channels[channel];
|
||||
delete historyKeeperKeys[channel];
|
||||
};
|
||||
// Check if the selected channel is expired
|
||||
// If it is, remove it from memory and broadcast a message to its members
|
||||
const checkExpired = function (ctx, channel) {
|
||||
if (channel && channel.length === STANDARD_CHANNEL_LENGTH && historyKeeperKeys[channel] &&
|
||||
historyKeeperKeys[channel].expire && historyKeeperKeys[channel].expire < +new Date()) {
|
||||
store.closeChannel(channel, function () {
|
||||
historyKeeperBroadcast(ctx, channel, {
|
||||
error: 'EEXPIRED',
|
||||
channel: channel
|
||||
});
|
||||
});
|
||||
delete ctx.channels[channel];
|
||||
delete historyKeeperKeys[channel];
|
||||
return true;
|
||||
}
|
||||
return;
|
||||
};
|
||||
|
||||
const onDirectMessage = function (ctx, seq, user, json) {
|
||||
let parsed;
|
||||
let channelName;
|
||||
try {
|
||||
parsed = JSON.parse(json[2]);
|
||||
} catch (err) {
|
||||
console.error("handleMessage(JSON.parse)", err); // TODO logging
|
||||
return;
|
||||
}
|
||||
|
||||
// If the requested history is for an expired channel, abort
|
||||
// Note the if we don't have the keys for that channel in historyKeeperKeys, we'll
|
||||
// have to abort later (once we know the expiration time)
|
||||
if (checkExpired(ctx, parsed[1])) { return; }
|
||||
|
||||
if (parsed[0] === 'GET_HISTORY') {
|
||||
// parsed[1] is the channel id
|
||||
// parsed[2] is a validation key or an object containing metadata (optionnal)
|
||||
// parsed[3] is the last known hash (optionnal)
|
||||
sendMsg(ctx, user, [seq, 'ACK']);
|
||||
channelName = parsed[1];
|
||||
var validateKey = parsed[2];
|
||||
var lastKnownHash = parsed[3];
|
||||
var owners;
|
||||
var expire;
|
||||
if (parsed[2] && typeof parsed[2] === "object") {
|
||||
validateKey = parsed[2].validateKey;
|
||||
lastKnownHash = parsed[2].lastKnownHash;
|
||||
owners = parsed[2].owners;
|
||||
if (parsed[2].expire) {
|
||||
expire = +parsed[2].expire * 1000 + (+new Date());
|
||||
}
|
||||
}
|
||||
|
||||
nThen(function (waitFor) {
|
||||
if (!tasks) { return; } // tasks are not supported
|
||||
if (typeof(expire) !== 'number' || !expire) { return; }
|
||||
|
||||
// the fun part...
|
||||
// the user has said they want this pad to expire at some point
|
||||
tasks.write(expire, "EXPIRE", [ channelName ], waitFor(function (err) {
|
||||
if (err) {
|
||||
// if there is an error, we don't want to crash the whole server...
|
||||
// just log it, and if there's a problem you'll be able to fix it
|
||||
// at a later date with the provided information
|
||||
console.error('Failed to write expiration to disk:', err); // TODO logging
|
||||
console.error([expire, 'EXPIRE', channelName]); // TODO logging
|
||||
}
|
||||
}));
|
||||
}).nThen(function (waitFor) {
|
||||
var w = waitFor();
|
||||
|
||||
/* unless this is a young channel, we will serve all messages from an offset
|
||||
this will not include the channel metadata, so we need to explicitly fetch that.
|
||||
unfortunately, we can't just serve it blindly, since then young channels will
|
||||
send the metadata twice, so let's do a quick check of what we're going to serve...
|
||||
*/
|
||||
getIndex(ctx, channelName, waitFor((err, index) => {
|
||||
/* if there's an error here, it should be encountered
|
||||
and handled by the next nThen block.
|
||||
so, let's just fall through...
|
||||
*/
|
||||
if (err) { return w(); }
|
||||
if (!index || !index.metadata) { return void w(); }
|
||||
// Store the metadata if we don't have it in memory
|
||||
if (!historyKeeperKeys[channelName]) {
|
||||
historyKeeperKeys[channelName] = index.metadata;
|
||||
}
|
||||
// And then check if the channel is expired. If it is, send the error and abort
|
||||
if (checkExpired(ctx, channelName)) { return void waitFor.abort(); }
|
||||
// Send the metadata to the user
|
||||
if (!lastKnownHash && index.cpIndex.length > 1) {
|
||||
sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify(index.metadata)], w);
|
||||
return;
|
||||
}
|
||||
w();
|
||||
}));
|
||||
}).nThen(() => {
|
||||
let msgCount = 0;
|
||||
let expired = false;
|
||||
getHistoryAsync(ctx, channelName, lastKnownHash, false, (msg, cb) => {
|
||||
if (!msg) { return; }
|
||||
if (msg.validateKey) {
|
||||
// If it is a young channel, this is the part where we get the metadata
|
||||
// Check if the channel is expired and abort if it is.
|
||||
if (!historyKeeperKeys[channelName]) { historyKeeperKeys[channelName] = msg; }
|
||||
expired = checkExpired(ctx, channelName);
|
||||
}
|
||||
if (expired) { return void cb(); }
|
||||
msgCount++;
|
||||
|
||||
sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify(msg)], cb);
|
||||
}, (err) => {
|
||||
// If the pad is expired, stop here, we've already sent the error message
|
||||
if (expired) { return; }
|
||||
|
||||
if (err && err.code !== 'ENOENT') {
|
||||
if (err.message !== 'EINVAL') { console.error("GET_HISTORY", err); }
|
||||
const parsedMsg = {error:err.message, channel: channelName};
|
||||
sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify(parsedMsg)]);
|
||||
return;
|
||||
}
|
||||
|
||||
// If this is a new channel, we need to store the metadata as
|
||||
// the first message in the file
|
||||
const chan = ctx.channels[channelName];
|
||||
if (msgCount === 0 && !historyKeeperKeys[channelName] && chan && chan.indexOf(user) > -1) {
|
||||
var key = {};
|
||||
key.channel = channelName;
|
||||
if (validateKey) {
|
||||
key.validateKey = validateKey;
|
||||
}
|
||||
if (owners) {
|
||||
key.owners = owners;
|
||||
}
|
||||
if (expire) {
|
||||
key.expire = expire;
|
||||
}
|
||||
historyKeeperKeys[channelName] = key;
|
||||
storeMessage(ctx, chan, JSON.stringify(key), false, undefined);
|
||||
sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify(key)]);
|
||||
}
|
||||
|
||||
// End of history message:
|
||||
let parsedMsg = {state: 1, channel: channelName};
|
||||
sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify(parsedMsg)]);
|
||||
});
|
||||
});
|
||||
} else if (parsed[0] === 'GET_HISTORY_RANGE') {
|
||||
channelName = parsed[1];
|
||||
var map = parsed[2];
|
||||
if (!(map && typeof(map) === 'object')) {
|
||||
return void sendMsg(ctx, user, [seq, 'ERROR', 'INVALID_ARGS', obj]);
|
||||
}
|
||||
|
||||
var oldestKnownHash = map.from;
|
||||
var desiredMessages = map.count;
|
||||
var desiredCheckpoint = map.cpCount;
|
||||
var txid = map.txid;
|
||||
if (typeof(desiredMessages) !== 'number' && typeof(desiredCheckpoint) !== 'number') {
|
||||
return void sendMsg(ctx, user, [seq, 'ERROR', 'UNSPECIFIED_COUNT', obj]);
|
||||
}
|
||||
|
||||
if (!txid) {
|
||||
return void sendMsg(ctx, user, [seq, 'ERROR', 'NO_TXID', obj]);
|
||||
}
|
||||
|
||||
sendMsg(ctx, user, [seq, 'ACK']);
|
||||
return void getOlderHistory(channelName, oldestKnownHash, function (messages) {
|
||||
var toSend = [];
|
||||
if (typeof (desiredMessages) === "number") {
|
||||
toSend = messages.slice(-desiredMessages);
|
||||
} else {
|
||||
let cpCount = 0;
|
||||
for (var i = messages.length - 1; i >= 0; i--) {
|
||||
if (/^cp\|/.test(messages[i][4]) && i !== (messages.length - 1)) {
|
||||
cpCount++;
|
||||
}
|
||||
toSend.unshift(messages[i]);
|
||||
if (cpCount >= desiredCheckpoint) { break; }
|
||||
}
|
||||
}
|
||||
toSend.forEach(function (msg) {
|
||||
sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id,
|
||||
JSON.stringify(['HISTORY_RANGE', txid, msg])]);
|
||||
});
|
||||
|
||||
sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id,
|
||||
JSON.stringify(['HISTORY_RANGE_END', txid, channelName])
|
||||
]);
|
||||
});
|
||||
} else if (parsed[0] === 'GET_FULL_HISTORY') {
|
||||
// parsed[1] is the channel id
|
||||
// parsed[2] is a validation key (optionnal)
|
||||
// parsed[3] is the last known hash (optionnal)
|
||||
sendMsg(ctx, user, [seq, 'ACK']);
|
||||
getHistoryAsync(ctx, parsed[1], -1, false, (msg, cb) => {
|
||||
if (!msg) { return; }
|
||||
sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify(['FULL_HISTORY', msg])], cb);
|
||||
}, (err) => {
|
||||
let parsedMsg = ['FULL_HISTORY_END', parsed[1]];
|
||||
if (err) {
|
||||
console.error(err.stack);
|
||||
parsedMsg = ['ERROR', parsed[1], err.message];
|
||||
}
|
||||
sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify(parsedMsg)]);
|
||||
});
|
||||
} else if (rpc) {
|
||||
/* RPC Calls... */
|
||||
var rpc_call = parsed.slice(1);
|
||||
|
||||
sendMsg(ctx, user, [seq, 'ACK']);
|
||||
try {
|
||||
// slice off the sequence number and pass in the rest of the message
|
||||
rpc(ctx, rpc_call, function (err, output) {
|
||||
if (err) {
|
||||
sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify([parsed[0], 'ERROR', err])]);
|
||||
return;
|
||||
}
|
||||
var msg = rpc_call[0].slice();
|
||||
if (msg[3] === 'REMOVE_OWNED_CHANNEL') {
|
||||
onChannelDeleted(ctx, msg[4]);
|
||||
}
|
||||
if (msg[3] === 'CLEAR_OWNED_CHANNEL') {
|
||||
onChannelCleared(ctx, msg[4]);
|
||||
}
|
||||
sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify([parsed[0]].concat(output))]);
|
||||
});
|
||||
} catch (e) {
|
||||
sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify([parsed[0], 'ERROR', 'SERVER_ERROR'])]);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
// TODO logging
|
||||
var cciLock = false;
|
||||
const checkChannelIntegrity = function (ctx) {
|
||||
if (process.env['CRYPTPAD_DEBUG'] && !cciLock) {
|
||||
let nt = nThen;
|
||||
cciLock = true;
|
||||
Object.keys(ctx.channels).forEach(function (channelName) {
|
||||
const chan = ctx.channels[channelName];
|
||||
if (!chan.index) { return; }
|
||||
nt = nt((waitFor) => {
|
||||
store.getChannelSize(channelName, waitFor((err, size) => {
|
||||
if (err) { return void console.log("Couldn't get size of channel", channelName); }
|
||||
if (size !== chan.index.size) {
|
||||
console.log("channel size mismatch for", channelName,
|
||||
"cached:", chan.index.size, "fileSize:", size);
|
||||
}
|
||||
}));
|
||||
}).nThen;
|
||||
});
|
||||
nt((waitFor) => { cciLock = false; });
|
||||
}
|
||||
};
|
||||
|
||||
return {
|
||||
id: HISTORY_KEEPER_ID,
|
||||
setConfig: setConfig,
|
||||
onChannelMessage: onChannelMessage,
|
||||
dropChannel: dropChannel,
|
||||
checkExpired: checkExpired,
|
||||
onDirectMessage: onDirectMessage,
|
||||
checkChannelIntegrity: checkChannelIntegrity
|
||||
};
|
||||
};
|
||||
|
||||
}());
|
Loading…
Reference in New Issue