load user pins in the database worker

pull/1/head
ansuz 5 years ago
parent 50e8893b24
commit 4ba36a9173

@ -156,7 +156,6 @@ var getMultipleFileSize = function (Env, channels, cb) {
}); });
}; };
const batchUserPins = BatchRead("LOAD_USER_PINS");
var loadUserPins = function (Env, safeKey, cb) { var loadUserPins = function (Env, safeKey, cb) {
var session = Core.getSession(Env.Sessions, safeKey); var session = Core.getSession(Env.Sessions, safeKey);
@ -164,23 +163,14 @@ var loadUserPins = function (Env, safeKey, cb) {
return cb(session.channels); return cb(session.channels);
} }
batchUserPins(safeKey, cb, function (done) { Env.batchUserPins(safeKey, cb, function (done) {
var ref = {}; Env.getPinState(safeKey, function (err, value) {
var lineHandler = Pins.createLineHandler(ref, function (label, data) { if (!err) {
Env.Log.error(label, {
log: safeKey,
data: data,
});
});
// if channels aren't in memory. load them from disk
// TODO replace with readMessagesBin
Env.pinStore.getMessages(safeKey, lineHandler, function () {
// no more messages
// only put this into the cache if it completes // only put this into the cache if it completes
session.channels = ref.pins; session.channels = value;
done(ref.pins); // FIXME no error handling? }
session.channels = value;
done(value);
}); });
}); });
}; };

@ -42,6 +42,7 @@ module.exports.create = function (config, cb) {
batchMetadata: BatchRead('GET_METADATA'), batchMetadata: BatchRead('GET_METADATA'),
batchRegisteredUsers: BatchRead("GET_REGISTERED_USERS"), batchRegisteredUsers: BatchRead("GET_REGISTERED_USERS"),
batchDiskUsage: BatchRead('GET_DISK_USAGE'), batchDiskUsage: BatchRead('GET_DISK_USAGE'),
batchUserPins: BatchRead('LOAD_USER_PINS'),
//historyKeeper: config.historyKeeper, //historyKeeper: config.historyKeeper,
intervals: config.intervals || {}, intervals: config.intervals || {},
@ -242,6 +243,7 @@ module.exports.create = function (config, cb) {
})); }));
}).nThen(function (w) { }).nThen(function (w) {
HK.initializeIndexWorkers(Env, { HK.initializeIndexWorkers(Env, {
pinPath: pinPath,
filePath: config.filePath, filePath: config.filePath,
archivePath: config.archivePath, archivePath: config.archivePath,
channelExpirationMs: config.channelExpirationMs, channelExpirationMs: config.channelExpirationMs,

@ -625,6 +625,8 @@ const handleGetHistoryRange = function (Env, Server, seq, userId, parsed) {
Env.Log.error("HK_GET_OLDER_HISTORY", err); Env.Log.error("HK_GET_OLDER_HISTORY", err);
} }
if (!Array.isArray(messages)) { messages = []; }
var toSend = []; var toSend = [];
if (typeof (desiredMessages) === "number") { if (typeof (desiredMessages) === "number") {
toSend = messages.slice(-desiredMessages); toSend = messages.slice(-desiredMessages);
@ -886,6 +888,15 @@ HK.initializeIndexWorkers = function (Env, config, _cb) {
}); });
}; };
Env.getPinState = function (safeKey, cb) {
Env.pinStore.getWeakLock(safeKey, function (next) {
sendCommand({
key: safeKey,
command: 'GET_PIN_STATE',
}, Util.both(next, cb));
});
};
//console.log("index workers ready"); //console.log("index workers ready");
cb(void 0); cb(void 0);
}); });

@ -6,19 +6,37 @@ const Store = require("../storage/file");
const Util = require("../common-util"); const Util = require("../common-util");
const nThen = require("nthen"); const nThen = require("nthen");
const Meta = require("../metadata"); const Meta = require("../metadata");
const Pins = require("../pins");
const Env = {}; const Env = {};
var ready = false; var ready = false;
var store; var store;
const init = function (config, cb) { var pinStore;
const init = function (config, _cb) {
const cb = Util.once(Util.mkAsync(_cb));
if (!config) { if (!config) {
return void cb('E_INVALID_CONFIG'); return void cb('E_INVALID_CONFIG');
} }
Store.create(config, function (err, _store) { nThen(function (w) {
if (err) { return void cb(err); } Store.create(config, w(function (err, _store) {
if (err) {
w.abort();
return void cb(err);
}
store = _store; store = _store;
}));
Store.create({
filePath: config.pinPath,
}, w(function (err, _pinStore) {
if (err) {
w.abort();
return void cb(err);
}
pinStore = _pinStore;
}));
}).nThen(function () {
cb(); cb();
}); });
}; };
@ -187,10 +205,24 @@ const getOlderHistory = function (data, cb) {
}); });
}; };
const getPinState = function (data, cb, errorHandler) {
const safeKey = data.key;
var ref = {};
var lineHandler = Pins.createLineHandler(ref, errorHandler);
// if channels aren't in memory. load them from disk
// TODO replace with readMessagesBin
pinStore.getMessages(safeKey, lineHandler, function () {
cb(void 0, ref.pins); // FIXME no error handling?
});
};
const COMMANDS = { const COMMANDS = {
COMPUTE_INDEX: computeIndex, COMPUTE_INDEX: computeIndex,
COMPUTE_METADATA: computeMetadata, COMPUTE_METADATA: computeMetadata,
GET_OLDER_HISTORY: getOlderHistory, GET_OLDER_HISTORY: getOlderHistory,
GET_PIN_STATE: getPinState,
}; };
process.on('message', function (data) { process.on('message', function (data) {
@ -201,13 +233,8 @@ process.on('message', function (data) {
} }
const cb = function (err, value) { const cb = function (err, value) {
if (err) {
return void process.send({
txid: data.txid,
error: err,
});
}
process.send({ process.send({
error: err,
txid: data.txid, txid: data.txid,
value: value, value: value,
}); });

Loading…
Cancel
Save