Merge branch '3.0' of https://github.com/calzoneman/sync into 3.0
[KisSync.git] / src / server.js
blobb1a5b3b9fc43f867f8218b515448f5f3dd345c55
1 const VERSION = require("../package.json").version;
2 var singleton = null;
3 var Config = require("./config");
4 var Promise = require("bluebird");
5 import * as ChannelStore from './channel-storage/channelstore';
6 import { EventEmitter } from 'events';
8 const LOGGER = require('@calzoneman/jsli')('server');
10 module.exports = {
11     init: function () {
12         LOGGER.info("Starting CyTube v%s", VERSION);
13         var chanlogpath = path.join(__dirname, "../chanlogs");
14         fs.exists(chanlogpath, function (exists) {
15             exists || fs.mkdirSync(chanlogpath);
16         });
18         var gdvttpath = path.join(__dirname, "../google-drive-subtitles");
19         fs.exists(gdvttpath, function (exists) {
20             exists || fs.mkdirSync(gdvttpath);
21         });
23         singleton = new Server();
24         return singleton;
25     },
27     getServer: function () {
28         return singleton;
29     }
32 const path = require("path");
33 const fs = require("fs");
34 const http = require("http");
35 const https = require("https");
36 const express = require("express");
37 const Channel = require("./channel/channel");
38 const db = require("./database");
39 const Flags = require("./flags");
40 const sio = require("socket.io");
41 import LocalChannelIndex from './web/localchannelindex';
42 import { PartitionChannelIndex } from './partition/partitionchannelindex';
43 import IOConfiguration from './configuration/ioconfig';
44 import WebConfiguration from './configuration/webconfig';
45 import session from './session';
46 import { LegacyModule } from './legacymodule';
47 import { PartitionModule } from './partition/partitionmodule';
48 import { Gauge } from 'prom-client';
49 import { EmailController } from './controller/email';
50 import { CaptchaController } from './controller/captcha';
52 var Server = function () {
53     var self = this;
54     self.channels = [],
55     self.express = null;
56     self.db = null;
57     self.api = null;
58     self.announcement = null;
59     self.infogetter = null;
60     self.servers = {};
61     self.chanPath = Config.get('channel-path');
63     var initModule;
64     if (Config.get('enable-partition')) {
65         initModule = this.initModule = new PartitionModule();
66         self.partitionDecider = initModule.getPartitionDecider();
67     } else {
68         initModule = this.initModule = new LegacyModule();
69     }
71     const globalMessageBus = this.initModule.getGlobalMessageBus();
72     globalMessageBus.on('UserProfileChanged', this.handleUserProfileChange.bind(this));
73     globalMessageBus.on('ChannelDeleted', this.handleChannelDelete.bind(this));
74     globalMessageBus.on('ChannelRegistered', this.handleChannelRegister.bind(this));
76     // database init ------------------------------------------------------
77     var Database = require("./database");
78     self.db = Database;
79     self.db.init();
80     ChannelStore.init();
82     let emailTransport;
83     if (Config.getEmailConfig().getPasswordReset().isEnabled()) {
84         const smtpConfig = Config.getEmailConfig().getSmtp();
85         emailTransport = require("nodemailer").createTransport({
86             host: smtpConfig.getHost(),
87             port: smtpConfig.getPort(),
88             secure: smtpConfig.isSecure(),
89             auth: {
90                 user: smtpConfig.getUser(),
91                 pass: smtpConfig.getPassword()
92             }
93         });
94     } else {
95         emailTransport = {
96             sendMail() {
97                 throw new Error('Email is not enabled on this server');
98             }
99         };
100     }
102     const emailController = new EmailController(
103         emailTransport,
104         Config.getEmailConfig()
105     );
107     const captchaController = new CaptchaController(
108         Config.getCaptchaConfig()
109     );
111     // webserver init -----------------------------------------------------
112     const ioConfig = IOConfiguration.fromOldConfig(Config);
113     const webConfig = WebConfiguration.fromOldConfig(Config);
114     const clusterClient = initModule.getClusterClient();
115     var channelIndex;
116     if (Config.get("enable-partition")) {
117         channelIndex = new PartitionChannelIndex(
118                 initModule.getRedisClientProvider().get(),
119                 initModule.getRedisClientProvider().get(),
120                 initModule.partitionConfig.getChannelIndexChannel()
121         );
122     } else {
123         channelIndex = new LocalChannelIndex();
124     }
125     self.express = express();
126     require("./web/webserver").init(
127             self.express,
128             webConfig,
129             ioConfig,
130             clusterClient,
131             channelIndex,
132             session,
133             globalMessageBus,
134             Config.getEmailConfig(),
135             emailController,
136             Config.getCaptchaConfig(),
137             captchaController
138     );
140     // http/https/sio server init -----------------------------------------
141     var key = "", cert = "", ca = undefined;
142     if (Config.get("https.enabled")) {
143         const certData = self.loadCertificateData();
144         key = certData.key;
145         cert = certData.cert;
146         ca = certData.ca;
147     }
149     var opts = {
150         key: key,
151         cert: cert,
152         passphrase: Config.get("https.passphrase"),
153         ca: ca,
154         ciphers: Config.get("https.ciphers"),
155         honorCipherOrder: true
156     };
158     Config.get("listen").forEach(function (bind) {
159         var id = bind.ip + ":" + bind.port;
160         if (id in self.servers) {
161             LOGGER.warn("Ignoring duplicate listen address %s", id);
162             return;
163         }
165         if (bind.https && Config.get("https.enabled")) {
166             self.servers[id] = https.createServer(opts, self.express);
167             // 2 minute default copied from node <= 12.x
168             self.servers[id].timeout = 120000;
169             self.servers[id].listen(bind.port, bind.ip);
170             self.servers[id].on("error", error => {
171                 if (error.code === "EADDRINUSE") {
172                     LOGGER.fatal(
173                         "Could not bind %s: address already in use.  Check " +
174                         "whether another application has already bound this " +
175                         "port, or whether another instance of this server " +
176                         "is running.",
177                         id
178                     );
179                     process.exit(1);
180                 }
181             });
182         } else if (bind.http) {
183             self.servers[id] = http.createServer(self.express);
184             // 2 minute default copied from node <= 12.x
185             self.servers[id].timeout = 120000;
186             self.servers[id].listen(bind.port, bind.ip);
187             self.servers[id].on("error", error => {
188                 if (error.code === "EADDRINUSE") {
189                     LOGGER.fatal(
190                         "Could not bind %s: address already in use.  Check " +
191                         "whether another application has already bound this " +
192                         "port, or whether another instance of this server " +
193                         "is running.",
194                         id
195                     );
196                     process.exit(1);
197                 }
198             });
199         }
200     });
202     require("./io/ioserver").init(self, webConfig);
204     // background tasks init ----------------------------------------------
205     require("./bgtask")(self);
207     // prometheus server
208     const prometheusConfig = Config.getPrometheusConfig();
209     if (prometheusConfig.isEnabled()) {
210         require("./prometheus-server").init(prometheusConfig);
211     }
213     // setuid
214     require("./setuid");
216     initModule.onReady();
219 Server.prototype = Object.create(EventEmitter.prototype);
221 Server.prototype.loadCertificateData = function loadCertificateData() {
222     const data = {
223         key: fs.readFileSync(path.resolve(__dirname, "..",
224                                           Config.get("https.keyfile"))),
225         cert: fs.readFileSync(path.resolve(__dirname, "..",
226                                            Config.get("https.certfile")))
227     };
229     if (Config.get("https.cafile")) {
230         data.ca = fs.readFileSync(path.resolve(__dirname, "..",
231                                                Config.get("https.cafile")));
232     }
234     return data;
237 Server.prototype.reloadCertificateData = function reloadCertificateData() {
238     const certData = this.loadCertificateData();
239     Object.keys(this.servers).forEach(key => {
240         const server = this.servers[key];
241         // TODO: Replace with actual node API
242         // once https://github.com/nodejs/node/issues/4464 is implemented.
243         if (server._sharedCreds) {
244             try {
245                 server._sharedCreds.context.setCert(certData.cert);
246                 server._sharedCreds.context.setKey(certData.key, Config.get("https.passphrase"));
247                 LOGGER.info('Reloaded certificate data for %s', key);
248             } catch (error) {
249                 LOGGER.error('Failed to reload certificate data for %s: %s', key, error.stack);
250             }
251         }
252     });
255 Server.prototype.isChannelLoaded = function (name) {
256     name = name.toLowerCase();
257     for (var i = 0; i < this.channels.length; i++) {
258         if (this.channels[i].uniqueName == name)
259             return true;
260     }
261     return false;
264 const promActiveChannels = new Gauge({
265     name: 'cytube_channels_num_active',
266     help: 'Number of channels currently active'
268 Server.prototype.getChannel = function (name) {
269     var cname = name.toLowerCase();
270     if (this.partitionDecider &&
271             !this.partitionDecider.isChannelOnThisPartition(cname)) {
272         const error = new Error(`Channel '${cname}' is mapped to a different partition`);
273         error.code = 'EWRONGPART';
274         throw error;
275     }
277     var self = this;
278     for (var i = 0; i < self.channels.length; i++) {
279         if (self.channels[i].uniqueName === cname)
280             return self.channels[i];
281     }
283     var c = new Channel(name);
284     promActiveChannels.inc();
285     c.on("empty", function () {
286         self.unloadChannel(c);
287     });
288     c.waitFlag(Flags.C_ERROR, () => {
289         self.unloadChannel(c, { skipSave: true });
290     });
291     self.channels.push(c);
292     return c;
295 Server.prototype.unloadChannel = function (chan, options) {
296     var self = this;
298     if (chan.dead || chan.dying) {
299         return;
300     }
302     chan.dying = true;
304     if (!options) {
305         options = {};
306     }
308     if (!options.skipSave) {
309         chan.saveState().catch(error => {
310             LOGGER.error(`Failed to save /${this.chanPath}/${chan.name} for unload: ${error.stack}`);
311         }).then(finishUnloading);
312     } else {
313         finishUnloading();
314     }
316     function finishUnloading() {
317         chan.logger.log("[init] Channel shutting down");
318         chan.logger.close();
320         chan.notifyModules("unload", []);
321         Object.keys(chan.modules).forEach(function (k) {
322             chan.modules[k].dead = true;
323             /*
324              * Automatically clean up any timeouts/intervals assigned
325              * to properties of channel modules.  Prevents a memory leak
326              * in case of forgetting to clear the timer on the "unload"
327              * module event.
328              */
329             Object.keys(chan.modules[k]).forEach(function (prop) {
330                 if (chan.modules[k][prop] && chan.modules[k][prop]._onTimeout) {
331                     LOGGER.warn("Detected non-null timer when unloading " +
332                             "module " + k + ": " + prop);
333                     try {
334                         clearTimeout(chan.modules[k][prop]);
335                         clearInterval(chan.modules[k][prop]);
336                     } catch (error) {
337                         LOGGER.error(error.stack);
338                     }
339                 }
340             });
341         });
343         for (var i = 0; i < self.channels.length; i++) {
344             if (self.channels[i].uniqueName === chan.uniqueName) {
345                 self.channels.splice(i, 1);
346                 i--;
347             }
348         }
350         LOGGER.info("Unloaded channel " + chan.name);
351         chan.broadcastUsercount.cancel();
352         // Empty all outward references from the channel
353         Object.keys(chan).forEach(key => {
354             if (key !== "refCounter") {
355                 delete chan[key];
356             }
357         });
358         chan.dead = true;
359         promActiveChannels.dec();
360     }
363 Server.prototype.packChannelList = function (publicOnly, isAdmin) {
364     var channels = this.channels.filter(function (c) {
365         if (!publicOnly) {
366             return true;
367         }
369         return c.modules.options && c.modules.options.get("show_public");
370     });
372     return channels.map(function (c) {
373         return c.packInfo(isAdmin);
374     });
377 Server.prototype.announce = function (data) {
378     this.setAnnouncement(data);
380     if (data == null) {
381         db.clearAnnouncement();
382     } else {
383         db.setAnnouncement(data);
384     }
386     this.emit("announcement", data);
389 Server.prototype.setAnnouncement = function (data) {
390     if (data == null) {
391         this.announcement = null;
392     } else {
393         this.announcement = data;
394         sio.instance.emit("announcement", data);
395     }
398 Server.prototype.forceSave = function () {
399     Promise.map(this.channels, async channel => {
400         try {
401             await channel.saveState();
402             LOGGER.info(`Saved /${this.chanPath}/${channel.name}`);
403         } catch (error) {
404             LOGGER.error(
405                 'Failed to save /%s/%s: %s',
406                 this.chanPath,
407                 channel ? channel.name : '<undefined>',
408                 error.stack
409             );
410         }
411     }, { concurrency: 5 }).then(() => {
412         LOGGER.info('Finished save');
413     });
416 Server.prototype.shutdown = function () {
417     LOGGER.info("Unloading channels");
418     Promise.map(this.channels, async channel => {
419         try {
420             await channel.saveState();
421             LOGGER.info(`Saved /${this.chanPath}/${channel.name}`);
422         } catch (error) {
423             LOGGER.error(
424                 'Failed to save /%s/%s: %s',
425                 this.chanPath,
426                 channel ? channel.name : '<undefined>',
427                 error.stack
428             );
429         }
430     }, { concurrency: 5 }).then(() => {
431         LOGGER.info("Goodbye");
432         process.exit(0);
433     }).catch(err => {
434         LOGGER.error(`Caught error while saving channels: ${err.stack}`);
435         process.exit(1);
436     });
439 Server.prototype.handlePartitionMapChange = function () {
440     const channels = Array.prototype.slice.call(this.channels);
441     Promise.map(channels, async channel => {
442         if (channel.dead) {
443             return;
444         }
446         if (!this.partitionDecider.isChannelOnThisPartition(channel.uniqueName)) {
447             LOGGER.info("Partition changed for " + channel.uniqueName);
448             try {
449                 await channel.saveState();
451                 channel.broadcastAll(
452                     "partitionChange",
453                     this.partitionDecider.getPartitionForChannel(
454                         channel.uniqueName
455                     )
456                 );
458                 const users = Array.prototype.slice.call(channel.users);
459                 users.forEach(u => {
460                     try {
461                         u.socket.disconnect();
462                     } catch (error) {
463                         // Ignore
464                     }
465                 });
467                 this.unloadChannel(channel, { skipSave: true });
468             } catch (error) {
469                 LOGGER.error(
470                     'Failed to unload /%s/%s for partition map flip: %s',
471                     this.chanPath,
472                     channel ? channel.name : '<undefined>',
473                     error.stack
474                 );
475             }
476         }
477     }, { concurrency: 5 }).then(() => {
478         LOGGER.info("Partition reload complete");
479     });
482 Server.prototype.reloadPartitionMap = function () {
483     if (!Config.get("enable-partition")) {
484         return;
485     }
487     this.initModule.getPartitionMapReloader().reload();
490 Server.prototype.handleUserProfileChange = function (event) {
491     try {
492         const lname = event.user.toLowerCase();
494         // Probably not the most efficient thing in the world, but w/e
495         // profile changes are not high volume
496         this.channels.forEach(channel => {
497             if (channel.dead) return;
499             channel.users.forEach(user => {
500                 if (user.getLowerName() === lname && user.account.user) {
501                     user.account.user.profile = {
502                         image: event.profile.image,
503                         text: event.profile.text
504                     };
506                     user.account.update();
508                     channel.sendUserProfile(channel.users, user);
510                     LOGGER.info(
511                             'Updated profile for user %s in channel %s',
512                             lname,
513                             channel.name
514                     );
515                 }
516             });
517         });
518     } catch (error) {
519         LOGGER.error('handleUserProfileChange failed: %s', error);
520     }
523 Server.prototype.handleChannelDelete = function (event) {
524     try {
525         const lname = event.channel.toLowerCase();
527         this.channels.forEach(channel => {
528             if (channel.dead) return;
530             if (channel.uniqueName === lname) {
531                 channel.clearFlag(Flags.C_REGISTERED);
533                 const users = Array.prototype.slice.call(channel.users);
534                 users.forEach(u => {
535                     u.kick('Channel deleted');
536                 });
538                 if (!channel.dead && !channel.dying) {
539                     channel.emit('empty');
540                 }
542                 LOGGER.info('Processed deleted channel %s', lname);
543             }
544         });
545     } catch (error) {
546         LOGGER.error('handleChannelDelete failed: %s', error);
547     }
550 Server.prototype.handleChannelRegister = function (event) {
551     try {
552         const lname = event.channel.toLowerCase();
554         this.channels.forEach(channel => {
555             if (channel.dead) return;
557             if (channel.uniqueName === lname) {
558                 channel.clearFlag(Flags.C_REGISTERED);
560                 const users = Array.prototype.slice.call(channel.users);
561                 users.forEach(u => {
562                     u.kick('Channel reloading');
563                 });
565                 if (!channel.dead && !channel.dying) {
566                     channel.emit('empty');
567                 }
569                 LOGGER.info('Processed registered channel %s', lname);
570             }
571         });
572     } catch (error) {
573         LOGGER.error('handleChannelRegister failed: %s', error);
574     }