diff --git a/bower.json b/bower.json
index b362c0a5f..86eb284b3 100644
--- a/bower.json
+++ b/bower.json
@@ -30,7 +30,7 @@
"secure-fabric.js": "secure-v1.7.9",
"hyperjson": "~1.4.0",
"chainpad-crypto": "^0.2.0",
- "chainpad-listmap": "^0.5.0",
+ "chainpad-listmap": "^0.7.0",
"chainpad": "^5.1.0",
"file-saver": "1.3.1",
"alertifyjs": "1.0.11",
@@ -39,7 +39,7 @@
"less": "3.7.1",
"bootstrap": "^v4.0.0",
"diff-dom": "2.1.1",
- "nthen": "^0.1.5",
+ "nthen": "0.1.7",
"open-sans-fontface": "^1.4.2",
"bootstrap-tokenfield": "^0.12.1",
"localforage": "^1.5.2",
diff --git a/config/config.example.js b/config/config.example.js
index 5f4473b99..779843f44 100644
--- a/config/config.example.js
+++ b/config/config.example.js
@@ -224,6 +224,12 @@ module.exports = {
* STORAGE
* ===================== */
+ /* By default the CryptPad server will run scheduled tasks every five minutes
+ * If you want to run scheduled tasks in a separate process (like a crontab)
+ * you can disable this behaviour by setting the following value to true
+ */
+ disableIntegratedTasks: false,
+
/* Pads that are not 'pinned' by any registered user can be set to expire
* after a configurable number of days of inactivity (default 90 days).
* The value can be changed or set to false to remove expiration.
diff --git a/customize.dist/fonts/cptools/fonts/cptools.svg b/customize.dist/fonts/cptools/fonts/cptools.svg
index f7fe0879f..93eef8d38 100644
--- a/customize.dist/fonts/cptools/fonts/cptools.svg
+++ b/customize.dist/fonts/cptools/fonts/cptools.svg
@@ -25,4 +25,5 @@
+
\ No newline at end of file
diff --git a/customize.dist/fonts/cptools/fonts/cptools.ttf b/customize.dist/fonts/cptools/fonts/cptools.ttf
index 1dac2ff87..18338a9ee 100644
Binary files a/customize.dist/fonts/cptools/fonts/cptools.ttf and b/customize.dist/fonts/cptools/fonts/cptools.ttf differ
diff --git a/customize.dist/fonts/cptools/fonts/cptools.woff b/customize.dist/fonts/cptools/fonts/cptools.woff
index 4f01d5d15..d8f56ba86 100644
Binary files a/customize.dist/fonts/cptools/fonts/cptools.woff and b/customize.dist/fonts/cptools/fonts/cptools.woff differ
diff --git a/customize.dist/fonts/cptools/style.css b/customize.dist/fonts/cptools/style.css
index 952207f15..349b62f2b 100644
--- a/customize.dist/fonts/cptools/style.css
+++ b/customize.dist/fonts/cptools/style.css
@@ -1,9 +1,9 @@
@font-face {
font-family: 'cptools';
src:
- url('fonts/cptools.ttf?yr9e7c') format('truetype'),
- url('fonts/cptools.woff?yr9e7c') format('woff'),
- url('fonts/cptools.svg?yr9e7c#cptools') format('svg');
+ url('fonts/cptools.ttf?cljhos') format('truetype'),
+ url('fonts/cptools.woff?cljhos') format('woff'),
+ url('fonts/cptools.svg?cljhos#cptools') format('svg');
font-weight: normal;
font-style: normal;
}
@@ -24,6 +24,9 @@
-moz-osx-font-smoothing: grayscale;
}
+.cptools-folder-upload:before {
+ content: "\e912";
+}
.cptools-folder-no-color:before {
content: "\e900";
}
diff --git a/customize.dist/loading.js b/customize.dist/loading.js
index e8f8a9453..11a82f770 100644
--- a/customize.dist/loading.js
+++ b/customize.dist/loading.js
@@ -169,6 +169,28 @@ define([], function () {
height: 100%;
background: #5cb85c;
}
+
+@keyframes spin {
+ from {
+ transform: rotate(0deg);
+ }
+ to {
+ transform: rotate(1800deg);
+ }
+}
+
+.cp-spinner {
+ display: inline-block;
+ box-sizing: border-box;
+ width: 80px;
+ height: 80px;
+ border: 11px solid white;
+ border-radius: 50%;
+ border-top-color: transparent;
+ animation: spin infinite 3s;
+ animation-timing-function: cubic-bezier(.6,0.15,0.4,0.85);
+}
+
*/}).toString().slice(14, -3);
var urlArgs = window.location.href.replace(/^.*\?([^\?]*)$/, function (all, x) { return x; });
var elem = document.createElement('div');
@@ -182,7 +204,7 @@ define([], function () {
'',
'
',
'
',
- '',
+ '',
'
',
'',
'
'
diff --git a/customize.dist/messages.js b/customize.dist/messages.js
index 7fa088c43..fdbe2ee7e 100755
--- a/customize.dist/messages.js
+++ b/customize.dist/messages.js
@@ -26,7 +26,8 @@ var getLanguage = messages._getLanguage = function () {
var l = getBrowserLanguage();
// Edge returns 'fr-FR' --> transform it to 'fr' and check again
return map[l] ? l :
- (map[l.split('-')[0]] ? l.split('-')[0] : 'en');
+ (map[l.split('-')[0]] ? l.split('-')[0] :
+ (map[l.split('_')[0]] ? l.split('_')[0] : 'en'));
};
var language = getLanguage();
diff --git a/customize.dist/pages.js b/customize.dist/pages.js
index 3fd5e63bb..4334541bb 100644
--- a/customize.dist/pages.js
+++ b/customize.dist/pages.js
@@ -103,7 +103,7 @@ define([
])*/
])
]),
- h('div.cp-version-footer', "CryptPad v2.25.0 (Zebra)")
+ h('div.cp-version-footer', "CryptPad v3.0.0 (Aurochs)")
]);
};
@@ -146,7 +146,7 @@ define([
//h('a.nav-item.nav-link', { href: '/what-is-cryptpad.html'}, Msg.topbar_whatIsCryptpad), // Moved the FAQ
//h('a.nav-item.nav-link', { href: '/faq.html'}, Msg.faq_link),
h('a.nav-item.nav-link', { href: 'https://blog.cryptpad.fr/'}, Msg.blog),
- h('a.nav-item.nav-link', { href: '/features.html'}, Msg.features),
+ h('a.nav-item.nav-link', { href: '/features.html'}, Msg.pricing),
h('a.nav-item.nav-link', { href: '/privacy.html'}, Msg.privacy),
//h('a.nav-item.nav-link', { href: '/contact.html'}, Msg.contact),
//h('a.nav-item.nav-link', { href: '/about.html'}, Msg.about),
diff --git a/customize.dist/pages/features.js b/customize.dist/pages/features.js
index ec6adfa6f..07aed6367 100644
--- a/customize.dist/pages/features.js
+++ b/customize.dist/pages/features.js
@@ -21,14 +21,14 @@ define([
target: '_blank',
rel: 'noopener noreferrer'
}, h('button.cp-features-register-button', Msg.features_f_subscribe));
- $(premiumButton).click(function (e) {
+ /*$(premiumButton).click(function (e) {
if (LocalStore.isLoggedIn()) { return; }
// Not logged in: go to /login with a redirect to this page
e.preventDefault();
e.stopPropagation();
sessionStorage.redirectTo = '/features.html';
window.location.href = '/login/';
- });
+ });*/
return h('div#cp-main', [
Pages.infopageTopbar(),
h('div.container-fluid.cp_cont_features',[
@@ -43,6 +43,10 @@ define([
h('div.card-body',[
h('h3.text-center',Msg.features_anon)
]),
+ h('div.card-body.cp-pricing',[
+ h('div.text-center', '0€'),
+ h('div.text-center', Msg.features_noData),
+ ]),
h('ul.list-group.list-group-flush',
['apps', 'core', 'file0', 'cryptdrive0', 'storage0'].map(function (f) {
return h('li.list-group-item', [
@@ -61,6 +65,10 @@ define([
h('div.card-body',[
h('h3.text-center',Msg.features_registered)
]),
+ h('div.card-body.cp-pricing',[
+ h('div.text-center', '0€'),
+ h('div.text-center', Msg.features_noData),
+ ]),
h('ul.list-group.list-group-flush', [
['anon', 'social', 'file1', 'cryptdrive1', 'devices', 'storage1'].map(function (f) {
return h('li.list-group-item', [
@@ -87,6 +95,13 @@ define([
h('div.card-body',[
h('h3.text-center',Msg.features_premium)
]),
+ h('div.card-body.cp-pricing',[
+ h('div.text-center', h('a', {
+ href: accounts.upgradeURL,
+ target: '_blank'
+ }, Msg._getKey('features_pricing', ['5', '10', '15']))),
+ h('div.text-center', Msg.features_emailRequired),
+ ]),
h('ul.list-group.list-group-flush', [
['reg', 'storage2', 'support', 'supporter'].map(function (f) {
return h('li.list-group-item', [
diff --git a/customize.dist/src/less2/include/contextmenu.less b/customize.dist/src/less2/include/contextmenu.less
index 8b1ef8c21..023053438 100644
--- a/customize.dist/src/less2/include/contextmenu.less
+++ b/customize.dist/src/less2/include/contextmenu.less
@@ -22,14 +22,13 @@
}
}
.dropdown-toggle {
+ transform: rotate(270deg);
margin-left: 1rem;
+ float: right;
}
.dropdown-menu {
top: -0.7rem;
left: 100%;
- &.left {
- left: -10rem;
- }
}
}
a {
diff --git a/customize.dist/src/less2/include/fileupload.less b/customize.dist/src/less2/include/fileupload.less
index e993d3030..ebe93399b 100644
--- a/customize.dist/src/less2/include/fileupload.less
+++ b/customize.dist/src/less2/include/fileupload.less
@@ -1,3 +1,4 @@
+@import (reference) "./browser.less";
@import (reference) './colortheme-all.less';
@import (reference) './modal.less';
@@ -10,24 +11,35 @@
#cp-fileupload {
.modal_base();
position: absolute;
- left: 10vw; right: 10vw;
+ right: 10vw;
bottom: 10vh;
- opacity: 0.9;
box-sizing: border-box;
z-index: 1000000; //Z file upload table container
display: none;
- #cp-fileupload-table {
- width: 80vw;
- tr:nth-child(1) {
- background-color: darken(@colortheme_modal-bg, 20%);
- td {
- font-weight: bold;
- padding: 0.25em;
- &:nth-child(4), &:nth-child(5) {
- text-align: center;
- }
+ color: darken(@colortheme_drive-bg, 10%);
+
+ @media screen and (max-width: @browser_media-medium-screen) {
+ left: 5vw; right: 5vw; bottom: 5vw;
+ }
+
+ .cp-fileupload-header {
+ display: flex;
+ background-color: darken(@colortheme_modal-bg, 10%);
+ font-weight: bold;
+ .cp-fileupload-header-title {
+ padding: 0.25em 0.5em;
+ flex-grow: 1;
+ }
+ .cp-fileupload-header-close {
+ padding: 0.25em 0.5em;
+ cursor: pointer;
+ &:hover {
+ background-color: rgba(0,0,0,0.1);
}
}
+ }
+ #cp-fileupload-table {
+ width: 100%;
@upload_pad_h: 0.25em;
@upload_pad_v: 0.5em;
@@ -35,27 +47,55 @@
padding: @upload_pad_h @upload_pad_v;
}
.cp-fileupload-table-link {
+ display: flex;
+ align-items: center;
+ white-space: nowrap;
+ max-width: 30vw;
+ margin: 0px @upload_pad_v;
.fa {
+ margin-top: 4px;
margin-right: 5px;
}
+ .cp-fileupload-table-name {
+ overflow: hidden;
+ text-overflow: ellipsis;
+ }
+ &[href]:hover {
+ text-decoration: none;
+ .cp-fileupload-table-name {
+ text-decoration: underline;
+ }
+ }
}
.cp-fileupload-table-progress {
- width: 25%;
+ min-width: 12em;
+ max-width: 16em;
position: relative;
text-align: center;
box-sizing: border-box;
}
.cp-fileupload-table-progress-container {
+ position: relative;
+ }
+ .cp-fileupload-table-progressbar {
position: absolute;
width: 0px;
- left: @upload_pad_v;
- top: @upload_pad_h; bottom: @upload_pad_h;
- background-color: rgba(0,0,255,0.3);
+ height: 100%;
+ background-color: #dddddd;
z-index: -1; //Z file upload progress container
}
- .cp-fileupload-table-cancel { text-align: center; }
- .fa.cancel {
- color: rgb(255, 0, 115);
+ .cp-fileupload-table-cancel {
+ text-align: center;
+ padding: 0px;
+ &:not(.success):not(.cancelled):hover {
+ background-color: rgba(0,0,0,0.1);
+ }
+ .fa {
+ padding: @upload_pad_h @upload_pad_v;
+ &.fa-times {
+ cursor: pointer;
+ }
+ }
}
}
}
diff --git a/customize.dist/src/less2/include/markdown.less b/customize.dist/src/less2/include/markdown.less
index c95f83645..a79717550 100644
--- a/customize.dist/src/less2/include/markdown.less
+++ b/customize.dist/src/less2/include/markdown.less
@@ -44,6 +44,13 @@
text-overflow: ellipsis;
}
}
+
+ div.plain-text-reader {
+ background: #f3f3f3;
+ padding: 10px;
+ color: black;
+ text-align: left;
+ }
}
.markdown_preformatted-code (@color: #333) {
diff --git a/customize.dist/src/less2/include/notifications.less b/customize.dist/src/less2/include/notifications.less
index b19a9c86b..f27d6ed60 100644
--- a/customize.dist/src/less2/include/notifications.less
+++ b/customize.dist/src/less2/include/notifications.less
@@ -1,4 +1,5 @@
@import (reference) "./colortheme-all.less";
+@import (reference) "./avatar.less";
.notifications_main() {
--LessLoader_require: LessLoader_currentFile();
@@ -53,6 +54,19 @@
}
}
}
+ .cp-notifications-requestedit-verified {
+ display: flex;
+ align-items: center;
+ &> span.cp-avatar {
+ .avatar_main(30px);
+ }
+ &> span {
+ margin-right: 10px;
+ }
+ &> p {
+ margin: 0;
+ }
+ }
}
diff --git a/customize.dist/src/less2/pages/page-features.less b/customize.dist/src/less2/pages/page-features.less
index e5e68e7d7..08bdbfc95 100644
--- a/customize.dist/src/less2/pages/page-features.less
+++ b/customize.dist/src/less2/pages/page-features.less
@@ -47,6 +47,18 @@
h3 {
margin: 0;
}
+ &.cp-pricing {
+ div {
+ font-size: 1.2em;
+ color: @cryptpad_color_blue;
+ &:first-child {
+ font-weight: bold;
+ }
+ &:last-child {
+ font-size: 1em;
+ }
+ }
+ }
}
}
h3 {
diff --git a/historyKeeper.js b/historyKeeper.js
index 03c767506..96ac43d96 100644
--- a/historyKeeper.js
+++ b/historyKeeper.js
@@ -5,10 +5,24 @@
const nThen = require('nthen');
const Nacl = require('tweetnacl');
const Crypto = require('crypto');
+const Once = require("./lib/once");
+const Meta = require("./lib/metadata");
let Log;
const now = function () { return (new Date()).getTime(); };
+/* getHash
+ * this function slices off the leading portion of a message which is
+ most likely unique
+ * these "hashes" are used to identify particular messages in a channel's history
+ * clients store "hashes" either in memory or in their drive to query for new messages:
+ * when reconnecting to a pad
+ * when connecting to chat or a mailbox
+ * thus, we can't change this function without invalidating client data which:
+ * is encrypted clientside
+ * can't be easily migrated
+ * don't break it!
+*/
const getHash = function (msg) {
if (typeof(msg) !== 'string') {
Log.warn('HK_GET_HASH', 'getHash() called on ' + typeof(msg) + ': ' + msg);
@@ -25,6 +39,18 @@ const tryParse = function (str) {
}
};
+/* sliceCpIndex
+ returns a list of all checkpoints which might be relevant for a client connecting to a session
+
+ * if there are two or fewer checkpoints, return everything you have
+ * if there are more than two
+ * return at least two
+ * plus any more which were received within the last 100 messages
+
+ This is important because the additional history is what prevents
+ clients from forking on checkpoints and dropping forked history.
+
+*/
const sliceCpIndex = function (cpIndex, line) {
// Remove "old" checkpoints (cp sent before 100 messages ago)
const minLine = Math.max(0, (line - 100));
@@ -36,6 +62,20 @@ const sliceCpIndex = function (cpIndex, line) {
return start.concat(end);
};
+const isMetadataMessage = function (parsed) {
+ return Boolean(parsed && parsed.channel);
+};
+
+// validateKeyStrings supplied by clients must decode to 32-byte Uint8Arrays
+const isValidValidateKeyString = function (key) {
+ try {
+ return typeof(key) === 'string' &&
+ Nacl.util.decodeBase64(key).length === Nacl.sign.publicKeyLength;
+ } catch (e) {
+ return false;
+ }
+};
+
module.exports.create = function (cfg) {
const rpc = cfg.rpc;
const tasks = cfg.tasks;
@@ -44,7 +84,7 @@ module.exports.create = function (cfg) {
Log.silly('HK_LOADING', 'LOADING HISTORY_KEEPER MODULE');
- const historyKeeperKeys = {};
+ const metadata_cache = {};
const HISTORY_KEEPER_ID = Crypto.randomBytes(8).toString('hex');
Log.verbose('HK_ID', 'History keeper ID: ' + HISTORY_KEEPER_ID);
@@ -53,54 +93,122 @@ module.exports.create = function (cfg) {
let STANDARD_CHANNEL_LENGTH, EPHEMERAL_CHANNEL_LENGTH;
const setConfig = function (config) {
STANDARD_CHANNEL_LENGTH = config.STANDARD_CHANNEL_LENGTH;
- EPHEMERAL_CHANNEL_LENGTH = config.EPHEMERAL_CHANNEl_LENGTH;
+ EPHEMERAL_CHANNEL_LENGTH = config.EPHEMERAL_CHANNEL_LENGTH;
sendMsg = config.sendMsg;
};
+ /* computeIndex
+ can call back with an error or a computed index which includes:
+ * cpIndex:
+ * array including any checkpoints pushed within the last 100 messages
+ * processed by 'sliceCpIndex(cpIndex, line)'
+ * offsetByHash:
+ * a map containing message offsets by their hash
+ * this is for every message in history, so it could be very large...
+ * except we remove offsets from the map if they occur before the oldest relevant checkpoint
+ * size: in bytes
+ * metadata:
+ * validationKey
+ * expiration time
+ * owners
+ * ??? (anything else we might add in the future)
+ * line
+ * the number of messages in history
+ * including the initial metadata line, if it exists
+
+ */
const computeIndex = function (channelName, cb) {
const cpIndex = [];
let messageBuf = [];
- let validateKey;
let metadata;
let i = 0;
- store.readMessagesBin(channelName, 0, (msgObj, rmcb) => {
- let msg;
- i++;
- if (!validateKey && msgObj.buff.indexOf('validateKey') > -1) {
- metadata = msg = tryParse(msgObj.buff.toString('utf8'));
- if (typeof msg === "undefined") { return rmcb(); }
- if (msg.validateKey) {
- validateKey = historyKeeperKeys[channelName] = msg;
- return rmcb();
+
+ const ref = {};
+
+ const CB = Once(cb);
+
+ const offsetByHash = {};
+ let size = 0;
+ nThen(function (w) {
+ // iterate over all messages in the channel log
+ // old channels can contain metadata as the first message of the log
+ // remember metadata the first time you encounter it
+ // otherwise index important messages in the log
+ store.readMessagesBin(channelName, 0, (msgObj, readMore) => {
+ let msg;
+ // keep an eye out for the metadata line if you haven't already seen it
+ // but only check for metadata on the first line
+ if (!i && !metadata && msgObj.buff.indexOf('{') === 0) {
+ i++; // always increment the message counter
+ msg = tryParse(msgObj.buff.toString('utf8'));
+ if (typeof msg === "undefined") { return readMore(); }
+
+ // validate that the current line really is metadata before storing it as such
+ if (isMetadataMessage(msg)) {
+ metadata = msg;
+ return readMore();
+ }
}
- }
- if (msgObj.buff.indexOf('cp|') > -1) {
- msg = msg || tryParse(msgObj.buff.toString('utf8'));
- if (typeof msg === "undefined") { return rmcb(); }
- if (msg[2] === 'MSG' && msg[4].indexOf('cp|') === 0) {
- cpIndex.push({
- offset: msgObj.offset,
- line: i
- });
- messageBuf = [];
+ i++;
+ if (msgObj.buff.indexOf('cp|') > -1) {
+ msg = msg || tryParse(msgObj.buff.toString('utf8'));
+ if (typeof msg === "undefined") { return readMore(); }
+ // cache the offsets of checkpoints if they can be parsed
+ if (msg[2] === 'MSG' && msg[4].indexOf('cp|') === 0) {
+ cpIndex.push({
+ offset: msgObj.offset,
+ line: i
+ });
+ // we only want to store messages since the latest checkpoint
+ // so clear the buffer every time you see a new one
+ messageBuf = [];
+ }
}
- }
- messageBuf.push(msgObj);
- return rmcb();
- }, (err) => {
- if (err && err.code !== 'ENOENT') { return void cb(err); }
- const offsetByHash = {};
- let size = 0;
- messageBuf.forEach((msgObj) => {
- const msg = tryParse(msgObj.buff.toString('utf8'));
- if (typeof msg === "undefined") { return; }
- if (msg[0] === 0 && msg[2] === 'MSG' && typeof(msg[4]) === 'string') {
- offsetByHash[getHash(msg[4])] = msgObj.offset;
+ // if it's not metadata or a checkpoint then it should be a regular message
+ // store it in the buffer
+ messageBuf.push(msgObj);
+ return readMore();
+ }, w((err) => {
+ if (err && err.code !== 'ENOENT') {
+ w.abort();
+ return void CB(err);
}
- // There is a trailing \n at the end of the file
- size = msgObj.offset + msgObj.buff.length + 1;
- });
- cb(null, {
+
+ // once indexing is complete you should have a buffer of messages since the latest checkpoint
+ // map the 'hash' of each message to its byte offset in the log, to be used for reconnecting clients
+ messageBuf.forEach((msgObj) => {
+ const msg = tryParse(msgObj.buff.toString('utf8'));
+ if (typeof msg === "undefined") { return; }
+ if (msg[0] === 0 && msg[2] === 'MSG' && typeof(msg[4]) === 'string') {
+ // msgObj.offset is API guaranteed by our storage module
+ // it should always be a valid positive integer
+ offsetByHash[getHash(msg[4])] = msgObj.offset;
+ }
+ // There is a trailing \n at the end of the file
+ size = msgObj.offset + msgObj.buff.length + 1;
+ });
+ }));
+ }).nThen(function (w) {
+ // create a function which will iterate over amendments to the metadata
+ const handler = Meta.createLineHandler(ref, Log.error);
+
+ // initialize the accumulator in case there was a foundational metadata line in the log content
+ if (metadata) { handler(void 0, metadata); }
+
+ // iterate over the dedicated metadata log (if it exists)
+ // proceed even in the event of a stream error on the metadata log
+ store.readDedicatedMetadata(channelName, handler, w(function (err) {
+ if (err) {
+ return void Log.error("DEDICATED_METADATA_ERROR", err);
+ }
+ }));
+ }).nThen(function () {
+ // when all is done, cache the metadata in memory
+ if (ref.index) { // but don't bother if no metadata was found...
+ metadata = metadata_cache[channelName] = ref.meta;
+ }
+ // and return the computed index
+ CB(null, {
// Only keep the checkpoints included in the last 100 messages
cpIndex: sliceCpIndex(cpIndex, i),
offsetByHash: offsetByHash,
@@ -111,13 +219,61 @@ module.exports.create = function (cfg) {
});
};
+ /* getIndex
+ calls back with an error if anything goes wrong
+ or with a cached index for a channel if it exists
+ (along with metadata)
+ otherwise it calls back with the index computed by 'computeIndex'
+
+ as an added bonus:
+ if the channel exists but its index does not then it caches the index
+ */
+ const indexQueues = {};
const getIndex = (ctx, channelName, cb) => {
const chan = ctx.channels[channelName];
- if (chan && chan.index) { return void cb(undefined, chan.index); }
+ // if there is a channel in memory and it has an index cached, return it
+ if (chan && chan.index) {
+ // enforce async behaviour
+ return void setTimeout(function () {
+ cb(undefined, chan.index);
+ });
+ }
+
+ // if a call to computeIndex is already in progress for this channel
+ // then add the callback for the latest invocation to the queue
+ // and wait for it to complete
+ if (Array.isArray(indexQueues[channelName])) {
+ indexQueues[channelName].push(cb);
+ return;
+ }
+
+ // otherwise, make a queue for any 'getIndex' calls made before the following 'computeIndex' call completes
+ var queue = indexQueues[channelName] = (indexQueues[channelName] || [cb]);
+
computeIndex(channelName, (err, ret) => {
- if (err) { return void cb(err); }
+ if (!Array.isArray(queue)) {
+ // something is very wrong if there's no callback array
+ return void Log.error("E_INDEX_NO_CALLBACK", channelName);
+ }
+
+
+ // clean up the queue that you're about to handle, but keep a local copy
+ delete indexQueues[channelName];
+
+ // this is most likely an unrecoverable filesystem error
+ if (err) {
+ // call back every pending function with the error
+ return void queue.forEach(function (_cb) {
+ _cb(err);
+ });
+ }
+ // cache the computed result if possible
if (chan) { chan.index = ret; }
- cb(undefined, ret);
+
+ // call back every pending function with the result
+ queue.forEach(function (_cb) {
+ _cb(void 0, ret);
+ });
});
};
@@ -128,24 +284,65 @@ module.exports.create = function (cfg) {
}
*/
+ /* storeMessage
+ * ctx
+ * channel id
+ * the message to store
+ * whether the message is a checkpoint
+ * optionally the hash of the message
+ * it's not always used, but we guard against it
+
+
+ * async but doesn't have a callback
+ * source of a race condition whereby:
+ * two messaages can be inserted
+ * two offsets can be computed using the total size of all the messages
+ * but the offsets don't correspond to the actual location of the newlines
+ * because the two actions were performed like ABba...
+ * the fix is to use callbacks and implement queueing for writes
+ * to guarantee that offset computation is always atomic with writes
+ */
+ const storageQueues = {};
+
+ const storeQueuedMessage = function (ctx, queue, id) {
+ if (queue.length === 0) {
+ delete storageQueues[id];
+ return;
+ }
+
+ const first = queue.shift();
+
+ const msgBin = first.msg;
+ const optionalMessageHash = first.hash;
+ const isCp = first.isCp;
- const storeMessage = function (ctx, channel, msg, isCp, maybeMsgHash) {
- const msgBin = new Buffer(msg + '\n', 'utf8');
// Store the message first, and update the index only once it's stored.
// store.messageBin can be async so updating the index first may
// result in a wrong cpIndex
nThen((waitFor) => {
- store.messageBin(channel.id, msgBin, waitFor(function (err) {
+ store.messageBin(id, msgBin, waitFor(function (err) {
if (err) {
waitFor.abort();
- return void Log.error("HK_STORE_MESSAGE_ERROR", err.message);
+ Log.error("HK_STORE_MESSAGE_ERROR", err.message);
+
+ // this error is critical, but there's not much we can do at the moment
+ // proceed with more messages, but they'll probably fail too
+ // at least you won't have a memory leak
+
+ // TODO make it possible to respond to clients with errors so they know
+ // their message wasn't stored
+ storeQueuedMessage(ctx, queue, id);
+ return;
}
}));
}).nThen((waitFor) => {
- getIndex(ctx, channel.id, waitFor((err, index) => {
+ getIndex(ctx, id, waitFor((err, index) => {
if (err) {
Log.warn("HK_STORE_MESSAGE_INDEX", err.stack);
// non-critical, we'll be able to get the channel index later
+
+ // proceed to the next message in the queue
+ storeQueuedMessage(ctx, queue, id);
return;
}
if (typeof (index.line) === "number") { index.line++; }
@@ -161,60 +358,177 @@ module.exports.create = function (cfg) {
line: ((index.line || 0) + 1)
} /*:cp_index_item*/));
}
- if (maybeMsgHash) { index.offsetByHash[maybeMsgHash] = index.size; }
+ if (optionalMessageHash) { index.offsetByHash[optionalMessageHash] = index.size; }
index.size += msgBin.length;
+
+ // handle the next element in the queue
+ storeQueuedMessage(ctx, queue, id);
}));
});
};
- // Determine what we should store when a message a broadcasted to a channel
+ const storeMessage = function (ctx, channel, msg, isCp, optionalMessageHash) {
+ const id = channel.id;
+
+ const msgBin = new Buffer(msg + '\n', 'utf8');
+ if (Array.isArray(storageQueues[id])) {
+ return void storageQueues[id].push({
+ msg: msgBin,
+ hash: optionalMessageHash,
+ isCp: isCp,
+ });
+ }
+
+ const queue = storageQueues[id] = (storageQueues[id] || [{
+ msg: msgBin,
+ hash: optionalMessageHash,
+ }]);
+ storeQueuedMessage(ctx, queue, id);
+ };
+
+ var CHECKPOINT_PATTERN = /^cp\|(([A-Za-z0-9+\/=]+)\|)?/;
+
+ /* onChannelMessage
+ Determine what we should store when a message a broadcasted to a channel"
+
+ * ignores ephemeral channels
+ * ignores messages sent to expired channels
+ * rejects duplicated checkpoints
+ * validates messages to channels that have validation keys
+ * caches the id of the last saved checkpoint
+ * adds timestamps to incoming messages
+ * writes messages to the store
+ */
const onChannelMessage = function (ctx, channel, msgStruct) {
// don't store messages if the channel id indicates that it's an ephemeral message
if (!channel.id || channel.id.length === EPHEMERAL_CHANNEL_LENGTH) { return; }
const isCp = /^cp\|/.test(msgStruct[4]);
- if (historyKeeperKeys[channel.id] && historyKeeperKeys[channel.id].expire &&
- historyKeeperKeys[channel.id].expire < +new Date()) {
- return; // Don't store messages on expired channel
- }
let id;
if (isCp) {
- /*::if (typeof(msgStruct[4]) !== 'string') { throw new Error(); }*/
- id = /cp\|(([A-Za-z0-9+\/=]+)\|)?/.exec(msgStruct[4]);
+ // id becomes either null or an array or results...
+ id = CHECKPOINT_PATTERN.exec(msgStruct[4]);
if (Array.isArray(id) && id[2] && id[2] === channel.lastSavedCp) {
// Reject duplicate checkpoints
return;
}
}
- if (historyKeeperKeys[channel.id] && historyKeeperKeys[channel.id].validateKey) {
- /*::if (typeof(msgStruct[4]) !== 'string') { throw new Error(); }*/
- let signedMsg = (isCp) ? msgStruct[4].replace(/^cp\|(([A-Za-z0-9+\/=]+)\|)?/, '') : msgStruct[4];
- signedMsg = Nacl.util.decodeBase64(signedMsg);
- const validateKey = Nacl.util.decodeBase64(historyKeeperKeys[channel.id].validateKey);
- const validated = Nacl.sign.open(signedMsg, validateKey);
- if (!validated) {
- Log.info("HK_SIGNED_MESSAGE_REJECTED", 'Channel '+channel.id);
- return;
- }
- }
- if (isCp) {
- // WARNING: the fact that we only check the most recent checkpoints
- // is a potential source of bugs if one editor has high latency and
- // pushes a duplicate of an earlier checkpoint than the latest which
- // has been pushed by editors with low latency
- if (Array.isArray(id) && id[2]) {
- // Store new checkpoint hash
- channel.lastSavedCp = id[2];
+
+ let metadata;
+ nThen(function (w) {
+ // getIndex (and therefore the latest metadata)
+ getIndex(ctx, channel.id, w(function (err, index) {
+ if (err) {
+ w.abort();
+ return void Log.error('CHANNEL_MESSAGE_ERROR', err);
+ }
+
+ if (!index.metadata) {
+ // if there's no channel metadata then it can't be an expiring channel
+ // nor can we possibly validate it
+ return;
+ }
+
+ metadata = index.metadata;
+
+ if (metadata.expire && metadata.expire < +new Date()) {
+ // don't store message sent to expired channels
+ w.abort();
+ return;
+ // TODO if a channel expired a long time ago but it's still here, remove it
+ }
+
+ // if there's no validateKey present skip to the next block
+ if (!metadata.validateKey) { return; }
+
+ // trim the checkpoint indicator off the message if it's present
+ let signedMsg = (isCp) ? msgStruct[4].replace(CHECKPOINT_PATTERN, '') : msgStruct[4];
+ // convert the message from a base64 string into a Uint8Array
+
+ // FIXME this can fail and the client won't notice
+ signedMsg = Nacl.util.decodeBase64(signedMsg);
+
+ // FIXME this can blow up
+ // TODO check that that won't cause any problems other than not being able to append...
+ const validateKey = Nacl.util.decodeBase64(metadata.validateKey);
+ // validate the message
+ const validated = Nacl.sign.open(signedMsg, validateKey);
+ if (!validated) {
+ // don't go any further if the message fails validation
+ w.abort();
+ Log.info("HK_SIGNED_MESSAGE_REJECTED", 'Channel '+channel.id);
+ return;
+ }
+ }));
+ }).nThen(function () {
+ // do checkpoint stuff...
+
+ // 1. get the checkpoint id
+ // 2. reject duplicate checkpoints
+
+ if (isCp) {
+ // if the message is a checkpoint we will have already validated
+ // that it isn't a duplicate. remember its id so that we can
+ // repeat this process for the next incoming checkpoint
+
+ // WARNING: the fact that we only check the most recent checkpoints
+ // is a potential source of bugs if one editor has high latency and
+ // pushes a duplicate of an earlier checkpoint than the latest which
+ // has been pushed by editors with low latency
+ // FIXME
+ if (Array.isArray(id) && id[2]) {
+ // Store new checkpoint hash
+ channel.lastSavedCp = id[2];
+ }
}
- }
- msgStruct.push(now());
- storeMessage(ctx, channel, JSON.stringify(msgStruct), isCp, getHash(msgStruct[4]));
+
+ // add the time to the message
+ msgStruct.push(now());
+
+ // storeMessage
+ storeMessage(ctx, channel, JSON.stringify(msgStruct), isCp, getHash(msgStruct[4]));
+ });
};
+ /* dropChannel
+ * exported as API
+ * used by chainpad-server/NetfluxWebsocketSrv.js
+ * cleans up memory structures which are managed entirely by the historyKeeper
+ * the netflux server manages other memory in ctx.channels
+ */
const dropChannel = function (chanName) {
- delete historyKeeperKeys[chanName];
+ delete metadata_cache[chanName];
};
+ /* getHistoryOffset
+ returns a number representing the byte offset from the start of the log
+ for whatever history you're seeking.
+
+ query by providing a 'lastKnownHash',
+ which is really just a string of the first 64 characters of an encrypted message.
+ OR by -1 which indicates that we want the full history (byte offset 0)
+ OR nothing, which indicates that you want whatever messages the historyKeeper deems relevant
+ (typically the last few checkpoints)
+
+ this function embeds a lot of the history keeper's logic:
+
+ 0. if you passed -1 as the lastKnownHash it means you want the complete history
+ * I'm not sure why you'd need to call this function if you know it will return 0 in this case...
+ * it has a side-effect of filling the index cache if it's empty
+ 1. if you provided a lastKnownHash and that message does not exist in the history:
+ * either the client has made a mistake or the history they knew about no longer exists
+ * call back with EINVAL
+ 2. if you did not provide a lastKnownHash
+ * and there are fewer than two checkpoints:
+ * return 0 (read from the start of the file)
+ * and there are two or more checkpoints:
+ * return the offset of the earliest checkpoint which 'sliceCpIndex' considers relevant
+ 3. if you did provide a lastKnownHash
+ * read through the log until you find the hash that you're looking for
+ * call back with either the byte offset of the message that you found OR
+ * -1 if you didn't find it
+
+ */
const getHistoryOffset = (ctx, channelName, lastKnownHash, cb /*:(e:?Error, os:?number)=>void*/) => {
// lastKnownhash === -1 means we want the complete history
if (lastKnownHash === -1) { return void cb(null, 0); }
@@ -223,8 +537,17 @@ module.exports.create = function (cfg) {
getIndex(ctx, channelName, waitFor((err, index) => {
if (err) { waitFor.abort(); return void cb(err); }
- // Check last known hash
+ // check if the "hash" the client is requesting exists in the index
const lkh = index.offsetByHash[lastKnownHash];
+ // we evict old hashes from the index as new checkpoints are discovered.
+ // if someone connects and asks for a hash that is no longer relevant,
+ // we tell them it's an invalid request. This is because of the semantics of "GET_HISTORY"
+ // which is only ever used when connecting or reconnecting in typical uses of history...
+ // this assumption should hold for uses by chainpad, but perhaps not for other uses cases.
+ // EXCEPT: other cases don't use checkpoints!
+ // clients that are told that their request is invalid should just make another request
+ // without specifying the hash, and just trust the server to give them the relevant data.
+ // QUESTION: does this mean mailboxes are causing the server to store too much stuff in memory?
if (lastKnownHash && typeof(lkh) !== "number") {
waitFor.abort();
return void cb(new Error('EINVAL'));
@@ -250,12 +573,20 @@ module.exports.create = function (cfg) {
offset = lkh;
}));
}).nThen((waitFor) => {
+ // if offset is less than zero then presumably the channel has no messages
+ // returning falls through to the next block and therefore returns -1
if (offset !== -1) { return; }
- store.readMessagesBin(channelName, 0, (msgObj, rmcb, abort) => {
+
+ // do a lookup from the index
+ // FIXME maybe we don't need this anymore?
+ // otherwise we have a non-negative offset and we can start to read from there
+ store.readMessagesBin(channelName, 0, (msgObj, readMore, abort) => {
+ // tryParse return a parsed message or undefined
const msg = tryParse(msgObj.buff.toString('utf8'));
- if (typeof msg === "undefined") { return rmcb(); }
+ // if it was undefined then go onto the next message
+ if (typeof msg === "undefined") { return readMore(); }
if (typeof(msg[4]) !== 'string' || lastKnownHash !== getHash(msg[4])) {
- return void rmcb();
+ return void readMore();
}
offset = msgObj.offset;
abort();
@@ -267,6 +598,15 @@ module.exports.create = function (cfg) {
});
};
+ /* getHistoryAsync
+ * finds the appropriate byte offset from which to begin reading using 'getHistoryOffset'
+ * streams through the rest of the messages, safely parsing them and returning the parsed content to the handler
+ * calls back when it has reached the end of the log
+
+ Used by:
+ * GET_HISTORY
+
+ */
const getHistoryAsync = (ctx, channelName, lastKnownHash, beforeHash, handler, cb) => {
let offset = -1;
nThen((waitFor) => {
@@ -280,15 +620,24 @@ module.exports.create = function (cfg) {
}).nThen((waitFor) => {
if (offset === -1) { return void cb(new Error("could not find offset")); }
const start = (beforeHash) ? 0 : offset;
- store.readMessagesBin(channelName, start, (msgObj, rmcb, abort) => {
+ store.readMessagesBin(channelName, start, (msgObj, readMore, abort) => {
if (beforeHash && msgObj.offset >= offset) { return void abort(); }
- handler(tryParse(msgObj.buff.toString('utf8')), rmcb);
+ handler(tryParse(msgObj.buff.toString('utf8')), readMore);
}, waitFor(function (err) {
return void cb(err);
}));
});
};
+ /* getOlderHistory
+ * allows clients to query for all messages until a known hash is read
+ * stores all messages in history as they are read
+ * can therefore be very expensive for memory
+ * should probably be converted to a streaming interface
+
+ Used by:
+ * GET_HISTORY_RANGE
+ */
const getOlderHistory = function (channelName, oldestKnownHash, cb) {
var messageBuffer = [];
var found = false;
@@ -298,10 +647,11 @@ module.exports.create = function (cfg) {
let parsed = tryParse(msgStr);
if (typeof parsed === "undefined") { return; }
- if (parsed.validateKey) {
- historyKeeperKeys[channelName] = parsed;
- return;
- }
+ // identify classic metadata messages by their inclusion of a channel.
+ // and don't send metadata, since:
+ // 1. the user won't be interested in it
+ // 2. this metadata is potentially incomplete/incorrect
+ if (isMetadataMessage(parsed)) { return; }
var content = parsed[4];
if (typeof(content) !== 'string') { return; }
@@ -329,13 +679,20 @@ module.exports.create = function (cfg) {
};
*/
-
+ /* historyKeeperBroadcast
+ * uses API from the netflux server to send messages to every member of a channel
+ * sendMsg runs in a try-catch and drops users if sending a message fails
+ */
const historyKeeperBroadcast = function (ctx, channel, msg) {
let chan = ctx.channels[channel] || (([] /*:any*/) /*:Chan_t*/);
chan.forEach(function (user) {
sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify(msg)]);
});
};
+
+ /* onChannelCleared
+ * broadcasts to all clients in a channel if that channel is deleted
+ */
const onChannelCleared = function (ctx, channel) {
historyKeeperBroadcast(ctx, channel, {
error: 'ECLEARED',
@@ -351,13 +708,29 @@ module.exports.create = function (cfg) {
});
});
delete ctx.channels[channel];
- delete historyKeeperKeys[channel];
+ delete metadata_cache[channel];
};
// Check if the selected channel is expired
// If it is, remove it from memory and broadcast a message to its members
+
+ const onChannelMetadataChanged = function (ctx, channel) {
+ channel = channel;
+ };
+
+ /* checkExpired
+ * synchronously returns true or undefined to indicate whether the channel is expired
+ * according to its metadata
+ * has some side effects:
+ * closes the channel via the store.closeChannel API
+ * and then broadcasts to all channel members that the channel has expired
+ * removes the channel from the netflux-server's in-memory cache
+ * removes the channel metadata from history keeper's in-memory cache
+
+ FIXME the boolean nature of this API should be separated from its side effects
+ */
const checkExpired = function (ctx, channel) {
- if (channel && channel.length === STANDARD_CHANNEL_LENGTH && historyKeeperKeys[channel] &&
- historyKeeperKeys[channel].expire && historyKeeperKeys[channel].expire < +new Date()) {
+ if (channel && channel.length === STANDARD_CHANNEL_LENGTH && metadata_cache[channel] &&
+ metadata_cache[channel].expire && metadata_cache[channel].expire < +new Date()) {
store.closeChannel(channel, function () {
historyKeeperBroadcast(ctx, channel, {
error: 'EEXPIRED',
@@ -365,12 +738,25 @@ module.exports.create = function (cfg) {
});
});
delete ctx.channels[channel];
- delete historyKeeperKeys[channel];
+ delete metadata_cache[channel];
return true;
}
return;
};
+ /* onDirectMessage
+ * exported for use by the netflux-server
+ * parses and handles all direct messages directed to the history keeper
+ * check if it's expired and execute all the associated side-effects
+ * routes queries to the appropriate handlers
+ * GET_HISTORY
+ * GET_HISTORY_RANGE
+ * GET_FULL_HISTORY
+ * RPC
+ * if the rpc has special hooks that the history keeper needs to be aware of...
+ * execute them here...
+
+ */
const onDirectMessage = function (ctx, seq, user, json) {
let parsed;
let channelName;
@@ -386,7 +772,7 @@ module.exports.create = function (cfg) {
}
// If the requested history is for an expired channel, abort
- // Note the if we don't have the keys for that channel in historyKeeperKeys, we'll
+ // Note the if we don't have the keys for that channel in metadata_cache, we'll
// have to abort later (once we know the expiration time)
if (checkExpired(ctx, parsed[1])) { return; }
@@ -396,35 +782,31 @@ module.exports.create = function (cfg) {
// parsed[3] is the last known hash (optionnal)
sendMsg(ctx, user, [seq, 'ACK']);
channelName = parsed[1];
- var validateKey = parsed[2];
- var lastKnownHash = parsed[3];
- var owners;
- var expire;
- if (parsed[2] && typeof parsed[2] === "object") {
- validateKey = parsed[2].validateKey;
- lastKnownHash = parsed[2].lastKnownHash;
- owners = parsed[2].owners;
- if (parsed[2].expire) {
- expire = +parsed[2].expire * 1000 + (+new Date());
+ var config = parsed[2];
+ var metadata = {};
+ var lastKnownHash;
+
+ // clients can optionally pass a map of attributes
+ // if the channel already exists this map will be ignored
+ // otherwise it will be stored as the initial metadata state for the channel
+ if (config && typeof config === "object" && !Array.isArray(parsed[2])) {
+ lastKnownHash = config.lastKnownHash;
+ metadata = config.metadata || {};
+ if (metadata.expire) {
+ metadata.expire = +metadata.expire * 1000 + (+new Date());
}
}
+ metadata.channel = channelName;
+
+ // if the user sends us an invalid key, we won't be able to validate their messages
+ // so they'll never get written to the log anyway. Let's just drop their message
+ // on the floor instead of doing a bunch of extra work
+ // TODO send them an error message so they know something is wrong
+ if (metadata.validateKey && !isValidValidateKeyString(metadata.validateKey)) {
+ return void Log.error('HK_INVALID_KEY', metadata.validateKey);
+ }
nThen(function (waitFor) {
- if (!tasks) { return; } // tasks are not supported
- if (typeof(expire) !== 'number' || !expire) { return; }
-
- // the fun part...
- // the user has said they want this pad to expire at some point
- tasks.write(expire, "EXPIRE", [ channelName ], waitFor(function (err) {
- if (err) {
- // if there is an error, we don't want to crash the whole server...
- // just log it, and if there's a problem you'll be able to fix it
- // at a later date with the provided information
- Log.error('HK_CREATE_EXPIRE_TASK', err);
- Log.info('HK_INVALID_EXPIRE_TASK', JSON.stringify([expire, 'EXPIRE', channelName]));
- }
- }));
- }).nThen(function (waitFor) {
var w = waitFor();
/* unless this is a young channel, we will serve all messages from an offset
@@ -438,39 +820,29 @@ module.exports.create = function (cfg) {
so, let's just fall through...
*/
if (err) { return w(); }
+
+
+ // it's possible that the channel doesn't have metadata
+ // but in that case there's no point in checking if the channel expired
+ // or in trying to send metadata, so just skip this block
if (!index || !index.metadata) { return void w(); }
- // Store the metadata if we don't have it in memory
- if (!historyKeeperKeys[channelName]) {
- historyKeeperKeys[channelName] = index.metadata;
- }
// And then check if the channel is expired. If it is, send the error and abort
+ // FIXME this is hard to read because 'checkExpired' has side effects
if (checkExpired(ctx, channelName)) { return void waitFor.abort(); }
- // Send the metadata to the user
- if (!lastKnownHash && index.cpIndex.length > 1) {
- sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify(index.metadata)], w);
- return;
- }
- w();
+ // always send metadata with GET_HISTORY requests
+ sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify(index.metadata)], w);
}));
}).nThen(() => {
let msgCount = 0;
- let expired = false;
- getHistoryAsync(ctx, channelName, lastKnownHash, false, (msg, cb) => {
+
+ // TODO compute lastKnownHash in a manner such that it will always skip past the metadata line?
+ getHistoryAsync(ctx, channelName, lastKnownHash, false, (msg, readMore) => {
if (!msg) { return; }
- if (msg.validateKey) {
- // If it is a young channel, this is the part where we get the metadata
- // Check if the channel is expired and abort if it is.
- if (!historyKeeperKeys[channelName]) { historyKeeperKeys[channelName] = msg; }
- expired = checkExpired(ctx, channelName);
- }
- if (expired) { return void cb(); }
msgCount++;
-
- sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify(msg)], cb);
+ // avoid sending the metadata message a second time
+ if (isMetadataMessage(msg) && metadata_cache[channelName]) { return readMore(); }
+ sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify(msg)], readMore);
}, (err) => {
- // If the pad is expired, stop here, we've already sent the error message
- if (expired) { return; }
-
if (err && err.code !== 'ENOENT') {
if (err.message !== 'EINVAL') { Log.error("HK_GET_HISTORY", err); }
const parsedMsg = {error:err.message, channel: channelName};
@@ -478,24 +850,46 @@ module.exports.create = function (cfg) {
return;
}
- // If this is a new channel, we need to store the metadata as
- // the first message in the file
const chan = ctx.channels[channelName];
- if (msgCount === 0 && !historyKeeperKeys[channelName] && chan && chan.indexOf(user) > -1) {
- var key = {};
- key.channel = channelName;
- if (validateKey) {
- key.validateKey = validateKey;
- }
- if (owners) {
- key.owners = owners;
- }
- if (expire) {
- key.expire = expire;
+
+ if (msgCount === 0 && !metadata_cache[channelName] && chan && chan.indexOf(user) > -1) {
+ metadata_cache[channelName] = metadata;
+
+ // the index will have already been constructed and cached at this point
+ // but it will not have detected any metadata because it hasn't been written yet
+ // this means that the cache starts off as invalid, so we have to correct it
+ if (chan && chan.index) { chan.index.metadata = metadata; }
+
+ // new channels will always have their metadata written to a dedicated metadata log
+ // but any lines after the first which are not amendments in a particular format will be ignored.
+ // Thus we should be safe from race conditions here if just write metadata to the log as below...
+ // TODO validate this logic
+ // otherwise maybe we need to check that the metadata log is empty as well
+ store.writeMetadata(channelName, JSON.stringify(metadata), function (err) {
+ if (err) {
+ // FIXME tell the user that there was a channel error?
+ return void Log.error('HK_WRITE_METADATA', {
+ channel: channelName,
+ error: err,
+ });
+ }
+ });
+
+ // write tasks
+ if(tasks && metadata.expire && typeof(metadata.expire) === 'number') {
+ // the fun part...
+ // the user has said they want this pad to expire at some point
+ tasks.write(metadata.expire, "EXPIRE", [ channelName ], function (err) {
+ if (err) {
+ // if there is an error, we don't want to crash the whole server...
+ // just log it, and if there's a problem you'll be able to fix it
+ // at a later date with the provided information
+ Log.error('HK_CREATE_EXPIRE_TASK', err);
+ Log.info('HK_INVALID_EXPIRE_TASK', JSON.stringify([metadata.expire, 'EXPIRE', channelName]));
+ }
+ });
}
- historyKeeperKeys[channelName] = key;
- storeMessage(ctx, chan, JSON.stringify(key), false, undefined);
- sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify(key)]);
+ sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify(metadata)]);
}
// End of history message:
@@ -551,9 +945,12 @@ module.exports.create = function (cfg) {
// parsed[2] is a validation key (optionnal)
// parsed[3] is the last known hash (optionnal)
sendMsg(ctx, user, [seq, 'ACK']);
- getHistoryAsync(ctx, parsed[1], -1, false, (msg, cb) => {
+
+ // FIXME should we send metadata here too?
+ // none of the clientside code which uses this API needs metadata, but it won't hurt to send it (2019-08-22)
+ getHistoryAsync(ctx, parsed[1], -1, false, (msg, readMore) => {
if (!msg) { return; }
- sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify(['FULL_HISTORY', msg])], cb);
+ sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify(['FULL_HISTORY', msg])], readMore);
}, (err) => {
let parsedMsg = ['FULL_HISTORY_END', parsed[1]];
if (err) {
@@ -581,6 +978,15 @@ module.exports.create = function (cfg) {
if (msg[3] === 'CLEAR_OWNED_CHANNEL') {
onChannelCleared(ctx, msg[4]);
}
+
+ // FIXME METADATA CHANGE
+ if (msg[3] === 'SET_METADATA') { // or whatever we call the RPC????
+ // make sure we update our cache of metadata
+ // or at least invalidate it and force other mechanisms to recompute its state
+ // 'output' could be the new state as computed by rpc
+ onChannelMetadataChanged(ctx, msg[4]);
+ }
+
sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify([parsed[0]].concat(output))]);
});
} catch (e) {
diff --git a/lib/deduplicate.js b/lib/deduplicate.js
new file mode 100644
index 000000000..3ad62e6b0
--- /dev/null
+++ b/lib/deduplicate.js
@@ -0,0 +1,11 @@
+// remove duplicate elements in an array
+module.exports = function (O) {
+ // make a copy of the original array
+ var A = O.slice();
+ for (var i = 0; i < A.length; i++) {
+ for (var j = i + 1; j < A.length; j++) {
+ if (A[i] === A[j]) { A.splice(j--, 1); }
+ }
+ }
+ return A;
+};
diff --git a/lib/metadata.js b/lib/metadata.js
new file mode 100644
index 000000000..419780d7b
--- /dev/null
+++ b/lib/metadata.js
@@ -0,0 +1,126 @@
+var Meta = module.exports;
+
+var deduplicate = require("./deduplicate");
+
+/* Metadata fields:
+
+ * channel
+ * validateKey
+ * owners
+ * ADD_OWNERS
+ * RM_OWNERS
+ * expire
+
+*/
+
+var commands = {};
+
+// ["ADD_OWNERS", ["7eEqelGso3EBr5jHlei6av4r9w2B9XZiGGwA1EgZ-5I="], 1561623438989]
+commands.ADD_OWNERS = function (meta, args) {
+ // bail out if args isn't an array
+ if (!Array.isArray(args)) {
+ throw new Error('METADATA_INVALID_OWNERS');
+ }
+
+ // you shouldn't be able to get here if there are no owners
+ // because only an owner should be able to change the owners
+ if (!Array.isArray(meta.owners)) {
+ throw new Error("METADATA_NONSENSE_OWNERS");
+ }
+
+ args.forEach(function (owner) {
+ if (meta.owners.indexOf(owner) >= 0) { return; }
+ meta.owners.push(owner);
+ });
+};
+
+// ["RM_OWNERS", ["CrufexqXcY-z+eKJlEbNELVy5Sb7E-EAAEFI8GnEtZ0="], 1561623439989]
+commands.RM_OWNERS = function (meta, args) {
+ // what are you doing if you don't have owners to remove?
+ if (!Array.isArray(args)) {
+ throw new Error('METADATA_INVALID_OWNERS');
+ }
+ // if there aren't any owners to start, this is also pointless
+ if (!Array.isArray(meta.owners)) {
+ throw new Error("METADATA_NONSENSE_OWNERS");
+ }
+
+ // remove owners one by one
+ // we assume there are no duplicates
+ args.forEach(function (owner) {
+ var index = meta.owners.indexOf(owner);
+ if (index < 0) { return; }
+ meta.owners.splice(index, 1);
+ });
+};
+
+// ["RESET_OWNERS", ["7eEqelGso3EBr5jHlei6av4r9w2B9XZiGGwA1EgZ-5I="], 1561623439989]
+commands.RESET_OWNERS = function (meta, args) {
+ // expect a new array, even if it's empty
+ if (!Array.isArray(args)) {
+ throw new Error('METADATA_INVALID_OWNERS');
+ }
+ // assume there are owners to start
+ if (!Array.isArray(meta.owners)) {
+ throw new Error("METADATA_NONSENSE_OWNERS");
+ }
+
+ // overwrite the existing owners with the new one
+ meta.owners = deduplicate(args);
+};
+
+commands.UPDATE_EXPIRATION = function () {
+ throw new Error("E_NOT_IMPLEMENTED");
+};
+
+var handleCommand = function (meta, line) {
+ var command = line[0];
+ var args = line[1];
+ //var time = line[2];
+
+ if (typeof(commands[command]) !== 'function') {
+ throw new Error("METADATA_UNSUPPORTED_COMMAND");
+ }
+
+ commands[command](meta, args);
+};
+
+Meta.createLineHandler = function (ref, errorHandler) {
+ ref.meta = {};
+ ref.index = 0;
+
+ return function (err, line) {
+ if (err) {
+ return void errorHandler('METADATA_HANDLER_LINE_ERR', {
+ error: err,
+ index: ref.index,
+ line: JSON.stringify(line),
+ });
+ }
+
+ if (Array.isArray(line)) {
+ try {
+ handleCommand(ref.meta, line);
+ ref.index++;
+ } catch (err2) {
+ errorHandler("METADATA_COMMAND_ERR", {
+ error: err2.stack,
+ line: line,
+ });
+ }
+ return;
+ }
+
+ if (ref.index === 0 && typeof(line) === 'object') {
+ ref.index++;
+ // special case!
+ ref.meta = line;
+ return;
+ }
+
+ errorHandler("METADATA_HANDLER_WEIRDLINE", {
+ line: line,
+ index: ref.index++,
+ });
+ };
+};
diff --git a/package-lock.json b/package-lock.json
index 7a91644fa..0fb182b2d 100644
--- a/package-lock.json
+++ b/package-lock.json
@@ -1,6 +1,6 @@
{
"name": "cryptpad",
- "version": "2.25.0",
+ "version": "3.0.0",
"lockfileVersion": 1,
"requires": true,
"dependencies": {
@@ -56,9 +56,9 @@
"optional": true
},
"async-limiter": {
- "version": "1.0.0",
- "resolved": "https://registry.npmjs.org/async-limiter/-/async-limiter-1.0.0.tgz",
- "integrity": "sha512-jp/uFnooOiO+L211eZOoSyzpOITMXx1rBITauYykG3BRYPu8h0UcxsPNB04RR5vo4Tyz3+ay17tR6JVf9qzYWg=="
+ "version": "1.0.1",
+ "resolved": "https://registry.npmjs.org/async-limiter/-/async-limiter-1.0.1.tgz",
+ "integrity": "sha512-csOlWGAcRFJaI6m+F2WKdnMKr4HhdhFVBk0H/QbJFMCr+uO2kwohwXQPxw/9OCxp05r5ghVBFSyioixx3gfkNQ=="
},
"balanced-match": {
"version": "1.0.0",
@@ -99,9 +99,9 @@
"integrity": "sha1-0ygVQE1olpn4Wk6k+odV3ROpYEg="
},
"chainpad-server": {
- "version": "3.0.2",
- "resolved": "https://registry.npmjs.org/chainpad-server/-/chainpad-server-3.0.2.tgz",
- "integrity": "sha512-c5aEljVAapDKKs0+Rt2jymKAszm8X4ZeLFNJj1yxflwBqoh0jr8OANYvbfjtNaYFe2Wdflp/1i4gibYX4IMc+g==",
+ "version": "3.0.3",
+ "resolved": "https://registry.npmjs.org/chainpad-server/-/chainpad-server-3.0.3.tgz",
+ "integrity": "sha512-NRfV7FFBEYy4ZVX7h0P5znu55X8v5K4iGWeMGihkfWZLKu70GmCPUTwpBCP79dUvnCToKEa4/e8aoSPcvZC8pA==",
"requires": {
"nthen": "^0.1.8",
"pull-stream": "^3.6.9",
@@ -227,19 +227,25 @@
}
},
"dom-serializer": {
- "version": "0.1.1",
- "resolved": "https://registry.npmjs.org/dom-serializer/-/dom-serializer-0.1.1.tgz",
- "integrity": "sha512-l0IU0pPzLWSHBcieZbpOKgkIn3ts3vAh7ZuFyXNwJxJXk/c4Gwj9xaTJwIDVQCXawWD0qb3IzMGH5rglQaO0XA==",
+ "version": "0.2.1",
+ "resolved": "https://registry.npmjs.org/dom-serializer/-/dom-serializer-0.2.1.tgz",
+ "integrity": "sha512-sK3ujri04WyjwQXVoK4PU3y8ula1stq10GJZpqHIUgoGZdsGzAGu65BnU3d08aTVSvO7mGPZUc0wTEDL+qGE0Q==",
"dev": true,
"requires": {
- "domelementtype": "^1.3.0",
- "entities": "^1.1.1"
+ "domelementtype": "^2.0.1",
+ "entities": "^2.0.0"
},
"dependencies": {
+ "domelementtype": {
+ "version": "2.0.1",
+ "resolved": "https://registry.npmjs.org/domelementtype/-/domelementtype-2.0.1.tgz",
+ "integrity": "sha512-5HOHUDsYZWV8FGWN0Njbr/Rn7f/eWSQi1v7+HsUVwXgn8nWWlL64zKDkS0n8ZmQ3mlWOMuXOnR+7Nx/5tMO5AQ==",
+ "dev": true
+ },
"entities": {
- "version": "1.1.2",
- "resolved": "https://registry.npmjs.org/entities/-/entities-1.1.2.tgz",
- "integrity": "sha512-f2LZMYl1Fzu7YSBKg+RoROelpOaNrcGmE9AZubeDfrCEia483oW4MI4VyFd5VNHIgQ/7qm1I0wUHK1eJnn2y2w==",
+ "version": "2.0.0",
+ "resolved": "https://registry.npmjs.org/entities/-/entities-2.0.0.tgz",
+ "integrity": "sha512-D9f7V0JSRwIxlRI2mjMqufDrRDnx8p+eEOz7aUM9SuvF8gsBzra0/6tbjl1m8eQHrZlYj6PxqE00hZ1SAIKPLw==",
"dev": true
}
}
@@ -458,9 +464,9 @@
}
},
"graceful-fs": {
- "version": "4.1.15",
- "resolved": "https://registry.npmjs.org/graceful-fs/-/graceful-fs-4.1.15.tgz",
- "integrity": "sha512-6uHUhOPEBgQ24HM+r6b/QwWfZq+yiFcipKFrOFiBEnWdy5sdzYoi+pJeQaPI5qOLRFqWmAXUPQNsielzdLoecA=="
+ "version": "4.2.2",
+ "resolved": "https://registry.npmjs.org/graceful-fs/-/graceful-fs-4.2.2.tgz",
+ "integrity": "sha512-IItsdsea19BoLC7ELy13q1iJFNmd7ofZH5+X/pJr90/nRoPEX0DJo1dHDbgtYWOhJhcCgMDTOw84RZ72q6lB+Q=="
},
"has-ansi": {
"version": "2.0.0",
@@ -597,9 +603,9 @@
}
},
"jszip": {
- "version": "3.2.1",
- "resolved": "https://registry.npmjs.org/jszip/-/jszip-3.2.1.tgz",
- "integrity": "sha512-iCMBbo4eE5rb1VCpm5qXOAaUiRKRUKiItn8ah2YQQx9qymmSAY98eyQfioChEYcVQLh0zxJ3wS4A0mh90AVPvw==",
+ "version": "3.2.2",
+ "resolved": "https://registry.npmjs.org/jszip/-/jszip-3.2.2.tgz",
+ "integrity": "sha512-NmKajvAFQpbg3taXQXr/ccS2wcucR1AZ+NtyWp2Nq7HHVsXhcJFR8p0Baf32C2yVvBylFWVeKf+WI2AnvlPhpA==",
"dev": true,
"requires": {
"lie": "~3.3.0",
@@ -697,9 +703,9 @@
}
},
"lodash": {
- "version": "4.17.14",
- "resolved": "https://registry.npmjs.org/lodash/-/lodash-4.17.14.tgz",
- "integrity": "sha512-mmKYbW3GLuJeX+iGP+Y7Gp1AiGHGbXHCOh/jZmrawMmsE7MS4znI3RL2FsjbqOyMayHInjOeykW7PEajUk1/xw==",
+ "version": "4.17.15",
+ "resolved": "https://registry.npmjs.org/lodash/-/lodash-4.17.15.tgz",
+ "integrity": "sha512-8xOcRHvCjnocdS5cpwXQXVzmmh5e5+saE2QGoeQmbKmRS6J3VQppPOIt0MnmE+4xlZoumy0GPG0D0MVIQbNA1A==",
"dev": true
},
"lodash.clonedeep": {
@@ -711,7 +717,8 @@
"lodash.merge": {
"version": "4.6.2",
"resolved": "https://registry.npmjs.org/lodash.merge/-/lodash.merge-4.6.2.tgz",
- "integrity": "sha512-0KpjqXRVvrYyCsX1swR/XTK0va6VQkQM6MNo7PqW77ByjAhoARA8EfrP1N4+KlKj8YS0ZUCtRT/YUuhyYDujIQ=="
+ "integrity": "sha512-0KpjqXRVvrYyCsX1swR/XTK0va6VQkQM6MNo7PqW77ByjAhoARA8EfrP1N4+KlKj8YS0ZUCtRT/YUuhyYDujIQ==",
+ "dev": true
},
"lodash.sortby": {
"version": "4.7.0",
@@ -965,9 +972,9 @@
}
},
"process-nextick-args": {
- "version": "2.0.0",
- "resolved": "https://registry.npmjs.org/process-nextick-args/-/process-nextick-args-2.0.0.tgz",
- "integrity": "sha512-MtEC1TqN0EU5nephaJ4rAtThHtC86dNN9qCuEhtshvpVBkAW5ZO7BASN9REnF9eoXGcRub+pFuKEpOHE+HbEMw==",
+ "version": "2.0.1",
+ "resolved": "https://registry.npmjs.org/process-nextick-args/-/process-nextick-args-2.0.1.tgz",
+ "integrity": "sha512-3ouUOpQhtgrbOa17J7+uxOTpITYWaGP7/AhoR3+A+/1e9skrzelGi/dXzEYyvbxubEF6Wn2ypscTKiKJFFn1ag==",
"dev": true
},
"promise": {
@@ -997,9 +1004,9 @@
"optional": true
},
"pull-stream": {
- "version": "3.6.12",
- "resolved": "https://registry.npmjs.org/pull-stream/-/pull-stream-3.6.12.tgz",
- "integrity": "sha512-+LO1XIVyTMmeoH26UHznpgrgX2npTVYccTkMpgk/EyiQjFt1FmoNm+w+/zMLuz9U3bpvT5sSUicMKEe/2JjgEA=="
+ "version": "3.6.14",
+ "resolved": "https://registry.npmjs.org/pull-stream/-/pull-stream-3.6.14.tgz",
+ "integrity": "sha512-KIqdvpqHHaTUA2mCYcLG1ibEbu/LCKoJZsBWyv9lSYtPkJPBq8m3Hxa103xHi6D2thj5YXa0TqK3L3GUkwgnew=="
},
"qs": {
"version": "6.5.2",
@@ -1049,9 +1056,9 @@
"integrity": "sha1-lAFm0gfRDphhT+SSU60vCsAZ9+E="
},
"rimraf": {
- "version": "2.6.3",
- "resolved": "https://registry.npmjs.org/rimraf/-/rimraf-2.6.3.tgz",
- "integrity": "sha512-mwqeW5XsA2qAejG46gYdENaxXjx9onRNCfn7L0duuP4hCuTIi/QO7PDK07KJfp1d+izWPrzEJDcSqBa0OZQriA==",
+ "version": "2.7.1",
+ "resolved": "https://registry.npmjs.org/rimraf/-/rimraf-2.7.1.tgz",
+ "integrity": "sha512-uWjbaKIK3T1OSVptzX7Nl6PvQ3qAGtKEtVRjRuazjfL3Bx5eI409VZSqgND+4UNnmzLVdPj9FqFJNPqBZFve4w==",
"dev": true,
"requires": {
"glob": "^7.1.3"
diff --git a/package.json b/package.json
index 585e8b8fc..8ed4b4e9d 100644
--- a/package.json
+++ b/package.json
@@ -1,7 +1,7 @@
{
"name": "cryptpad",
"description": "realtime collaborative visual editor with zero knowlege server",
- "version": "2.25.0",
+ "version": "3.0.0",
"license": "AGPL-3.0+",
"repository": {
"type": "git",
@@ -11,7 +11,7 @@
"chainpad-server": "~3.0.2",
"express": "~4.16.0",
"fs-extra": "^7.0.0",
- "nthen": "~0.1.0",
+ "nthen": "0.1.8",
"pull-stream": "^3.6.1",
"replify": "^1.2.0",
"saferphore": "0.0.1",
diff --git a/rpc.js b/rpc.js
index a69033cc1..3af44160a 100644
--- a/rpc.js
+++ b/rpc.js
@@ -17,6 +17,7 @@ const Saferphore = require("saferphore");
const nThen = require("nthen");
const getFolderSize = require("get-folder-size");
const Pins = require("./lib/pins");
+const Meta = require("./lib/metadata");
var RPC = module.exports;
@@ -313,22 +314,22 @@ var getFileSize = function (Env, channel, cb) {
});
};
+
var getMetadata = function (Env, channel, cb) {
if (!isValidId(channel)) { return void cb('INVALID_CHAN'); }
- if (channel.length === 32) {
- if (typeof(Env.msgStore.getChannelMetadata) !== 'function') {
- return cb('GET_CHANNEL_METADATA_UNSUPPORTED');
- }
+ if (channel.length !== 32) { return cb("INVALID_CHAN"); }
- return void Env.msgStore.getChannelMetadata(channel, function (e, data) {
- if (e) {
- if (e.code === 'INVALID_METADATA') { return void cb(void 0, {}); }
- return void cb(e.code);
- }
- cb(void 0, data);
- });
- }
+ var ref = {};
+ var lineHandler = Meta.createLineHandler(ref, Log.error);
+
+ return void Env.msgStore.readChannelMetadata(channel, lineHandler, function (err) {
+ if (err) {
+ // stream errors?
+ return void cb(err);
+ }
+ cb(void 0, ref.meta);
+ });
};
var getMultipleFileSize = function (Env, channels, cb) {
@@ -802,18 +803,13 @@ var clearOwnedChannel = function (Env, channelId, unsafeKey, cb) {
return cb('INVALID_ARGUMENTS');
}
- if (!(Env.msgStore && Env.msgStore.getChannelMetadata)) {
- return cb('E_NOT_IMPLEMENTED');
- }
-
- Env.msgStore.getChannelMetadata(channelId, function (e, metadata) {
- if (e) { return cb(e); }
+ getMetadata(Env, channelId, function (err, metadata) {
+ if (err) { return void cb(err); }
if (!(metadata && Array.isArray(metadata.owners))) { return void cb('E_NO_OWNERS'); }
// Confirm that the channel is owned by the user in question
if (metadata.owners.indexOf(unsafeKey) === -1) {
return void cb('INSUFFICIENT_PERMISSIONS');
}
-
// FIXME COLDSTORAGE
return void Env.msgStore.clearChannel(channelId, function (e) {
cb(e);
@@ -822,6 +818,7 @@ var clearOwnedChannel = function (Env, channelId, unsafeKey, cb) {
};
var removeOwnedBlob = function (Env, blobId, unsafeKey, cb) {
+ // FIXME METADATA
var safeKey = escapeKeyCharacters(unsafeKey);
var safeKeyPrefix = safeKey.slice(0,3);
var blobPrefix = blobId.slice(0,2);
@@ -891,17 +888,12 @@ var removeOwnedChannel = function (Env, channelId, unsafeKey, cb) {
return void removeOwnedBlob(Env, channelId, unsafeKey, cb);
}
- if (!(Env.msgStore && Env.msgStore.removeChannel && Env.msgStore.getChannelMetadata)) {
- return cb("E_NOT_IMPLEMENTED");
- }
-
- Env.msgStore.getChannelMetadata(channelId, function (e, metadata) {
- if (e) { return cb(e); }
+ getMetadata(Env, channelId, function (err, metadata) {
+ if (err) { return void cb(err); }
if (!(metadata && Array.isArray(metadata.owners))) { return void cb('E_NO_OWNERS'); }
if (metadata.owners.indexOf(unsafeKey) === -1) {
return void cb('INSUFFICIENT_PERMISSIONS');
}
-
// if the admin has configured data retention...
// temporarily archive the file instead of removing it
if (Env.retainData) {
@@ -1459,21 +1451,23 @@ var removeLoginBlock = function (Env, msg, cb) {
});
};
+var ARRAY_LINE = /^\[/;
+
+/* Files can contain metadata but not content
+ call back with true if the channel log has no content other than metadata
+ otherwise false
+*/
var isNewChannel = function (Env, channel, cb) {
if (!isValidId(channel)) { return void cb('INVALID_CHAN'); }
if (channel.length !== 32) { return void cb('INVALID_CHAN'); }
- var count = 0;
var done = false;
Env.msgStore.getMessages(channel, function (msg) {
if (done) { return; }
- var parsed;
try {
- parsed = JSON.parse(msg);
- if (parsed && typeof(parsed) === 'object') { count++; }
- if (count >= 2) {
+ if (typeof(msg) === 'string' && ARRAY_LINE.test(msg)) {
done = true;
- cb(void 0, false); // it is not a new file
+ return void cb(void 0, false);
}
} catch (e) {
WARN('invalid message read from store', e);
@@ -1722,7 +1716,7 @@ RPC.create = function (
respond(e, [null, size, null]);
});
case 'GET_METADATA':
- return void getMetadata(Env, msg[1], function (e, data) {
+ return void getMetadata(Env, msg[1], function (e, data) { // FIXME METADATA
WARN(e, msg[1]);
respond(e, [null, data, null]);
});
diff --git a/server.js b/server.js
index b8c2b8163..63f5738a2 100644
--- a/server.js
+++ b/server.js
@@ -105,6 +105,18 @@ app.head(/^\/common\/feedback\.html/, function (req, res, next) {
}());
app.use(function (req, res, next) {
+ if (req.method === 'OPTIONS' && /\/blob\//.test(req.url)) {
+ console.log(req.url);
+ res.setHeader('Access-Control-Allow-Origin', '*');
+ res.setHeader('Access-Control-Allow-Methods', 'GET, OPTIONS');
+ res.setHeader('Access-Control-Allow-Headers', 'DNT,X-CustomHeader,Keep-Alive,User-Agent,X-Requested-With,If-Modified-Since,Cache-Control,Content-Type,Content-Range,Range');
+ res.setHeader('Access-Control-Max-Age', 1728000);
+ res.setHeader('Content-Type', 'application/octet-stream; charset=utf-8');
+ res.setHeader('Content-Length', 0);
+ res.statusCode = 204;
+ return void res.end();
+ }
+
setHeaders(req, res);
if (/[\?\&]ver=[^\/]+$/.test(req.url)) { res.setHeader("Cache-Control", "max-age=31536000"); }
next();
@@ -247,7 +259,7 @@ var historyKeeper;
var log;
-// Initialize tasks, then rpc, then store, then history keeper and then start the server
+// Initialize logging, the the store, then tasks, then rpc, then history keeper and then start the server
var nt = nThen(function (w) {
// set up logger
var Logger = require("./lib/log");
@@ -261,13 +273,13 @@ var nt = nThen(function (w) {
config.store = _store;
}));
}).nThen(function (w) {
- if (!config.enableTaskScheduling) { return; }
var Tasks = require("./storage/tasks");
Tasks.create(config, w(function (e, tasks) {
if (e) {
throw e;
}
config.tasks = tasks;
+ if (config.disableIntegratedTasks) { return; }
setInterval(function () {
tasks.runAll(function (err) {
if (err) {
diff --git a/storage/file.js b/storage/file.js
index 99272fc6f..739f079ec 100644
--- a/storage/file.js
+++ b/storage/file.js
@@ -6,6 +6,7 @@ var Fse = require("fs-extra");
var Path = require("path");
var nThen = require("nthen");
var Semaphore = require("saferphore");
+var Once = require("../lib/once");
const ToPull = require('stream-to-pull-stream');
const Pull = require('pull-stream');
@@ -27,6 +28,30 @@ var mkArchivePath = function (env, channelId) {
return Path.join(env.archiveRoot, 'datastore', channelId.slice(0, 2), channelId) + '.ndjson';
};
+var mkMetadataPath = function (env, channelId) {
+ return Path.join(env.root, channelId.slice(0, 2), channelId) + '.metadata.ndjson';
+};
+
+var mkArchiveMetadataPath = function (env, channelId) {
+ return Path.join(env.archiveRoot, 'datastore', channelId.slice(0, 2), channelId) + '.metadata.ndjson';
+};
+
+// pass in the path so we can reuse the same function for archived files
+var channelExists = function (filepath, cb) {
+ Fs.stat(filepath, function (err, stat) {
+ if (err) {
+ if (err.code === 'ENOENT') {
+ // no, the file doesn't exist
+ return void cb(void 0, false);
+ }
+ return void cb(err);
+ }
+ if (!stat.isFile()) { return void cb("E_NOT_FILE"); }
+ return void cb(void 0, true);
+ });
+};
+
+// reads classic metadata from a channel log and aborts
var getMetadataAtPath = function (Env, path, cb) {
var remainder = '';
var stream = Fs.createReadStream(path, { encoding: 'utf8' });
@@ -60,11 +85,6 @@ var getMetadataAtPath = function (Env, path, cb) {
stream.on('error', function (e) { complete(e); });
};
-var getChannelMetadata = function (Env, channelId, cb) {
- var path = mkPath(Env, channelId);
- getMetadataAtPath(Env, path, cb);
-};
-
var closeChannel = function (env, channelName, cb) {
if (!env.channels[channelName]) { return void cb(); }
try {
@@ -77,6 +97,7 @@ var closeChannel = function (env, channelName, cb) {
}
};
+// truncates a file to the end of its metadata line
var clearChannel = function (env, channelId, cb) {
var path = mkPath(env, channelId);
getMetadataAtPath(env, path, function (e, metadata) {
@@ -106,6 +127,9 @@ var clearChannel = function (env, channelId, cb) {
});
};
+/* readMessages is our classic method of reading messages from the disk
+ notably doesn't provide a means of aborting if you finish early
+*/
var readMessages = function (path, msgHandler, cb) {
var remainder = '';
var stream = Fs.createReadStream(path, { encoding: 'utf8' });
@@ -127,6 +151,104 @@ var readMessages = function (path, msgHandler, cb) {
stream.on('error', function (e) { complete(e); });
};
+/* getChannelMetadata
+ reads only the metadata embedded in the first line of a channel log.
+ does not necessarily provide the most up to date metadata, as it
+ could have been amended
+*/
+var getChannelMetadata = function (Env, channelId, cb) {
+ var path = mkPath(Env, channelId);
+
+ // gets metadata embedded in a file
+ getMetadataAtPath(Env, path, cb);
+};
+
+// low level method for getting just the dedicated metadata channel
+var getDedicatedMetadata = function (env, channelId, handler, cb) {
+ var metadataPath = mkMetadataPath(env, channelId);
+ readMessages(metadataPath, function (line) {
+ if (!line) { return; }
+ try {
+ var parsed = JSON.parse(line);
+ handler(null, parsed);
+ } catch (e) {
+ handler(e, line);
+ }
+ }, function (err) {
+ if (err) {
+ // ENOENT => there is no metadata log
+ if (err.code === 'ENOENT') { return void cb(); }
+ // otherwise stream errors?
+ return void cb(err);
+ }
+ cb();
+ });
+};
+
+/* readMetadata
+ fetches the classic format of the metadata from the channel log
+ if it is present, otherwise load the log of metadata amendments.
+ Requires a handler to process successive lines.
+*/
+var readMetadata = function (env, channelId, handler, cb) {
+/*
+
+Possibilities
+
+ 1. there is no metadata because it's an old channel
+ 2. there is metadata in the first line of the channel, but nowhere else
+ 3. there is metadata in the first line of the channel as well as in a dedicated log
+ 4. there is no metadata in the first line of the channel. Everything is in the dedicated log
+
+How to proceed
+
+ 1. load the first line of the channel and treat it as a metadata message if applicable
+ 2. load the dedicated log and treat it as an update
+
+*/
+
+ nThen(function (w) {
+ // returns the first line of a channel, parsed...
+ getChannelMetadata(env, channelId, w(function (err, data) {
+ if (err) {
+ // 'INVALID_METADATA' if it can't parse
+ // stream errors if anything goes wrong at a lower level
+ // ENOENT (no channel here)
+ return void handler(err);
+ }
+ // disregard anything that isn't a map
+ if (!data || typeof(data) !== 'object' || Array.isArray(data)) { return; }
+
+ // otherwise it's good.
+ handler(null, data);
+ }));
+ }).nThen(function () {
+ getDedicatedMetadata(env, channelId, handler, function (err) {
+ if (err) {
+ // stream errors?
+ return void cb(err);
+ }
+ cb();
+ });
+ });
+};
+
+// writeMetadata appends to the dedicated log of metadata amendments
+var writeMetadata = function (env, channelId, data, cb) {
+ var path = mkMetadataPath(env, channelId);
+
+ Fse.mkdirp(Path.dirname(path), PERMISSIVE, function (err) {
+ if (err && err.code !== 'EEXIST') { return void cb(err); }
+
+ // TODO see if we can make this any faster by using something other than appendFile
+ Fs.appendFile(path, data + '\n', cb);
+ });
+};
+
+
+// transform a stream of arbitrarily divided data
+// into a stream of buffers divided by newlines in the source stream
+// TODO see if we could improve performance by using libnewline
const NEWLINE_CHR = ('\n').charCodeAt(0);
const mkBufferSplit = () => {
let remainder = null;
@@ -160,6 +282,8 @@ const mkBufferSplit = () => {
}, Pull.flatten());
};
+// return a streaming function which transforms buffers into objects
+// containing the buffer and the offset from the start of the stream
const mkOffsetCounter = () => {
let offset = 0;
return Pull.map((buff) => {
@@ -170,9 +294,13 @@ const mkOffsetCounter = () => {
});
};
+// readMessagesBin asynchronously iterates over the messages in a channel log
+// the handler for each message must call back to read more, which should mean
+// that this function has a lower memory profile than our classic method
+// of reading logs line by line.
+// it also allows the handler to abort reading at any time
const readMessagesBin = (env, id, start, msgHandler, cb) => {
const stream = Fs.createReadStream(mkPath(env, id), { start: start });
- // TODO get the channel and add the atime
let keepReading = true;
Pull(
ToPull.read(stream),
@@ -187,8 +315,8 @@ const readMessagesBin = (env, id, start, msgHandler, cb) => {
);
};
+// check if a file exists at $path
var checkPath = function (path, callback) {
- // TODO check if we actually need to use stat at all
Fs.stat(path, function (err) {
if (!err) {
callback(undefined, true);
@@ -208,31 +336,79 @@ var checkPath = function (path, callback) {
});
};
-var removeChannel = function (env, channelName, cb) {
- var filename = mkPath(env, channelName);
- Fs.unlink(filename, cb);
+var labelError = function (label, err) {
+ return label + (err.code ? "_" + err.code: '');
};
-// pass in the path so we can reuse the same function for archived files
-var channelExists = function (filepath, channelName, cb) {
- Fs.stat(filepath, function (err, stat) {
- if (err) {
- if (err.code === 'ENOENT') {
- // no, the file doesn't exist
- return void cb(void 0, false);
+/* removeChannel
+ fully deletes a channel log and any associated metadata
+*/
+var removeChannel = function (env, channelName, cb) {
+ var channelPath = mkPath(env, channelName);
+ var metadataPath = mkMetadataPath(env, channelName);
+
+ var CB = Once(cb);
+
+ var errors = 0;
+ nThen(function (w) {
+ Fs.unlink(channelPath, w(function (err) {
+ if (err) {
+ if (err.code === 'ENOENT') {
+ errors++;
+ return;
+ }
+ w.abort();
+ CB(labelError("E_CHANNEL_REMOVAL", err));
}
- return void cb(err);
+ }));
+ Fs.unlink(metadataPath, w(function (err) {
+ if (err) {
+ if (err.code === 'ENOENT') {
+ errors++;
+ return;
+ } // proceed if there's no metadata to delete
+ w.abort();
+ CB(labelError("E_METADATA_REMOVAL", err));
+ }
+ }));
+ }).nThen(function () {
+ if (errors === 2) {
+ return void CB(labelError('E_REMOVE_CHANNEL', new Error("ENOENT")));
}
- if (!stat.isFile()) { return void cb("E_NOT_FILE"); }
- return void cb(void 0, true);
+
+ CB();
});
};
+/* removeArchivedChannel
+ fully removes an archived channel log and any associated metadata
+*/
var removeArchivedChannel = function (env, channelName, cb) {
- var filename = mkArchivePath(env, channelName);
- Fs.unlink(filename, cb);
+ var channelPath = mkArchivePath(env, channelName);
+ var metadataPath = mkArchiveMetadataPath(env, channelName);
+
+ var CB = Once(cb);
+
+ nThen(function (w) {
+ Fs.unlink(channelPath, w(function (err) {
+ if (err) {
+ w.abort();
+ CB(labelError("E_ARCHIVED_CHANNEL_REMOVAL", err));
+ }
+ }));
+ Fs.unlink(metadataPath, w(function (err) {
+ if (err) {
+ if (err.code === "ENOENT") { return; }
+ w.abort();
+ CB(labelError("E_ARCHIVED_METADATA_REMOVAL", err));
+ }
+ }));
+ }).nThen(function () {
+ CB();
+ });
};
+// TODO implement a method of removing metadata that doesn't have a corresponding channel
var listChannels = function (root, handler, cb) {
// do twenty things at a time
var sema = Semaphore.create(20);
@@ -255,15 +431,31 @@ var listChannels = function (root, handler, cb) {
var wait = w();
dirList.forEach(function (dir) {
sema.take(function (give) {
+ // TODO modify the asynchronous bits here to keep less in memory at any given time
+ // list a directory -> process its contents with semaphores until less than N jobs are running
+ // then list the next directory...
var nestedDirPath = Path.join(root, dir);
Fs.readdir(nestedDirPath, w(give(function (err, list) {
if (err) { return void handler(err); } // Is this correct?
list.forEach(function (item) {
- // ignore things that don't match the naming pattern
- if (/^\./.test(item) || !/[0-9a-fA-F]{32,}\.ndjson$/.test(item)) { return; }
+ // ignore hidden files
+ if (/^\./.test(item)) { return; }
+ // ignore anything that isn't channel or metadata
+ if (!/^[0-9a-fA-F]{32}(\.metadata?)*\.ndjson$/.test(item)) {
+ return;
+ }
+ if (!/^[0-9a-fA-F]{32}\.ndjson$/.test(item)) {
+ // this will catch metadata, which we want to ignore if
+ // the corresponding channel is present
+ if (list.indexOf(item.replace(/\.metadata/, '')) !== -1) { return; }
+ // otherwise fall through
+ }
var filepath = Path.join(nestedDirPath, item);
- var channel = filepath.replace(/\.ndjson$/, '').replace(/.*\//, '');
+ var channel = filepath
+ .replace(/\.ndjson$/, '')
+ .replace(/\.metadata/, '')
+ .replace(/.*\//, '');
if ([32, 34].indexOf(channel.length) === -1) { return; }
// otherwise throw it on the pile
@@ -296,6 +488,7 @@ var listChannels = function (root, handler, cb) {
// move a channel's log file from its current location
// to an equivalent location in the cold storage directory
var archiveChannel = function (env, channelName, cb) {
+ // TODO close channels before archiving them?
if (!env.retainData) {
return void cb("ARCHIVES_DISABLED");
}
@@ -314,20 +507,106 @@ var archiveChannel = function (env, channelName, cb) {
// use Fse.move to move it, Fse makes paths to the directory when you use it.
// https://github.com/jprichardson/node-fs-extra/blob/HEAD/docs/move.md
- Fse.move(currentPath, archivePath, { overwrite: true }, cb);
+ nThen(function (w) {
+ // move the channel log and abort if anything goes wrong
+ Fse.move(currentPath, archivePath, { overwrite: true }, w(function (err) {
+ if (err) {
+ // proceed to the next block to remove metadata even if there's no channel
+ if (err.code === 'ENOENT') { return; }
+ // abort and callback for other types of errors
+ w.abort();
+ return void cb(err);
+ }
+ }));
+ }).nThen(function (w) {
+ // archive the dedicated metadata channel
+ var metadataPath = mkMetadataPath(env, channelName);
+ var archiveMetadataPath = mkArchiveMetadataPath(env, channelName);
+
+ Fse.move(metadataPath, archiveMetadataPath, { overwrite: true, }, w(function (err) {
+ // there's no metadata to archive, so you're done!
+ if (err && err.code === "ENOENT") {
+ return void cb();
+ }
+
+ // there was an error archiving the metadata
+ if (err) {
+ return void cb(labelError("E_METADATA_ARCHIVAL", err));
+ }
+
+ // it was archived successfully
+ cb();
+ }));
+ });
};
+// restore a channel and its metadata from the archive
+// to the appropriate location in the live database
var unarchiveChannel = function (env, channelName, cb) {
// very much like 'archiveChannel' but in the opposite direction
// the file is currently archived
- var currentPath = mkArchivePath(env, channelName);
- var unarchivedPath = mkPath(env, channelName);
+ var channelPath = mkPath(env, channelName);
+ var metadataPath = mkMetadataPath(env, channelName);
+
+ // don't call the callback multiple times
+ var CB = Once(cb);
// if a file exists in the unarchived path, you probably don't want to clobber its data
// so unlike 'archiveChannel' we won't overwrite.
// Fse.move will call back with EEXIST in such a situation
- Fse.move(currentPath, unarchivedPath, cb);
+
+ nThen(function (w) {
+ // if either metadata or a file exist in prod, abort
+ channelExists(channelPath, w(function (err, exists) {
+ if (err) {
+ w.abort();
+ return void CB(err);
+ }
+ if (exists) {
+ w.abort();
+ return CB('UNARCHIVE_CHANNEL_CONFLICT');
+ }
+ }));
+ channelExists(metadataPath, w(function (err, exists) {
+ if (err) {
+ w.abort();
+ return void CB(err);
+ }
+ if (exists) {
+ w.abort();
+ return CB("UNARCHIVE_METADATA_CONFLICT");
+ }
+ }));
+ }).nThen(function (w) {
+ // construct archive paths
+ var archiveChannelPath = mkArchivePath(env, channelName);
+ // restore the archived channel
+ Fse.move(archiveChannelPath, channelPath, w(function (err) {
+ if (err) {
+ w.abort();
+ return void CB(err);
+ }
+ }));
+ }).nThen(function (w) {
+ var archiveMetadataPath = mkArchiveMetadataPath(env, channelName);
+ // TODO validate that it's ok to move metadata non-atomically
+
+ // restore the metadata log
+ Fse.move(archiveMetadataPath, metadataPath, w(function (err) {
+ // if there's nothing to move, you're done.
+ if (err && err.code === 'ENOENT') {
+ return CB();
+ }
+ // call back with an error if something goes wrong
+ if (err) {
+ w.abort();
+ return void CB(labelError("E_METADATA_RESTORATION", err));
+ }
+ // otherwise it was moved successfully
+ CB();
+ }));
+ });
};
var flushUnusedChannels = function (env, cb, frame) {
@@ -352,11 +631,34 @@ var flushUnusedChannels = function (env, cb, frame) {
cb();
};
+/* channelBytes
+ calls back with an error or the size (in bytes) of a channel and its metadata
+*/
var channelBytes = function (env, chanName, cb) {
- var path = mkPath(env, chanName);
- Fs.stat(path, function (err, stats) {
- if (err) { return void cb(err); }
- cb(undefined, stats.size);
+ var channelPath = mkPath(env, chanName);
+ var dataPath = mkMetadataPath(env, chanName);
+
+ var CB = Once(cb);
+
+ var channelSize = 0;
+ var dataSize = 0;
+ nThen(function (w) {
+ Fs.stat(channelPath, w(function (err, stats) {
+ if (err) {
+ if (err.code === 'ENOENT') { return; }
+ return void CB(err);
+ }
+ channelSize = stats.size;
+ }));
+ Fs.stat(dataPath, w(function (err, stats) {
+ if (err) {
+ if (err.code === 'ENOENT') { return; }
+ return void CB(err);
+ }
+ dataSize = stats.size;
+ }));
+ }).nThen(function () {
+ CB(void 0, channelSize + dataSize);
});
};
@@ -450,6 +752,7 @@ var getChannel = function (
});
};
+// write a message to the disk as raw bytes
const messageBin = (env, chanName, msgBin, cb) => {
getChannel(env, chanName, function (err, chan) {
if (!chan) {
@@ -466,18 +769,19 @@ const messageBin = (env, chanName, msgBin, cb) => {
chan.writeStream.write(msgBin, function () {
/*::if (!chan) { throw new Error("Flow unreachable"); }*/
chan.onError.splice(chan.onError.indexOf(complete), 1);
+ chan.atime = +new Date();
if (!cb) { return; }
- //chan.messages.push(msg);
- chan.atime = +new Date(); // FIXME seems like odd behaviour that not passing a callback would result in not updating atime...
complete();
});
});
};
+// append a string to a channel's log as a new line
var message = function (env, chanName, msg, cb) {
messageBin(env, chanName, new Buffer(msg + '\n', 'utf8'), cb);
};
+// stream messages from a channel log
var getMessages = function (env, chanName, handler, cb) {
getChannel(env, chanName, function (err, chan) {
if (!chan) {
@@ -499,6 +803,9 @@ var getMessages = function (env, chanName, handler, cb) {
errorState = true;
return void cb(err);
}
+ // is it really, though? what if we hit the limit of open channels
+ // and 'clean up' in the middle of reading a massive file?
+ // certainly unlikely
if (!chan) { throw new Error("impossible, flow checking"); }
chan.atime = +new Date();
cb();
@@ -563,80 +870,124 @@ module.exports.create = function (
}));
}).nThen(function () {
cb({
- readMessagesBin: (channelName, start, asyncMsgHandler, cb) => {
- if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); }
- readMessagesBin(env, channelName, start, asyncMsgHandler, cb);
- },
+ // OLDER METHODS
+ // write a new message to a log
message: function (channelName, content, cb) {
if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); }
message(env, channelName, content, cb);
},
+ // iterate over all the messages in a log
+ getMessages: function (channelName, msgHandler, cb) {
+ if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); }
+ getMessages(env, channelName, msgHandler, cb);
+ },
+
+ // NEWER IMPLEMENTATIONS OF THE SAME THING
+ // write a new message to a log
messageBin: (channelName, content, cb) => {
if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); }
messageBin(env, channelName, content, cb);
},
- getMessages: function (channelName, msgHandler, cb) {
+ // iterate over the messages in a log
+ readMessagesBin: (channelName, start, asyncMsgHandler, cb) => {
if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); }
- getMessages(env, channelName, msgHandler, cb);
+ readMessagesBin(env, channelName, start, asyncMsgHandler, cb);
},
+
+ // METHODS for deleting data
+ // remove a channel and its associated metadata log if present
removeChannel: function (channelName, cb) {
if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); }
removeChannel(env, channelName, function (err) {
cb(err);
});
},
+ // remove a channel and its associated metadata log from the archive directory
removeArchivedChannel: function (channelName, cb) {
if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); }
removeArchivedChannel(env, channelName, cb);
},
- closeChannel: function (channelName, cb) {
- if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); }
- closeChannel(env, channelName, cb);
- },
- flushUnusedChannels: function (cb) {
- flushUnusedChannels(env, cb);
- },
- getChannelSize: function (channelName, cb) {
- if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); }
- channelBytes(env, channelName, cb);
- },
- getChannelMetadata: function (channelName, cb) {
- if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); }
- getChannelMetadata(env, channelName, cb);
- },
+ // clear all data for a channel but preserve its metadata
clearChannel: function (channelName, cb) {
if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); }
clearChannel(env, channelName, cb);
},
- listChannels: function (handler, cb) {
- listChannels(env.root, handler, cb);
- },
+
+ // check if a channel exists in the database
isChannelAvailable: function (channelName, cb) {
if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); }
// construct the path
var filepath = mkPath(env, channelName);
- channelExists(filepath, channelName, cb);
+ channelExists(filepath, cb);
},
+ // check if a channel exists in the archive
isChannelArchived: function (channelName, cb) {
if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); }
// construct the path
var filepath = mkArchivePath(env, channelName);
- channelExists(filepath, channelName, cb);
- },
- listArchivedChannels: function (handler, cb) {
- listChannels(Path.join(env.archiveRoot, 'datastore'), handler, cb);
+ channelExists(filepath, cb);
},
+ // move a channel from the database to the archive, along with its metadata
archiveChannel: function (channelName, cb) {
if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); }
archiveChannel(env, channelName, cb);
},
+ // restore a channel from the archive to the database, along with its metadata
restoreArchivedChannel: function (channelName, cb) {
if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); }
unarchiveChannel(env, channelName, cb);
},
+
+ // METADATA METHODS
+ // fetch the metadata for a channel
+ getChannelMetadata: function (channelName, cb) {
+ if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); }
+ getChannelMetadata(env, channelName, cb);
+ },
+ // iterate over lines of metadata changes from a dedicated log
+ readDedicatedMetadata: function (channelName, handler, cb) {
+ if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); }
+ getDedicatedMetadata(env, channelName, handler, cb);
+ },
+
+ // iterate over multiple lines of metadata changes
+ readChannelMetadata: function (channelName, handler, cb) {
+ if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); }
+ readMetadata(env, channelName, handler, cb);
+ },
+ // write a new line to a metadata log
+ writeMetadata: function (channelName, data, cb) {
+ if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); }
+ writeMetadata(env, channelName, data, cb);
+ },
+
+ // CHANNEL ITERATION
+ listChannels: function (handler, cb) {
+ listChannels(env.root, handler, cb);
+ },
+ listArchivedChannels: function (handler, cb) {
+ listChannels(Path.join(env.archiveRoot, 'datastore'), handler, cb);
+ },
+
+ getChannelSize: function (channelName, cb) {
+ if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); }
+ channelBytes(env, channelName, cb);
+ },
+ // OTHER DATABASE FUNCTIONALITY
+ // remove a particular channel from the cache
+ closeChannel: function (channelName, cb) {
+ if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); }
+ closeChannel(env, channelName, cb);
+ },
+ // iterate over open channels and close any that are not active
+ flushUnusedChannels: function (cb) {
+ flushUnusedChannels(env, cb);
+ },
+ // write to a log file
log: function (channelName, content, cb) {
message(env, channelName, content, cb);
},
+ // shut down the database
shutdown: function () {
clearInterval(it);
}
diff --git a/www/admin/inner.js b/www/admin/inner.js
index e2edb9ad5..4c335dd12 100644
--- a/www/admin/inner.js
+++ b/www/admin/inner.js
@@ -199,13 +199,13 @@ define([
// A ticket has been closed by the admins...
if (!$ticket.length) { return; }
$ticket.addClass('cp-support-list-closed');
- $ticket.append(Support.makeCloseMessage(common, content, hash));
+ $ticket.append(APP.support.makeCloseMessage(content, hash));
return;
}
if (msg.type !== 'TICKET') { return; }
if (!$ticket.length) {
- $ticket = Support.makeTicket($div, common, content, function () {
+ $ticket = APP.support.makeTicket($div, content, function () {
var error = false;
hashesById[id].forEach(function (d) {
common.mailbox.dismiss(d, function (err) {
@@ -218,7 +218,7 @@ define([
if (!error) { $ticket.remove(); }
});
}
- $ticket.append(Support.makeMessage(common, content, hash, true));
+ $ticket.append(APP.support.makeMessage(content, hash));
}
});
return $div;
@@ -349,6 +349,7 @@ define([
APP.privateKey = privateData.supportPrivateKey;
APP.origin = privateData.origin;
APP.readOnly = privateData.readOnly;
+ APP.support = Support.create(common, true);
// Content
var $rightside = APP.$rightside;
diff --git a/www/code/export.js b/www/code/export.js
index 23d689361..04616a192 100644
--- a/www/code/export.js
+++ b/www/code/export.js
@@ -8,7 +8,7 @@ define([
module.main = function (userDoc, cb) {
var mode = userDoc.highlightMode || 'gfm';
var content = userDoc.content;
- module.type = SFCodeMirror.getContentExtension(mode);
+ module.ext = SFCodeMirror.getContentExtension(mode);
cb(SFCodeMirror.fileExporter(content));
};
diff --git a/www/code/inner.js b/www/code/inner.js
index 9e4d0a207..7f043a9c9 100644
--- a/www/code/inner.js
+++ b/www/code/inner.js
@@ -272,6 +272,7 @@ define([
var andThen2 = function (editor, CodeMirror, framework, isPresentMode) {
var common = framework._.sfCommon;
+ var privateData = common.getMetadataMgr().getPrivateData();
var previewPane = mkPreviewPane(editor, CodeMirror, framework, isPresentMode);
var markdownTb = mkMarkdownTb(editor, framework);
@@ -349,7 +350,8 @@ define([
onUploaded: function (ev, data) {
var parsed = Hash.parsePadUrl(data.url);
var secret = Hash.getSecrets('file', parsed.hash, data.password);
- var src = Hash.getBlobPathFromHex(secret.channel);
+ var fileHost = privateData.fileHost || privateData.origin;
+ var src = fileHost + Hash.getBlobPathFromHex(secret.channel);
var key = Hash.encodeBase64(secret.keys.cryptKey);
var mt = '';
editor.replaceSelection(mt);
@@ -363,7 +365,15 @@ define([
});
framework.setFileExporter(CodeMirror.getContentExtension, CodeMirror.fileExporter);
- framework.setFileImporter({}, CodeMirror.fileImporter);
+ framework.setFileImporter({}, function () {
+ /* setFileImporter currently takes a function with the following signature:
+ (content, file) => {}
+ I used 'apply' with 'arguments' to avoid breaking things if this API ever changes.
+ */
+ var ret = CodeMirror.fileImporter.apply(null, Array.prototype.slice.call(arguments));
+ previewPane.modeChange(ret.mode);
+ return ret;
+ });
framework.setNormalizer(function (c) {
return {
diff --git a/www/common/application_config_internal.js b/www/common/application_config_internal.js
index e4d5aa4f9..6a362b133 100644
--- a/www/common/application_config_internal.js
+++ b/www/common/application_config_internal.js
@@ -93,6 +93,7 @@ define(function() {
config.applicationsIcon = {
file: 'cptools-file',
fileupload: 'cptools-file-upload',
+ folderupload: 'cptools-folder-upload',
pad: 'cptools-pad',
code: 'cptools-code',
slide: 'cptools-slide',
diff --git a/www/common/common-constants.js b/www/common/common-constants.js
index 986e115e2..ac51dfbca 100644
--- a/www/common/common-constants.js
+++ b/www/common/common-constants.js
@@ -7,6 +7,7 @@ define(function () {
fileHashKey: 'FS_hash',
// sessionStorage
newPadPathKey: "newPadPath",
+ newPadFileData: "newPadFileData",
// Store
displayNameKey: 'cryptpad.username',
oldStorageKey: 'CryptPad_RECENTPADS',
diff --git a/www/common/common-interface.js b/www/common/common-interface.js
index e44a4ecff..cb5f9903a 100644
--- a/www/common/common-interface.js
+++ b/www/common/common-interface.js
@@ -592,6 +592,16 @@ define([
]);
};
+ UI.createHelper = function (href, text) {
+ var q = h('a.fa.fa-question-circle', {
+ style: 'text-decoration: none !important;',
+ title: text,
+ href: href,
+ target: "_blank",
+ 'data-tippy-placement': "right"
+ });
+ return q;
+ };
/*
* spinner
@@ -773,6 +783,7 @@ define([
var icon = AppConfig.applicationsIcon[type];
var font = icon.indexOf('cptools') === 0 ? 'cptools' : 'fa';
if (type === 'fileupload') { type = 'file'; }
+ if (type === 'folderupload') { type = 'file'; }
var appClass = ' cp-icon cp-icon-color-'+type;
$icon = $('', {'class': font + ' ' + icon + appClass});
}
diff --git a/www/common/common-messenger.js b/www/common/common-messenger.js
index 1102f94a2..365c8be40 100644
--- a/www/common/common-messenger.js
+++ b/www/common/common-messenger.js
@@ -422,8 +422,10 @@ define([
var friend = getFriendFromChannel(chan.id) || {};
var cfg = {
- validateKey: keys ? keys.validateKey : undefined,
- owners: [proxy.edPublic, friend.edPublic],
+ metadata: {
+ validateKey: keys ? keys.validateKey : undefined,
+ owners: [proxy.edPublic, friend.edPublic],
+ },
lastKnownHash: data.lastKnownHash
};
var msg = ['GET_HISTORY', chan.id, cfg];
diff --git a/www/common/common-thumbnail.js b/www/common/common-thumbnail.js
index 34d44daf5..c22e2acee 100644
--- a/www/common/common-thumbnail.js
+++ b/www/common/common-thumbnail.js
@@ -15,6 +15,7 @@ define([
};
var supportedTypes = [
+ 'text/plain',
'image/png',
'image/jpeg',
'image/jpg',
@@ -23,7 +24,12 @@ define([
'application/pdf'
];
- Thumb.isSupportedType = function (type) {
+ Thumb.isSupportedType = function (file) {
+ if (!file) { return false; }
+ var type = file.type;
+ if (Util.isPlainTextFile(file.type, file.name)) {
+ type = "text/plain";
+ }
return supportedTypes.some(function (t) {
return type.indexOf(t) !== -1;
});
@@ -164,6 +170,26 @@ define([
});
});
};
+ Thumb.fromPlainTextBlob = function (blob, cb) {
+ var canvas = document.createElement("canvas");
+ canvas.width = canvas.height = Thumb.dimension;
+ var reader = new FileReader();
+ reader.addEventListener('loadend', function (e) {
+ var content = e.srcElement.result;
+ var lines = content.split("\n");
+ var canvasContext = canvas.getContext("2d");
+ var fontSize = 4;
+ canvas.height = (lines.length) * (fontSize + 1);
+ canvasContext.font = fontSize + 'px monospace';
+ lines.forEach(function (text, i) {
+
+ canvasContext.fillText(text, 5, i * (fontSize + 1));
+ });
+ var D = getResizedDimensions(canvas, "txt");
+ Thumb.fromCanvas(canvas, D, cb);
+ });
+ reader.readAsText(blob);
+ };
Thumb.fromBlob = function (blob, cb) {
if (blob.type.indexOf('video/') !== -1) {
return void Thumb.fromVideoBlob(blob, cb);
@@ -171,6 +197,9 @@ define([
if (blob.type.indexOf('application/pdf') !== -1) {
return void Thumb.fromPdfBlob(blob, cb);
}
+ if (Util.isPlainTextFile(blob.type, blob.name)) {
+ return void Thumb.fromPlainTextBlob(blob, cb);
+ }
Thumb.fromImageBlob(blob, cb);
};
@@ -230,9 +259,15 @@ define([
if (!Visible.currently()) { to = window.setTimeout(interval, Thumb.UPDATE_FIRST); }
};
+
var addThumbnail = function (err, thumb, $span, cb) {
+ var u8 = Nacl.util.decodeBase64(thumb.split(',')[1]);
+ var blob = new Blob([u8], {
+ type: 'image/png'
+ });
+ var url = URL.createObjectURL(blob);
var img = new Image();
- img.src = thumb.slice(0,5) === 'data:' ? thumb : 'data:image/png;base64,'+thumb;
+ img.src = url;
$span.find('.cp-icon').hide();
$span.prepend(img);
cb($(img));
@@ -254,9 +289,11 @@ define([
var parsed = Hash.parsePadUrl(href);
var k = getKey(parsed.type, channel);
var whenNewThumb = function () {
+ var privateData = common.getMetadataMgr().getPrivateData();
+ var fileHost = privateData.fileHost || privateData.origin;
var secret = Hash.getSecrets('file', parsed.hash, password);
var hexFileName = secret.channel;
- var src = Hash.getBlobPathFromHex(hexFileName);
+ var src = fileHost + Hash.getBlobPathFromHex(hexFileName);
var key = secret.keys && secret.keys.cryptKey;
FileCrypto.fetchDecryptedMetadata(src, key, function (e, metadata) {
if (e) {
diff --git a/www/common/common-ui-elements.js b/www/common/common-ui-elements.js
index bc465cdfb..d57eea7a5 100644
--- a/www/common/common-ui-elements.js
+++ b/www/common/common-ui-elements.js
@@ -119,15 +119,35 @@ define([
$('