@ -27,7 +27,7 @@ const CHANNEL_WRITE_WINDOW = 300000;
them . The tradeoff with this timeout is that some functions , the stream , and
and the timeout itself are stored in memory . A longer timeout uses more memory
and running out of memory will also kill the server . * /
const STREAM _CLOSE _TIMEOUT = 30 0000;
const STREAM _CLOSE _TIMEOUT = 12 0000;
/ * T h e a b o v e t i m e o u t c l o s e s t h e s t r e a m , b u t a p p a r e n t l y t h a t d o e s n ' t a l w a y s w o r k .
We set yet another timeout to allow the runtime to gracefully close the stream
@ -83,15 +83,31 @@ var channelExists = function (filepath, cb) {
const destroyStream = function ( stream ) {
if ( ! stream ) { return ; }
try { stream . close ( ) ; } catch ( err ) { console . error ( err ) ; }
try {
stream . close ( ) ;
if ( stream . closed && stream . fd === null ) { return ; }
} catch ( err ) {
console . error ( err ) ;
}
setTimeout ( function ( ) {
try { stream . destroy ( ) ; } catch ( err ) { console . error ( err ) ; }
} , STREAM _DESTROY _TIMEOUT ) ;
} ;
/ *
accept a stream , an id ( used as a label ) and an optional number of milliseconds
return a function which ignores all arguments
and first tries to gracefully close a stream
then destroys it after a period if the close was not successful
if the function is not called within the specified number of milliseconds
then it will be called implicitly with an error to indicate
that it was run because it timed out
* /
const ensureStreamCloses = function ( stream , id , ms ) {
return Util . bake ( Util . mkTimeout ( Util . once ( function ( err ) {
destroyStream ( stream , id ) ;
destroyStream ( stream );
if ( err ) {
// this can only be a timeout error...
console . log ( "stream close error:" , err , id ) ;
@ -106,7 +122,7 @@ const ensureStreamCloses = function (stream, id, ms) {
// it also allows the handler to abort reading at any time
const readMessagesBin = ( env , id , start , msgHandler , cb ) => {
const stream = Fs . createReadStream ( mkPath ( env , id ) , { start : start } ) ;
const finish = ensureStreamCloses ( stream , id ) ;
const finish = ensureStreamCloses ( stream , '[readMessagesBin:' + id + ']' ) ;
return void readFileBin ( stream , msgHandler , function ( err ) {
cb ( err ) ;
finish ( ) ;
@ -117,7 +133,7 @@ const readMessagesBin = (env, id, start, msgHandler, cb) => {
// returns undefined if the first message was not an object (not an array)
var getMetadataAtPath = function ( Env , path , _cb ) {
const stream = Fs . createReadStream ( path , { start : 0 } ) ;
const finish = ensureStreamCloses ( stream , path ) ;
const finish = ensureStreamCloses ( stream , '[getMetadataAtPath:' + path + ']' ) ;
var cb = Util . once ( Util . mkAsync ( Util . both ( _cb , finish ) ) , function ( ) {
throw new Error ( "Multiple Callbacks" ) ;
} ) ;
@ -203,7 +219,7 @@ var clearChannel = function (env, channelId, _cb) {
* /
var readMessages = function ( path , msgHandler , _cb ) {
var stream = Fs . createReadStream ( path , { start : 0 } ) ;
const finish = ensureStreamCloses ( stream , path ) ;
const finish = ensureStreamCloses ( stream , '[readMessages:' + path + ']' ) ;
var cb = Util . once ( Util . mkAsync ( Util . both ( finish , _cb ) ) ) ;
return readFileBin ( stream , function ( msgObj , readMore ) {
@ -231,7 +247,7 @@ var getDedicatedMetadata = function (env, channelId, handler, _cb) {
var metadataPath = mkMetadataPath ( env , channelId ) ;
var stream = Fs . createReadStream ( metadataPath , { start : 0 } ) ;
const finish = ensureStreamCloses ( stream , metadataPath ) ;
const finish = ensureStreamCloses ( stream , '[getDedicatedMetadata:' + metadataPath + ']' ) ;
var cb = Util . both ( finish , _cb ) ;
readFileBin ( stream , function ( msgObj , readMore ) {