2 Test SBDebugger.InterruptRequested and SBCommandInterpreter.WasInterrupted.
6 import lldbsuite
.test
.lldbutil
as lldbutil
7 from lldbsuite
.test
.lldbtest
import *
12 class TestDebuggerInterruption(TestBase
):
13 """This test runs a command that starts up, rendevous with the test thread
14 using threading barriers, then checks whether it has been interrupted.
16 The command's first argument is either 'interp' or 'debugger', to test
17 InterruptRequested and WasInterrupted respectively.
19 The command has two modes, interrupt and check, the former is the one that
20 waits for an interrupt. Then latter just returns whether an interrupt was
21 requested. We use the latter to make sure we took down the flag correctly."""
23 NO_DEBUG_INFO_TESTCASE
= True
25 class CommandRunner(threading
.Thread
):
26 """This class is for running a command, and for making a thread to run the command on.
27 It gets passed the test it is working on behalf of, and most of the important
28 objects come from the test."""
30 def __init__(self
, test
):
35 # We smuggle out barriers and event to the runner thread using thread local data:
38 interruptible
.local_data
= interruptible
.BarrierContainer(
39 self
.test
.before_interrupt_barrier
,
40 self
.test
.after_interrupt_barrier
,
44 class DirectCommandRunner(CommandRunner
):
45 """ "This version runs a single command using HandleCommand."""
47 def __init__(self
, test
, command
):
48 super().__init
__(test
)
49 self
.command
= command
53 result
= self
.test
.dbg
.GetCommandInterpreter().HandleCommand(
54 self
.command
, self
.test
.result
56 if self
.test
.result_barrier
:
57 self
.test
.result_barrier
.wait()
59 class CommandInterpreterRunner(CommandRunner
):
60 """This version runs the CommandInterpreter and feeds the command to it."""
62 def __init__(self
, test
):
63 super().__init
__(test
)
70 # We will use files for debugger input and output:
72 # First write down the command:
73 with
open(test
.getBuildArtifact(test
.in_filename
), "w") as f
:
74 f
.write(f
"{test.command}\n")
76 # Now set the debugger's stdout & stdin to our files, and run
77 # the CommandInterpreter:
78 with
open(test
.out_filename
, "w") as outf
, open(
81 outsbf
= lldb
.SBFile(outf
.fileno(), "w", False)
82 orig_outf
= test
.dbg
.GetOutputFile()
83 error
= test
.dbg
.SetOutputFile(outsbf
)
84 test
.assertSuccess(error
, "Could not set outfile")
86 insbf
= lldb
.SBFile(inf
.fileno(), "r", False)
87 orig_inf
= test
.dbg
.GetOutputFile()
88 error
= test
.dbg
.SetInputFile(insbf
)
89 test
.assertSuccess(error
, "Could not set infile")
91 options
= lldb
.SBCommandInterpreterRunOptions()
92 options
.SetPrintResults(True)
93 options
.SetEchoCommands(False)
95 test
.dbg
.RunCommandInterpreter(True, False, options
, 0, False, False)
96 test
.dbg
.GetOutputFile().Flush()
98 error
= test
.dbg
.SetOutputFile(orig_outf
)
99 test
.assertSuccess(error
, "Restored outfile")
100 test
.dbg
.SetInputFile(orig_inf
)
101 test
.assertSuccess(error
, "Restored infile")
103 def command_setup(self
, args
):
104 """Insert our command, if needed. Then set up event and barriers if needed.
105 Then return the command to run."""
107 self
.interp
= self
.dbg
.GetCommandInterpreter()
108 self
.command_name
= "interruptible_command"
109 self
.cmd_result
= lldb
.SBCommandReturnObject()
111 if not "check" in args
:
112 self
.event
= threading
.Event()
113 self
.result_barrier
= threading
.Barrier(2, timeout
=10)
114 self
.before_interrupt_barrier
= threading
.Barrier(2, timeout
=10)
115 self
.after_interrupt_barrier
= threading
.Barrier(2, timeout
=10)
118 self
.result_barrier
= None
119 self
.before_interrupt_barrier
= None
120 self
.after_interrupt_barrier
= None
122 if not self
.interp
.UserCommandExists(self
.command_name
):
123 # Make the command we're going to use - it spins calling WasInterrupted:
124 cmd_filename
= "interruptible"
125 cmd_filename
= os
.path
.join(self
.getSourceDir(), "interruptible.py")
126 self
.runCmd(f
"command script import {cmd_filename}")
127 cmd_string
= f
"command script add {self.command_name} --class interruptible.WelcomeCommand"
128 self
.runCmd(cmd_string
)
131 command
= self
.command_name
133 command
= self
.command_name
+ " " + args
136 def run_single_command(self
, command
):
137 # Now start up a thread to run the command:
139 self
.runner
= TestDebuggerInterruption
.DirectCommandRunner(self
, command
)
142 def start_command_interp(self
):
143 self
.runner
= TestDebuggerInterruption
.CommandInterpreterRunner(self
)
146 def check_text(self
, result_text
, interrupted
):
149 "Command was interrupted", result_text
, "Got the interrupted message"
153 "Command was not interrupted",
155 "Got the not interrupted message",
158 def gather_output(self
):
159 # Now wait for the interrupt to interrupt the command:
160 self
.runner
.join(10.0)
161 finished
= not self
.runner
.is_alive()
162 # Don't leave the runner thread stranded if the interrupt didn't work.
165 self
.runner
.join(10.0)
167 self
.assertTrue(finished
, "We did finish the command")
169 def check_result(self
, interrupted
=True):
171 self
.check_text(self
.result
.GetOutput(), interrupted
)
173 def check_result_output(self
, interrupted
=True):
176 # Okay, now open the file for reading, and read.
177 with
open(self
.out_filename
, "r") as f
:
180 self
.assertNotEqual(len(buffer), 0, "No command data")
181 self
.check_text(buffer, interrupted
)
183 def debugger_interrupt_test(self
, use_interrupt_requested
):
184 """Test that debugger interruption interrupts a command
185 running directly through HandleCommand.
186 If use_interrupt_requested is true, we'll check that API,
187 otherwise we'll check WasInterrupted. They should both do
190 if use_interrupt_requested
:
191 command
= self
.command_setup("debugger")
193 command
= self
.command_setup("interp")
195 self
.result
= lldb
.SBCommandReturnObject()
196 self
.run_single_command(command
)
198 # Okay now wait till the command has gotten started to issue the interrupt:
199 self
.before_interrupt_barrier
.wait()
200 # I'm going to do it twice here to test that it works as a counter:
201 self
.dbg
.RequestInterrupt()
202 self
.dbg
.RequestInterrupt()
205 self
.dbg
.CancelInterruptRequest()
207 self
.addTearDownHook(cleanup
)
208 # Okay, now set both sides going:
209 self
.after_interrupt_barrier
.wait()
211 # Check that the command was indeed interrupted. First rendevous
212 # after the runner thread had a chance to execute the command:
213 self
.result_barrier
.wait()
214 self
.assertTrue(self
.result
.Succeeded(), "Our command succeeded")
215 result_output
= self
.result
.GetOutput()
216 self
.check_result(True)
218 # Do it again to make sure that the counter is counting:
219 self
.dbg
.CancelInterruptRequest()
220 command
= self
.command_setup("debugger")
221 self
.run_single_command(command
)
223 # This time we won't even get to run the command, since HandleCommand
224 # checks for the interrupt state on entry, so we don't wait on the command
226 self
.result_barrier
.wait()
228 # Again check that we were
229 self
.assertFalse(self
.result
.Succeeded(), "Our command was not allowed to run")
230 error_output
= self
.result
.GetError()
232 "... Interrupted", error_output
, "Command was cut short by interrupt"
235 # Now take down the flag, and make sure that we aren't interrupted:
236 self
.dbg
.CancelInterruptRequest()
238 # Now make sure that we really did take down the flag:
239 command
= self
.command_setup("debugger check")
240 self
.run_single_command(command
)
241 result_output
= self
.result
.GetOutput()
242 self
.check_result(False)
244 def test_debugger_interrupt_use_dbg(self
):
245 self
.debugger_interrupt_test(True)
247 def test_debugger_interrupt_use_interp(self
):
248 self
.debugger_interrupt_test(False)
250 def test_interp_doesnt_interrupt_debugger(self
):
251 """Test that interpreter interruption does not interrupt a command
252 running directly through HandleCommand.
253 If use_interrupt_requested is true, we'll check that API,
254 otherwise we'll check WasInterrupted. They should both do
257 command
= self
.command_setup("debugger poll")
259 self
.result
= lldb
.SBCommandReturnObject()
260 self
.run_single_command(command
)
262 # Now raise the debugger interrupt flag. It will also interrupt the command:
263 self
.before_interrupt_barrier
.wait()
264 self
.dbg
.GetCommandInterpreter().InterruptCommand()
265 self
.after_interrupt_barrier
.wait()
267 # Check that the command was indeed interrupted:
268 self
.result_barrier
.wait()
269 self
.assertTrue(self
.result
.Succeeded(), "Our command succeeded")
270 result_output
= self
.result
.GetOutput()
271 self
.check_result(False)
273 def interruptible_command_test(self
, use_interrupt_requested
):
274 """Test that interpreter interruption interrupts a command
275 running in the RunCommandInterpreter loop.
276 If use_interrupt_requested is true, we'll check that API,
277 otherwise we'll check WasInterrupted. They should both do
280 self
.out_filename
= self
.getBuildArtifact("output")
281 self
.in_filename
= self
.getBuildArtifact("input")
282 # We're going to overwrite the input file, but we
283 # don't want data accumulating in the output file.
285 if os
.path
.exists(self
.out_filename
):
286 os
.unlink(self
.out_filename
)
288 # You should be able to use either check method interchangeably:
289 if use_interrupt_requested
:
290 self
.command
= self
.command_setup("debugger") + "\n"
292 self
.command
= self
.command_setup("interp") + "\n"
294 self
.start_command_interp()
296 # Now give the interpreter a chance to run this command up
297 # to the first barrier
298 self
.before_interrupt_barrier
.wait()
299 # Then issue the interrupt:
300 sent_interrupt
= self
.dbg
.GetCommandInterpreter().InterruptCommand()
301 self
.assertTrue(sent_interrupt
, "Did send command interrupt.")
302 # Now give the command a chance to finish:
303 self
.after_interrupt_barrier
.wait()
305 self
.check_result_output(True)
307 os
.unlink(self
.out_filename
)
309 # Now send the check command, and make sure the flag is now down.
310 self
.command
= self
.command_setup("interp check") + "\n"
311 self
.start_command_interp()
313 self
.check_result_output(False)
315 def test_interruptible_command_check_dbg(self
):
316 self
.interruptible_command_test(True)
318 def test_interruptible_command_check_interp(self
):
319 self
.interruptible_command_test(False)
321 def test_debugger_doesnt_interrupt_command(self
):
322 """Test that debugger interruption doesn't interrupt a command
323 running in the RunCommandInterpreter loop."""
325 self
.out_filename
= self
.getBuildArtifact("output")
326 self
.in_filename
= self
.getBuildArtifact("input")
327 # We're going to overwrite the input file, but we
328 # don't want data accumulating in the output file.
330 if os
.path
.exists(self
.out_filename
):
331 os
.unlink(self
.out_filename
)
333 self
.command
= self
.command_setup("interp poll") + "\n"
335 self
.start_command_interp()
337 self
.before_interrupt_barrier
.wait()
338 self
.dbg
.RequestInterrupt()
341 self
.dbg
.CancelInterruptRequest()
343 self
.addTearDownHook(cleanup
)
344 self
.after_interrupt_barrier
.wait()
346 self
.check_result_output(False)
348 os
.unlink(self
.out_filename
)