1 { config, lib, pkgs, ... }:
6 cfg = config.services.apache-kafka;
9 if cfg.serverProperties != null then
14 broker.id=${toString cfg.brokerId}
15 port=${toString cfg.port}
16 host.name=${cfg.hostname}
17 log.dirs=${concatStringsSep "," cfg.logDirs}
18 zookeeper.connect=${cfg.zookeeper}
19 ${toString cfg.extraProperties}
22 serverConfig = pkgs.writeText "server.properties" serverProperties;
23 logConfig = pkgs.writeText "log4j.properties" cfg.log4jProperties;
27 options.services.apache-kafka = {
29 description = lib.mdDoc "Whether to enable Apache Kafka.";
35 description = lib.mdDoc "Broker ID.";
41 description = lib.mdDoc "Port number the broker should listen on.";
47 description = lib.mdDoc "Hostname the broker should bind to.";
48 default = "localhost";
53 description = lib.mdDoc "Log file directories";
54 default = [ "/tmp/kafka-logs" ];
55 type = types.listOf types.path;
58 zookeeper = mkOption {
59 description = lib.mdDoc "Zookeeper connection string";
60 default = "localhost:2181";
64 extraProperties = mkOption {
65 description = lib.mdDoc "Extra properties for server.properties.";
66 type = types.nullOr types.lines;
70 serverProperties = mkOption {
71 description = lib.mdDoc ''
72 Complete server.properties content. Other server.properties config
73 options will be ignored if this option is used.
75 type = types.nullOr types.lines;
79 log4jProperties = mkOption {
80 description = lib.mdDoc "Kafka log4j property configuration.";
82 log4j.rootLogger=INFO, stdout
84 log4j.appender.stdout=org.apache.log4j.ConsoleAppender
85 log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
86 log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n
91 jvmOptions = mkOption {
92 description = lib.mdDoc "Extra command line options for the JVM running Kafka.";
94 type = types.listOf types.str;
96 "-Djava.net.preferIPv4Stack=true"
97 "-Dcom.sun.management.jmxremote"
98 "-Dcom.sun.management.jmxremote.local.only=true"
103 description = lib.mdDoc "The kafka package to use";
104 default = pkgs.apacheKafka;
105 defaultText = literalExpression "pkgs.apacheKafka";
106 type = types.package;
110 description = lib.mdDoc "The JRE with which to run Kafka";
111 default = cfg.package.passthru.jre;
112 defaultText = literalExpression "pkgs.apacheKafka.passthru.jre";
113 type = types.package;
118 config = mkIf cfg.enable {
120 environment.systemPackages = [cfg.package];
122 users.users.apache-kafka = {
124 group = "apache-kafka";
125 description = "Apache Kafka daemon user";
126 home = head cfg.logDirs;
128 users.groups.apache-kafka = {};
130 systemd.tmpfiles.rules = map (logDir: "d '${logDir}' 0700 apache-kafka - - -") cfg.logDirs;
132 systemd.services.apache-kafka = {
133 description = "Apache Kafka Daemon";
134 wantedBy = [ "multi-user.target" ];
135 after = [ "network.target" ];
138 ${cfg.jre}/bin/java \
139 -cp "${cfg.package}/libs/*" \
140 -Dlog4j.configuration=file:${logConfig} \
141 ${toString cfg.jvmOptions} \
145 User = "apache-kafka";
146 SuccessExitStatus = "0 143";