diff --git a/lib/commands/channel.js b/lib/commands/channel.js index 10131d9d8..6a773cdb9 100644 --- a/lib/commands/channel.js +++ b/lib/commands/channel.js @@ -54,50 +54,14 @@ Channel.clearOwnedChannel = function (Env, safeKey, channelId, cb, Server) { }); }; -Channel.removeOwnedChannel = function (Env, safeKey, channelId, cb, Server) { +Channel.removeOwnedChannel = function (Env, safeKey, channelId, cb, Server) { // XXX very heavy CPU usage if (typeof(channelId) !== 'string' || !Core.isValidId(channelId)) { return cb('INVALID_ARGUMENTS'); } var unsafeKey = Util.unescapeKeyCharacters(safeKey); if (Env.blobStore.isFileId(channelId)) { - var blobId = channelId; - - return void nThen(function (w) { - // check if you have permissions - Env.blobStore.isOwnedBy(safeKey, blobId, w(function (err, owned) { - if (err || !owned) { - w.abort(); - return void cb("INSUFFICIENT_PERMISSIONS"); - } - })); - }).nThen(function (w) { - // remove the blob - return void Env.blobStore.archive.blob(blobId, w(function (err) { - Env.Log.info('ARCHIVAL_OWNED_FILE_BY_OWNER_RPC', { - safeKey: safeKey, - blobId: blobId, - status: err? String(err): 'SUCCESS', - }); - if (err) { - w.abort(); - return void cb(err); - } - })); - }).nThen(function () { - // archive the proof - return void Env.blobStore.archive.proof(safeKey, blobId, function (err) { - Env.Log.info("ARCHIVAL_PROOF_REMOVAL_BY_OWNER_RPC", { - safeKey: safeKey, - blobId: blobId, - status: err? String(err): 'SUCCESS', - }); - if (err) { - return void cb("E_PROOF_REMOVAL"); - } - cb(void 0, 'OK'); - }); - }); + return void Env.removeOwnedBlob(channelId, safeKey, cb); } Metadata.getMetadata(Env, channelId, function (err, metadata) { diff --git a/lib/historyKeeper.js b/lib/historyKeeper.js index 6fd1ce2f5..6ec43aaa3 100644 --- a/lib/historyKeeper.js +++ b/lib/historyKeeper.js @@ -10,6 +10,7 @@ const Core = require("./commands/core"); const Store = require("./storage/file"); const BlobStore = require("./storage/blob"); +const Workers = require("./workers/index"); module.exports.create = function (config, cb) { const Log = config.log; @@ -78,8 +79,6 @@ module.exports.create = function (config, cb) { domain: config.domain }; - HK.initializeValidationWorkers(Env); - (function () { var pes = config.premiumUploadSize; if (!isNaN(pes) && pes >= Env.maxUploadSize) { @@ -243,7 +242,7 @@ module.exports.create = function (config, cb) { Env.blobStore = blob; })); }).nThen(function (w) { - HK.initializeIndexWorkers(Env, { + Workers.initialize(Env, { blobPath: config.blobPath, blobStagingPath: config.blobStagingPath, pinPath: pinPath, @@ -268,6 +267,7 @@ module.exports.create = function (config, cb) { if (config.disableIntegratedTasks) { return; } config.intervals = config.intervals || {}; + // XXX config.intervals.taskExpiration = setInterval(function () { tasks.runAll(function (err) { if (err) { diff --git a/lib/hk-util.js b/lib/hk-util.js index 455fdaa22..a4abab6ba 100644 --- a/lib/hk-util.js +++ b/lib/hk-util.js @@ -6,10 +6,6 @@ const nThen = require('nthen'); const Util = require("./common-util"); const MetaRPC = require("./commands/metadata"); const Nacl = require('tweetnacl/nacl-fast'); -const { fork } = require('child_process'); -const OS = require("os"); -const numCPUs = OS.cpus().length; - const now = function () { return (new Date()).getTime(); }; const ONE_DAY = 1000 * 60 * 60 * 24; // one day in milliseconds @@ -767,268 +763,6 @@ HK.onDirectMessage = function (Env, Server, seq, userId, json) { }); }; -HK.initializeIndexWorkers = function (Env, config, _cb) { - var cb = Util.once(Util.mkAsync(_cb)); - - const workers = []; - - const response = Util.response(function (errLabel, info) { - Env.Log.error('HK_DB_WORKER__' + errLabel, info); - }); - const initWorker = function (worker, cb) { - //console.log("initializing index worker"); - const txid = Util.uid(); - response.expect(txid, function (err) { - if (err) { return void cb(err); } - //console.log("worker initialized"); - workers.push(worker); - cb(); - }, 15000); - - worker.send({ - txid: txid, - config: config, - }); - - worker.on('message', function (res) { - if (!res) { return; } - if (!res.txid) { - // !report errors... - if (res.error) { - Env.Log.error(res.error, res.value); - } - return; - } - //console.log(res); - response.handle(res.txid, [res.error, res.value]); - }); - - var substituteWorker = Util.once(function () { - Env.Log.info("SUBSTITUTE_INDEX_WORKER", ''); - var idx = workers.indexOf(worker); - if (idx !== -1) { - workers.splice(idx, 1); - } - var w = fork('lib/workers/compute-index'); - initWorker(w, function (err) { - if (err) { - throw new Error(err); - } - workers.push(w); - }); - }); - - worker.on('exit', substituteWorker); - worker.on('close', substituteWorker); - worker.on('error', function (err) { - substituteWorker(); - Env.log.error("INDEX_WORKER_ERROR", { - error: err, - }); - }); - }; - - var workerIndex = 0; - var sendCommand = function (msg, _cb) { - var cb = Util.once(Util.mkAsync(_cb)); - - workerIndex = (workerIndex + 1) % workers.length; - if (workers.length === 0 || - typeof(workers[workerIndex].send) !== 'function') { - return void cb("NO_WORKERS"); - } - const txid = Util.uid(); - msg.txid = txid; - response.expect(txid, cb, 60000); - workers[workerIndex].send(msg); - }; - - nThen(function (w) { - OS.cpus().forEach(function () { - initWorker(fork('lib/workers/compute-index'), w(function (err) { - if (!err) { return; } - w.abort(); - return void cb(err); - })); - }); - }).nThen(function () { - Env.computeIndex = function (Env, channel, cb) { - Env.store.getWeakLock(channel, function (next) { - sendCommand({ - channel: channel, - command: 'COMPUTE_INDEX', - }, function (err, index) { - next(); - cb(err, index); - }); - }); - }; - - Env.computeMetadata = function (channel, cb) { - Env.store.getWeakLock(channel, function (next) { - sendCommand({ - channel: channel, - command: 'COMPUTE_METADATA', - }, function (err, metadata) { - next(); - cb(err, metadata); - }); - }); - }; - - Env.getOlderHistory = function (channel, oldestKnownHash, cb) { - Env.store.getWeakLock(channel, function (next) { - sendCommand({ - channel: channel, - command: "GET_OLDER_HISTORY", - hash: oldestKnownHash, - }, Util.both(next, cb)); - }); - }; - - Env.getPinState = function (safeKey, cb) { - Env.pinStore.getWeakLock(safeKey, function (next) { - sendCommand({ - key: safeKey, - command: 'GET_PIN_STATE', - }, Util.both(next, cb)); - }); - }; - - Env.getFileSize = function (channel, cb) { - sendCommand({ - command: 'GET_FILE_SIZE', - channel: channel, - }, cb); - }; - - Env.getDeletedPads = function (channels, cb) { - sendCommand({ - command: "GET_DELETED_PADS", - channels: channels, - }, cb); - }; - - Env.getTotalSize = function (channels, cb) { - // we could take out locks for all of these channels, - // but it's OK if the size is slightly off - sendCommand({ - command: 'GET_TOTAL_SIZE', - channels: channels, - }, cb); - }; - - Env.getMultipleFileSize = function (channels, cb) { - sendCommand({ - command: "GET_MULTIPLE_FILE_SIZE", - channels: channels, - }, cb); - }; - - Env.getHashOffset = function (channel, hash, cb) { - Env.store.getWeakLock(channel, function (next) { - sendCommand({ - command: 'GET_HASH_OFFSET', - channel: channel, - hash: hash, - }, Util.both(next, cb)); - }); - }; - - //console.log("index workers ready"); - cb(void 0); - }); -}; - -HK.initializeValidationWorkers = function (Env) { - if (typeof(Env.validateMessage) !== 'undefined') { - return void console.error("validation workers are already initialized"); - } - - // Create our workers - const workers = []; - for (let i = 0; i < numCPUs; i++) { - workers.push(fork('lib/workers/check-signature.js')); - } - - const response = Util.response(function (errLabel, info) { - Env.Log.error('HK_VALIDATE_WORKER__' + errLabel, info); - }); - - var initWorker = function (worker) { - worker.on('message', function (res) { - if (!res || !res.txid) { return; } - //console.log(+new Date(), "Received verification response"); - response.handle(res.txid, [res.error, res.value]); - }); - - var substituteWorker = Util.once( function () { - Env.Log.info("SUBSTITUTE_VALIDATION_WORKER", ''); - var idx = workers.indexOf(worker); - if (idx !== -1) { - workers.splice(idx, 1); - } - // Spawn a new one - var w = fork('lib/workers/check-signature.js'); - workers.push(w); - initWorker(w); - }); - - // Spawn a new process in one ends - worker.on('exit', substituteWorker); - worker.on('close', substituteWorker); - worker.on('error', function (err) { - substituteWorker(); - Env.Log.error('VALIDATION_WORKER_ERROR', { - error: err, - }); - }); - }; - workers.forEach(initWorker); - - var nextWorker = 0; - const send = function (msg, _cb) { - var cb = Util.once(Util.mkAsync(_cb)); - // let's be paranoid about asynchrony and only calling back once.. - nextWorker = (nextWorker + 1) % workers.length; - if (workers.length === 0 || typeof(workers[nextWorker].send) !== 'function') { - return void cb("INVALID_WORKERS"); - } - - var txid = msg.txid = Util.uid(); - - // expect a response within 45s - response.expect(txid, cb, 60000); - - // Send the request - workers[nextWorker].send(msg); - }; - - Env.validateMessage = function (signedMsg, key, cb) { - send({ - msg: signedMsg, - key: key, - command: 'INLINE', - }, cb); - }; - - Env.checkSignature = function (signedMsg, signature, publicKey, cb) { - send({ - command: 'DETACHED', - sig: signature, - msg: signedMsg, - key: publicKey, - }, cb); - }; - - Env.hashChannelList = function (channels, cb) { - send({ - command: 'HASH_CHANNEL_LIST', - channels: channels, - }, cb); - }; -}; - /* onChannelMessage Determine what we should store when a message a broadcasted to a channel" diff --git a/lib/log.js b/lib/log.js index 0e0567a69..35dfad7a3 100644 --- a/lib/log.js +++ b/lib/log.js @@ -21,7 +21,7 @@ var write = function (ctx, content) { }; // various degrees of logging -const logLevels = ['silly', 'verbose', 'debug', 'feedback', 'info', 'warn', 'error']; +const logLevels = Logger.levels = ['silly', 'verbose', 'debug', 'feedback', 'info', 'warn', 'error']; var handlers = { silly: function (ctx, time, tag, info) { diff --git a/lib/workers/compute-index.js b/lib/workers/compute-index.js index 37fa9ac1a..a1fe06645 100644 --- a/lib/workers/compute-index.js +++ b/lib/workers/compute-index.js @@ -10,8 +10,22 @@ const Meta = require("../metadata"); const Pins = require("../pins"); const Core = require("../commands/core"); const Saferphore = require("saferphore"); +const Logger = require("../log"); -const Env = {}; +const Env = { + Log: {}, +}; + +// support the usual log API but pass it to the main process +Logger.levels.forEach(function (level) { + Env.Log[level] = function (label, info) { + process.send({ + log: level, + label: label, + info: info, + }); + }; +}); var ready = false; var store; @@ -57,10 +71,6 @@ const init = function (config, _cb) { }); }; -const tryParse = function (Env, str) { - try { return JSON.parse(str); } catch (err) { } -}; - /* computeIndex can call back with an error or a computed index which includes: * cpIndex: @@ -107,7 +117,7 @@ const computeIndex = function (data, cb) { // but only check for metadata on the first line if (!i && msgObj.buff.indexOf('{') === 0) { i++; // always increment the message counter - msg = tryParse(Env, msgObj.buff.toString('utf8')); + msg = HK.tryParse(Env, msgObj.buff.toString('utf8')); if (typeof msg === "undefined") { return readMore(); } // validate that the current line really is metadata before storing it as such @@ -116,7 +126,7 @@ const computeIndex = function (data, cb) { } i++; if (msgObj.buff.indexOf('cp|') > -1) { - msg = msg || tryParse(Env, msgObj.buff.toString('utf8')); + msg = msg || HK.tryParse(Env, msgObj.buff.toString('utf8')); if (typeof msg === "undefined") { return readMore(); } // cache the offsets of checkpoints if they can be parsed if (msg[2] === 'MSG' && msg[4].indexOf('cp|') === 0) { @@ -142,7 +152,7 @@ const computeIndex = function (data, cb) { // once indexing is complete you should have a buffer of messages since the latest checkpoint // map the 'hash' of each message to its byte offset in the log, to be used for reconnecting clients messageBuf.forEach((msgObj) => { - const msg = tryParse(Env, msgObj.buff.toString('utf8')); + const msg = HK.tryParse(Env, msgObj.buff.toString('utf8')); if (typeof msg === "undefined") { return; } if (msg[0] === 0 && msg[2] === 'MSG' && typeof(msg[4]) === 'string') { // msgObj.offset is API guaranteed by our storage module @@ -166,9 +176,9 @@ const computeIndex = function (data, cb) { }); }; -const computeMetadata = function (data, cb, errorHandler) { +const computeMetadata = function (data, cb) { const ref = {}; - const lineHandler = Meta.createLineHandler(ref, errorHandler); + const lineHandler = Meta.createLineHandler(ref, Env.Log.error); return void store.readChannelMetadata(data.channel, lineHandler, function (err) { if (err) { // stream errors? @@ -199,7 +209,7 @@ const getOlderHistory = function (data, cb) { store.getMessages(channelName, function (msgStr) { if (found) { return; } - let parsed = tryParse(Env, msgStr); + let parsed = HK.tryParse(Env, msgStr); if (typeof parsed === "undefined") { return; } // identify classic metadata messages by their inclusion of a channel. @@ -221,11 +231,11 @@ const getOlderHistory = function (data, cb) { }); }; -const getPinState = function (data, cb, errorHandler) { +const getPinState = function (data, cb) { const safeKey = data.key; var ref = {}; - var lineHandler = Pins.createLineHandler(ref, errorHandler); + var lineHandler = Pins.createLineHandler(ref, Env.Log.error); // if channels aren't in memory. load them from disk // TODO replace with readMessagesBin @@ -328,7 +338,7 @@ const getHashOffset = function (data, cb) { var offset = -1; store.readMessagesBin(channelName, 0, (msgObj, readMore, abort) => { // tryParse return a parsed message or undefined - const msg = tryParse(Env, msgObj.buff.toString('utf8')); + const msg = HK.tryParse(Env, msgObj.buff.toString('utf8')); // if it was undefined then go onto the next message if (typeof msg === "undefined") { return readMore(); } if (typeof(msg[4]) !== 'string' || lastKnownHash !== HK.getHash(msg[4])) { @@ -342,6 +352,47 @@ const getHashOffset = function (data, cb) { }); }; +const removeOwnedBlob = function (data, cb) { + const blobId = data.blobId; + const safeKey = data.safeKey; + + nThen(function (w) { + // check if you have permissions + blobStore.isOwnedBy(safeKey, blobId, w(function (err, owned) { + if (err || !owned) { + w.abort(); + return void cb("INSUFFICIENT_PERMISSIONS"); + } + })); + }).nThen(function (w) { + // remove the blob + blobStore.archive.blob(blobId, w(function (err) { + Env.Log.info('ARCHIVAL_OWNED_FILE_BY_OWNER_RPC', { + safeKey: safeKey, + blobId: blobId, + status: err? String(err): 'SUCCESS', + }); + if (err) { + w.abort(); + return void cb(err); + } + })); + }).nThen(function () { + // archive the proof + blobStore.archive.proof(safeKey, blobId, function (err) { + Env.Log.info("ARCHIVAL_PROOF_REMOVAL_BY_OWNER_RPC", { + safeKey: safeKey, + blobId: blobId, + status: err? String(err): 'SUCCESS', + }); + if (err) { + return void cb("E_PROOF_REMOVAL"); + } + cb(void 0, 'OK'); + }); + }); +}; + const COMMANDS = { COMPUTE_INDEX: computeIndex, COMPUTE_METADATA: computeMetadata, @@ -352,12 +403,14 @@ const COMMANDS = { GET_DELETED_PADS: getDeletedPads, GET_MULTIPLE_FILE_SIZE: getMultipleFileSize, GET_HASH_OFFSET: getHashOffset, + REMOVE_OWNED_BLOB: removeOwnedBlob, }; process.on('message', function (data) { - if (!data || !data.txid) { + if (!data || !data.txid || !data.pid) { return void process.send({ - error:'E_INVAL' + error:'E_INVAL', + data: data, }); } @@ -365,6 +418,7 @@ process.on('message', function (data) { process.send({ error: err, txid: data.txid, + pid: data.pid, value: value, }); }; @@ -381,12 +435,12 @@ process.on('message', function (data) { if (typeof(command) !== 'function') { return void cb("E_BAD_COMMAND"); } - command(data, cb, function (label, info) { - // for streaming errors - process.send({ - error: label, - value: info, - }); - }); + command(data, cb); }); +process.on('uncaughtException', function (err) { + console.error('[%s] UNCAUGHT EXCEPTION IN DB WORKER'); + console.error(err); + console.error("TERMINATING"); + process.exit(1); +}); diff --git a/lib/workers/index.js b/lib/workers/index.js new file mode 100644 index 000000000..dc380a208 --- /dev/null +++ b/lib/workers/index.js @@ -0,0 +1,303 @@ +/* jshint esversion: 6 */ +/* global process */ +const Util = require("../common-util"); +const nThen = require('nthen'); +const OS = require("os"); +const numCPUs = OS.cpus().length; +const { fork } = require('child_process'); +const Workers = module.exports; +const PID = process.pid; + +Workers.initializeValidationWorkers = function (Env) { + if (typeof(Env.validateMessage) !== 'undefined') { + return void console.error("validation workers are already initialized"); + } + + // Create our workers + const workers = []; + for (let i = 0; i < numCPUs; i++) { + workers.push(fork('lib/workers/check-signature.js')); + } + + const response = Util.response(function (errLabel, info) { + Env.Log.error('HK_VALIDATE_WORKER__' + errLabel, info); + }); + + var initWorker = function (worker) { + worker.on('message', function (res) { + if (!res || !res.txid) { return; } + //console.log(+new Date(), "Received verification response"); + response.handle(res.txid, [res.error, res.value]); + }); + + var substituteWorker = Util.once( function () { + Env.Log.info("SUBSTITUTE_VALIDATION_WORKER", ''); + var idx = workers.indexOf(worker); + if (idx !== -1) { + workers.splice(idx, 1); + } + // Spawn a new one + var w = fork('lib/workers/check-signature.js'); + workers.push(w); + initWorker(w); + }); + + // Spawn a new process in one ends + worker.on('exit', substituteWorker); + worker.on('close', substituteWorker); + worker.on('error', function (err) { + substituteWorker(); + Env.Log.error('VALIDATION_WORKER_ERROR', { + error: err, + }); + }); + }; + workers.forEach(initWorker); + + var nextWorker = 0; + const send = function (msg, _cb) { + var cb = Util.once(Util.mkAsync(_cb)); + // let's be paranoid about asynchrony and only calling back once.. + nextWorker = (nextWorker + 1) % workers.length; + if (workers.length === 0 || typeof(workers[nextWorker].send) !== 'function') { + return void cb("INVALID_WORKERS"); + } + + var txid = msg.txid = Util.uid(); + + // expect a response within 45s + response.expect(txid, cb, 60000); + + // Send the request + workers[nextWorker].send(msg); + }; + + Env.validateMessage = function (signedMsg, key, cb) { + send({ + msg: signedMsg, + key: key, + command: 'INLINE', + }, cb); + }; + + Env.checkSignature = function (signedMsg, signature, publicKey, cb) { + send({ + command: 'DETACHED', + sig: signature, + msg: signedMsg, + key: publicKey, + }, cb); + }; + + Env.hashChannelList = function (channels, cb) { + send({ + command: 'HASH_CHANNEL_LIST', + channels: channels, + }, cb); + }; +}; + +Workers.initializeIndexWorkers = function (Env, config, _cb) { + var cb = Util.once(Util.mkAsync(_cb)); + + const workers = []; + + const response = Util.response(function (errLabel, info) { + Env.Log.error('HK_DB_WORKER__' + errLabel, info); + }); + + const Log = Env.Log; + const handleLog = function (level, label, info) { + if (typeof(Log[level]) !== 'function') { return; } + Log[level](label, info); + }; + + const initWorker = function (worker, cb) { + //console.log("initializing index worker"); + const txid = Util.uid(); + response.expect(txid, function (err) { + if (err) { return void cb(err); } + //console.log("worker initialized"); + workers.push(worker); + cb(); + }, 15000); + + worker.send({ + pid: PID, + txid: txid, + config: config, + }); + + worker.on('message', function (res) { + if (!res) { return; } + // handle log messages before checking if it was addressed to your PID + // it might still be useful to know what happened inside an orphaned worker + if (res.log) { + return void handleLog(res.log, res.label, res.info); + } + // but don't bother handling things addressed to other processes + // since it's basically guaranteed not to work + if (res.pid !== PID) { + return void Log.error("WRONG_PID", res); + } + + response.handle(res.txid, [res.error, res.value]); + }); + + var substituteWorker = Util.once(function () { + // XXX reassign jobs delegated to failed workers + Env.Log.info("SUBSTITUTE_INDEX_WORKER", ''); + var idx = workers.indexOf(worker); + if (idx !== -1) { + workers.splice(idx, 1); + } + var w = fork('lib/workers/compute-index'); + initWorker(w, function (err) { + if (err) { + throw new Error(err); + } + workers.push(w); + }); + }); + + worker.on('exit', substituteWorker); + worker.on('close', substituteWorker); + worker.on('error', function (err) { + substituteWorker(); + Env.log.error("INDEX_WORKER_ERROR", { + error: err, + }); + }); + }; + + var workerIndex = 0; + var sendCommand = function (msg, _cb) { + var cb = Util.once(Util.mkAsync(_cb)); + + workerIndex = (workerIndex + 1) % workers.length; + if (workers.length === 0 || + typeof(workers[workerIndex].send) !== 'function') { + return void cb("NO_WORKERS"); + } + + // XXX insert a queue here to prevent timeouts + // XXX track which worker is doing which jobs + + const txid = Util.uid(); + msg.txid = txid; + msg.pid = PID; + response.expect(txid, cb, 60000); + workers[workerIndex].send(msg); + }; + + nThen(function (w) { + OS.cpus().forEach(function () { + initWorker(fork('lib/workers/compute-index'), w(function (err) { + if (!err) { return; } + w.abort(); + return void cb(err); + })); + }); + }).nThen(function () { + Env.computeIndex = function (Env, channel, cb) { + Env.store.getWeakLock(channel, function (next) { + sendCommand({ + channel: channel, + command: 'COMPUTE_INDEX', + }, function (err, index) { + next(); + cb(err, index); + }); + }); + }; + + Env.computeMetadata = function (channel, cb) { + Env.store.getWeakLock(channel, function (next) { + sendCommand({ + channel: channel, + command: 'COMPUTE_METADATA', + }, function (err, metadata) { + next(); + cb(err, metadata); + }); + }); + }; + + Env.getOlderHistory = function (channel, oldestKnownHash, cb) { + Env.store.getWeakLock(channel, function (next) { + sendCommand({ + channel: channel, + command: "GET_OLDER_HISTORY", + hash: oldestKnownHash, + }, Util.both(next, cb)); + }); + }; + + Env.getPinState = function (safeKey, cb) { + Env.pinStore.getWeakLock(safeKey, function (next) { + sendCommand({ + key: safeKey, + command: 'GET_PIN_STATE', + }, Util.both(next, cb)); + }); + }; + + Env.getFileSize = function (channel, cb) { + sendCommand({ + command: 'GET_FILE_SIZE', + channel: channel, + }, cb); + }; + + Env.getDeletedPads = function (channels, cb) { + sendCommand({ + command: "GET_DELETED_PADS", + channels: channels, + }, cb); + }; + + Env.getTotalSize = function (channels, cb) { + // we could take out locks for all of these channels, + // but it's OK if the size is slightly off + sendCommand({ + command: 'GET_TOTAL_SIZE', + channels: channels, + }, cb); + }; + + Env.getMultipleFileSize = function (channels, cb) { + sendCommand({ + command: "GET_MULTIPLE_FILE_SIZE", + channels: channels, + }, cb); + }; + + Env.getHashOffset = function (channel, hash, cb) { + Env.store.getWeakLock(channel, function (next) { + sendCommand({ + command: 'GET_HASH_OFFSET', + channel: channel, + hash: hash, + }, Util.both(next, cb)); + }); + }; + + Env.removeOwnedBlob = function (blobId, safeKey, cb) { + sendCommand({ + command: 'REMOVE_OWNED_BLOB', + blobId: blobId, + safeKey: safeKey, + }, cb); + }; + + //console.log("index workers ready"); + cb(void 0); + }); +}; + +// XXX task expiration... + +Workers.initialize = function (Env, config, cb) { + Workers.initializeValidationWorkers(Env); + Workers.initializeIndexWorkers(Env, config, cb); +};