Merge branch '3.0' of https://github.com/calzoneman/sync into 3.0
[KisSync.git] / src / pubsub / redis.js
blob8f63d63ae2a82ca55d720d85f9245b656b6897f9
1 import { EventEmitter } from 'events';
2 import { v4 as uuidv4 } from 'uuid';
4 const LOGGER = require('@calzoneman/jsli')('redis-messagebus');
6 class RedisMessageBus extends EventEmitter {
7 constructor(pubClient, subClient, channel) {
8 super();
10 this.pubClient = pubClient;
11 this.subClient = subClient;
12 this.channel = channel;
13 this.publisherID = uuidv4();
15 subClient.once('ready', this.subscribe.bind(this));
18 subscribe() {
19 this.subClient.subscribe(this.channel);
20 this.subClient.on('message', this.onMessage.bind(this));
22 LOGGER.info('Subscribed to Redis messages on channel %s', this.channel);
25 onMessage(channel, message) {
26 if (channel !== this.channel) {
27 LOGGER.warn('Ignoring message from mismatched channel "%s"', channel);
28 return;
31 try {
32 const { event, payload } = JSON.parse(message);
34 this._emit(event, payload);
35 } catch (error) {
36 if (error instanceof SyntaxError) {
37 LOGGER.error(
38 'Malformed message received: %s (message: "%s")',
39 message,
40 error
42 } else {
43 LOGGER.error('Unexpected error decoding message: %s', error.stack);
46 return;
50 async emit(event, payload) {
51 try {
52 const message = JSON.stringify({
53 time: new Date(),
54 publisher: this.publisherID,
55 event,
56 payload
57 });
59 await this.pubClient.publish(this.channel, message);
60 } catch (error) {
61 LOGGER.error('Unable to send event %s: %s', event, error);
66 Object.assign(RedisMessageBus.prototype, {
67 _emit: EventEmitter.prototype.emit
68 });
70 export { RedisMessageBus };