1 const VERSION = require("../package.json").version;
3 var Config = require("./config");
4 var Promise = require("bluebird");
5 import * as ChannelStore from './channel-storage/channelstore';
6 import { EventEmitter } from 'events';
8 const LOGGER = require('@calzoneman/jsli')('server');
12 LOGGER.info("Starting CyTube v%s", VERSION);
13 var chanlogpath = path.join(__dirname, "../chanlogs");
14 fs.exists(chanlogpath, function (exists) {
15 exists || fs.mkdirSync(chanlogpath);
18 var gdvttpath = path.join(__dirname, "../google-drive-subtitles");
19 fs.exists(gdvttpath, function (exists) {
20 exists || fs.mkdirSync(gdvttpath);
23 singleton = new Server();
27 getServer: function () {
32 const path = require("path");
33 const fs = require("fs");
34 const http = require("http");
35 const https = require("https");
36 const express = require("express");
37 const Channel = require("./channel/channel");
38 const db = require("./database");
39 const Flags = require("./flags");
40 const sio = require("socket.io");
41 import LocalChannelIndex from './web/localchannelindex';
42 import { PartitionChannelIndex } from './partition/partitionchannelindex';
43 import IOConfiguration from './configuration/ioconfig';
44 import WebConfiguration from './configuration/webconfig';
45 import session from './session';
46 import { LegacyModule } from './legacymodule';
47 import { PartitionModule } from './partition/partitionmodule';
48 import { Gauge } from 'prom-client';
49 import { EmailController } from './controller/email';
50 import { CaptchaController } from './controller/captcha';
52 var Server = function () {
58 self.announcement = null;
59 self.infogetter = null;
61 self.chanPath = Config.get('channel-path');
64 if (Config.get('enable-partition')) {
65 initModule = this.initModule = new PartitionModule();
66 self.partitionDecider = initModule.getPartitionDecider();
68 initModule = this.initModule = new LegacyModule();
71 const globalMessageBus = this.initModule.getGlobalMessageBus();
72 globalMessageBus.on('UserProfileChanged', this.handleUserProfileChange.bind(this));
73 globalMessageBus.on('ChannelDeleted', this.handleChannelDelete.bind(this));
74 globalMessageBus.on('ChannelRegistered', this.handleChannelRegister.bind(this));
76 // database init ------------------------------------------------------
77 var Database = require("./database");
83 if (Config.getEmailConfig().getPasswordReset().isEnabled()) {
84 const smtpConfig = Config.getEmailConfig().getSmtp();
85 emailTransport = require("nodemailer").createTransport({
86 host: smtpConfig.getHost(),
87 port: smtpConfig.getPort(),
88 secure: smtpConfig.isSecure(),
90 user: smtpConfig.getUser(),
91 pass: smtpConfig.getPassword()
97 throw new Error('Email is not enabled on this server');
102 const emailController = new EmailController(
104 Config.getEmailConfig()
107 const captchaController = new CaptchaController(
108 Config.getCaptchaConfig()
111 // webserver init -----------------------------------------------------
112 const ioConfig = IOConfiguration.fromOldConfig(Config);
113 const webConfig = WebConfiguration.fromOldConfig(Config);
114 const clusterClient = initModule.getClusterClient();
116 if (Config.get("enable-partition")) {
117 channelIndex = new PartitionChannelIndex(
118 initModule.getRedisClientProvider().get(),
119 initModule.getRedisClientProvider().get(),
120 initModule.partitionConfig.getChannelIndexChannel()
123 channelIndex = new LocalChannelIndex();
125 self.express = express();
126 require("./web/webserver").init(
134 Config.getEmailConfig(),
136 Config.getCaptchaConfig(),
140 // http/https/sio server init -----------------------------------------
141 var key = "", cert = "", ca = undefined;
142 if (Config.get("https.enabled")) {
143 const certData = self.loadCertificateData();
145 cert = certData.cert;
152 passphrase: Config.get("https.passphrase"),
154 ciphers: Config.get("https.ciphers"),
155 honorCipherOrder: true
158 Config.get("listen").forEach(function (bind) {
159 var id = bind.ip + ":" + bind.port;
160 if (id in self.servers) {
161 LOGGER.warn("Ignoring duplicate listen address %s", id);
165 if (bind.https && Config.get("https.enabled")) {
166 self.servers[id] = https.createServer(opts, self.express);
167 // 2 minute default copied from node <= 12.x
168 self.servers[id].timeout = 120000;
169 self.servers[id].listen(bind.port, bind.ip);
170 self.servers[id].on("error", error => {
171 if (error.code === "EADDRINUSE") {
173 "Could not bind %s: address already in use. Check " +
174 "whether another application has already bound this " +
175 "port, or whether another instance of this server " +
182 } else if (bind.http) {
183 self.servers[id] = http.createServer(self.express);
184 // 2 minute default copied from node <= 12.x
185 self.servers[id].timeout = 120000;
186 self.servers[id].listen(bind.port, bind.ip);
187 self.servers[id].on("error", error => {
188 if (error.code === "EADDRINUSE") {
190 "Could not bind %s: address already in use. Check " +
191 "whether another application has already bound this " +
192 "port, or whether another instance of this server " +
202 require("./io/ioserver").init(self, webConfig);
204 // background tasks init ----------------------------------------------
205 require("./bgtask")(self);
208 const prometheusConfig = Config.getPrometheusConfig();
209 if (prometheusConfig.isEnabled()) {
210 require("./prometheus-server").init(prometheusConfig);
216 initModule.onReady();
219 Server.prototype = Object.create(EventEmitter.prototype);
221 Server.prototype.loadCertificateData = function loadCertificateData() {
223 key: fs.readFileSync(path.resolve(__dirname, "..",
224 Config.get("https.keyfile"))),
225 cert: fs.readFileSync(path.resolve(__dirname, "..",
226 Config.get("https.certfile")))
229 if (Config.get("https.cafile")) {
230 data.ca = fs.readFileSync(path.resolve(__dirname, "..",
231 Config.get("https.cafile")));
237 Server.prototype.reloadCertificateData = function reloadCertificateData() {
238 const certData = this.loadCertificateData();
239 Object.keys(this.servers).forEach(key => {
240 const server = this.servers[key];
241 // TODO: Replace with actual node API
242 // once https://github.com/nodejs/node/issues/4464 is implemented.
243 if (server._sharedCreds) {
245 server._sharedCreds.context.setCert(certData.cert);
246 server._sharedCreds.context.setKey(certData.key, Config.get("https.passphrase"));
247 LOGGER.info('Reloaded certificate data for %s', key);
249 LOGGER.error('Failed to reload certificate data for %s: %s', key, error.stack);
255 Server.prototype.isChannelLoaded = function (name) {
256 name = name.toLowerCase();
257 for (var i = 0; i < this.channels.length; i++) {
258 if (this.channels[i].uniqueName == name)
264 const promActiveChannels = new Gauge({
265 name: 'cytube_channels_num_active',
266 help: 'Number of channels currently active'
268 Server.prototype.getChannel = function (name) {
269 var cname = name.toLowerCase();
270 if (this.partitionDecider &&
271 !this.partitionDecider.isChannelOnThisPartition(cname)) {
272 const error = new Error(`Channel '${cname}' is mapped to a different partition`);
273 error.code = 'EWRONGPART';
278 for (var i = 0; i < self.channels.length; i++) {
279 if (self.channels[i].uniqueName === cname)
280 return self.channels[i];
283 var c = new Channel(name);
284 promActiveChannels.inc();
285 c.on("empty", function () {
286 self.unloadChannel(c);
288 c.waitFlag(Flags.C_ERROR, () => {
289 self.unloadChannel(c, { skipSave: true });
291 self.channels.push(c);
295 Server.prototype.unloadChannel = function (chan, options) {
298 if (chan.dead || chan.dying) {
308 if (!options.skipSave) {
309 chan.saveState().catch(error => {
310 LOGGER.error(`Failed to save /${this.chanPath}/${chan.name} for unload: ${error.stack}`);
311 }).then(finishUnloading);
316 function finishUnloading() {
317 chan.logger.log("[init] Channel shutting down");
320 chan.notifyModules("unload", []);
321 Object.keys(chan.modules).forEach(function (k) {
322 chan.modules[k].dead = true;
324 * Automatically clean up any timeouts/intervals assigned
325 * to properties of channel modules. Prevents a memory leak
326 * in case of forgetting to clear the timer on the "unload"
329 Object.keys(chan.modules[k]).forEach(function (prop) {
330 if (chan.modules[k][prop] && chan.modules[k][prop]._onTimeout) {
331 LOGGER.warn("Detected non-null timer when unloading " +
332 "module " + k + ": " + prop);
334 clearTimeout(chan.modules[k][prop]);
335 clearInterval(chan.modules[k][prop]);
337 LOGGER.error(error.stack);
343 for (var i = 0; i < self.channels.length; i++) {
344 if (self.channels[i].uniqueName === chan.uniqueName) {
345 self.channels.splice(i, 1);
350 LOGGER.info("Unloaded channel " + chan.name);
351 chan.broadcastUsercount.cancel();
352 // Empty all outward references from the channel
353 Object.keys(chan).forEach(key => {
354 if (key !== "refCounter") {
359 promActiveChannels.dec();
363 Server.prototype.packChannelList = function (publicOnly, isAdmin) {
364 var channels = this.channels.filter(function (c) {
369 return c.modules.options && c.modules.options.get("show_public");
372 return channels.map(function (c) {
373 return c.packInfo(isAdmin);
377 Server.prototype.announce = function (data) {
378 this.setAnnouncement(data);
381 db.clearAnnouncement();
383 db.setAnnouncement(data);
386 this.emit("announcement", data);
389 Server.prototype.setAnnouncement = function (data) {
391 this.announcement = null;
393 this.announcement = data;
394 sio.instance.emit("announcement", data);
398 Server.prototype.forceSave = function () {
399 Promise.map(this.channels, async channel => {
401 await channel.saveState();
402 LOGGER.info(`Saved /${this.chanPath}/${channel.name}`);
405 'Failed to save /%s/%s: %s',
407 channel ? channel.name : '<undefined>',
411 }, { concurrency: 5 }).then(() => {
412 LOGGER.info('Finished save');
416 Server.prototype.shutdown = function () {
417 LOGGER.info("Unloading channels");
418 Promise.map(this.channels, async channel => {
420 await channel.saveState();
421 LOGGER.info(`Saved /${this.chanPath}/${channel.name}`);
424 'Failed to save /%s/%s: %s',
426 channel ? channel.name : '<undefined>',
430 }, { concurrency: 5 }).then(() => {
431 LOGGER.info("Goodbye");
434 LOGGER.error(`Caught error while saving channels: ${err.stack}`);
439 Server.prototype.handlePartitionMapChange = function () {
440 const channels = Array.prototype.slice.call(this.channels);
441 Promise.map(channels, async channel => {
446 if (!this.partitionDecider.isChannelOnThisPartition(channel.uniqueName)) {
447 LOGGER.info("Partition changed for " + channel.uniqueName);
449 await channel.saveState();
451 channel.broadcastAll(
453 this.partitionDecider.getPartitionForChannel(
458 const users = Array.prototype.slice.call(channel.users);
461 u.socket.disconnect();
467 this.unloadChannel(channel, { skipSave: true });
470 'Failed to unload /%s/%s for partition map flip: %s',
472 channel ? channel.name : '<undefined>',
477 }, { concurrency: 5 }).then(() => {
478 LOGGER.info("Partition reload complete");
482 Server.prototype.reloadPartitionMap = function () {
483 if (!Config.get("enable-partition")) {
487 this.initModule.getPartitionMapReloader().reload();
490 Server.prototype.handleUserProfileChange = function (event) {
492 const lname = event.user.toLowerCase();
494 // Probably not the most efficient thing in the world, but w/e
495 // profile changes are not high volume
496 this.channels.forEach(channel => {
497 if (channel.dead) return;
499 channel.users.forEach(user => {
500 if (user.getLowerName() === lname && user.account.user) {
501 user.account.user.profile = {
502 image: event.profile.image,
503 text: event.profile.text
506 user.account.update();
508 channel.sendUserProfile(channel.users, user);
511 'Updated profile for user %s in channel %s',
519 LOGGER.error('handleUserProfileChange failed: %s', error);
523 Server.prototype.handleChannelDelete = function (event) {
525 const lname = event.channel.toLowerCase();
527 this.channels.forEach(channel => {
528 if (channel.dead) return;
530 if (channel.uniqueName === lname) {
531 channel.clearFlag(Flags.C_REGISTERED);
533 const users = Array.prototype.slice.call(channel.users);
535 u.kick('Channel deleted');
538 if (!channel.dead && !channel.dying) {
539 channel.emit('empty');
542 LOGGER.info('Processed deleted channel %s', lname);
546 LOGGER.error('handleChannelDelete failed: %s', error);
550 Server.prototype.handleChannelRegister = function (event) {
552 const lname = event.channel.toLowerCase();
554 this.channels.forEach(channel => {
555 if (channel.dead) return;
557 if (channel.uniqueName === lname) {
558 channel.clearFlag(Flags.C_REGISTERED);
560 const users = Array.prototype.slice.call(channel.users);
562 u.kick('Channel reloading');
565 if (!channel.dead && !channel.dying) {
566 channel.emit('empty');
569 LOGGER.info('Processed registered channel %s', lname);
573 LOGGER.error('handleChannelRegister failed: %s', error);