1 { config, lib, pkgs, ... }:
3 cfg = config.services.apache-kafka;
5 # The `javaProperties` generator takes care of various escaping rules and
6 # generation of the properties file, but we'll handle stringly conversion
7 # ourselves in mkPropertySettings and stringlySettings, since we know more
8 # about the specifically allowed format eg. for lists of this type, and we
9 # don't want to coerce-downsample values to str too early by having the
10 # coercedTypes from javaProperties directly in our NixOS option types.
12 # Make sure every `freeformType` and any specific option type in `settings` is
15 mkPropertyString = let
17 bool = lib.boolToString;
19 list = lib.concatMapStringsSep "," mkPropertyString;
23 v: render.${builtins.typeOf v} v;
25 stringlySettings = lib.mapAttrs (_: mkPropertyString)
26 (lib.filterAttrs (_: v: v != null) cfg.settings);
28 generator = (pkgs.formats.javaProperties {}).generate;
31 options.services.apache-kafka = {
32 enable = lib.mkEnableOption "Apache Kafka event streaming broker";
34 settings = lib.mkOption {
36 [Kafka broker configuration](https://kafka.apache.org/documentation.html#brokerconfigs)
37 {file}`server.properties`.
39 Note that .properties files contain mappings from string to string.
40 Keys with dots are NOT represented by nested attrs in these settings,
41 but instead as quoted strings (ie. `settings."broker.id"`, NOT
42 `settings.broker.id`).
44 type = lib.types.submodule {
45 freeformType = with lib.types; let
46 primitive = oneOf [bool int str];
47 in lazyAttrsOf (nullOr (either primitive (listOf primitive)));
50 "broker.id" = lib.mkOption {
51 description = "Broker ID. -1 or null to auto-allocate in zookeeper mode.";
53 type = with lib.types; nullOr int;
56 "log.dirs" = lib.mkOption {
57 description = "Log file directories.";
58 # Deliberaly leave out old default and use the rewrite opportunity
59 # to have users choose a safer value -- /tmp might be volatile and is a
60 # slightly scary default choice.
61 # default = [ "/tmp/apache-kafka" ];
62 type = with lib.types; listOf path;
65 "listeners" = lib.mkOption {
68 See [listeners](https://kafka.apache.org/documentation/#brokerconfigs_listeners).
70 type = lib.types.listOf lib.types.str;
71 default = [ "PLAINTEXT://localhost:9092" ];
77 clusterId = lib.mkOption {
79 KRaft mode ClusterId used for formatting log directories. Can be generated with `kafka-storage.sh random-uuid`
81 type = with lib.types; nullOr str;
85 configFiles.serverProperties = lib.mkOption {
87 Kafka server.properties configuration file path.
88 Defaults to the rendered `settings`.
90 type = lib.types.path;
93 configFiles.log4jProperties = lib.mkOption {
94 description = "Kafka log4j property configuration file path";
95 type = lib.types.path;
96 default = pkgs.writeText "log4j.properties" cfg.log4jProperties;
97 defaultText = ''pkgs.writeText "log4j.properties" cfg.log4jProperties'';
100 formatLogDirs = lib.mkOption {
102 Whether to format log dirs in KRaft mode if all log dirs are
103 unformatted, ie. they contain no meta.properties.
105 type = lib.types.bool;
109 formatLogDirsIgnoreFormatted = lib.mkOption {
111 Whether to ignore already formatted log dirs when formatting log dirs,
112 instead of failing. Useful when replacing or adding disks.
114 type = lib.types.bool;
118 log4jProperties = lib.mkOption {
119 description = "Kafka log4j property configuration.";
121 log4j.rootLogger=INFO, stdout
123 log4j.appender.stdout=org.apache.log4j.ConsoleAppender
124 log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
125 log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n
127 type = lib.types.lines;
130 jvmOptions = lib.mkOption {
131 description = "Extra command line options for the JVM running Kafka.";
133 type = lib.types.listOf lib.types.str;
135 "-Djava.net.preferIPv4Stack=true"
136 "-Dcom.sun.management.jmxremote"
137 "-Dcom.sun.management.jmxremote.local.only=true"
141 package = lib.mkPackageOption pkgs "apacheKafka" { };
144 description = "The JRE with which to run Kafka";
145 default = cfg.package.passthru.jre;
146 defaultText = lib.literalExpression "pkgs.apacheKafka.passthru.jre";
147 type = lib.types.package;
152 (lib.mkRenamedOptionModule
153 [ "services" "apache-kafka" "brokerId" ]
154 [ "services" "apache-kafka" "settings" ''broker.id'' ])
155 (lib.mkRenamedOptionModule
156 [ "services" "apache-kafka" "logDirs" ]
157 [ "services" "apache-kafka" "settings" ''log.dirs'' ])
158 (lib.mkRenamedOptionModule
159 [ "services" "apache-kafka" "zookeeper" ]
160 [ "services" "apache-kafka" "settings" ''zookeeper.connect'' ])
162 (lib.mkRemovedOptionModule [ "services" "apache-kafka" "port" ]
163 "Please see services.apache-kafka.settings.listeners and its documentation instead")
164 (lib.mkRemovedOptionModule [ "services" "apache-kafka" "hostname" ]
165 "Please see services.apache-kafka.settings.listeners and its documentation instead")
166 (lib.mkRemovedOptionModule [ "services" "apache-kafka" "extraProperties" ]
167 "Please see services.apache-kafka.settings and its documentation instead")
168 (lib.mkRemovedOptionModule [ "services" "apache-kafka" "serverProperties" ]
169 "Please see services.apache-kafka.settings and its documentation instead")
172 config = lib.mkIf cfg.enable {
173 services.apache-kafka.configFiles.serverProperties = generator "server.properties" stringlySettings;
175 users.users.apache-kafka = {
177 group = "apache-kafka";
178 description = "Apache Kafka daemon user";
180 users.groups.apache-kafka = {};
182 systemd.tmpfiles.rules = map (logDir: "d '${logDir}' 0700 apache-kafka - - -") cfg.settings."log.dirs";
184 systemd.services.apache-kafka = {
185 description = "Apache Kafka Daemon";
186 wantedBy = [ "multi-user.target" ];
187 after = [ "network.target" ];
188 preStart = lib.mkIf cfg.formatLogDirs
189 (if cfg.formatLogDirsIgnoreFormatted then ''
190 ${cfg.package}/bin/kafka-storage.sh format -t "${cfg.clusterId}" -c ${cfg.configFiles.serverProperties} --ignore-formatted
192 if ${lib.concatMapStringsSep " && " (l: ''[ ! -f "${l}/meta.properties" ]'') cfg.settings."log.dirs"}; then
193 ${cfg.package}/bin/kafka-storage.sh format -t "${cfg.clusterId}" -c ${cfg.configFiles.serverProperties}
198 ${cfg.jre}/bin/java \
199 -cp "${cfg.package}/libs/*" \
200 -Dlog4j.configuration=file:${cfg.configFiles.log4jProperties} \
201 ${toString cfg.jvmOptions} \
203 ${cfg.configFiles.serverProperties}
205 User = "apache-kafka";
206 SuccessExitStatus = "0 143";
211 meta.doc = ./kafka.md;
212 meta.maintainers = with lib.maintainers; [