3 * This program is free software; you can redistribute it and/or modify
4 * it under the terms of the GNU General Public License as published by
5 * the Free Software Foundation; either version 2 of the License, or
6 * (at your option) any later version.
8 * This program is distributed in the hope that it will be useful,
9 * but WITHOUT ANY WARRANTY; without even the implied warranty of
10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 * GNU General Public License for more details.
13 * You should have received a copy of the GNU General Public License along
14 * with this program; if not, write to the Free Software Foundation, Inc.,
15 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
16 * http://www.gnu.org/copyleft/gpl.html
21 namespace MediaWiki\Logger\Monolog
;
23 use Kafka\MetaDataFromKafka
;
25 use MediaWiki\Logger\LoggerFactory
;
26 use Monolog\Handler\AbstractProcessingHandler
;
28 use Psr\Log\LoggerInterface
;
31 * Log handler sends log events to a kafka server.
33 * Constructor options array arguments:
34 * * alias: map from monolog channel to kafka topic name. When no
35 * alias exists the topic "monolog_$channel" will be used.
36 * * swallowExceptions: Swallow exceptions that occur while talking to
37 * kafka. Defaults to false.
38 * * logExceptions: Log exceptions talking to kafka here. Either null,
39 * the name of a channel to log to, or an object implementing
40 * FormatterInterface. Defaults to null.
42 * Requires the nmred/kafka-php library, version >= 1.3.0
45 * @author Erik Bernhardson <ebernhardson@wikimedia.org>
46 * @copyright © 2015 Erik Bernhardson and Wikimedia Foundation.
48 class KafkaHandler
extends AbstractProcessingHandler
{
50 * @var Produce Sends requests to kafka
55 * @var array Optional handler configuration
60 * @var array Map from topic name to partition this request produces to
62 protected $partitions = array();
65 * @var array defaults for constructor options
67 private static $defaultOptions = array(
68 'alias' => array(), // map from monolog channel to kafka topic
69 'swallowExceptions' => false, // swallow exceptions sending records
70 'logExceptions' => null, // A PSR3 logger to inform about errors
74 * @param Produce $produce Kafka instance to produce through
75 * @param array $options optional handler configuration
76 * @param int $level The minimum logging level at which this handler will be triggered
77 * @param bool $bubble Whether the messages that are handled can bubble up the stack or not
79 public function __construct(
80 Produce
$produce, array $options, $level = Logger
::DEBUG
, $bubble = true
82 parent
::__construct( $level, $bubble );
83 $this->produce
= $produce;
84 $this->options
= array_merge( self
::$defaultOptions, $options );
88 * Constructs the necessary support objects and returns a KafkaHandler
91 * @param string[] $kafkaServers
92 * @param array $options
93 * @param int $level The minimum logging level at which this handle will be triggered
94 * @param bool $bubble Whether the messages that are handled can bubble the stack or not
95 * @return KafkaHandler
97 public static function factory(
98 $kafkaServers, array $options = array(), $level = Logger
::DEBUG
, $bubble = true
100 $metadata = new MetaDataFromKafka( $kafkaServers );
101 $produce = new Produce( $metadata );
102 if ( isset( $options['logExceptions'] ) && is_string( $options['logExceptions'] ) ) {
103 $options['logExceptions'] = LoggerFactory
::getInstance( $options['logExceptions'] );
105 return new self( $produce, $options, $level, $bubble );
111 protected function write( array $record ) {
112 if ( $record['formatted'] !== null ) {
113 $this->addMessages( $record['channel'], array( $record['formatted'] ) );
121 public function handleBatch( array $batch ) {
123 foreach ( $batch as $record ) {
124 if ( $record['level'] < $this->level
) {
127 $channels[$record['channel']][] = $this->processRecord( $record );
130 $formatter = $this->getFormatter();
131 foreach ( $channels as $channel => $records ) {
133 foreach ( $records as $idx => $record ) {
134 $message = $formatter->format( $record );
135 if ( $message !== null ) {
136 $messages[] = $message;
140 $this->addMessages( $channel, $messages );
148 * Send any records in the kafka client internal queue.
150 protected function send() {
152 $this->produce
->send();
153 } catch ( \Kafka\Exception
$e ) {
154 $ignore = $this->warning(
155 'Error sending records to kafka: {exception}',
156 array( 'exception' => $e ) );
164 * @param string $topic Name of topic to get partition for
165 * @return int|null The random partition to produce to for this request,
166 * or null if a partition could not be determined.
168 protected function getRandomPartition( $topic ) {
169 if ( !array_key_exists( $topic, $this->partitions
) ) {
171 $partitions = $this->produce
->getAvailablePartitions( $topic );
172 } catch ( \Kafka\Exception
$e ) {
173 $ignore = $this->warning(
174 'Error getting metadata for kafka topic {topic}: {exception}',
175 array( 'topic' => $topic, 'exception' => $e ) );
182 $key = array_rand( $partitions );
183 $this->partitions
[$topic] = $partitions[$key];
185 $details = $this->produce
->getClient()->getTopicDetail( $topic );
186 $ignore = $this->warning(
187 'No partitions available for kafka topic {topic}',
188 array( 'topic' => $topic, 'kafka' => $details )
191 throw new \
RuntimeException( "No partitions available for kafka topic $topic" );
193 $this->partitions
[$topic] = null;
196 return $this->partitions
[$topic];
200 * Adds records for a channel to the Kafka client internal queue.
202 * @param string $channel Name of Monolog channel records belong to
203 * @param array $records List of records to append
205 protected function addMessages( $channel, array $records ) {
206 if ( isset( $this->options
['alias'][$channel] ) ) {
207 $topic = $this->options
['alias'][$channel];
209 $topic = "monolog_$channel";
211 $partition = $this->getRandomPartition( $topic );
212 if ( $partition !== null ) {
213 $this->produce
->setMessages( $topic, $partition, $records );
218 * @param string $message PSR3 compatible message string
219 * @param array $context PSR3 compatible log context
220 * @return bool true if caller should ignore warning
222 protected function warning( $message, array $context = array() ) {
223 if ( $this->options
['logExceptions'] instanceof LoggerInterface
) {
224 $this->options
['logExceptions']->warning( $message, $context );
226 return $this->options
['swallowExceptions'];