vuls: init at 0.27.0
[NixPkgs.git] / nixos / tests / kafka.nix
blob5390e9d7f79f12d10313667b2b48c3cac44c7a05
1 { system ? builtins.currentSystem,
2   config ? {},
3   pkgs ? import ../.. { inherit system config; }
4 }:
6 with pkgs.lib;
8 let
9   makeKafkaTest = name: { kafkaPackage, mode ? "zookeeper" }: (import ./make-test-python.nix ({
10     inherit name;
11     meta = with pkgs.lib.maintainers; {
12       maintainers = [ nequissimus ];
13     };
15     nodes = {
16       kafka = { ... }: {
17         services.apache-kafka = mkMerge [
18           ({
19             enable = true;
20             package = kafkaPackage;
21             settings = {
22               "offsets.topic.replication.factor" = 1;
23               "log.dirs" = [
24                 "/var/lib/kafka/logdir1"
25                 "/var/lib/kafka/logdir2"
26               ];
27             };
28           })
29           (mkIf (mode == "zookeeper") {
30             settings = {
31               "zookeeper.session.timeout.ms" = 600000;
32               "zookeeper.connect" = [ "zookeeper1:2181" ];
33             };
34           })
35           (mkIf (mode == "kraft") {
36             clusterId = "ak2fIHr4S8WWarOF_ODD0g";
37             formatLogDirs = true;
38             settings = {
39               "node.id" = 1;
40               "process.roles" = [
41                 "broker"
42                 "controller"
43               ];
44               "listeners" = [
45                 "PLAINTEXT://:9092"
46                 "CONTROLLER://:9093"
47               ];
48               "listener.security.protocol.map" = [
49                 "PLAINTEXT:PLAINTEXT"
50                 "CONTROLLER:PLAINTEXT"
51               ];
52               "controller.quorum.voters" = [
53                 "1@kafka:9093"
54               ];
55               "controller.listener.names" = [ "CONTROLLER" ];
56             };
57           })
58         ];
60         networking.firewall.allowedTCPPorts = [ 9092 9093 ];
61         # i686 tests: qemu-system-i386 can simulate max 2047MB RAM (not 2048)
62         virtualisation.memorySize = 2047;
63       };
64     } // optionalAttrs (mode == "zookeeper") {
65       zookeeper1 = { ... }: {
66         services.zookeeper = {
67           enable = true;
68         };
70         networking.firewall.allowedTCPPorts = [ 2181 ];
71       };
72     };
74     testScript = ''
75       start_all()
77       ${optionalString (mode == "zookeeper") ''
78       zookeeper1.wait_for_unit("default.target")
79       zookeeper1.wait_for_unit("zookeeper.service")
80       zookeeper1.wait_for_open_port(2181)
81       ''}
83       kafka.wait_for_unit("default.target")
84       kafka.wait_for_unit("apache-kafka.service")
85       kafka.wait_for_open_port(9092)
87       kafka.wait_until_succeeds(
88           "${kafkaPackage}/bin/kafka-topics.sh --create "
89           + "--bootstrap-server localhost:9092 --partitions 1 "
90           + "--replication-factor 1 --topic testtopic"
91       )
92       kafka.succeed(
93           "echo 'test 1' | "
94           + "${kafkaPackage}/bin/kafka-console-producer.sh "
95           + "--broker-list localhost:9092 --topic testtopic"
96       )
97       assert "test 1" in kafka.succeed(
98           "${kafkaPackage}/bin/kafka-console-consumer.sh "
99           + "--bootstrap-server localhost:9092 --topic testtopic "
100           + "--from-beginning --max-messages 1"
101       )
102     '';
103   }) { inherit system; });
105 in with pkgs; {
106   kafka_3_6 = makeKafkaTest "kafka_3_6" { kafkaPackage = apacheKafka_3_6; };
107   kafka_3_7 = makeKafkaTest "kafka_3_7" { kafkaPackage = apacheKafka_3_7; };
108   kafka_3_8 = makeKafkaTest "kafka_3_8" { kafkaPackage = apacheKafka_3_8; };
109   kafka = makeKafkaTest "kafka" { kafkaPackage = apacheKafka; };
110   kafka_kraft = makeKafkaTest "kafka_kraft" { kafkaPackage = apacheKafka; mode = "kraft"; };