[C++20] [Modules] Fix may-be incorrect ADL for module local entities (#123931)
[llvm-project.git] / lldb / test / API / python_api / event / TestEvents.py
blobfb1a7e3bc6d3ac109ba00c892f16b3a58e0ae5eb
1 """
2 Test lldb Python event APIs.
3 """
5 import re
6 import lldb
7 from lldbsuite.test.decorators import *
8 from lldbsuite.test.lldbtest import *
9 from lldbsuite.test import lldbutil
10 import random
12 @skipIfLinux # llvm.org/pr25924, sometimes generating SIGSEGV
13 class EventAPITestCase(TestBase):
14 NO_DEBUG_INFO_TESTCASE = True
16 def setUp(self):
17 # Call super's setUp().
18 TestBase.setUp(self)
19 # Find the line number to of function 'c'.
20 self.line = line_number(
21 "main.c", '// Find the line number of function "c" here.'
23 random.seed()
25 @expectedFailureAll(
26 oslist=["linux"], bugnumber="llvm.org/pr23730 Flaky, fails ~1/10 cases"
28 @skipIfWindows # This is flakey on Windows AND when it fails, it hangs: llvm.org/pr38373
29 @skipIfNetBSD
30 def test_listen_for_and_print_event(self):
31 """Exercise SBEvent API."""
32 self.build()
33 exe = self.getBuildArtifact("a.out")
35 self.dbg.SetAsync(True)
37 # Create a target by the debugger.
38 target = self.dbg.CreateTarget(exe)
39 self.assertTrue(target, VALID_TARGET)
41 # Now create a breakpoint on main.c by name 'c'.
42 breakpoint = target.BreakpointCreateByName("c", "a.out")
44 listener = lldb.SBListener("my listener")
46 # Now launch the process, and do not stop at the entry point.
47 error = lldb.SBError()
48 flags = target.GetLaunchInfo().GetLaunchFlags()
49 process = target.Launch(
50 listener,
51 None, # argv
52 None, # envp
53 None, # stdin_path
54 None, # stdout_path
55 None, # stderr_path
56 None, # working directory
57 flags, # launch flags
58 False, # Stop at entry
59 error,
60 ) # error
62 self.assertEqual(process.GetState(), lldb.eStateStopped, PROCESS_STOPPED)
64 # Create an empty event object.
65 event = lldb.SBEvent()
67 traceOn = self.TraceOn()
68 if traceOn:
69 lldbutil.print_stacktraces(process)
71 # Create MyListeningThread class to wait for any kind of event.
72 import threading
74 class MyListeningThread(threading.Thread):
75 def run(self):
76 count = 0
77 # Let's only try at most 4 times to retrieve any kind of event.
78 # After that, the thread exits.
79 while not count > 3:
80 if traceOn:
81 print("Try wait for event...")
82 if listener.WaitForEvent(5, event):
83 if traceOn:
84 desc = lldbutil.get_description(event)
85 print("Event description:", desc)
86 print("Event data flavor:", event.GetDataFlavor())
87 print(
88 "Process state:",
89 lldbutil.state_type_to_str(process.GetState()),
91 print()
92 else:
93 if traceOn:
94 print("timeout occurred waiting for event...")
95 count = count + 1
96 listener.Clear()
97 return
99 # Let's start the listening thread to retrieve the events.
100 my_thread = MyListeningThread()
101 my_thread.start()
103 # Use Python API to continue the process. The listening thread should be
104 # able to receive the state changed events.
105 process.Continue()
107 # Use Python API to kill the process. The listening thread should be
108 # able to receive the state changed event, too.
109 process.Kill()
111 # Wait until the 'MyListeningThread' terminates.
112 my_thread.join()
114 # Shouldn't we be testing against some kind of expectation here?
116 @expectedFlakeyLinux("llvm.org/pr23730") # Flaky, fails ~1/100 cases
117 @skipIfWindows # This is flakey on Windows AND when it fails, it hangs: llvm.org/pr38373
118 @skipIfNetBSD
119 def test_wait_for_event(self):
120 """Exercise SBListener.WaitForEvent() API."""
121 self.build()
122 exe = self.getBuildArtifact("a.out")
124 self.dbg.SetAsync(True)
126 # Create a target by the debugger.
127 target = self.dbg.CreateTarget(exe)
128 self.assertTrue(target, VALID_TARGET)
130 # Now create a breakpoint on main.c by name 'c'.
131 breakpoint = target.BreakpointCreateByName("c", "a.out")
132 self.trace("breakpoint:", breakpoint)
133 self.assertTrue(
134 breakpoint and breakpoint.GetNumLocations() == 1, VALID_BREAKPOINT
137 # Get the debugger listener.
138 listener = self.dbg.GetListener()
140 # Now launch the process, and do not stop at entry point.
141 error = lldb.SBError()
142 flags = target.GetLaunchInfo().GetLaunchFlags()
143 process = target.Launch(
144 listener,
145 None, # argv
146 None, # envp
147 None, # stdin_path
148 None, # stdout_path
149 None, # stderr_path
150 None, # working directory
151 flags, # launch flags
152 False, # Stop at entry
153 error,
154 ) # error
155 self.assertTrue(error.Success() and process, PROCESS_IS_VALID)
157 # Create an empty event object.
158 event = lldb.SBEvent()
159 self.assertFalse(event, "Event should not be valid initially")
161 # Create MyListeningThread to wait for any kind of event.
162 import threading
164 class MyListeningThread(threading.Thread):
165 def run(self):
166 count = 0
167 # Let's only try at most 3 times to retrieve any kind of event.
168 while not count > 3:
169 if listener.WaitForEvent(5, event):
170 self.context.trace("Got a valid event:", event)
171 self.context.trace("Event data flavor:", event.GetDataFlavor())
172 self.context.trace(
173 "Event type:", lldbutil.state_type_to_str(event.GetType())
175 listener.Clear()
176 return
177 count = count + 1
178 print("Timeout: listener.WaitForEvent")
179 listener.Clear()
180 return
182 # Use Python API to kill the process. The listening thread should be
183 # able to receive a state changed event.
184 process.Kill()
186 # Let's start the listening thread to retrieve the event.
187 my_thread = MyListeningThread()
188 my_thread.context = self
189 my_thread.start()
191 # Wait until the 'MyListeningThread' terminates.
192 my_thread.join()
194 self.assertTrue(event, "My listening thread successfully received an event")
196 @expectedFailureAll(
197 oslist=["linux"], bugnumber="llvm.org/pr23617 Flaky, fails ~1/10 cases"
199 @skipIfWindows # This is flakey on Windows AND when it fails, it hangs: llvm.org/pr38373
200 @expectedFailureNetBSD
201 def test_add_listener_to_broadcaster(self):
202 """Exercise some SBBroadcaster APIs."""
203 self.build()
204 exe = self.getBuildArtifact("a.out")
206 self.dbg.SetAsync(True)
208 # Create a target by the debugger.
209 target = self.dbg.CreateTarget(exe)
210 self.assertTrue(target, VALID_TARGET)
212 # Now create a breakpoint on main.c by name 'c'.
213 breakpoint = target.BreakpointCreateByName("c", "a.out")
214 self.trace("breakpoint:", breakpoint)
215 self.assertTrue(
216 breakpoint and breakpoint.GetNumLocations() == 1, VALID_BREAKPOINT
219 listener = lldb.SBListener("my listener")
221 # Now launch the process, and do not stop at the entry point.
222 error = lldb.SBError()
223 flags = target.GetLaunchInfo().GetLaunchFlags()
224 process = target.Launch(
225 listener,
226 None, # argv
227 None, # envp
228 None, # stdin_path
229 None, # stdout_path
230 None, # stderr_path
231 None, # working directory
232 flags, # launch flags
233 False, # Stop at entry
234 error,
235 ) # error
237 # Create an empty event object.
238 event = lldb.SBEvent()
239 self.assertFalse(event, "Event should not be valid initially")
241 # The finite state machine for our custom listening thread, with an
242 # initial state of None, which means no event has been received.
243 # It changes to 'connected' after 'connected' event is received (for remote platforms)
244 # It changes to 'running' after 'running' event is received (should happen only if the
245 # currentstate is either 'None' or 'connected')
246 # It changes to 'stopped' if a 'stopped' event is received (should happen only if the
247 # current state is 'running'.)
248 self.state = None
250 # Create MyListeningThread to wait for state changed events.
251 # By design, a "running" event is expected following by a "stopped"
252 # event.
253 import threading
255 class MyListeningThread(threading.Thread):
256 def run(self):
257 self.context.trace("Running MyListeningThread:", self)
259 # Regular expression pattern for the event description.
260 pattern = re.compile("data = {.*, state = (.*)}$")
262 # Let's only try at most 6 times to retrieve our events.
263 count = 0
264 while True:
265 if listener.WaitForEvent(5, event):
266 desc = lldbutil.get_description(event)
267 self.context.trace("Event description:", desc)
268 match = pattern.search(desc)
269 if not match:
270 break
271 if match.group(1) == "connected":
272 # When debugging remote targets with lldb-server, we
273 # first get the 'connected' event.
274 self.context.assertTrue(self.context.state is None)
275 self.context.state = "connected"
276 continue
277 elif match.group(1) == "running":
278 self.context.assertTrue(
279 self.context.state is None
280 or self.context.state == "connected"
282 self.context.state = "running"
283 continue
284 elif match.group(1) == "stopped":
285 self.context.assertTrue(self.context.state == "running")
286 # Whoopee, both events have been received!
287 self.context.state = "stopped"
288 break
289 else:
290 break
291 print("Timeout: listener.WaitForEvent")
292 count = count + 1
293 if count > 6:
294 break
295 listener.Clear()
296 return
298 # Use Python API to continue the process. The listening thread should be
299 # able to receive the state changed events.
300 process.Continue()
302 # Start the listening thread to receive the "running" followed by the
303 # "stopped" events.
304 my_thread = MyListeningThread()
305 # Supply the enclosing context so that our listening thread can access
306 # the 'state' variable.
307 my_thread.context = self
308 my_thread.start()
310 # Wait until the 'MyListeningThread' terminates.
311 my_thread.join()
313 # The final judgement. :-)
314 self.assertEqual(
315 self.state, "stopped", "Both expected state changed events received"
318 def wait_for_next_event(self, expected_state, test_shadow=False):
319 """Wait for an event from self.primary & self.shadow listener.
320 If test_shadow is true, we also check that the shadow listener only
321 receives events AFTER the primary listener does."""
322 import stop_hook
323 # Waiting on the shadow listener shouldn't have events yet because
324 # we haven't fetched them for the primary listener yet:
325 event = lldb.SBEvent()
327 if test_shadow:
328 success = self.shadow_listener.WaitForEvent(1, event)
329 self.assertFalse(success, "Shadow listener doesn't pull events")
331 # But there should be an event for the primary listener:
332 success = self.primary_listener.WaitForEvent(5, event)
334 self.assertTrue(success, "Primary listener got the event")
336 state = lldb.SBProcess.GetStateFromEvent(event)
337 primary_event_type = event.GetType()
338 restart = False
339 if state == lldb.eStateStopped:
340 restart = lldb.SBProcess.GetRestartedFromEvent(event)
341 # This counter is matching the stop hooks, which don't get run
342 # for auto-restarting stops.
343 if not restart:
344 self.stop_counter += 1
345 self.assertEqual(
346 stop_hook.StopHook.counter[self.instance],
347 self.stop_counter,
348 "matching stop hook",
351 if expected_state is not None:
352 self.assertEqual(
353 state, expected_state, "Primary thread got the correct event"
356 # And after pulling that one there should be an equivalent event for the shadow
357 # listener:
358 success = self.shadow_listener.WaitForEvent(5, event)
359 self.assertTrue(success, "Shadow listener got event too")
360 shadow_event_type = event.GetType()
361 self.assertEqual(
362 primary_event_type, shadow_event_type, "It was the same event type"
364 self.assertEqual(
365 state, lldb.SBProcess.GetStateFromEvent(event), "It was the same state"
367 self.assertEqual(
368 restart,
369 lldb.SBProcess.GetRestartedFromEvent(event),
370 "It was the same restarted",
372 return state, restart
374 @expectedFlakeyLinux("llvm.org/pr23730") # Flaky, fails ~1/100 cases
375 @skipIfWindows # This is flakey on Windows AND when it fails, it hangs: llvm.org/pr38373
376 @skipIfNetBSD
377 def test_shadow_listener(self):
378 self.build()
379 exe = self.getBuildArtifact("a.out")
381 # Create a target by the debugger.
382 target = self.dbg.CreateTarget(exe)
383 self.assertTrue(target, VALID_TARGET)
385 # Now create a breakpoint on main.c by name 'c'.
386 bkpt1 = target.BreakpointCreateByName("c", "a.out")
387 self.trace("breakpoint:", bkpt1)
388 self.assertEqual(bkpt1.GetNumLocations(), 1, VALID_BREAKPOINT)
390 self.primary_listener = lldb.SBListener("my listener")
391 self.shadow_listener = lldb.SBListener("shadow listener")
393 self.cur_thread = None
395 error = lldb.SBError()
396 launch_info = target.GetLaunchInfo()
397 launch_info.SetListener(self.primary_listener)
398 launch_info.SetShadowListener(self.shadow_listener)
400 self.runCmd(
401 "settings set target.process.extra-startup-command QSetLogging:bitmask=LOG_PROCESS|LOG_EXCEPTIONS|LOG_RNB_PACKETS|LOG_STEP;"
403 self.dbg.SetAsync(True)
405 # Now make our stop hook - we want to ensure it stays up to date with
406 # the events. We can't get our hands on the stop-hook instance directly,
407 # so we'll pass in an instance key, and use that to retrieve the data from
408 # this instance of the stop hook:
409 self.instance = f"Key{random.randint(0,10000)}"
410 stop_hook_path = os.path.join(self.getSourceDir(), "stop_hook.py")
411 self.runCmd(f"command script import {stop_hook_path}")
412 import stop_hook
414 self.runCmd(
415 f"target stop-hook add -P stop_hook.StopHook -k instance -v {self.instance}"
417 self.stop_counter = 0
419 self.process = target.Launch(launch_info, error)
420 self.assertSuccess(error, "Process launched successfully")
422 # Keep fetching events from the primary to trigger the do on removal and
423 # then from the shadow listener, and make sure they match:
425 # Events in the launch sequence might be platform dependent, so don't
426 # expect any particular event till we get the stopped:
427 state = lldb.eStateInvalid
429 while state != lldb.eStateStopped:
430 state, restart = self.wait_for_next_event(None, False)
432 # Okay, we're now at a good stop, so try a next:
433 self.cur_thread = self.process.threads[0]
435 # Make sure we're at our expected breakpoint:
436 self.assertTrue(self.cur_thread.IsValid(), "Got a zeroth thread")
437 self.assertEqual(self.cur_thread.stop_reason, lldb.eStopReasonBreakpoint)
438 self.assertEqual(
439 self.cur_thread.GetStopReasonDataCount(), 2, "Only one breakpoint/loc here"
441 self.assertEqual(
442 bkpt1.GetID(),
443 self.cur_thread.GetStopReasonDataAtIndex(0),
444 "Hit the right breakpoint",
447 self.cur_thread.StepOver()
448 # We'll run the test for "shadow listener blocked by primary listener
449 # for the first couple rounds, then we'll skip the 1 second pause...
450 self.wait_for_next_event(lldb.eStateRunning, True)
451 self.wait_for_next_event(lldb.eStateStopped, True)
453 # Next try an auto-continue breakpoint and make sure the shadow listener got
454 # the resumed info as well. Note that I'm not explicitly counting
455 # running events here. At the point when I wrote this lldb sometimes
456 # emits two running events in a row. Apparently the code to coalesce running
457 # events isn't working. But that's not what this test is testing, we're really
458 # testing that the primary & shadow listeners hear the same thing and in the
459 # right order.
461 main_spec = lldb.SBFileSpec("main.c")
462 bkpt2 = target.BreakpointCreateBySourceRegex("b.2. returns %d", main_spec)
463 self.assertGreater(bkpt2.GetNumLocations(), 0, "BP2 worked")
464 bkpt2.SetAutoContinue(True)
466 bkpt3 = target.BreakpointCreateBySourceRegex("a.3. returns %d", main_spec)
467 self.assertGreater(bkpt3.GetNumLocations(), 0, "BP3 worked")
469 state = lldb.eStateStopped
470 restarted = False
472 # Put in a counter to make sure we don't spin forever if there is some
473 # error in the logic.
474 counter = 0
475 while state != lldb.eStateExited:
476 counter += 1
477 self.assertLess(
478 counter, 50, "Took more than 50 events to hit two breakpoints."
480 if state == lldb.eStateStopped and not restarted:
481 self.process.Continue()
483 state, restarted = self.wait_for_next_event(None, False)
485 # Now make sure we agree with the stop hook counter:
486 self.assertEqual(self.stop_counter, stop_hook.StopHook.counter[self.instance])
487 self.assertEqual(stop_hook.StopHook.non_stops[self.instance], 0, "No non stops")