ctdb-scripts: Improve update and listing code
[samba4-gss.git] / python / samba / tests / blackbox / smbcontrol_process.py
blob1ff7720b07961c07700e3303a81dc1aee6eff28d
1 # Blackbox tests for the smbcontrol fault injection commands
3 # Copyright (C) Andrew Bartlett <abartlet@samba.org> 2018
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 # As the test terminates and sleeps samba processes these tests need to run
19 # in the preforkrestartdc test environment to prevent them impacting other
20 # tests.
22 import time
23 from samba.tests import BlackboxTestCase, BlackboxProcessError
24 from samba.messaging import Messaging
26 COMMAND = "bin/smbcontrol"
27 PING = "ping"
30 class SmbcontrolProcessBlockboxTests(BlackboxTestCase):
32 def setUp(self):
33 super().setUp()
34 lp_ctx = self.get_loadparm()
35 self.msg_ctx = Messaging(lp_ctx=lp_ctx)
37 def get_process_data(self):
38 services = self.msg_ctx.irpc_all_servers()
40 processes = []
41 for service in services:
42 for id in service.ids:
43 processes.append((service.name, id.pid))
44 return processes
46 def get_process(self, name):
47 processes = self.get_process_data()
48 for pname, pid in processes:
49 if name == pname:
50 return pid
51 return None
53 def test_inject_fault(self):
54 INJECT = "inject"
55 FAULT = "segv"
57 # Note that this process name needs to be different to the one used
58 # in the sleep test to avoid a race condition
60 pid = self.get_process("rpc_server")
63 # Ensure we can ping the process before injecting a fault.
65 try:
66 self.check_run("%s %s %s" % (COMMAND, pid, PING),
67 msg="trying to ping rpc_server")
68 except BlackboxProcessError as e:
69 self.fail("Unable to ping rpc_server process")
72 # Now inject a fault.
74 try:
75 self.check_run("%s %s %s %s" % (COMMAND, pid, INJECT, FAULT),
76 msg="injecting fault into rpc_server")
77 except BlackboxProcessError as e:
78 print(e)
79 self.fail("Unable to inject a fault into the rpc_server process")
82 # The process should have died, so we should not be able to ping it
84 try:
85 self.check_run("%s %s %s" % (COMMAND, pid, PING),
86 msg="trying to ping rpc_server")
87 self.fail("Could ping rpc_server process")
88 except BlackboxProcessError as e:
89 pass
91 def test_sleep(self):
92 SLEEP = "sleep" # smbcontrol sleep command
93 DURATION = 5 # duration to sleep server for
94 DELTA = 1 # permitted error for the sleep duration
97 # Note that this process name needs to be different to the one used
98 # in the inject fault test to avoid a race condition
100 pid = self.get_process("ldap_server")
102 # Ensure we can ping the process before getting it to sleep
104 try:
105 self.check_run("%s %s %s" % (COMMAND, pid, PING),
106 msg="trying to ping rpc_server")
107 except BlackboxProcessError as e:
108 self.fail("Unable to ping rpc_server process")
111 # Now ask it to sleep
113 start = time.time()
114 try:
115 self.check_run("%s %s %s %s" % (COMMAND, pid, SLEEP, DURATION),
116 msg="putting rpc_server to sleep for %d" % DURATION)
117 except BlackboxProcessError as e:
118 print(e)
119 self.fail("Failed to get rpc_server to sleep for %d" % DURATION)
122 # The process should be sleeping and not respond until it wakes
124 try:
125 self.check_run("%s %s %s" % (COMMAND, pid, PING),
126 msg="trying to ping rpc_server")
127 end = time.time()
128 duration = end - start
129 self.assertGreater(duration + DELTA, DURATION)
130 except BlackboxProcessError as e:
131 self.fail("Unable to ping rpc_server process")