|
|
@ -124,6 +124,7 @@ const mkBufferSplit = () => {
|
|
|
|
return (abort, cb) => {
|
|
|
|
return (abort, cb) => {
|
|
|
|
read(abort, function (end, data) {
|
|
|
|
read(abort, function (end, data) {
|
|
|
|
if (end) {
|
|
|
|
if (end) {
|
|
|
|
|
|
|
|
if (data) { console.log("mkBufferSplit() Data at the end"); }
|
|
|
|
cb(end, remainder ? [remainder, data] : [data]);
|
|
|
|
cb(end, remainder ? [remainder, data] : [data]);
|
|
|
|
remainder = null;
|
|
|
|
remainder = null;
|
|
|
|
return;
|
|
|
|
return;
|
|
|
@ -166,7 +167,9 @@ const readMessagesBin = (env, id, start, msgHandler, cb) => {
|
|
|
|
ToPull.read(stream),
|
|
|
|
ToPull.read(stream),
|
|
|
|
mkBufferSplit(),
|
|
|
|
mkBufferSplit(),
|
|
|
|
mkOffsetCounter(),
|
|
|
|
mkOffsetCounter(),
|
|
|
|
Pull.asyncMap((data, moreCb) => { msgHandler(data, moreCb, ()=>{ keepReading = false; moreCb(); }); }),
|
|
|
|
Pull.asyncMap((data, moreCb) => {
|
|
|
|
|
|
|
|
msgHandler(data, moreCb, () => { keepReading = false; moreCb(); });
|
|
|
|
|
|
|
|
}),
|
|
|
|
Pull.drain(() => (keepReading), (err) => {
|
|
|
|
Pull.drain(() => (keepReading), (err) => {
|
|
|
|
cb((keepReading) ? err : undefined);
|
|
|
|
cb((keepReading) ? err : undefined);
|
|
|
|
})
|
|
|
|
})
|
|
|
|