5 * Event relayer for Apache Kafka.
6 * Configuring for WANCache:
7 * 'relayerConfig' => [ 'class' => 'EventRelayerKafka', 'KafkaEventHost' => 'localhost:9092' ],
9 class EventRelayerKafka
extends EventRelayer
{
25 * Create Kafka producer.
27 * @param array $params
29 public function __construct( array $params ) {
30 parent
::__construct( $params );
32 $this->config
= new HashConfig( $params );
33 if ( !$this->config
->has( 'KafkaEventHost' ) ) {
34 throw new InvalidArgumentException( "KafkaEventHost must be configured" );
39 * Get the producer object from kafka-php.
42 protected function getKafkaProducer() {
43 if ( !$this->producer
) {
44 $this->producer
= Produce
::getInstance(
45 null, null, $this->config
->get( 'KafkaEventHost' ) );
47 return $this->producer
;
50 protected function doNotify( $channel, array $events ) {
51 $jsonEvents = array_map( 'json_encode', $events );
53 $producer = $this->getKafkaProducer();
54 $producer->setMessages( $channel, 0, $jsonEvents );
56 } catch ( \Kafka\Exception
$e ) {
57 $this->logger
->warning( "Sending events failed: $e" );