[AMDGPU] Add True16 register classes.
[llvm-project.git] / lldb / test / API / python_api / was_interrupted / TestDebuggerInterruption.py
blob4912b6c5841fc35e7684ea08eddce7bff71d599a
1 """
2 Test SBDebugger.InterruptRequested and SBCommandInterpreter.WasInterrupted.
3 """
5 import lldb
6 import lldbsuite.test.lldbutil as lldbutil
7 from lldbsuite.test.lldbtest import *
8 import threading
9 import os
12 class TestDebuggerInterruption(TestBase):
13 """This test runs a command that starts up, rendevous with the test thread
14 using threading barriers, then checks whether it has been interrupted.
16 The command's first argument is either 'interp' or 'debugger', to test
17 InterruptRequested and WasInterrupted respectively.
19 The command has two modes, interrupt and check, the former is the one that
20 waits for an interrupt. Then latter just returns whether an interrupt was
21 requested. We use the latter to make sure we took down the flag correctly."""
23 NO_DEBUG_INFO_TESTCASE = True
25 class CommandRunner(threading.Thread):
26 """This class is for running a command, and for making a thread to run the command on.
27 It gets passed the test it is working on behalf of, and most of the important
28 objects come from the test."""
30 def __init__(self, test):
31 super().__init__()
32 self.test = test
34 def rendevous(self):
35 # We smuggle out barriers and event to the runner thread using thread local data:
36 import interruptible
38 interruptible.local_data = interruptible.BarrierContainer(
39 self.test.before_interrupt_barrier,
40 self.test.after_interrupt_barrier,
41 self.test.event,
44 class DirectCommandRunner(CommandRunner):
45 """ "This version runs a single command using HandleCommand."""
47 def __init__(self, test, command):
48 super().__init__(test)
49 self.command = command
51 def run(self):
52 self.rendevous()
53 result = self.test.dbg.GetCommandInterpreter().HandleCommand(
54 self.command, self.test.result
56 if self.test.result_barrier:
57 self.test.result_barrier.wait()
59 class CommandInterpreterRunner(CommandRunner):
60 """This version runs the CommandInterpreter and feeds the command to it."""
62 def __init__(self, test):
63 super().__init__(test)
65 def run(self):
66 self.rendevous()
68 test = self.test
70 # We will use files for debugger input and output:
72 # First write down the command:
73 with open(test.getBuildArtifact(test.in_filename), "w") as f:
74 f.write(f"{test.command}\n")
76 # Now set the debugger's stdout & stdin to our files, and run
77 # the CommandInterpreter:
78 with open(test.out_filename, "w") as outf, open(
79 test.in_filename, "r"
80 ) as inf:
81 outsbf = lldb.SBFile(outf.fileno(), "w", False)
82 orig_outf = test.dbg.GetOutputFile()
83 error = test.dbg.SetOutputFile(outsbf)
84 test.assertSuccess(error, "Could not set outfile")
86 insbf = lldb.SBFile(inf.fileno(), "r", False)
87 orig_inf = test.dbg.GetOutputFile()
88 error = test.dbg.SetInputFile(insbf)
89 test.assertSuccess(error, "Could not set infile")
91 options = lldb.SBCommandInterpreterRunOptions()
92 options.SetPrintResults(True)
93 options.SetEchoCommands(False)
95 test.dbg.RunCommandInterpreter(True, False, options, 0, False, False)
96 test.dbg.GetOutputFile().Flush()
98 error = test.dbg.SetOutputFile(orig_outf)
99 test.assertSuccess(error, "Restored outfile")
100 test.dbg.SetInputFile(orig_inf)
101 test.assertSuccess(error, "Restored infile")
103 def command_setup(self, args):
104 """Insert our command, if needed. Then set up event and barriers if needed.
105 Then return the command to run."""
107 self.interp = self.dbg.GetCommandInterpreter()
108 self.command_name = "interruptible_command"
109 self.cmd_result = lldb.SBCommandReturnObject()
111 if not "check" in args:
112 self.event = threading.Event()
113 self.result_barrier = threading.Barrier(2, timeout=10)
114 self.before_interrupt_barrier = threading.Barrier(2, timeout=10)
115 self.after_interrupt_barrier = threading.Barrier(2, timeout=10)
116 else:
117 self.event = None
118 self.result_barrier = None
119 self.before_interrupt_barrier = None
120 self.after_interrupt_barrier = None
122 if not self.interp.UserCommandExists(self.command_name):
123 # Make the command we're going to use - it spins calling WasInterrupted:
124 cmd_filename = "interruptible"
125 cmd_filename = os.path.join(self.getSourceDir(), "interruptible.py")
126 self.runCmd(f"command script import {cmd_filename}")
127 cmd_string = f"command script add {self.command_name} --class interruptible.WelcomeCommand"
128 self.runCmd(cmd_string)
130 if len(args) == 0:
131 command = self.command_name
132 else:
133 command = self.command_name + " " + args
134 return command
136 def run_single_command(self, command):
137 # Now start up a thread to run the command:
138 self.result.Clear()
139 self.runner = TestDebuggerInterruption.DirectCommandRunner(self, command)
140 self.runner.start()
142 def start_command_interp(self):
143 self.runner = TestDebuggerInterruption.CommandInterpreterRunner(self)
144 self.runner.start()
146 def check_text(self, result_text, interrupted):
147 if interrupted:
148 self.assertIn(
149 "Command was interrupted", result_text, "Got the interrupted message"
151 else:
152 self.assertIn(
153 "Command was not interrupted",
154 result_text,
155 "Got the not interrupted message",
158 def gather_output(self):
159 # Now wait for the interrupt to interrupt the command:
160 self.runner.join(10.0)
161 finished = not self.runner.is_alive()
162 # Don't leave the runner thread stranded if the interrupt didn't work.
163 if not finished:
164 self.event.set()
165 self.runner.join(10.0)
167 self.assertTrue(finished, "We did finish the command")
169 def check_result(self, interrupted=True):
170 self.gather_output()
171 self.check_text(self.result.GetOutput(), interrupted)
173 def check_result_output(self, interrupted=True):
174 self.gather_output()
175 buffer = ""
176 # Okay, now open the file for reading, and read.
177 with open(self.out_filename, "r") as f:
178 buffer = f.read()
180 self.assertNotEqual(len(buffer), 0, "No command data")
181 self.check_text(buffer, interrupted)
183 def debugger_interrupt_test(self, use_interrupt_requested):
184 """Test that debugger interruption interrupts a command
185 running directly through HandleCommand.
186 If use_interrupt_requested is true, we'll check that API,
187 otherwise we'll check WasInterrupted. They should both do
188 the same thing."""
190 if use_interrupt_requested:
191 command = self.command_setup("debugger")
192 else:
193 command = self.command_setup("interp")
195 self.result = lldb.SBCommandReturnObject()
196 self.run_single_command(command)
198 # Okay now wait till the command has gotten started to issue the interrupt:
199 self.before_interrupt_barrier.wait()
200 # I'm going to do it twice here to test that it works as a counter:
201 self.dbg.RequestInterrupt()
202 self.dbg.RequestInterrupt()
204 def cleanup():
205 self.dbg.CancelInterruptRequest()
207 self.addTearDownHook(cleanup)
208 # Okay, now set both sides going:
209 self.after_interrupt_barrier.wait()
211 # Check that the command was indeed interrupted. First rendevous
212 # after the runner thread had a chance to execute the command:
213 self.result_barrier.wait()
214 self.assertTrue(self.result.Succeeded(), "Our command succeeded")
215 result_output = self.result.GetOutput()
216 self.check_result(True)
218 # Do it again to make sure that the counter is counting:
219 self.dbg.CancelInterruptRequest()
220 command = self.command_setup("debugger")
221 self.run_single_command(command)
223 # This time we won't even get to run the command, since HandleCommand
224 # checks for the interrupt state on entry, so we don't wait on the command
225 # barriers.
226 self.result_barrier.wait()
228 # Again check that we were
229 self.assertFalse(self.result.Succeeded(), "Our command was not allowed to run")
230 error_output = self.result.GetError()
231 self.assertIn(
232 "... Interrupted", error_output, "Command was cut short by interrupt"
235 # Now take down the flag, and make sure that we aren't interrupted:
236 self.dbg.CancelInterruptRequest()
238 # Now make sure that we really did take down the flag:
239 command = self.command_setup("debugger check")
240 self.run_single_command(command)
241 result_output = self.result.GetOutput()
242 self.check_result(False)
244 def test_debugger_interrupt_use_dbg(self):
245 self.debugger_interrupt_test(True)
247 def test_debugger_interrupt_use_interp(self):
248 self.debugger_interrupt_test(False)
250 def test_interp_doesnt_interrupt_debugger(self):
251 """Test that interpreter interruption does not interrupt a command
252 running directly through HandleCommand.
253 If use_interrupt_requested is true, we'll check that API,
254 otherwise we'll check WasInterrupted. They should both do
255 the same thing."""
257 command = self.command_setup("debugger poll")
259 self.result = lldb.SBCommandReturnObject()
260 self.run_single_command(command)
262 # Now raise the debugger interrupt flag. It will also interrupt the command:
263 self.before_interrupt_barrier.wait()
264 self.dbg.GetCommandInterpreter().InterruptCommand()
265 self.after_interrupt_barrier.wait()
267 # Check that the command was indeed interrupted:
268 self.result_barrier.wait()
269 self.assertTrue(self.result.Succeeded(), "Our command succeeded")
270 result_output = self.result.GetOutput()
271 self.check_result(False)
273 def interruptible_command_test(self, use_interrupt_requested):
274 """Test that interpreter interruption interrupts a command
275 running in the RunCommandInterpreter loop.
276 If use_interrupt_requested is true, we'll check that API,
277 otherwise we'll check WasInterrupted. They should both do
278 the same thing."""
280 self.out_filename = self.getBuildArtifact("output")
281 self.in_filename = self.getBuildArtifact("input")
282 # We're going to overwrite the input file, but we
283 # don't want data accumulating in the output file.
285 if os.path.exists(self.out_filename):
286 os.unlink(self.out_filename)
288 # You should be able to use either check method interchangeably:
289 if use_interrupt_requested:
290 self.command = self.command_setup("debugger") + "\n"
291 else:
292 self.command = self.command_setup("interp") + "\n"
294 self.start_command_interp()
296 # Now give the interpreter a chance to run this command up
297 # to the first barrier
298 self.before_interrupt_barrier.wait()
299 # Then issue the interrupt:
300 sent_interrupt = self.dbg.GetCommandInterpreter().InterruptCommand()
301 self.assertTrue(sent_interrupt, "Did send command interrupt.")
302 # Now give the command a chance to finish:
303 self.after_interrupt_barrier.wait()
305 self.check_result_output(True)
307 os.unlink(self.out_filename)
309 # Now send the check command, and make sure the flag is now down.
310 self.command = self.command_setup("interp check") + "\n"
311 self.start_command_interp()
313 self.check_result_output(False)
315 def test_interruptible_command_check_dbg(self):
316 self.interruptible_command_test(True)
318 def test_interruptible_command_check_interp(self):
319 self.interruptible_command_test(False)
321 def test_debugger_doesnt_interrupt_command(self):
322 """Test that debugger interruption doesn't interrupt a command
323 running in the RunCommandInterpreter loop."""
325 self.out_filename = self.getBuildArtifact("output")
326 self.in_filename = self.getBuildArtifact("input")
327 # We're going to overwrite the input file, but we
328 # don't want data accumulating in the output file.
330 if os.path.exists(self.out_filename):
331 os.unlink(self.out_filename)
333 self.command = self.command_setup("interp poll") + "\n"
335 self.start_command_interp()
337 self.before_interrupt_barrier.wait()
338 self.dbg.RequestInterrupt()
340 def cleanup():
341 self.dbg.CancelInterruptRequest()
343 self.addTearDownHook(cleanup)
344 self.after_interrupt_barrier.wait()
346 self.check_result_output(False)
348 os.unlink(self.out_filename)