From 4ba36a9173a3b8ff7c3ee28a8214f535b2a4d3e8 Mon Sep 17 00:00:00 2001 From: ansuz Date: Wed, 25 Mar 2020 11:39:14 -0400 Subject: [PATCH] load user pins in the database worker --- lib/commands/pin-rpc.js | 26 ++++++-------------- lib/historyKeeper.js | 2 ++ lib/hk-util.js | 11 +++++++++ lib/workers/compute-index.js | 47 ++++++++++++++++++++++++++++-------- 4 files changed, 58 insertions(+), 28 deletions(-) diff --git a/lib/commands/pin-rpc.js b/lib/commands/pin-rpc.js index cda482d6b..b831dbb37 100644 --- a/lib/commands/pin-rpc.js +++ b/lib/commands/pin-rpc.js @@ -156,7 +156,6 @@ var getMultipleFileSize = function (Env, channels, cb) { }); }; -const batchUserPins = BatchRead("LOAD_USER_PINS"); var loadUserPins = function (Env, safeKey, cb) { var session = Core.getSession(Env.Sessions, safeKey); @@ -164,23 +163,14 @@ var loadUserPins = function (Env, safeKey, cb) { return cb(session.channels); } - batchUserPins(safeKey, cb, function (done) { - var ref = {}; - var lineHandler = Pins.createLineHandler(ref, function (label, data) { - Env.Log.error(label, { - log: safeKey, - data: data, - }); - }); - - // if channels aren't in memory. load them from disk - // TODO replace with readMessagesBin - Env.pinStore.getMessages(safeKey, lineHandler, function () { - // no more messages - - // only put this into the cache if it completes - session.channels = ref.pins; - done(ref.pins); // FIXME no error handling? + Env.batchUserPins(safeKey, cb, function (done) { + Env.getPinState(safeKey, function (err, value) { + if (!err) { + // only put this into the cache if it completes + session.channels = value; + } + session.channels = value; + done(value); }); }); }; diff --git a/lib/historyKeeper.js b/lib/historyKeeper.js index c8356b908..13be09bd1 100644 --- a/lib/historyKeeper.js +++ b/lib/historyKeeper.js @@ -42,6 +42,7 @@ module.exports.create = function (config, cb) { batchMetadata: BatchRead('GET_METADATA'), batchRegisteredUsers: BatchRead("GET_REGISTERED_USERS"), batchDiskUsage: BatchRead('GET_DISK_USAGE'), + batchUserPins: BatchRead('LOAD_USER_PINS'), //historyKeeper: config.historyKeeper, intervals: config.intervals || {}, @@ -242,6 +243,7 @@ module.exports.create = function (config, cb) { })); }).nThen(function (w) { HK.initializeIndexWorkers(Env, { + pinPath: pinPath, filePath: config.filePath, archivePath: config.archivePath, channelExpirationMs: config.channelExpirationMs, diff --git a/lib/hk-util.js b/lib/hk-util.js index 0fafcdb91..d17cf778b 100644 --- a/lib/hk-util.js +++ b/lib/hk-util.js @@ -625,6 +625,8 @@ const handleGetHistoryRange = function (Env, Server, seq, userId, parsed) { Env.Log.error("HK_GET_OLDER_HISTORY", err); } + if (!Array.isArray(messages)) { messages = []; } + var toSend = []; if (typeof (desiredMessages) === "number") { toSend = messages.slice(-desiredMessages); @@ -886,6 +888,15 @@ HK.initializeIndexWorkers = function (Env, config, _cb) { }); }; + Env.getPinState = function (safeKey, cb) { + Env.pinStore.getWeakLock(safeKey, function (next) { + sendCommand({ + key: safeKey, + command: 'GET_PIN_STATE', + }, Util.both(next, cb)); + }); + }; + //console.log("index workers ready"); cb(void 0); }); diff --git a/lib/workers/compute-index.js b/lib/workers/compute-index.js index e46e40bcc..dd3798aa6 100644 --- a/lib/workers/compute-index.js +++ b/lib/workers/compute-index.js @@ -6,19 +6,37 @@ const Store = require("../storage/file"); const Util = require("../common-util"); const nThen = require("nthen"); const Meta = require("../metadata"); +const Pins = require("../pins"); const Env = {}; var ready = false; var store; -const init = function (config, cb) { +var pinStore; +const init = function (config, _cb) { + const cb = Util.once(Util.mkAsync(_cb)); if (!config) { return void cb('E_INVALID_CONFIG'); } - Store.create(config, function (err, _store) { - if (err) { return void cb(err); } - store = _store; + nThen(function (w) { + Store.create(config, w(function (err, _store) { + if (err) { + w.abort(); + return void cb(err); + } + store = _store; + })); + Store.create({ + filePath: config.pinPath, + }, w(function (err, _pinStore) { + if (err) { + w.abort(); + return void cb(err); + } + pinStore = _pinStore; + })); + }).nThen(function () { cb(); }); }; @@ -187,10 +205,24 @@ const getOlderHistory = function (data, cb) { }); }; +const getPinState = function (data, cb, errorHandler) { + const safeKey = data.key; + + var ref = {}; + var lineHandler = Pins.createLineHandler(ref, errorHandler); + + // if channels aren't in memory. load them from disk + // TODO replace with readMessagesBin + pinStore.getMessages(safeKey, lineHandler, function () { + cb(void 0, ref.pins); // FIXME no error handling? + }); +}; + const COMMANDS = { COMPUTE_INDEX: computeIndex, COMPUTE_METADATA: computeMetadata, GET_OLDER_HISTORY: getOlderHistory, + GET_PIN_STATE: getPinState, }; process.on('message', function (data) { @@ -201,13 +233,8 @@ process.on('message', function (data) { } const cb = function (err, value) { - if (err) { - return void process.send({ - txid: data.txid, - error: err, - }); - } process.send({ + error: err, txid: data.txid, value: value, });