Run DCE after a LoopFlatten test to reduce spurious output [nfc]
[llvm-project.git] / lldb / test / API / python_api / event / TestEvents.py
blob3017c86a113d7d7228b1d81434661d949a6271fd
1 """
2 Test lldb Python event APIs.
3 """
5 import re
6 import lldb
7 from lldbsuite.test.decorators import *
8 from lldbsuite.test.lldbtest import *
9 from lldbsuite.test import lldbutil
12 @skipIfLinux # llvm.org/pr25924, sometimes generating SIGSEGV
13 class EventAPITestCase(TestBase):
14 NO_DEBUG_INFO_TESTCASE = True
16 def setUp(self):
17 # Call super's setUp().
18 TestBase.setUp(self)
19 # Find the line number to of function 'c'.
20 self.line = line_number(
21 "main.c", '// Find the line number of function "c" here.'
24 @expectedFailureAll(
25 oslist=["linux"], bugnumber="llvm.org/pr23730 Flaky, fails ~1/10 cases"
27 @skipIfWindows # This is flakey on Windows AND when it fails, it hangs: llvm.org/pr38373
28 @skipIfNetBSD
29 def test_listen_for_and_print_event(self):
30 """Exercise SBEvent API."""
31 self.build()
32 exe = self.getBuildArtifact("a.out")
34 self.dbg.SetAsync(True)
36 # Create a target by the debugger.
37 target = self.dbg.CreateTarget(exe)
38 self.assertTrue(target, VALID_TARGET)
40 # Now create a breakpoint on main.c by name 'c'.
41 breakpoint = target.BreakpointCreateByName("c", "a.out")
43 listener = lldb.SBListener("my listener")
45 # Now launch the process, and do not stop at the entry point.
46 error = lldb.SBError()
47 flags = target.GetLaunchInfo().GetLaunchFlags()
48 process = target.Launch(
49 listener,
50 None, # argv
51 None, # envp
52 None, # stdin_path
53 None, # stdout_path
54 None, # stderr_path
55 None, # working directory
56 flags, # launch flags
57 False, # Stop at entry
58 error,
59 ) # error
61 self.assertEqual(process.GetState(), lldb.eStateStopped, PROCESS_STOPPED)
63 # Create an empty event object.
64 event = lldb.SBEvent()
66 traceOn = self.TraceOn()
67 if traceOn:
68 lldbutil.print_stacktraces(process)
70 # Create MyListeningThread class to wait for any kind of event.
71 import threading
73 class MyListeningThread(threading.Thread):
74 def run(self):
75 count = 0
76 # Let's only try at most 4 times to retrieve any kind of event.
77 # After that, the thread exits.
78 while not count > 3:
79 if traceOn:
80 print("Try wait for event...")
81 if listener.WaitForEvent(5, event):
82 if traceOn:
83 desc = lldbutil.get_description(event)
84 print("Event description:", desc)
85 print("Event data flavor:", event.GetDataFlavor())
86 print(
87 "Process state:",
88 lldbutil.state_type_to_str(process.GetState()),
90 print()
91 else:
92 if traceOn:
93 print("timeout occurred waiting for event...")
94 count = count + 1
95 listener.Clear()
96 return
98 # Let's start the listening thread to retrieve the events.
99 my_thread = MyListeningThread()
100 my_thread.start()
102 # Use Python API to continue the process. The listening thread should be
103 # able to receive the state changed events.
104 process.Continue()
106 # Use Python API to kill the process. The listening thread should be
107 # able to receive the state changed event, too.
108 process.Kill()
110 # Wait until the 'MyListeningThread' terminates.
111 my_thread.join()
113 # Shouldn't we be testing against some kind of expectation here?
115 @expectedFlakeyLinux("llvm.org/pr23730") # Flaky, fails ~1/100 cases
116 @skipIfWindows # This is flakey on Windows AND when it fails, it hangs: llvm.org/pr38373
117 @skipIfNetBSD
118 def test_wait_for_event(self):
119 """Exercise SBListener.WaitForEvent() API."""
120 self.build()
121 exe = self.getBuildArtifact("a.out")
123 self.dbg.SetAsync(True)
125 # Create a target by the debugger.
126 target = self.dbg.CreateTarget(exe)
127 self.assertTrue(target, VALID_TARGET)
129 # Now create a breakpoint on main.c by name 'c'.
130 breakpoint = target.BreakpointCreateByName("c", "a.out")
131 self.trace("breakpoint:", breakpoint)
132 self.assertTrue(
133 breakpoint and breakpoint.GetNumLocations() == 1, VALID_BREAKPOINT
136 # Get the debugger listener.
137 listener = self.dbg.GetListener()
139 # Now launch the process, and do not stop at entry point.
140 error = lldb.SBError()
141 flags = target.GetLaunchInfo().GetLaunchFlags()
142 process = target.Launch(
143 listener,
144 None, # argv
145 None, # envp
146 None, # stdin_path
147 None, # stdout_path
148 None, # stderr_path
149 None, # working directory
150 flags, # launch flags
151 False, # Stop at entry
152 error,
153 ) # error
154 self.assertTrue(error.Success() and process, PROCESS_IS_VALID)
156 # Create an empty event object.
157 event = lldb.SBEvent()
158 self.assertFalse(event, "Event should not be valid initially")
160 # Create MyListeningThread to wait for any kind of event.
161 import threading
163 class MyListeningThread(threading.Thread):
164 def run(self):
165 count = 0
166 # Let's only try at most 3 times to retrieve any kind of event.
167 while not count > 3:
168 if listener.WaitForEvent(5, event):
169 self.context.trace("Got a valid event:", event)
170 self.context.trace("Event data flavor:", event.GetDataFlavor())
171 self.context.trace(
172 "Event type:", lldbutil.state_type_to_str(event.GetType())
174 listener.Clear()
175 return
176 count = count + 1
177 print("Timeout: listener.WaitForEvent")
178 listener.Clear()
179 return
181 # Use Python API to kill the process. The listening thread should be
182 # able to receive a state changed event.
183 process.Kill()
185 # Let's start the listening thread to retrieve the event.
186 my_thread = MyListeningThread()
187 my_thread.context = self
188 my_thread.start()
190 # Wait until the 'MyListeningThread' terminates.
191 my_thread.join()
193 self.assertTrue(event, "My listening thread successfully received an event")
195 @expectedFailureAll(
196 oslist=["linux"], bugnumber="llvm.org/pr23617 Flaky, fails ~1/10 cases"
198 @skipIfWindows # This is flakey on Windows AND when it fails, it hangs: llvm.org/pr38373
199 @expectedFailureNetBSD
200 def test_add_listener_to_broadcaster(self):
201 """Exercise some SBBroadcaster APIs."""
202 self.build()
203 exe = self.getBuildArtifact("a.out")
205 self.dbg.SetAsync(True)
207 # Create a target by the debugger.
208 target = self.dbg.CreateTarget(exe)
209 self.assertTrue(target, VALID_TARGET)
211 # Now create a breakpoint on main.c by name 'c'.
212 breakpoint = target.BreakpointCreateByName("c", "a.out")
213 self.trace("breakpoint:", breakpoint)
214 self.assertTrue(
215 breakpoint and breakpoint.GetNumLocations() == 1, VALID_BREAKPOINT
218 listener = lldb.SBListener("my listener")
220 # Now launch the process, and do not stop at the entry point.
221 error = lldb.SBError()
222 flags = target.GetLaunchInfo().GetLaunchFlags()
223 process = target.Launch(
224 listener,
225 None, # argv
226 None, # envp
227 None, # stdin_path
228 None, # stdout_path
229 None, # stderr_path
230 None, # working directory
231 flags, # launch flags
232 False, # Stop at entry
233 error,
234 ) # error
236 # Create an empty event object.
237 event = lldb.SBEvent()
238 self.assertFalse(event, "Event should not be valid initially")
240 # The finite state machine for our custom listening thread, with an
241 # initial state of None, which means no event has been received.
242 # It changes to 'connected' after 'connected' event is received (for remote platforms)
243 # It changes to 'running' after 'running' event is received (should happen only if the
244 # currentstate is either 'None' or 'connected')
245 # It changes to 'stopped' if a 'stopped' event is received (should happen only if the
246 # current state is 'running'.)
247 self.state = None
249 # Create MyListeningThread to wait for state changed events.
250 # By design, a "running" event is expected following by a "stopped"
251 # event.
252 import threading
254 class MyListeningThread(threading.Thread):
255 def run(self):
256 self.context.trace("Running MyListeningThread:", self)
258 # Regular expression pattern for the event description.
259 pattern = re.compile("data = {.*, state = (.*)}$")
261 # Let's only try at most 6 times to retrieve our events.
262 count = 0
263 while True:
264 if listener.WaitForEvent(5, event):
265 desc = lldbutil.get_description(event)
266 self.context.trace("Event description:", desc)
267 match = pattern.search(desc)
268 if not match:
269 break
270 if match.group(1) == "connected":
271 # When debugging remote targets with lldb-server, we
272 # first get the 'connected' event.
273 self.context.assertTrue(self.context.state is None)
274 self.context.state = "connected"
275 continue
276 elif match.group(1) == "running":
277 self.context.assertTrue(
278 self.context.state is None
279 or self.context.state == "connected"
281 self.context.state = "running"
282 continue
283 elif match.group(1) == "stopped":
284 self.context.assertTrue(self.context.state == "running")
285 # Whoopee, both events have been received!
286 self.context.state = "stopped"
287 break
288 else:
289 break
290 print("Timeout: listener.WaitForEvent")
291 count = count + 1
292 if count > 6:
293 break
294 listener.Clear()
295 return
297 # Use Python API to continue the process. The listening thread should be
298 # able to receive the state changed events.
299 process.Continue()
301 # Start the listening thread to receive the "running" followed by the
302 # "stopped" events.
303 my_thread = MyListeningThread()
304 # Supply the enclosing context so that our listening thread can access
305 # the 'state' variable.
306 my_thread.context = self
307 my_thread.start()
309 # Wait until the 'MyListeningThread' terminates.
310 my_thread.join()
312 # The final judgement. :-)
313 self.assertEqual(
314 self.state, "stopped", "Both expected state changed events received"
317 def wait_for_next_event(self, expected_state, test_shadow=False):
318 """Wait for an event from self.primary & self.shadow listener.
319 If test_shadow is true, we also check that the shadow listener only
320 receives events AFTER the primary listener does."""
321 # Waiting on the shadow listener shouldn't have events yet because
322 # we haven't fetched them for the primary listener yet:
323 event = lldb.SBEvent()
325 if test_shadow:
326 success = self.shadow_listener.WaitForEvent(1, event)
327 self.assertFalse(success, "Shadow listener doesn't pull events")
329 # But there should be an event for the primary listener:
330 success = self.primary_listener.WaitForEvent(5, event)
331 self.assertTrue(success, "Primary listener got the event")
333 state = lldb.SBProcess.GetStateFromEvent(event)
334 restart = False
335 if state == lldb.eStateStopped:
336 restart = lldb.SBProcess.GetRestartedFromEvent(event)
338 if expected_state != None:
339 self.assertEqual(
340 state, expected_state, "Primary thread got the correct event"
343 # And after pulling that one there should be an equivalent event for the shadow
344 # listener:
345 success = self.shadow_listener.WaitForEvent(5, event)
346 self.assertTrue(success, "Shadow listener got event too")
347 self.assertEqual(
348 state, lldb.SBProcess.GetStateFromEvent(event), "It was the same event"
350 self.assertEqual(
351 restart,
352 lldb.SBProcess.GetRestartedFromEvent(event),
353 "It was the same restarted",
356 return state, restart
358 @expectedFlakeyLinux("llvm.org/pr23730") # Flaky, fails ~1/100 cases
359 @skipIfWindows # This is flakey on Windows AND when it fails, it hangs: llvm.org/pr38373
360 @skipIfNetBSD
361 def test_shadow_listener(self):
362 self.build()
363 exe = self.getBuildArtifact("a.out")
365 # Create a target by the debugger.
366 target = self.dbg.CreateTarget(exe)
367 self.assertTrue(target, VALID_TARGET)
369 # Now create a breakpoint on main.c by name 'c'.
370 bkpt1 = target.BreakpointCreateByName("c", "a.out")
371 self.trace("breakpoint:", bkpt1)
372 self.assertTrue(bkpt1.GetNumLocations() == 1, VALID_BREAKPOINT)
374 self.primary_listener = lldb.SBListener("my listener")
375 self.shadow_listener = lldb.SBListener("shadow listener")
377 self.cur_thread = None
379 error = lldb.SBError()
380 launch_info = target.GetLaunchInfo()
381 launch_info.SetListener(self.primary_listener)
382 launch_info.SetShadowListener(self.shadow_listener)
384 self.runCmd(
385 "settings set target.process.extra-startup-command QSetLogging:bitmask=LOG_PROCESS|LOG_EXCEPTIONS|LOG_RNB_PACKETS|LOG_STEP;"
387 self.dbg.SetAsync(True)
389 self.process = target.Launch(launch_info, error)
390 self.assertSuccess(error, "Process launched successfully")
392 # Keep fetching events from the primary to trigger the do on removal and
393 # then from the shadow listener, and make sure they match:
395 # Events in the launch sequence might be platform dependent, so don't
396 # expect any particular event till we get the stopped:
397 state = lldb.eStateInvalid
398 while state != lldb.eStateStopped:
399 state, restart = self.wait_for_next_event(None, False)
401 # Okay, we're now at a good stop, so try a next:
402 self.cur_thread = self.process.threads[0]
404 # Make sure we're at our expected breakpoint:
405 self.assertTrue(self.cur_thread.IsValid(), "Got a zeroth thread")
406 self.assertEqual(self.cur_thread.stop_reason, lldb.eStopReasonBreakpoint)
407 self.assertEqual(
408 self.cur_thread.GetStopReasonDataCount(), 2, "Only one breakpoint/loc here"
410 self.assertEqual(
411 bkpt1.GetID(),
412 self.cur_thread.GetStopReasonDataAtIndex(0),
413 "Hit the right breakpoint",
415 # Disable the first breakpoint so it doesn't get in the way...
416 bkpt1.SetEnabled(False)
418 self.cur_thread.StepOver()
419 # We'll run the test for "shadow listener blocked by primary listener
420 # for the first couple rounds, then we'll skip the 1 second pause...
421 self.wait_for_next_event(lldb.eStateRunning, True)
422 self.wait_for_next_event(lldb.eStateStopped, True)
424 # Next try an auto-continue breakpoint and make sure the shadow listener got
425 # the resumed info as well. Note that I'm not explicitly counting
426 # running events here. At the point when I wrote this lldb sometimes
427 # emits two running events in a row. Apparently the code to coalesce running
428 # events isn't working. But that's not what this test is testing, we're really
429 # testing that the primary & shadow listeners hear the same thing and in the
430 # right order.
432 main_spec = lldb.SBFileSpec("main.c")
433 bkpt2 = target.BreakpointCreateBySourceRegex("b.2. returns %d", main_spec)
434 self.assertTrue(bkpt2.GetNumLocations() > 0, "BP2 worked")
435 bkpt2.SetAutoContinue(True)
437 bkpt3 = target.BreakpointCreateBySourceRegex("a.3. returns %d", main_spec)
438 self.assertTrue(bkpt3.GetNumLocations() > 0, "BP3 worked")
440 state = lldb.eStateStopped
441 restarted = False
443 # Put in a counter to make sure we don't spin forever if there is some
444 # error in the logic.
445 counter = 0
446 while state != lldb.eStateExited:
447 counter += 1
448 self.assertLess(
449 counter, 50, "Took more than 50 events to hit two breakpoints."
451 if state == lldb.eStateStopped and not restarted:
452 self.process.Continue()
453 state, restarted = self.wait_for_next_event(None, False)