From ab868237a0b94492dce011eba686e23a68dd32be Mon Sep 17 00:00:00 2001 From: ansuz Date: Wed, 11 Sep 2019 11:00:01 +0200 Subject: [PATCH] replace ad hoc line stream with readline module --- lib/metadata.js | 1 + storage/file.js | 99 ++++++++++++++++++++++++++++--------------------- 2 files changed, 58 insertions(+), 42 deletions(-) diff --git a/lib/metadata.js b/lib/metadata.js index 73f82b6ea..6231ea224 100644 --- a/lib/metadata.js +++ b/lib/metadata.js @@ -205,6 +205,7 @@ Meta.createLineHandler = function (ref, errorHandler) { line: JSON.stringify(line), }); } + if (typeof(line) === 'undefined') { return; } if (Array.isArray(line)) { try { diff --git a/storage/file.js b/storage/file.js index 52b1d38e9..b67132268 100644 --- a/storage/file.js +++ b/storage/file.js @@ -6,7 +6,8 @@ var Fse = require("fs-extra"); var Path = require("path"); var nThen = require("nthen"); var Semaphore = require("saferphore"); -var Once = require("../lib/once"); +var Util = require("../lib/common-util"); +const Readline = require("readline"); const ToPull = require('stream-to-pull-stream'); const Pull = require('pull-stream'); @@ -52,37 +53,54 @@ var channelExists = function (filepath, cb) { }; // reads classic metadata from a channel log and aborts -var getMetadataAtPath = function (Env, path, cb) { - var remainder = ''; - var stream = Fs.createReadStream(path, { encoding: 'utf8' }); - var complete = function (err, data) { - var _cb = cb; - cb = undefined; - if (_cb) { _cb(err, data); } - }; - stream.on('data', function (chunk) { - if (!/\n/.test(chunk)) { - remainder += chunk; - return; - } - stream.close(); - var metadata = chunk.split('\n')[0]; +// returns undefined if the first message was not an object (not an array) +var getMetadataAtPath = function (Env, path, _cb) { + var stream; - var parsed = null; + // cb implicitly destroys the stream, if it exists + // and calls back asynchronously no more than once + var cb = Util.once(Util.both(function () { try { - parsed = JSON.parse(metadata); - complete(undefined, parsed); - } - catch (e) { - console.log("getMetadataAtPath"); - console.error(e); - complete('INVALID_METADATA', metadata); + stream.destroy(); + } catch (err) { + return err; } + }, Util.mkAsync(_cb))); + + try { + // stream creation can throw... probably ENOENT + stream = Fs.createReadStream(path, { encoding: 'utf8' }); + } catch (err) { + return void cb(err); + } + + // stream lines + const rl = Readline.createInterface({ + input: stream, }); - stream.on('end', function () { - complete(); - }); - stream.on('error', function (e) { complete(e); }); + + var i = 0; + rl + .on('line', function (line) { + // metadata should always be on the first line or not exist in the channel at all + if (i++ > 0) { return void cb(); } + var metadata; + try { + metadata = JSON.parse(line); + // if it parses, is a truthy object, and is not an array + // then it's what you were looking for + if (metadata && typeof(metadata) === 'object' && !Array.isArray(metadata)) { + return void cb(void 0, metadata); + } else { // it parsed, but isn't metadata + return void cb(); // call back without an error or metadata + } + } catch (err) { + // if you can't parse, that's bad + return void cb("INVALID_METADATA"); + } + }) + .on('end', cb) + .on('error', cb); }; var closeChannel = function (env, channelName, cb) { @@ -98,18 +116,14 @@ var closeChannel = function (env, channelName, cb) { }; // truncates a file to the end of its metadata line -var clearChannel = function (env, channelId, cb) { +// TODO write the metadata in a dedicated file +var clearChannel = function (env, channelId, _cb) { + var cb = Util.once(Util.mkAsync(_cb)); + var path = mkPath(env, channelId); getMetadataAtPath(env, path, function (e, metadata) { if (e) { return cb(new Error(e)); } - if (!metadata) { - return void Fs.truncate(path, 0, function (err) { - if (err) { - return cb(err); - } - cb(void 0); - }); - } + if (!metadata) { return void Fs.truncate(path, 0, cb); } var len = JSON.stringify(metadata).length + 1; @@ -214,7 +228,7 @@ How to proceed // 'INVALID_METADATA' if it can't parse // stream errors if anything goes wrong at a lower level // ENOENT (no channel here) - return void handler(err); + return void handler(err, data); } // disregard anything that isn't a map if (!data || typeof(data) !== 'object' || Array.isArray(data)) { return; } @@ -347,7 +361,7 @@ var removeChannel = function (env, channelName, cb) { var channelPath = mkPath(env, channelName); var metadataPath = mkMetadataPath(env, channelName); - var CB = Once(cb); + var CB = Util.once(cb); var errors = 0; nThen(function (w) { @@ -387,7 +401,7 @@ var removeArchivedChannel = function (env, channelName, cb) { var channelPath = mkArchivePath(env, channelName); var metadataPath = mkArchiveMetadataPath(env, channelName); - var CB = Once(cb); + var CB = Util.once(cb); nThen(function (w) { Fs.unlink(channelPath, w(function (err) { @@ -602,7 +616,7 @@ var unarchiveChannel = function (env, channelName, cb) { var metadataPath = mkMetadataPath(env, channelName); // don't call the callback multiple times - var CB = Once(cb); + var CB = Util.once(cb); // if a file exists in the unarchived path, you probably don't want to clobber its data // so unlike 'archiveChannel' we won't overwrite. @@ -690,7 +704,7 @@ var channelBytes = function (env, chanName, cb) { var channelPath = mkPath(env, chanName); var dataPath = mkMetadataPath(env, chanName); - var CB = Once(cb); + var CB = Util.once(cb); var channelSize = 0; var dataSize = 0; @@ -820,6 +834,7 @@ const messageBin = (env, chanName, msgBin, cb) => { chan.onError.push(complete); chan.writeStream.write(msgBin, function () { /*::if (!chan) { throw new Error("Flow unreachable"); }*/ + // TODO replace ad hoc queuing with WriteQueue chan.onError.splice(chan.onError.indexOf(complete), 1); chan.atime = +new Date(); if (!cb) { return; }