1 import { EventEmitter
} from 'events';
2 import { v4 as uuidv4
} from 'uuid';
4 const LOGGER
= require('@calzoneman/jsli')('redis-messagebus');
6 class RedisMessageBus
extends EventEmitter
{
7 constructor(pubClient
, subClient
, channel
) {
10 this.pubClient
= pubClient
;
11 this.subClient
= subClient
;
12 this.channel
= channel
;
13 this.publisherID
= uuidv4();
15 subClient
.once('ready', this.subscribe
.bind(this));
19 this.subClient
.subscribe(this.channel
);
20 this.subClient
.on('message', this.onMessage
.bind(this));
22 LOGGER
.info('Subscribed to Redis messages on channel %s', this.channel
);
25 onMessage(channel
, message
) {
26 if (channel
!== this.channel
) {
27 LOGGER
.warn('Ignoring message from mismatched channel "%s"', channel
);
32 const { event
, payload
} = JSON
.parse(message
);
34 this._emit(event
, payload
);
36 if (error
instanceof SyntaxError
) {
38 'Malformed message received: %s (message: "%s")',
43 LOGGER
.error('Unexpected error decoding message: %s', error
.stack
);
50 async
emit(event
, payload
) {
52 const message
= JSON
.stringify({
54 publisher
: this.publisherID
,
59 await
this.pubClient
.publish(this.channel
, message
);
61 LOGGER
.error('Unable to send event %s: %s', event
, error
);
66 Object
.assign(RedisMessageBus
.prototype, {
67 _emit
: EventEmitter
.prototype.emit
70 export { RedisMessageBus
};