8 in import ./make-test-python.nix ({ pkgs, lib, ... }: {
10 meta = with pkgs.lib; { maintainers = with maintainers; [ c0deaddict ]; };
13 client = { pkgs, ... }: {
14 environment.systemPackages = with pkgs; [ natscli ];
17 server = { pkgs, ... }: {
18 networking.firewall.allowedTCPPorts = [ port ];
37 testScript = let file = "/tmp/msg";
42 "--server=nats://server:${toString port} "
44 "--password=${password} "
46 ).format(" ".join(args))
49 from threading import Thread
50 threads = [ Thread(target=fn) for fn in fns ]
51 for t in threads: t.start()
52 for t in threads: t.join()
55 server.wait_for_unit("nats.service")
57 with subtest("pub sub"):
59 lambda: client1.succeed(nats_cmd("sub", "--count", "1", "${topic}")),
60 lambda: client2.succeed("sleep 2 && {}".format(nats_cmd("pub", "${topic}", "hello"))),