|
|
@ -395,7 +395,7 @@ const storeMessage = function (Env, channel, msg, isCp, optionalMessageHash) {
|
|
|
|
* -1 if you didn't find it
|
|
|
|
* -1 if you didn't find it
|
|
|
|
|
|
|
|
|
|
|
|
*/
|
|
|
|
*/
|
|
|
|
const getHistoryOffset = (Env, channelName, lastKnownHash, _cb) => { // XXX
|
|
|
|
const getHistoryOffset = (Env, channelName, lastKnownHash, _cb) => {
|
|
|
|
const cb = Util.once(Util.mkAsync(_cb));
|
|
|
|
const cb = Util.once(Util.mkAsync(_cb));
|
|
|
|
|
|
|
|
|
|
|
|
// lastKnownhash === -1 means we want the complete history
|
|
|
|
// lastKnownhash === -1 means we want the complete history
|
|
|
@ -407,20 +407,9 @@ const getHistoryOffset = (Env, channelName, lastKnownHash, _cb) => { // XXX
|
|
|
|
|
|
|
|
|
|
|
|
// check if the "hash" the client is requesting exists in the index
|
|
|
|
// check if the "hash" the client is requesting exists in the index
|
|
|
|
const lkh = index.offsetByHash[lastKnownHash];
|
|
|
|
const lkh = index.offsetByHash[lastKnownHash];
|
|
|
|
// we evict old hashes from the index as new checkpoints are discovered.
|
|
|
|
|
|
|
|
// if someone connects and asks for a hash that is no longer relevant,
|
|
|
|
// fall through to the next block if the offset of the hash in question is not in memory
|
|
|
|
// we tell them it's an invalid request. This is because of the semantics of "GET_HISTORY"
|
|
|
|
if (lastKnownHash && typeof(lkh) !== "number") { return; }
|
|
|
|
// which is only ever used when connecting or reconnecting in typical uses of history...
|
|
|
|
|
|
|
|
// this assumption should hold for uses by chainpad, but perhaps not for other uses cases.
|
|
|
|
|
|
|
|
// EXCEPT: other cases don't use checkpoints!
|
|
|
|
|
|
|
|
// clients that are told that their request is invalid should just make another request
|
|
|
|
|
|
|
|
// without specifying the hash, and just trust the server to give them the relevant data.
|
|
|
|
|
|
|
|
// QUESTION: does this mean mailboxes are causing the server to store too much stuff in memory?
|
|
|
|
|
|
|
|
if (lastKnownHash && typeof(lkh) !== "number") {
|
|
|
|
|
|
|
|
return; // XXX fall through to find the offset since it isn't cached
|
|
|
|
|
|
|
|
//waitFor.abort();
|
|
|
|
|
|
|
|
//return void cb(new Error('EINVAL'));
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// Since last 2 checkpoints
|
|
|
|
// Since last 2 checkpoints
|
|
|
|
if (!lastKnownHash) {
|
|
|
|
if (!lastKnownHash) {
|
|
|
@ -442,13 +431,13 @@ const getHistoryOffset = (Env, channelName, lastKnownHash, _cb) => { // XXX
|
|
|
|
offset = lkh;
|
|
|
|
offset = lkh;
|
|
|
|
}));
|
|
|
|
}));
|
|
|
|
}).nThen((w) => {
|
|
|
|
}).nThen((w) => {
|
|
|
|
// if offset is less than zero then presumably the channel has no messages
|
|
|
|
// skip past this block if the offset is anything other than -1
|
|
|
|
// returning falls through to the next block and therefore returns -1
|
|
|
|
// this basically makes these first two nThen blocks behave like if-else
|
|
|
|
if (offset !== -1) { return; }
|
|
|
|
if (offset !== -1) { return; }
|
|
|
|
|
|
|
|
|
|
|
|
// do a lookup from the index
|
|
|
|
// either the message exists in history but is not in the cached index
|
|
|
|
// FIXME maybe we don't need this anymore?
|
|
|
|
// or it does not exist at all. In either case 'getHashOffset' is expected
|
|
|
|
// otherwise we have a non-negative offset and we can start to read from there
|
|
|
|
// to return a number: -1 if not present, positive interger otherwise
|
|
|
|
Env.getHashOffset(channelName, lastKnownHash, w(function (err, _offset) {
|
|
|
|
Env.getHashOffset(channelName, lastKnownHash, w(function (err, _offset) {
|
|
|
|
if (err) {
|
|
|
|
if (err) {
|
|
|
|
w.abort();
|
|
|
|
w.abort();
|
|
|
@ -475,7 +464,7 @@ const getHistoryAsync = (Env, channelName, lastKnownHash, beforeHash, handler, c
|
|
|
|
|
|
|
|
|
|
|
|
let offset = -1;
|
|
|
|
let offset = -1;
|
|
|
|
nThen((waitFor) => {
|
|
|
|
nThen((waitFor) => {
|
|
|
|
getHistoryOffset(Env, channelName, lastKnownHash, waitFor((err, os) => { // XXX
|
|
|
|
getHistoryOffset(Env, channelName, lastKnownHash, waitFor((err, os) => {
|
|
|
|
if (err) {
|
|
|
|
if (err) {
|
|
|
|
waitFor.abort();
|
|
|
|
waitFor.abort();
|
|
|
|
return void cb(err);
|
|
|
|
return void cb(err);
|
|
|
@ -485,8 +474,6 @@ const getHistoryAsync = (Env, channelName, lastKnownHash, beforeHash, handler, c
|
|
|
|
}).nThen((waitFor) => {
|
|
|
|
}).nThen((waitFor) => {
|
|
|
|
if (offset === -1) {
|
|
|
|
if (offset === -1) {
|
|
|
|
return void cb(new Error('EUNKNOWN'));
|
|
|
|
return void cb(new Error('EUNKNOWN'));
|
|
|
|
console.log(lastKnownHash);
|
|
|
|
|
|
|
|
return void cb(new Error("could not find offset")); // XXX EUNKNOWN
|
|
|
|
|
|
|
|
}
|
|
|
|
}
|
|
|
|
const start = (beforeHash) ? 0 : offset;
|
|
|
|
const start = (beforeHash) ? 0 : offset;
|
|
|
|
store.readMessagesBin(channelName, start, (msgObj, readMore, abort) => {
|
|
|
|
store.readMessagesBin(channelName, start, (msgObj, readMore, abort) => {
|
|
|
@ -635,7 +622,7 @@ const handleGetHistory = function (Env, Server, seq, userId, parsed) {
|
|
|
|
}, (err) => {
|
|
|
|
}, (err) => {
|
|
|
|
if (err && err.code !== 'ENOENT') {
|
|
|
|
if (err && err.code !== 'ENOENT') {
|
|
|
|
if (err.message !== 'EINVAL') { Log.error("HK_GET_HISTORY", err); }
|
|
|
|
if (err.message !== 'EINVAL') { Log.error("HK_GET_HISTORY", err); }
|
|
|
|
const parsedMsg = {error:err.message, channel: channelName, txid: txid}; // XXX
|
|
|
|
const parsedMsg = {error:err.message, channel: channelName, txid: txid};
|
|
|
|
Server.send(userId, [0, HISTORY_KEEPER_ID, 'MSG', userId, JSON.stringify(parsedMsg)]);
|
|
|
|
Server.send(userId, [0, HISTORY_KEEPER_ID, 'MSG', userId, JSON.stringify(parsedMsg)]);
|
|
|
|
return;
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
}
|
|
|
|