|
|
@ -47,9 +47,49 @@ var checkPath = function (path, callback) {
|
|
|
|
});
|
|
|
|
});
|
|
|
|
};
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
var removeChannel = function (env, channelName, cb) {
|
|
|
|
|
|
|
|
var filename = Path.join(env.root, channelName.slice(0, 2), channelName + '.ndjson');
|
|
|
|
|
|
|
|
Fs.unlink(filename, cb);
|
|
|
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
var closeChannel = function (env, channelName, cb) {
|
|
|
|
|
|
|
|
if (!env.channels[channelName]) { return; }
|
|
|
|
|
|
|
|
try {
|
|
|
|
|
|
|
|
env.channels[channelName].writeStream.close();
|
|
|
|
|
|
|
|
delete env.channels[channelName];
|
|
|
|
|
|
|
|
env.openFiles--;
|
|
|
|
|
|
|
|
cb();
|
|
|
|
|
|
|
|
} catch (err) {
|
|
|
|
|
|
|
|
cb(err);
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
var flushUnusedChannels = function (env, cb, frame) {
|
|
|
|
|
|
|
|
var currentTime = +new Date();
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
var expiration = typeof(frame) === 'undefined'? env.channelExpirationMs: frame;
|
|
|
|
|
|
|
|
Object.keys(env.channels).forEach(function (chanId) {
|
|
|
|
|
|
|
|
var chan = env.channels[chanId];
|
|
|
|
|
|
|
|
if (typeof(chan.atime) !== 'number') { return; }
|
|
|
|
|
|
|
|
if (currentTime >= expiration + chan.atime) {
|
|
|
|
|
|
|
|
closeChannel(env, chanId, function (err) {
|
|
|
|
|
|
|
|
if (err) {
|
|
|
|
|
|
|
|
console.error(err);
|
|
|
|
|
|
|
|
return;
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
if (env.verbose) {
|
|
|
|
|
|
|
|
console.log("Closed channel [%s]", chanId);
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
});
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
});
|
|
|
|
|
|
|
|
cb();
|
|
|
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
|
|
var getChannel = function (env, id, callback) {
|
|
|
|
var getChannel = function (env, id, callback) {
|
|
|
|
if (env.channels[id]) {
|
|
|
|
if (env.channels[id]) {
|
|
|
|
var chan = env.channels[id];
|
|
|
|
var chan = env.channels[id];
|
|
|
|
|
|
|
|
chan.atime = +new Date();
|
|
|
|
if (chan.whenLoaded) {
|
|
|
|
if (chan.whenLoaded) {
|
|
|
|
chan.whenLoaded.push(callback);
|
|
|
|
chan.whenLoaded.push(callback);
|
|
|
|
} else {
|
|
|
|
} else {
|
|
|
@ -57,6 +97,19 @@ var getChannel = function (env, id, callback) {
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return;
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if (env.openFiles >= env.openFileLimit) {
|
|
|
|
|
|
|
|
// if you're running out of open files, asynchronously clean up expired files
|
|
|
|
|
|
|
|
// do it on a shorter timeframe, though (half of normal)
|
|
|
|
|
|
|
|
setTimeout(function () {
|
|
|
|
|
|
|
|
flushUnusedChannels(env, function () {
|
|
|
|
|
|
|
|
if (env.verbose) {
|
|
|
|
|
|
|
|
console.log("Approaching open file descriptor limit. Cleaning up");
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
}, env.channelExpirationMs / 2);
|
|
|
|
|
|
|
|
});
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
var channel = env.channels[id] = {
|
|
|
|
var channel = env.channels[id] = {
|
|
|
|
atime: +new Date(),
|
|
|
|
atime: +new Date(),
|
|
|
|
messages: [],
|
|
|
|
messages: [],
|
|
|
@ -100,8 +153,10 @@ var getChannel = function (env, id, callback) {
|
|
|
|
}).nThen(function (waitFor) {
|
|
|
|
}).nThen(function (waitFor) {
|
|
|
|
if (errorState) { return; }
|
|
|
|
if (errorState) { return; }
|
|
|
|
var stream = channel.writeStream = Fs.createWriteStream(path, { flags: 'a' });
|
|
|
|
var stream = channel.writeStream = Fs.createWriteStream(path, { flags: 'a' });
|
|
|
|
|
|
|
|
env.openFiles++;
|
|
|
|
stream.on('open', waitFor());
|
|
|
|
stream.on('open', waitFor());
|
|
|
|
stream.on('error', function (err) {
|
|
|
|
stream.on('error', function (err) {
|
|
|
|
|
|
|
|
env.openFiles--;
|
|
|
|
// this might be called after this nThen block closes.
|
|
|
|
// this might be called after this nThen block closes.
|
|
|
|
if (channel.whenLoaded) {
|
|
|
|
if (channel.whenLoaded) {
|
|
|
|
complete(err);
|
|
|
|
complete(err);
|
|
|
@ -161,15 +216,14 @@ var getMessages = function (env, chanName, handler, cb) {
|
|
|
|
});
|
|
|
|
});
|
|
|
|
};
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
|
|
var removeChannel = function (env, channelName, cb) {
|
|
|
|
|
|
|
|
var filename = Path.join(env.root, channelName.slice(0, 2), channelName + '.ndjson');
|
|
|
|
|
|
|
|
Fs.unlink(filename, cb);
|
|
|
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
module.exports.create = function (conf, cb) {
|
|
|
|
module.exports.create = function (conf, cb) {
|
|
|
|
var env = {
|
|
|
|
var env = {
|
|
|
|
root: conf.filePath || './datastore',
|
|
|
|
root: conf.filePath || './datastore',
|
|
|
|
channels: { },
|
|
|
|
channels: { },
|
|
|
|
|
|
|
|
channelExpirationMs: conf.channelExpirationMs || 30000,
|
|
|
|
|
|
|
|
verbose: conf.verbose,
|
|
|
|
|
|
|
|
openFiles: 0,
|
|
|
|
|
|
|
|
openFileLimit: conf.openFileLimit || 2048,
|
|
|
|
};
|
|
|
|
};
|
|
|
|
Fs.mkdir(env.root, function (err) {
|
|
|
|
Fs.mkdir(env.root, function (err) {
|
|
|
|
if (err && err.code !== 'EEXIST') {
|
|
|
|
if (err && err.code !== 'EEXIST') {
|
|
|
@ -188,6 +242,15 @@ module.exports.create = function (conf, cb) {
|
|
|
|
cb(err);
|
|
|
|
cb(err);
|
|
|
|
});
|
|
|
|
});
|
|
|
|
},
|
|
|
|
},
|
|
|
|
|
|
|
|
closeChannel: function (channelName, cb) {
|
|
|
|
|
|
|
|
closeChannel(env, channelName, cb);
|
|
|
|
|
|
|
|
},
|
|
|
|
|
|
|
|
flushUnusedChannels: function (cb) {
|
|
|
|
|
|
|
|
flushUnusedChannels(env, cb);
|
|
|
|
|
|
|
|
},
|
|
|
|
});
|
|
|
|
});
|
|
|
|
});
|
|
|
|
});
|
|
|
|
|
|
|
|
setInterval(function () {
|
|
|
|
|
|
|
|
flushUnusedChannels(env, function () { });
|
|
|
|
|
|
|
|
}, 5000);
|
|
|
|
};
|
|
|
|
};
|
|
|
|