From 7647f7c68a9aabd13e357df57baf0c8e4c3d7da0 Mon Sep 17 00:00:00 2001 From: ansuz Date: Wed, 12 Jun 2019 16:05:18 +0200 Subject: [PATCH] implement four new storage APIs * listChannels * listArchivedChannels * archiveChannel * removeArchivedChannel --- storage/file.js | 141 ++++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 131 insertions(+), 10 deletions(-) diff --git a/storage/file.js b/storage/file.js index ba092a14e..84d95a266 100644 --- a/storage/file.js +++ b/storage/file.js @@ -5,6 +5,7 @@ var Fs = require("fs"); var Fse = require("fs-extra"); var Path = require("path"); var nThen = require("nthen"); +var Semaphore = require("saferphore"); const ToPull = require('stream-to-pull-stream'); const Pull = require('pull-stream'); @@ -14,10 +15,18 @@ const isValidChannelId = function (id) { /^[a-zA-Z0-9=+-]*$/.test(id); }; +// 511 -> octal 777 +// read, write, execute permissions flag +const PERMISSIVE = 511; + var mkPath = function (env, channelId) { return Path.join(env.root, channelId.slice(0, 2), channelId) + '.ndjson'; }; +var mkArchivePath = function (env, channelId) { + return Path.join(env.archiveRoot, channelId.slice(0, 2), channelId) + '.ndjson'; +}; + var getMetadataAtPath = function (Env, path, cb) { var remainder = ''; var stream = Fs.createReadStream(path, { encoding: 'utf8' }); @@ -68,7 +77,7 @@ var closeChannel = function (env, channelName, cb) { } }; -var clearChannel = function (env, channelId, cb) { // FIXME deletion +var clearChannel = function (env, channelId, cb) { var path = mkPath(env, channelId); getMetadataAtPath(env, path, function (e, metadata) { if (e) { return cb(new Error(e)); } @@ -189,8 +198,7 @@ var checkPath = function (path, callback) { callback(err); return; } - // 511 -> octal 777 - Fse.mkdirp(Path.dirname(path), 511, function (err) { + Fse.mkdirp(Path.dirname(path), PERMISSIVE, function (err) { if (err && err.code !== 'EEXIST') { callback(err); return; @@ -200,11 +208,99 @@ var checkPath = function (path, callback) { }); }; -var removeChannel = function (env, channelName, cb) { // FIXME deletion +var removeChannel = function (env, channelName, cb) { var filename = mkPath(env, channelName); Fs.unlink(filename, cb); }; +var removeArchivedChannel = function (env, channelName, cb) { + var filename = mkArchivePath(env, channelName); + Fs.unlink(filename, cb); +}; + +var listChannels = function (root, handler, cb) { + // do twenty things at a time + var sema = Semaphore.create(20); + + var dirList = []; + + nThen(function (w) { + // the root of your datastore contains nested directories... + Fs.readdir(root, w(function (err, list) { + if (err) { + w.abort(); + // TODO check if we normally return strings or errors + return void cb(err); + } + dirList = list; + })); + }).nThen(function (w) { + // search inside the nested directories + // stream it so you don't put unnecessary data in memory + var wait = w(); + dirList.forEach(function (dir) { + sema.take(function (give) { + var nestedDirPath = Path.join(root, dir); + Fs.readdir(nestedDirPath, w(give(function (err, list) { + if (err) { return void handler(err); } // Is this correct? + + list.forEach(function (item) { + // ignore things that don't match the naming pattern + if (/^\./.test(item) || !/[0-9a-fA-F]{32,}\.ndjson$/.test(item)) { return; } + var filepath = Path.join(nestedDirPath, item); + var channel = filepath.replace(/\.ndjson$/, '').replace(/.*\//, ''); + if ([32, 34].indexOf(channel.length) === -1) { return; } + + // otherwise throw it on the pile + sema.take(function (give) { + Fs.stat(filepath, w(give(function (err, stats) { + if (err) { + return void handler(err); + } + + handler(void 0, { + channel: channel, + atime: stats.atime, + mtime: stats.mtime, + ctime: stats.ctime, + size: stats.size, + }); + }))); + }); + }); + }))); + }); + }); + wait(); + }).nThen(function () { + cb(); + }); +}; + +// move a channel's log file from its current location +// to an equivalent location in the cold storage directory +var archiveChannel = function (env, channelName, cb) { + if (!env.retainData) { + return void cb("ARCHIVES_DISABLED"); + } + + // ctime is the most reliable indicator of when a file was archived + // because it is used to indicate changes to the files metadata + // and not its contents + // if we find that this is not reliable in production, we can update it manually + // https://nodejs.org/api/fs.html#fs_fs_utimes_path_atime_mtime_callback + + // check what the channel's path should be (in its current location) + var currentPath = mkPath(env, channelName); + + // construct a parallel path in the new location + var archivePath = mkArchivePath(env, channelName); + + // use Fse.move to move it, Fse makes paths to the directory when you use it. + // https://github.com/jprichardson/node-fs-extra/blob/HEAD/docs/move.md + Fse.move(currentPath, archivePath, { overwrite: true }, cb); +}; + var flushUnusedChannels = function (env, cb, frame) { var currentTime = +new Date(); @@ -413,19 +509,30 @@ module.exports.create = function ( ) { var env = { root: conf.filePath || './datastore', + archiveRoot: conf.archivePath || './data/archive', + retainData: conf.retainData, channels: { }, channelExpirationMs: conf.channelExpirationMs || 30000, verbose: conf.verbose, openFiles: 0, openFileLimit: conf.openFileLimit || 2048, }; - // 0x1ff -> 777 var it; - Fse.mkdirp(env.root, 0x1ff, function (err) { - if (err && err.code !== 'EEXIST') { - // TODO: somehow return a nice error - throw err; - } + + nThen(function (w) { + // make sure the store's directory exists + Fse.mkdirp(env.root, PERMISSIVE, w(function (err) { + if (err && err.code !== 'EEXIST') { + throw err; + } + })); + // make sure the cold storage directory exists + Fse.mkdirp(env.archiveRoot, PERMISSIVE, w(function (err) { + if (err && err.code !== 'EEXIST') { + throw err; + } + })); + }).nThen(function () { cb({ readMessagesBin: (channelName, start, asyncMsgHandler, cb) => { if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); } @@ -449,6 +556,10 @@ module.exports.create = function ( cb(err); }); }, + removeArchivedChannel: function (channelName, cb) { + if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); } + removeArchivedChannel(env, channelName, cb); + }, closeChannel: function (channelName, cb) { if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); } closeChannel(env, channelName, cb); @@ -468,6 +579,16 @@ module.exports.create = function ( if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); } clearChannel(env, channelName, cb); }, + listChannels: function (handler, cb) { + listChannels(env.root, handler, cb); + }, + listArchivedChannels: function (handler, cb) { + listChannels(env.archiveRoot, handler, cb); + }, + archiveChannel: function (channelName, cb) { + if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); } + archiveChannel(env, channelName, cb); + }, log: function (channelName, content, cb) { message(env, channelName, content, cb); },