@ -9,6 +9,8 @@ const Meta = require("./metadata");
const WriteQueue = require ( "./write-queue" ) ;
const BatchRead = require ( "./batch-read" ) ;
const RPC = require ( "./rpc" ) ;
const Extras = require ( "./hk-util.js" ) ;
const STANDARD _CHANNEL _LENGTH = Extras . STANDARD _CHANNEL _LENGTH ;
const EPHEMERAL _CHANNEL _LENGTH = Extras . EPHEMERAL _CHANNEL _LENGTH ;
@ -66,8 +68,8 @@ const isValidValidateKeyString = function (key) {
var CHECKPOINT _PATTERN = /^cp\|(([A-Za-z0-9+\/=]+)\|)?/ ;
module . exports . create = function ( cfg ) {
const rpc = cfg . rpc ;
module . exports . create = function ( cfg , cb ) {
var rpc ;
const tasks = cfg . tasks ;
const store = cfg . store ;
Log = cfg . log ;
@ -75,6 +77,7 @@ module.exports.create = function (cfg) {
Log . silly ( 'HK_LOADING' , 'LOADING HISTORY_KEEPER MODULE' ) ;
const metadata _cache = { } ;
const channel _cache = { } ;
const HISTORY _KEEPER _ID = Crypto . randomBytes ( 8 ) . toString ( 'hex' ) ;
Log . verbose ( 'HK_ID' , 'History keeper ID: ' + HISTORY _KEEPER _ID ) ;
@ -211,8 +214,9 @@ module.exports.create = function (cfg) {
if the channel exists but its index does not then it caches the index
* /
const batchIndexReads = BatchRead ( "HK_GET_INDEX" ) ;
const getIndex = ( ctx , channelName , cb ) => {
const chan = ctx . channels [ channelName ] ;
const getIndex = ( channelName , cb ) => {
const chan = channel _cache [ channelName ] ;
// if there is a channel in memory and it has an index cached, return it
if ( chan && chan . index ) {
// enforce async behaviour
@ -233,15 +237,7 @@ module.exports.create = function (cfg) {
} ) ;
} ;
/ * : :
type cp _index _item = {
offset : number ,
line : number
}
* /
/ * s t o r e M e s s a g e
* ctx
* channel id
* the message to store
* whether the message is a checkpoint
@ -260,7 +256,7 @@ module.exports.create = function (cfg) {
* /
const queueStorage = WriteQueue ( ) ;
const storeMessage = function ( c tx, c hannel, msg , isCp , optionalMessageHash ) {
const storeMessage = function ( c hannel, msg , isCp , optionalMessageHash ) {
const id = channel . id ;
queueStorage ( id , function ( next ) {
@ -284,7 +280,7 @@ module.exports.create = function (cfg) {
}
} ) ) ;
} ) . nThen ( ( waitFor ) => {
getIndex ( ctx, id, waitFor ( ( err , index ) => {
getIndex ( id, waitFor ( ( err , index ) => {
if ( err ) {
Log . warn ( "HK_STORE_MESSAGE_INDEX" , err . stack ) ;
// non-critical, we'll be able to get the channel index later
@ -298,10 +294,10 @@ module.exports.create = function (cfg) {
delete index . offsetByHash [ k ] ;
}
}
index . cpIndex . push ( ( {
index . cpIndex . push ( {
offset : index . size ,
line : ( ( index . line || 0 ) + 1 )
} /*:cp_index_item*/ ) ) ;
} ) ;
}
if ( optionalMessageHash ) { index . offsetByHash [ optionalMessageHash ] = index . size ; }
index . size += msgBin . length ;
@ -313,21 +309,10 @@ module.exports.create = function (cfg) {
} ) ;
} ;
/ * h i s t o r y K e e p e r B r o a d c a s t
* uses API from the netflux server to send messages to every member of a channel
* sendMsg runs in a try - catch and drops users if sending a message fails
* /
const historyKeeperBroadcast = function ( ctx , channel , msg ) {
let chan = ctx . channels [ channel ] || ( ( [ ] /*:any*/ ) /*:Chan_t*/ ) ;
chan . forEach ( function ( user ) {
ctx . sendMsg ( ctx , user , [ 0 , HISTORY _KEEPER _ID , 'MSG' , user . id , JSON . stringify ( msg ) ] ) ;
} ) ;
} ;
/ * e x p i r e C h a n n e l i s h e r e t o c l e a n u p c h a n n e l s t h a t s h o u l d h a v e b e e n r e m o v e d
but for some reason are still present
* /
const expireChannel = function ( c tx, c hannel) {
const expireChannel = function ( channel ) {
return void store . archiveChannel ( channel , function ( err ) {
Log . info ( "ARCHIVAL_CHANNEL_BY_HISTORY_KEEPER_EXPIRATION" , {
channelId : channel ,
@ -336,6 +321,14 @@ module.exports.create = function (cfg) {
} ) ;
} ;
/ * d r o p C h a n n e l
* cleans up memory structures which are managed entirely by the historyKeeper
* /
const dropChannel = function ( chanName ) {
delete metadata _cache [ chanName ] ;
delete channel _cache [ chanName ] ;
} ;
/ * c h e c k E x p i r e d
* synchronously returns true or undefined to indicate whether the channel is expired
* according to its metadata
@ -347,7 +340,7 @@ module.exports.create = function (cfg) {
FIXME the boolean nature of this API should be separated from its side effects
* /
const checkExpired = function ( ctx , channel ) {
const checkExpired = function ( Server , channel ) {
if ( ! ( channel && channel . length === STANDARD _CHANNEL _LENGTH ) ) { return false ; }
let metadata = metadata _cache [ channel ] ;
if ( ! ( metadata && typeof ( metadata . expire ) === 'number' ) ) { return false ; }
@ -362,18 +355,16 @@ module.exports.create = function (cfg) {
// there may have been a problem with scheduling tasks
// or the scheduled tasks may not be running
// so trigger a removal from here
if ( pastDue >= ONE _DAY ) { expireChannel ( c tx, c hannel) ; }
if ( pastDue >= ONE _DAY ) { expireChannel ( c hannel) ; }
// close the channel
store . closeChannel ( channel , function ( ) {
historyKeeperBroadcast ( ctx , channel , {
// XXX make sure that clients actually disconnect when we broadcast an error
Server . channelBroadcast ( channel , {
error : 'EEXPIRED' ,
channel : channel
} ) ;
// remove it from any caches after you've told anyone in the channel
// that it has expired
delete ctx . channels [ channel ] ;
delete metadata _cache [ channel ] ;
} , HISTORY _KEEPER _ID ) ;
dropChannel ( channel ) ;
} ) ;
// return true to indicate that it has expired
@ -391,7 +382,7 @@ module.exports.create = function (cfg) {
* adds timestamps to incoming messages
* writes messages to the store
* /
const onChannelMessage = function ( ctx , channel , msgStruct ) {
const onChannelMessage = function ( Server , channel , msgStruct ) {
// TODO our usage of 'channel' here looks prone to errors
// we only use it for its 'id', but it can contain other stuff
// also, we're using this RPC from both the RPC and Netflux-server
@ -414,7 +405,7 @@ module.exports.create = function (cfg) {
let metadata ;
nThen ( function ( w ) {
// getIndex (and therefore the latest metadata)
getIndex ( c tx, c hannel. id , w ( function ( err , index ) {
getIndex ( c hannel. id , w ( function ( err , index ) {
if ( err ) {
w . abort ( ) ;
return void Log . error ( 'CHANNEL_MESSAGE_ERROR' , err ) ;
@ -429,7 +420,7 @@ module.exports.create = function (cfg) {
metadata = index . metadata ;
// don't write messages to expired channels
if ( checkExpired ( ctx , channel ) ) { return void w . abort ( ) ; }
if ( checkExpired ( Server , channel ) ) { return void w . abort ( ) ; }
// if there's no validateKey present skip to the next block
if ( ! metadata . validateKey ) { return ; }
@ -479,20 +470,10 @@ module.exports.create = function (cfg) {
msgStruct . push ( now ( ) ) ;
// storeMessage
storeMessage ( c tx, c hannel, JSON . stringify ( msgStruct ) , isCp , getHash ( msgStruct [ 4 ] , Log ) ) ;
storeMessage ( c hannel, JSON . stringify ( msgStruct ) , isCp , getHash ( msgStruct [ 4 ] , Log ) ) ;
} ) ;
} ;
/ * d r o p C h a n n e l
* exported as API
* used by chainpad - server / NetfluxWebsocketSrv . js
* cleans up memory structures which are managed entirely by the historyKeeper
* the netflux server manages other memory in ctx . channels
* /
const dropChannel = function ( chanName ) {
delete metadata _cache [ chanName ] ;
} ;
/ * g e t H i s t o r y O f f s e t
returns a number representing the byte offset from the start of the log
for whatever history you ' re seeking .
@ -522,12 +503,12 @@ module.exports.create = function (cfg) {
* - 1 if you didn ' t find it
* /
const getHistoryOffset = ( c tx, c hannelName, lastKnownHash , cb /*:(e:?Error, os:?number)=>void*/ ) => {
const getHistoryOffset = ( c hannelName, lastKnownHash , cb ) => {
// lastKnownhash === -1 means we want the complete history
if ( lastKnownHash === - 1 ) { return void cb ( null , 0 ) ; }
let offset = - 1 ;
nThen ( ( waitFor ) => {
getIndex ( c tx, c hannelName, waitFor ( ( err , index ) => {
getIndex ( c hannelName, waitFor ( ( err , index ) => {
if ( err ) { waitFor . abort ( ) ; return void cb ( err ) ; }
// check if the "hash" the client is requesting exists in the index
@ -600,10 +581,10 @@ module.exports.create = function (cfg) {
* GET _HISTORY
* /
const getHistoryAsync = ( c tx, c hannelName, lastKnownHash , beforeHash , handler , cb ) => {
const getHistoryAsync = ( c hannelName, lastKnownHash , beforeHash , handler , cb ) => {
let offset = - 1 ;
nThen ( ( waitFor ) => {
getHistoryOffset ( c tx, c hannelName, lastKnownHash , waitFor ( ( err , os ) => {
getHistoryOffset ( c hannelName, lastKnownHash , waitFor ( ( err , os ) => {
if ( err ) {
waitFor . abort ( ) ;
return void cb ( err ) ;
@ -666,42 +647,50 @@ module.exports.create = function (cfg) {
/ * o n C h a n n e l C l e a r e d
* broadcasts to all clients in a channel if that channel is deleted
* /
const onChannelCleared = function ( ctx , channel ) {
historyKeeperBroadcast( ctx , channel , {
const onChannelCleared = function ( Server , channel ) {
Server. channelBroadcast ( channel , {
error : 'ECLEARED' ,
channel : channel
} );
} , HISTORY _KEEPER _ID );
} ;
// When a channel is removed from datastore, broadcast a message to all its connected users
const onChannelDeleted = function ( ctx , channel ) {
const onChannelDeleted = function ( Server , channel ) {
store . closeChannel ( channel , function ( ) {
historyKeeperBroadcast( ctx , channel , {
Server. channelBroadcast ( channel , {
error : 'EDELETED' ,
channel : channel
} );
} , HISTORY _KEEPER _ID );
} ) ;
delete ctx . channels [ channel ] ;
delete channel _cache [ channel ] ;
Server . clearChannel ( channel ) ;
delete metadata _cache [ channel ] ;
} ;
// Check if the selected channel is expired
// If it is, remove it from memory and broadcast a message to its members
const onChannelMetadataChanged = function ( ctx , channel , metadata ) {
if ( channel && metadata _cache [ channel ] && typeof ( metadata ) === "object" ) {
Log . silly ( 'SET_METADATA_CACHE' , 'Channel ' + channel + ', metadata: ' + JSON . stringify ( metadata ) ) ;
metadata _cache [ channel ] = metadata ;
if ( ctx . channels [ channel ] && ctx . channels [ channel ] . index ) {
ctx . channels [ channel ] . index . metadata = metadata ;
}
historyKeeperBroadcast ( ctx , channel , metadata ) ;
const onChannelMetadataChanged = function ( Server , channel , metadata ) {
if ( ! ( channel && metadata _cache [ channel ] && typeof ( metadata ) === "object" ) ) { return ; }
Log . silly ( 'SET_METADATA_CACHE' , {
channel : channel ,
metadata : JSON . stringify ( metadata ) ,
} ) ;
metadata _cache [ channel ] = metadata ;
if ( channel _cache [ channel ] && channel _cache [ channel ] . index ) {
channel _cache [ channel ] . index . metadata = metadata ;
}
Server . channelBroadcast ( channel , metadata , HISTORY _KEEPER _ID ) ;
} ;
const handleGetHistory = function ( ctx , seq , user , parsed ) {
const handleGetHistory = function ( Server , seq , user , parsed ) {
// parsed[1] is the channel id
// parsed[2] is a validation key or an object containing metadata (optionnal)
// parsed[3] is the last known hash (optionnal)
ctx . sendMsg ( ctx , user , [ seq , 'ACK' ] ) ;
Server . send ( user . id , [ seq , 'ACK' ] ) ;
var channelName = parsed [ 1 ] ;
var config = parsed [ 2 ] ;
var metadata = { } ;
@ -736,7 +725,7 @@ module.exports.create = function (cfg) {
unfortunately , we can ' t just serve it blindly , since then young channels will
send the metadata twice , so let 's do a quick check of what we' re going to serve ...
* /
getIndex ( c tx, c hannelName, waitFor ( ( err , index ) => {
getIndex ( c hannelName, waitFor ( ( err , index ) => {
/ * i f t h e r e ' s a n e r r o r h e r e , i t s h o u l d b e e n c o u n t e r e d
and handled by the next nThen block .
so , let ' s just fall through ...
@ -750,29 +739,29 @@ module.exports.create = function (cfg) {
if ( ! index || ! index . metadata ) { return void w ( ) ; }
// And then check if the channel is expired. If it is, send the error and abort
// FIXME this is hard to read because 'checkExpired' has side effects
if ( checkExpired ( ctx , channelName ) ) { return void waitFor . abort ( ) ; }
if ( checkExpired ( Server , channelName ) ) { return void waitFor . abort ( ) ; }
// always send metadata with GET_HISTORY requests
ctx. sendMsg ( ctx , user , [ 0 , HISTORY _KEEPER _ID , 'MSG' , user . id , JSON . stringify ( index . metadata ) ] , w ) ;
Server. send ( user . id , [ 0 , HISTORY _KEEPER _ID , 'MSG' , user . id , JSON . stringify ( index . metadata ) ] , w ) ;
} ) ) ;
} ) . nThen ( ( ) => {
let msgCount = 0 ;
// TODO compute lastKnownHash in a manner such that it will always skip past the metadata line?
getHistoryAsync ( c tx, c hannelName, lastKnownHash , false , ( msg , readMore ) => {
getHistoryAsync ( c hannelName, lastKnownHash , false , ( msg , readMore ) => {
if ( ! msg ) { return ; }
msgCount ++ ;
// avoid sending the metadata message a second time
if ( isMetadataMessage ( msg ) && metadata _cache [ channelName ] ) { return readMore ( ) ; }
ctx. sendMsg ( ctx , user , [ 0 , HISTORY _KEEPER _ID , 'MSG' , user . id , JSON . stringify ( msg ) ] , readMore ) ;
Server. send ( user . id , [ 0 , HISTORY _KEEPER _ID , 'MSG' , user . id , JSON . stringify ( msg ) ] , readMore ) ;
} , ( err ) => {
if ( err && err . code !== 'ENOENT' ) {
if ( err . message !== 'EINVAL' ) { Log . error ( "HK_GET_HISTORY" , err ) ; }
const parsedMsg = { error : err . message , channel : channelName } ;
ctx. sendMsg ( ctx , user , [ 0 , HISTORY _KEEPER _ID , 'MSG' , user . id , JSON . stringify ( parsedMsg ) ] ) ;
Server. send ( user . id , [ 0 , HISTORY _KEEPER _ID , 'MSG' , user . id , JSON . stringify ( parsedMsg ) ] ) ;
return ;
}
const chan = c tx. channels [ channelName ] ;
const chan = c hannel_cache [ channelName ] ;
if ( msgCount === 0 && ! metadata _cache [ channelName ] && chan && chan . indexOf ( user ) > - 1 ) {
metadata _cache [ channelName ] = metadata ;
@ -811,21 +800,22 @@ module.exports.create = function (cfg) {
}
} ) ;
}
ctx. sendMsg ( ctx , user , [ 0 , HISTORY _KEEPER _ID , 'MSG' , user . id , JSON . stringify ( metadata ) ] ) ;
Server. send ( user . id , [ 0 , HISTORY _KEEPER _ID , 'MSG' , user . id , JSON . stringify ( metadata ) ] ) ;
}
// End of history message:
let parsedMsg = { state : 1 , channel : channelName } ;
ctx . sendMsg ( ctx , user , [ 0 , HISTORY _KEEPER _ID , 'MSG' , user . id , JSON . stringify ( parsedMsg ) ] ) ;
Server . send ( user . id , [ 0 , HISTORY _KEEPER _ID , 'MSG' , user . id , JSON . stringify ( parsedMsg ) ] ) ;
} ) ;
} ) ;
} ;
const handleGetHistoryRange = function ( ctx , seq , user , parsed ) {
const handleGetHistoryRange = function ( Server , seq , user , parsed ) {
var channelName = parsed [ 1 ] ;
var map = parsed [ 2 ] ;
if ( ! ( map && typeof ( map ) === 'object' ) ) {
return void ctx. sendMsg ( ctx , user , [ seq , 'ERROR' , 'INVALID_ARGS' , HISTORY _KEEPER _ID ] ) ;
return void Server. send ( user . id , [ seq , 'ERROR' , 'INVALID_ARGS' , HISTORY _KEEPER _ID ] ) ;
}
var oldestKnownHash = map . from ;
@ -833,14 +823,14 @@ module.exports.create = function (cfg) {
var desiredCheckpoint = map . cpCount ;
var txid = map . txid ;
if ( typeof ( desiredMessages ) !== 'number' && typeof ( desiredCheckpoint ) !== 'number' ) {
return void ctx. sendMsg ( ctx , user , [ seq , 'ERROR' , 'UNSPECIFIED_COUNT' , HISTORY _KEEPER _ID ] ) ;
return void Server. send ( user . id , [ seq , 'ERROR' , 'UNSPECIFIED_COUNT' , HISTORY _KEEPER _ID ] ) ;
}
if ( ! txid ) {
return void ctx. sendMsg ( ctx , user , [ seq , 'ERROR' , 'NO_TXID' , HISTORY _KEEPER _ID ] ) ;
return void Server. send ( user . id , [ seq , 'ERROR' , 'NO_TXID' , HISTORY _KEEPER _ID ] ) ;
}
ctx. sendMsg ( ctx , user , [ seq , 'ACK' ] ) ;
Server. send ( user . id , [ seq , 'ACK' ] ) ;
return void getOlderHistory ( channelName , oldestKnownHash , function ( messages ) {
var toSend = [ ] ;
if ( typeof ( desiredMessages ) === "number" ) {
@ -856,64 +846,66 @@ module.exports.create = function (cfg) {
}
}
toSend . forEach ( function ( msg ) {
ctx. sendMsg ( ctx , user , [ 0 , HISTORY _KEEPER _ID , 'MSG' , user . id ,
Server. send ( user . id , [ 0 , HISTORY _KEEPER _ID , 'MSG' , user . id ,
JSON . stringify ( [ 'HISTORY_RANGE' , txid , msg ] ) ] ) ;
} ) ;
ctx. sendMsg ( ctx , user , [ 0 , HISTORY _KEEPER _ID , 'MSG' , user . id ,
Server. send ( user . id , [ 0 , HISTORY _KEEPER _ID , 'MSG' , user . id ,
JSON . stringify ( [ 'HISTORY_RANGE_END' , txid , channelName ] )
] ) ;
} ) ;
} ;
const handleGetFullHistory = function ( ctx , seq , user , parsed ) {
const handleGetFullHistory = function ( Server , seq , user , parsed ) {
// parsed[1] is the channel id
// parsed[2] is a validation key (optionnal)
// parsed[3] is the last known hash (optionnal)
ctx . sendMsg ( ctx , user , [ seq , 'ACK' ] ) ;
Server . send ( user . id , [ seq , 'ACK' ] ) ;
// FIXME should we send metadata here too?
// none of the clientside code which uses this API needs metadata, but it won't hurt to send it (2019-08-22)
return void getHistoryAsync ( ctx, parsed[ 1 ] , - 1 , false , ( msg , readMore ) => {
return void getHistoryAsync ( parsed[ 1 ] , - 1 , false , ( msg , readMore ) => {
if ( ! msg ) { return ; }
ctx. sendMsg ( ctx , user , [ 0 , HISTORY _KEEPER _ID , 'MSG' , user . id , JSON . stringify ( [ 'FULL_HISTORY' , msg ] ) ] , readMore ) ;
Server. send ( user . id , [ 0 , HISTORY _KEEPER _ID , 'MSG' , user . id , JSON . stringify ( [ 'FULL_HISTORY' , msg ] ) ] , readMore ) ;
} , ( err ) => {
let parsedMsg = [ 'FULL_HISTORY_END' , parsed [ 1 ] ] ;
if ( err ) {
Log . error ( 'HK_GET_FULL_HISTORY' , err . stack ) ;
parsedMsg = [ 'ERROR' , parsed [ 1 ] , err . message ] ;
}
ctx. sendMsg ( ctx , user , [ 0 , HISTORY _KEEPER _ID , 'MSG' , user . id , JSON . stringify ( parsedMsg ) ] ) ;
Server. send ( user . id , [ 0 , HISTORY _KEEPER _ID , 'MSG' , user . id , JSON . stringify ( parsedMsg ) ] ) ;
} ) ;
} ;
const handleRPC = function ( ctx , seq , user , parsed ) {
const handleRPC = function ( Server , seq , user , parsed ) {
if ( typeof ( rpc ) !== 'function' ) { return ; }
/* RPC Calls... */
var rpc _call = parsed . slice ( 1 ) ;
ctx . sendMsg ( ctx , user , [ seq , 'ACK' ] ) ;
// XXX ensure user is guaranteed to have 'id'
Server . send ( user . id , [ seq , 'ACK' ] ) ;
try {
// slice off the sequence number and pass in the rest of the message
rpc ( ctx , rpc _call , function ( err , output ) {
rpc ( Server , rpc _call , function ( err , output ) {
if ( err ) {
ctx. sendMsg ( ctx , user , [ 0 , HISTORY _KEEPER _ID , 'MSG' , user . id , JSON . stringify ( [ parsed [ 0 ] , 'ERROR' , err ] ) ] ) ;
Server. send ( user . id , [ 0 , HISTORY _KEEPER _ID , 'MSG' , user . id , JSON . stringify ( [ parsed [ 0 ] , 'ERROR' , err ] ) ] ) ;
return ;
}
var msg = rpc _call [ 0 ] . slice ( ) ;
if ( msg [ 3 ] === 'REMOVE_OWNED_CHANNEL' ) {
onChannelDeleted ( ctx , msg [ 4 ] ) ;
onChannelDeleted ( Server , msg [ 4 ] ) ;
}
if ( msg [ 3 ] === 'CLEAR_OWNED_CHANNEL' ) {
onChannelCleared ( ctx , msg [ 4 ] ) ;
onChannelCleared ( Server , msg [ 4 ] ) ;
}
if ( msg [ 3 ] === 'SET_METADATA' ) { // or whatever we call the RPC????
// make sure we update our cache of metadata
// or at least invalidate it and force other mechanisms to recompute its state
// 'output' could be the new state as computed by rpc
onChannelMetadataChanged ( ctx , msg [ 4 ] . channel , output [ 1 ] ) ;
onChannelMetadataChanged ( Server , msg [ 4 ] . channel , output [ 1 ] ) ;
}
// unauthenticated RPC calls have a different message format
@ -921,10 +913,10 @@ module.exports.create = function (cfg) {
// this is an inline reimplementation of historyKeeperBroadcast
// because if we use that directly it will bypass signature validation
// which opens up the user to malicious behaviour
let chan = c tx. channels [ output . channel ] ;
let chan = c hannel_cache [ output . channel ] ;
if ( chan && chan . length ) {
chan . forEach ( function ( user ) {
ctx. sendMsg ( ctx , user , output . message ) ;
Server. send ( user . id , output . message ) ;
//[0, null, 'MSG', user.id, JSON.stringify(output.message)]);
} ) ;
}
@ -934,10 +926,10 @@ module.exports.create = function (cfg) {
}
// finally, send a response to the client that sent the RPC
ctx. sendMsg ( ctx , user , [ 0 , HISTORY _KEEPER _ID , 'MSG' , user . id , JSON . stringify ( [ parsed [ 0 ] ] . concat ( output ) ) ] ) ;
Server. send ( user . id , [ 0 , HISTORY _KEEPER _ID , 'MSG' , user . id , JSON . stringify ( [ parsed [ 0 ] ] . concat ( output ) ) ] ) ;
} ) ;
} catch ( e ) {
ctx. sendMsg ( ctx , user , [ 0 , HISTORY _KEEPER _ID , 'MSG' , user . id , JSON . stringify ( [ parsed [ 0 ] , 'ERROR' , 'SERVER_ERROR' ] ) ] ) ;
Server. send ( user . id , [ 0 , HISTORY _KEEPER _ID , 'MSG' , user . id , JSON . stringify ( [ parsed [ 0 ] , 'ERROR' , 'SERVER_ERROR' ] ) ] ) ;
}
} ;
@ -953,7 +945,7 @@ module.exports.create = function (cfg) {
* check if it ' s expired and execute all the associated side - effects
* routes queries to the appropriate handlers
* /
const onDirectMessage = function ( ctx , seq , user , json ) {
const onDirectMessage = function ( Server , seq , user , json ) {
Log . silly ( 'HK_MESSAGE' , json ) ;
let parsed ;
@ -967,33 +959,49 @@ module.exports.create = function (cfg) {
// If the requested history is for an expired channel, abort
// Note the if we don't have the keys for that channel in metadata_cache, we'll
// have to abort later (once we know the expiration time)
if ( checkExpired ( ctx , parsed [ 1 ] ) ) { return ; }
if ( checkExpired ( Server , parsed [ 1 ] ) ) { return ; }
// look up the appropriate command in the map of commands or fall back to RPC
var command = directMessageCommands [ parsed [ 0 ] ] || handleRPC ;
// run the command with the standard function signature
command ( ctx , seq , user , parsed ) ;
command ( Server , seq , user , parsed ) ;
} ;
return {
cfg . historyKeeper = {
id : HISTORY _KEEPER _ID ,
channelMessage : function ( ctx , channel , msgStruct ) {
onChannelMessage ( ctx , channel , msgStruct ) ;
channelMessage : function ( Server , channel , msgStruct ) {
// netflux-server emits 'channelMessage' events whenever someone broadcasts to a channel
// historyKeeper stores these messages if the channel id indicates that they are
// a channel type with permanent history
onChannelMessage ( Server , channel , msgStruct ) ;
} ,
channelClose : function ( channelName ) {
// netflux-server emits 'channelClose' events whenever everyone leaves a channel
// we drop cached metadata and indexes at the same time
dropChannel ( channelName ) ;
} ,
channelOpen : function ( ctx , channelName , user ) {
ctx . sendMsg ( ctx , user , [
channelOpen : function ( Server , channelName , userId ) {
channel _cache [ channelName ] = { } ;
Server . send ( userId , [
0 ,
HISTORY _KEEPER _ID , // ctx.historyKeeper.id
HISTORY _KEEPER _ID ,
'JOIN' ,
channelName
] ) ;
} ,
directMessage : function ( ctx , seq , user , json ) {
onDirectMessage ( ctx , seq , user , json ) ;
directMessage : function ( Server , seq , user , json ) {
// netflux-server allows you to register an id with a handler
// this handler is invoked every time someone sends a message to that id
onDirectMessage ( Server , seq , user , json ) ;
} ,
} ;
RPC . create ( cfg , function ( err , _rpc ) {
if ( err ) { throw err ; }
rpc = _rpc ;
cb ( void 0 , cfg . historyKeeper ) ;
} ) ;
} ;