1 import ./make-test-python.nix ({ pkgs, lib, ... }:
4 pythonEnv = pkgs.python3.withPackages (p: [p.beanstalkc]);
6 produce = pkgs.writeScript "produce.py" ''
7 #!${pythonEnv.interpreter}
10 queue = beanstalkc.Connection(host='localhost', port=11300, parse_yaml=False);
11 queue.put(b'this is a job')
12 queue.put(b'this is another job')
15 consume = pkgs.writeScript "consume.py" ''
16 #!${pythonEnv.interpreter}
19 queue = beanstalkc.Connection(host='localhost', port=11300, parse_yaml=False);
21 job = queue.reserve(timeout=0)
22 print(job.body.decode('utf-8'))
29 meta.maintainers = [ lib.maintainers.aanderse ];
33 { services.beanstalkd.enable = true;
39 machine.wait_for_unit("beanstalkd.service")
41 machine.succeed("${produce}")
42 assert "this is a job\n" == machine.succeed(
45 assert "this is another job\n" == machine.succeed(