1 { system ? builtins.currentSystem,
3 pkgs ? import ../.. { inherit system config; }
9 makeKafkaTest = name: { kafkaPackage, mode ? "zookeeper" }: (import ./make-test-python.nix ({
11 meta = with pkgs.lib.maintainers; {
12 maintainers = [ nequissimus ];
17 services.apache-kafka = mkMerge [
20 package = kafkaPackage;
22 "offsets.topic.replication.factor" = 1;
24 "/var/lib/kafka/logdir1"
25 "/var/lib/kafka/logdir2"
29 (mkIf (mode == "zookeeper") {
31 "zookeeper.session.timeout.ms" = 600000;
32 "zookeeper.connect" = [ "zookeeper1:2181" ];
35 (mkIf (mode == "kraft") {
36 clusterId = "ak2fIHr4S8WWarOF_ODD0g";
48 "listener.security.protocol.map" = [
50 "CONTROLLER:PLAINTEXT"
52 "controller.quorum.voters" = [
55 "controller.listener.names" = [ "CONTROLLER" ];
60 networking.firewall.allowedTCPPorts = [ 9092 9093 ];
61 # i686 tests: qemu-system-i386 can simulate max 2047MB RAM (not 2048)
62 virtualisation.memorySize = 2047;
64 } // optionalAttrs (mode == "zookeeper") {
65 zookeeper1 = { ... }: {
66 services.zookeeper = {
70 networking.firewall.allowedTCPPorts = [ 2181 ];
77 ${optionalString (mode == "zookeeper") ''
78 zookeeper1.wait_for_unit("default.target")
79 zookeeper1.wait_for_unit("zookeeper.service")
80 zookeeper1.wait_for_open_port(2181)
83 kafka.wait_for_unit("default.target")
84 kafka.wait_for_unit("apache-kafka.service")
85 kafka.wait_for_open_port(9092)
87 kafka.wait_until_succeeds(
88 "${kafkaPackage}/bin/kafka-topics.sh --create "
89 + "--bootstrap-server localhost:9092 --partitions 1 "
90 + "--replication-factor 1 --topic testtopic"
94 + "${kafkaPackage}/bin/kafka-console-producer.sh "
95 + "--broker-list localhost:9092 --topic testtopic"
97 assert "test 1" in kafka.succeed(
98 "${kafkaPackage}/bin/kafka-console-consumer.sh "
99 + "--bootstrap-server localhost:9092 --topic testtopic "
100 + "--from-beginning --max-messages 1"
103 }) { inherit system; });
106 kafka_3_6 = makeKafkaTest "kafka_3_6" { kafkaPackage = apacheKafka_3_6; };
107 kafka_3_7 = makeKafkaTest "kafka_3_7" { kafkaPackage = apacheKafka_3_7; };
108 kafka_3_8 = makeKafkaTest "kafka_3_8" { kafkaPackage = apacheKafka_3_8; };
109 kafka = makeKafkaTest "kafka" { kafkaPackage = apacheKafka; };
110 kafka_kraft = makeKafkaTest "kafka_kraft" { kafkaPackage = apacheKafka; mode = "kraft"; };