You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
342 lines
10 KiB
JavaScript
342 lines
10 KiB
JavaScript
var Fs = require("fs");
|
|
var Path = require("path");
|
|
var nThen = require("nthen");
|
|
|
|
var mkPath = function (env, channelId) {
|
|
return Path.join(env.root, channelId.slice(0, 2), channelId) + '.ndjson';
|
|
};
|
|
|
|
var getMetadataAtPath = function (Env, path, cb) {
|
|
var remainder = '';
|
|
var stream = Fs.createReadStream(path, 'utf8');
|
|
var complete = function (err, data) {
|
|
var _cb = cb;
|
|
cb = undefined;
|
|
if (_cb) { _cb(err, data); }
|
|
};
|
|
stream.on('data', function (chunk) {
|
|
if (!/\n/.test(chunk)) {
|
|
remainder += chunk;
|
|
return;
|
|
}
|
|
stream.close();
|
|
var metadata = chunk.split('\n')[0];
|
|
|
|
var parsed = null;
|
|
try {
|
|
parsed = JSON.parse(metadata);
|
|
complete(void 0, parsed);
|
|
}
|
|
catch (e) {
|
|
console.log();
|
|
console.error(e);
|
|
complete('INVALID_METADATA');
|
|
}
|
|
});
|
|
stream.on('end', function () {
|
|
complete(null);
|
|
});
|
|
stream.on('error', function (e) { complete(e); });
|
|
};
|
|
|
|
var getChannelMetadata = function (Env, channelId, cb) {
|
|
var path = mkPath(Env, channelId);
|
|
getMetadataAtPath(Env, path, cb);
|
|
};
|
|
|
|
var closeChannel = function (env, channelName, cb) {
|
|
if (!env.channels[channelName]) { return; }
|
|
try {
|
|
env.channels[channelName].writeStream.close();
|
|
delete env.channels[channelName];
|
|
env.openFiles--;
|
|
cb();
|
|
} catch (err) {
|
|
cb(err);
|
|
}
|
|
};
|
|
|
|
var clearChannel = function (env, channelId, cb) {
|
|
var path = mkPath(env, channelId);
|
|
getMetadataAtPath(env, path, function (e, metadata) {
|
|
if (e) { return cb(e); }
|
|
if (!metadata) {
|
|
return void Fs.truncate(path, 0, function (err) {
|
|
if (err) {
|
|
return cb(err);
|
|
}
|
|
cb(void 0);
|
|
});
|
|
}
|
|
|
|
var len = JSON.stringify(metadata).length + 1;
|
|
|
|
// as long as closeChannel is synchronous, this should not cause
|
|
// any race conditions. truncate ought to return faster than a channel
|
|
// can be opened and read by another user. if that turns out not to be
|
|
// the case, we'll need to implement locking.
|
|
closeChannel(env, channelId, function (err) {
|
|
if (err) { cb(err); }
|
|
Fs.truncate(path, len, function (err) {
|
|
if (err) { return cb(err); }
|
|
cb();
|
|
});
|
|
});
|
|
});
|
|
};
|
|
|
|
var readMessages = function (path, msgHandler, cb) {
|
|
var remainder = '';
|
|
var stream = Fs.createReadStream(path, 'utf8');
|
|
var complete = function (err) {
|
|
var _cb = cb;
|
|
cb = undefined;
|
|
if (_cb) { _cb(err); }
|
|
};
|
|
stream.on('data', function (chunk) {
|
|
var lines = chunk.split('\n');
|
|
lines[0] = remainder + lines[0];
|
|
remainder = lines.pop();
|
|
lines.forEach(msgHandler);
|
|
});
|
|
stream.on('end', function () {
|
|
msgHandler(remainder);
|
|
complete();
|
|
});
|
|
stream.on('error', function (e) { complete(e); });
|
|
};
|
|
|
|
var checkPath = function (path, callback) {
|
|
// TODO check if we actually need to use stat at all
|
|
Fs.stat(path, function (err) {
|
|
if (!err) {
|
|
callback(undefined, true);
|
|
return;
|
|
}
|
|
if (err.code !== 'ENOENT') {
|
|
callback(err);
|
|
return;
|
|
}
|
|
Fs.mkdir(Path.dirname(path), function (err) {
|
|
if (err && err.code !== 'EEXIST') {
|
|
callback(err);
|
|
return;
|
|
}
|
|
callback(undefined, false);
|
|
});
|
|
});
|
|
};
|
|
|
|
var removeChannel = function (env, channelName, cb) {
|
|
var filename = mkPath(env, channelName);
|
|
Fs.unlink(filename, cb);
|
|
};
|
|
|
|
var flushUnusedChannels = function (env, cb, frame) {
|
|
var currentTime = +new Date();
|
|
|
|
var expiration = typeof(frame) === 'undefined'? env.channelExpirationMs: frame;
|
|
Object.keys(env.channels).forEach(function (chanId) {
|
|
var chan = env.channels[chanId];
|
|
if (typeof(chan.atime) !== 'number') { return; }
|
|
if (currentTime >= expiration + chan.atime) {
|
|
closeChannel(env, chanId, function (err) {
|
|
if (err) {
|
|
console.error(err);
|
|
return;
|
|
}
|
|
if (env.verbose) {
|
|
console.log("Closed channel [%s]", chanId);
|
|
}
|
|
});
|
|
}
|
|
});
|
|
cb();
|
|
};
|
|
|
|
var getChannel = function (env, id, callback) {
|
|
if (env.channels[id]) {
|
|
var chan = env.channels[id];
|
|
chan.atime = +new Date();
|
|
if (chan.whenLoaded) {
|
|
chan.whenLoaded.push(callback);
|
|
} else {
|
|
callback(undefined, chan);
|
|
}
|
|
return;
|
|
}
|
|
|
|
if (env.openFiles >= env.openFileLimit) {
|
|
// if you're running out of open files, asynchronously clean up expired files
|
|
// do it on a shorter timeframe, though (half of normal)
|
|
setTimeout(function () {
|
|
flushUnusedChannels(env, function () {
|
|
if (env.verbose) {
|
|
console.log("Approaching open file descriptor limit. Cleaning up");
|
|
}
|
|
}, env.channelExpirationMs / 2);
|
|
});
|
|
}
|
|
|
|
var channel = env.channels[id] = {
|
|
atime: +new Date(),
|
|
messages: [],
|
|
writeStream: undefined,
|
|
whenLoaded: [ callback ],
|
|
onError: [ ]
|
|
};
|
|
var complete = function (err) {
|
|
var whenLoaded = channel.whenLoaded;
|
|
// no guarantee stream.on('error') will not cause this to be called multiple times
|
|
if (!whenLoaded) { return; }
|
|
channel.whenLoaded = undefined;
|
|
if (err) {
|
|
delete env.channels[id];
|
|
}
|
|
whenLoaded.forEach(function (wl) { wl(err, (err) ? undefined : channel); });
|
|
};
|
|
var path = mkPath(env, id);
|
|
var fileExists;
|
|
var errorState;
|
|
nThen(function (waitFor) {
|
|
checkPath(path, waitFor(function (err, exists) {
|
|
if (err) {
|
|
errorState = true;
|
|
complete(err);
|
|
return;
|
|
}
|
|
fileExists = exists;
|
|
}));
|
|
}).nThen(function (waitFor) {
|
|
if (errorState) { return; }
|
|
if (!fileExists) { return; }
|
|
readMessages(path, function (msg) {
|
|
channel.messages.push(msg);
|
|
}, waitFor(function (err) {
|
|
if (err) {
|
|
errorState = true;
|
|
complete(err);
|
|
}
|
|
}));
|
|
}).nThen(function (waitFor) {
|
|
if (errorState) { return; }
|
|
var stream = channel.writeStream = Fs.createWriteStream(path, { flags: 'a' });
|
|
env.openFiles++;
|
|
stream.on('open', waitFor());
|
|
stream.on('error', function (err) {
|
|
env.openFiles--;
|
|
// this might be called after this nThen block closes.
|
|
if (channel.whenLoaded) {
|
|
complete(err);
|
|
} else {
|
|
channel.onError.forEach(function (handler) {
|
|
handler(err);
|
|
});
|
|
}
|
|
});
|
|
}).nThen(function () {
|
|
if (errorState) { return; }
|
|
complete();
|
|
});
|
|
};
|
|
|
|
var message = function (env, chanName, msg, cb) {
|
|
getChannel(env, chanName, function (err, chan) {
|
|
if (err) {
|
|
cb(err);
|
|
return;
|
|
}
|
|
var complete = function (err) {
|
|
var _cb = cb;
|
|
cb = undefined;
|
|
if (_cb) { _cb(err); }
|
|
};
|
|
chan.onError.push(complete);
|
|
chan.writeStream.write(msg + '\n', function () {
|
|
chan.onError.splice(chan.onError.indexOf(complete) - 1, 1);
|
|
if (!cb) { return; }
|
|
chan.messages.push(msg);
|
|
chan.atime = +new Date();
|
|
complete();
|
|
});
|
|
});
|
|
};
|
|
|
|
var getMessages = function (env, chanName, handler, cb) {
|
|
getChannel(env, chanName, function (err, chan) {
|
|
if (err) {
|
|
cb(err);
|
|
return;
|
|
}
|
|
try {
|
|
chan.messages
|
|
.forEach(function (message) {
|
|
if (!message) { return; }
|
|
handler(message);
|
|
});
|
|
} catch (err2) {
|
|
console.error(err2);
|
|
cb(err2);
|
|
return;
|
|
}
|
|
chan.atime = +new Date();
|
|
cb();
|
|
});
|
|
};
|
|
|
|
var channelBytes = function (env, chanName, cb) {
|
|
var path = mkPath(env, chanName);
|
|
Fs.stat(path, function (err, stats) {
|
|
if (err) { return void cb(err); }
|
|
cb(void 0, stats.size);
|
|
});
|
|
};
|
|
|
|
module.exports.create = function (conf, cb) {
|
|
var env = {
|
|
root: conf.filePath || './datastore',
|
|
channels: { },
|
|
channelExpirationMs: conf.channelExpirationMs || 30000,
|
|
verbose: conf.verbose,
|
|
openFiles: 0,
|
|
openFileLimit: conf.openFileLimit || 2048,
|
|
};
|
|
Fs.mkdir(env.root, function (err) {
|
|
if (err && err.code !== 'EEXIST') {
|
|
// TODO: somehow return a nice error
|
|
throw err;
|
|
}
|
|
cb({
|
|
message: function (channelName, content, cb) {
|
|
message(env, channelName, content, cb);
|
|
},
|
|
getMessages: function (channelName, msgHandler, cb) {
|
|
getMessages(env, channelName, msgHandler, cb);
|
|
},
|
|
removeChannel: function (channelName, cb) {
|
|
removeChannel(env, channelName, function (err) {
|
|
cb(err);
|
|
});
|
|
},
|
|
closeChannel: function (channelName, cb) {
|
|
closeChannel(env, channelName, cb);
|
|
},
|
|
flushUnusedChannels: function (cb) {
|
|
flushUnusedChannels(env, cb);
|
|
},
|
|
getChannelSize: function (chanName, cb) {
|
|
channelBytes(env, chanName, cb);
|
|
},
|
|
getChannelMetadata: function (channelName, cb) {
|
|
getChannelMetadata(env, channelName, cb);
|
|
},
|
|
clearChannel: function (channelName, cb) {
|
|
clearChannel(env, channelName, cb);
|
|
},
|
|
});
|
|
});
|
|
setInterval(function () {
|
|
flushUnusedChannels(env, function () { });
|
|
}, 5000);
|
|
};
|