diff --git a/lib/historyKeeper.js b/lib/historyKeeper.js index 09f5fdfea..c3ed67cb7 100644 --- a/lib/historyKeeper.js +++ b/lib/historyKeeper.js @@ -10,29 +10,13 @@ const Meta = require("./metadata"); const WriteQueue = require("./write-queue"); const BatchRead = require("./batch-read"); +const Extras = require("./hk-util.js"); + let Log; const now = function () { return (new Date()).getTime(); }; const ONE_DAY = 1000 * 60 * 60 * 24; // one day in milliseconds -/* getHash - * this function slices off the leading portion of a message which is - most likely unique - * these "hashes" are used to identify particular messages in a channel's history - * clients store "hashes" either in memory or in their drive to query for new messages: - * when reconnecting to a pad - * when connecting to chat or a mailbox - * thus, we can't change this function without invalidating client data which: - * is encrypted clientside - * can't be easily migrated - * don't break it! -*/ -const getHash = function (msg) { - if (typeof(msg) !== 'string') { - Log.warn('HK_GET_HASH', 'getHash() called on ' + typeof(msg) + ': ' + msg); - return ''; - } - return msg.slice(0,64); -}; +const getHash = Extras.getHash; const tryParse = function (str) { try { @@ -185,7 +169,7 @@ module.exports.create = function (cfg) { if (msg[0] === 0 && msg[2] === 'MSG' && typeof(msg[4]) === 'string') { // msgObj.offset is API guaranteed by our storage module // it should always be a valid positive integer - offsetByHash[getHash(msg[4])] = msgObj.offset; + offsetByHash[getHash(msg[4], Log)] = msgObj.offset; } // There is a trailing \n at the end of the file size = msgObj.offset + msgObj.buff.length + 1; @@ -502,7 +486,7 @@ module.exports.create = function (cfg) { msgStruct.push(now()); // storeMessage - storeMessage(ctx, channel, JSON.stringify(msgStruct), isCp, getHash(msgStruct[4])); + storeMessage(ctx, channel, JSON.stringify(msgStruct), isCp, getHash(msgStruct[4], Log)); }); }; @@ -601,7 +585,7 @@ module.exports.create = function (cfg) { const msg = tryParse(msgObj.buff.toString('utf8')); // if it was undefined then go onto the next message if (typeof msg === "undefined") { return readMore(); } - if (typeof(msg[4]) !== 'string' || lastKnownHash !== getHash(msg[4])) { + if (typeof(msg[4]) !== 'string' || lastKnownHash !== getHash(msg[4], Log)) { return void readMore(); } offset = msgObj.offset; @@ -672,7 +656,7 @@ module.exports.create = function (cfg) { var content = parsed[4]; if (typeof(content) !== 'string') { return; } - var hash = getHash(content); + var hash = getHash(content, Log); if (hash === oldestKnownHash) { found = true; } diff --git a/lib/hk-util.js b/lib/hk-util.js new file mode 100644 index 000000000..cea39c4e1 --- /dev/null +++ b/lib/hk-util.js @@ -0,0 +1,24 @@ +var HK = module.exports; + +/* getHash + * this function slices off the leading portion of a message which is + most likely unique + * these "hashes" are used to identify particular messages in a channel's history + * clients store "hashes" either in memory or in their drive to query for new messages: + * when reconnecting to a pad + * when connecting to chat or a mailbox + * thus, we can't change this function without invalidating client data which: + * is encrypted clientside + * can't be easily migrated + * don't break it! +*/ +HK.getHash = function (msg, Log) { + if (typeof(msg) !== 'string') { + if (Log) { + Log.warn('HK_GET_HASH', 'getHash() called on ' + typeof(msg) + ': ' + msg); + } + return ''; + } + return msg.slice(0,64); +}; + diff --git a/storage/file.js b/storage/file.js index a3d6f521f..08ec98f85 100644 --- a/storage/file.js +++ b/storage/file.js @@ -7,6 +7,8 @@ var Path = require("path"); var nThen = require("nthen"); var Semaphore = require("saferphore"); var Util = require("../lib/common-util"); +var Meta = require("../lib/metadata"); +var Extras = require("../lib/hk-util"); const Schedule = require("../lib/schedule"); const Readline = require("readline"); @@ -39,6 +41,10 @@ var mkArchiveMetadataPath = function (env, channelId) { return Path.join(env.archiveRoot, 'datastore', channelId.slice(0, 2), channelId) + '.metadata.ndjson'; }; +var mkTempPath = function (env, channelId) { + return mkPath(env, channelId) + '.temp'; +}; + // pass in the path so we can reuse the same function for archived files var channelExists = function (filepath, cb) { Fs.stat(filepath, function (err, stat) { @@ -868,6 +874,178 @@ var getMessages = function (env, chanName, handler, cb) { }); }; +var trimChannel = function (env, channelName, hash, _cb) { + var cb = Util.once(Util.mkAsync(_cb)); + // this function is queued as a blocking action for the relevant channel + + // derive temporary file paths for metadata and log buffers + var tempChannelPath = mkTempPath(env, channelName); + + // derive production db paths + var channelPath = mkPath(env, channelName); + var metadataPath = mkMetadataPath(env, channelName); + + // derive archive paths + var archiveChannelPath = mkArchivePath(env, channelName); + var archiveMetadataPath = mkArchiveMetadataPath(env, channelName); + + var metadataReference = {}; + + var tempStream; + var ABORT; + + var cleanUp = function (cb) { + if (tempStream && !tempStream.closed) { + try { + tempStream.close(); + } catch (err) { } + } + + Fse.unlink(tempChannelPath, function (err) { + // proceed if deleted or if there was nothing to delete + if (!err || err.code === 'ENOENT') { return cb(); } + // else abort and call back with the error + cb(err); + }); + }; + + nThen(function (w) { + // close the file descriptor if it is open + closeChannel(env, channelName, w(function (err) { + if (err) { + w.abort(); + return void cb(err); + } + })); + }).nThen(function (w) { + cleanUp(w(function (err) { + if (err) { + w.abort(); + cb(err); + } + })); + }).nThen(function (w) { + // eat errors since loading the logger here would create a cyclical dependency + var lineHandler = Meta.createLineHandler(metadataReference, Util.noop); + + readMetadata(env, channelName, lineHandler, w(function (err) { + if (err) { + w.abort(); + return void cb(err); + } + // if there were no errors just fall through to the next block + })); + }).nThen(function (w) { + // create temp buffer writeStream + tempStream = Fs.createWriteStream(tempChannelPath, { + flags: 'a', + }); + tempStream.on('open', w()); + tempStream.on('error', function (err) { + w.abort(); + ABORT = true; + cleanUp(function () { + cb(err); + }); + }); + }).nThen(function (w) { + var i = 0; + var retain = false; + + var handler = function (msgObj, readMore, abort) { + if (ABORT) { return void abort(); } + // the first message might be metadata... ignore it if so + if (i++ === 0 && msgObj.buff.indexOf('{') === 0) { return; } + + if (retain) { + // if this flag is set then you've already found + // the message you were looking for. + // write it to your temp buffer and keep going + return void tempStream.write(msgObj.buff, function () { + readMore(); + }); + } + + var msg = Util.tryParse(msgObj.buff.toString('utf8')); + + var msgHash = Extras.getHash(msg[4]); + + if (msgHash === hash) { + // everything from this point on should be retained + retain = true; + } + }; + + readMessagesBin(env, channelName, 0, handler, w(function (err) { + if (err) { + w.abort(); + return void cleanUp(function () { + // intentionally call back with main error + // not the cleanup error + cb(err); + }); + } + + if (!retain) { + // you never found the message you were looking for + // this whole operation is invalid... + // clean up, abort, and call back with an error + + w.abort(); + cleanUp(function () { + // intentionally call back with main error + // not the cleanup error + cb('HASH_NOT_FOUND'); + }); + } + })); + }).nThen(function (w) { + // copy existing channel to the archive + Fse.copy(channelPath, archiveChannelPath, w(function (err) { + if (!err || err.code === 'ENOENT') { return; } + w.abort(); + cleanUp(function () { + cb(err); + }); + })); + + // copy existing metadaata to the archive + Fse.copy(metadataPath, archiveMetadataPath, w(function (err) { + if (!err || err.code === 'ENOENT') { return; } + w.abort(); + cleanUp(function () { + cb(err); + }); + })); + }).nThen(function (w) { + // overwrite the existing metadata log with the current metadata state + Fs.writeFile(metadataPath, JSON.stringify(metadataReference.meta) + '\n', w(function (err) { + // this shouldn't happen, but if it does your channel might be messed up :( + if (err) { + w.abort(); + cb(err); + } + })); + + // overwrite the existing channel with the temp log + Fse.move(tempChannelPath, channelPath, { + overwrite: true, + }, w(function (err) { + // this shouldn't happen, but if it does your channel might be messed up :( + if (err) { + w.abort(); + cb(err); + } + })); + }).nThen(function () { + // clean up and call back with no error + // triggering a historyKeeper index cache eviction... + cleanUp(function () { + cb(); + }); + }); +}; + module.exports.create = function (conf, cb) { var env = { root: conf.filePath || './datastore', @@ -987,25 +1165,9 @@ module.exports.create = function (conf, cb) { }); }, trimChannel: function (channelName, hash, cb) { - // XXX ansuz - // XXX queue lock - /* block any reads from the metadata and log files - until this whole process has finished - close the file descriptor if it is open - derive temporary file paths for metadata and log buffers - compute metadata state and write to metadata buffer - scan through log file and begin copying lines to the log buffer - once you recognize the first line by the hash the user provided - archive the file and current metadata once both buffers are copied - move the metadata and log buffers into place - return the lock on reads - call back - - in case of an error, remove the buffer files - */ + if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); } schedule.blocking(channelName, function (next) { - cb("E_NOT_IMPLEMENTED"); - next(); + trimChannel(env, channelName, hash, Util.both(cb, next)); }); },