|
|
|
@ -7,6 +7,12 @@ var nThen = require("nthen");
|
|
|
|
|
const ToPull = require('stream-to-pull-stream');
|
|
|
|
|
const Pull = require('pull-stream');
|
|
|
|
|
|
|
|
|
|
const isValidChannelId = function (id) {
|
|
|
|
|
return typeof(id) === 'string' &&
|
|
|
|
|
[32, 48].indexOf(id.length) > -1 &&
|
|
|
|
|
/^[a-zA-Z0-9=+-]*$/.test(id);
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
var mkPath = function (env, channelId) {
|
|
|
|
|
return Path.join(env.root, channelId.slice(0, 2), channelId) + '.ndjson';
|
|
|
|
|
};
|
|
|
|
@ -161,7 +167,9 @@ const readMessagesBin = (env, id, start, msgHandler, cb) => {
|
|
|
|
|
mkBufferSplit(),
|
|
|
|
|
mkOffsetCounter(),
|
|
|
|
|
Pull.asyncMap((data, moreCb) => { msgHandler(data, moreCb, ()=>{ keepReading = false; moreCb(); }); }),
|
|
|
|
|
Pull.drain(()=>(keepReading), cb)
|
|
|
|
|
Pull.drain(() => (keepReading), (err) => {
|
|
|
|
|
cb((keepReading) ? err : undefined);
|
|
|
|
|
})
|
|
|
|
|
);
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
@ -414,35 +422,44 @@ module.exports.create = function (
|
|
|
|
|
}
|
|
|
|
|
cb({
|
|
|
|
|
readMessagesBin: (channelName, start, asyncMsgHandler, cb) => {
|
|
|
|
|
if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); }
|
|
|
|
|
readMessagesBin(env, channelName, start, asyncMsgHandler, cb);
|
|
|
|
|
},
|
|
|
|
|
message: function (channelName, content, cb) {
|
|
|
|
|
if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); }
|
|
|
|
|
message(env, channelName, content, cb);
|
|
|
|
|
},
|
|
|
|
|
messageBin: (channelName, content, cb) => {
|
|
|
|
|
if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); }
|
|
|
|
|
messageBin(env, channelName, content, cb);
|
|
|
|
|
},
|
|
|
|
|
getMessages: function (channelName, msgHandler, cb) {
|
|
|
|
|
if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); }
|
|
|
|
|
getMessages(env, channelName, msgHandler, cb);
|
|
|
|
|
},
|
|
|
|
|
removeChannel: function (channelName, cb) {
|
|
|
|
|
if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); }
|
|
|
|
|
removeChannel(env, channelName, function (err) {
|
|
|
|
|
cb(err);
|
|
|
|
|
});
|
|
|
|
|
},
|
|
|
|
|
closeChannel: function (channelName, cb) {
|
|
|
|
|
if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); }
|
|
|
|
|
closeChannel(env, channelName, cb);
|
|
|
|
|
},
|
|
|
|
|
flushUnusedChannels: function (cb) {
|
|
|
|
|
flushUnusedChannels(env, cb);
|
|
|
|
|
},
|
|
|
|
|
getChannelSize: function (chanName, cb) {
|
|
|
|
|
channelBytes(env, chanName, cb);
|
|
|
|
|
getChannelSize: function (channelName, cb) {
|
|
|
|
|
if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); }
|
|
|
|
|
channelBytes(env, channelName, cb);
|
|
|
|
|
},
|
|
|
|
|
getChannelMetadata: function (channelName, cb) {
|
|
|
|
|
if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); }
|
|
|
|
|
getChannelMetadata(env, channelName, cb);
|
|
|
|
|
},
|
|
|
|
|
clearChannel: function (channelName, cb) {
|
|
|
|
|
if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); }
|
|
|
|
|
clearChannel(env, channelName, cb);
|
|
|
|
|
},
|
|
|
|
|
});
|
|
|
|
|