Merge "Update docs/hooks.txt for ShowSearchHitTitle"
[mediawiki.git] / includes / libs / eventrelayer / EventRelayerKafka.php
blob999eb439357ff3b7c9b0211c979b3ecd568dde05
1 <?php
2 use Kafka\Produce;
4 /**
5 * Event relayer for Apache Kafka.
6 * Configuring for WANCache:
7 * 'relayerConfig' => [ 'class' => 'EventRelayerKafka', 'KafkaEventHost' => 'localhost:9092' ],
8 */
9 class EventRelayerKafka extends EventRelayer {
10 /**
11 * Configuration.
13 * @var Config
15 protected $config;
17 /**
18 * Kafka producer.
20 * @var Produce
22 protected $producer;
24 /**
25 * Create Kafka producer.
27 * @param array $params
29 public function __construct( array $params ) {
30 parent::__construct( $params );
32 $this->config = new HashConfig( $params );
33 if ( !$this->config->has( 'KafkaEventHost' ) ) {
34 throw new InvalidArgumentException( "KafkaEventHost must be configured" );
38 /**
39 * Get the producer object from kafka-php.
40 * @return Produce
42 protected function getKafkaProducer() {
43 if ( !$this->producer ) {
44 $this->producer = Produce::getInstance(
45 null, null, $this->config->get( 'KafkaEventHost' ) );
47 return $this->producer;
50 protected function doNotify( $channel, array $events ) {
51 $jsonEvents = array_map( 'json_encode', $events );
52 try {
53 $producer = $this->getKafkaProducer();
54 $producer->setMessages( $channel, 0, $jsonEvents );
55 $producer->send();
56 } catch ( \Kafka\Exception $e ) {
57 $this->logger->warning( "Sending events failed: $e" );
58 return false;
60 return true;