4 import gdbremote_testcase
5 from lldbsuite
.test
.decorators
import *
6 from lldbsuite
.test
.lldbtest
import *
7 from lldbsuite
.test
import lldbutil
10 class TestGdbRemoteThreadsInStopReply(gdbremote_testcase
.GdbRemoteTestCaseBase
):
11 ENABLE_THREADS_IN_STOP_REPLY_ENTRIES
= [
12 "read packet: $QListThreadsInStopReply#21",
13 "send packet: $OK#00",
16 def gather_stop_reply_fields(self
, thread_count
, field_names
):
17 context
, threads
= self
.launch_with_threads(thread_count
)
18 key_vals_text
= context
.get("stop_reply_kv")
19 self
.assertIsNotNone(key_vals_text
)
21 self
.reset_test_sequence()
22 self
.add_register_info_collection_packets()
23 self
.add_process_info_collection_packets()
25 context
= self
.expect_gdbremote_sequence()
26 self
.assertIsNotNone(context
)
27 hw_info
= self
.parse_hw_info(context
)
29 # Parse the stop reply contents.
30 kv_dict
= self
.parse_key_val_dict(key_vals_text
)
33 result
["pc_register"] = hw_info
["pc_register"]
34 result
["little_endian"] = hw_info
["little_endian"]
35 for key_field
in field_names
:
36 result
[key_field
] = kv_dict
.get(key_field
)
40 def gather_stop_reply_threads(self
, thread_count
):
41 # Pull out threads from stop response.
42 stop_reply_threads_text
= self
.gather_stop_reply_fields(
43 thread_count
, ["threads"]
45 if stop_reply_threads_text
:
47 int(thread_id
, 16) for thread_id
in stop_reply_threads_text
.split(",")
52 def gather_stop_reply_pcs(self
, thread_count
):
53 results
= self
.gather_stop_reply_fields(thread_count
, ["threads", "thread-pcs"])
57 threads_text
= results
["threads"]
58 pcs_text
= results
["thread-pcs"]
59 thread_ids
= threads_text
.split(",")
60 pcs
= pcs_text
.split(",")
61 self
.assertEqual(len(thread_ids
), len(pcs
))
64 for i
in range(0, len(pcs
)):
65 thread_pcs
[int(thread_ids
[i
], 16)] = pcs
[i
]
68 result
["thread_pcs"] = thread_pcs
69 result
["pc_register"] = results
["pc_register"]
70 result
["little_endian"] = results
["little_endian"]
73 def switch_endian(self
, egg
):
74 return "".join(reversed(re
.findall("..", egg
)))
76 def parse_hw_info(self
, context
):
77 self
.assertIsNotNone(context
)
78 process_info
= self
.parse_process_info_response(context
)
79 endian
= process_info
.get("endian")
80 reg_info
= self
.parse_register_info_packets(context
)
81 (pc_lldb_reg_index
, pc_reg_info
) = self
.find_pc_reg_info(reg_info
)
84 hw_info
["pc_register"] = pc_lldb_reg_index
85 hw_info
["little_endian"] = endian
== "little"
88 def gather_threads_info_pcs(self
, pc_register
, little_endian
):
89 self
.reset_test_sequence()
90 self
.test_sequence
.add_log_lines(
92 "read packet: $jThreadsInfo#c1",
95 "regex": r
"^\$(.*)#[0-9a-fA-F]{2}$",
96 "capture": {1: "threads_info"},
102 context
= self
.expect_gdbremote_sequence()
103 self
.assertIsNotNone(context
)
104 threads_info
= context
.get("threads_info")
105 register
= str(pc_register
)
106 # The jThreadsInfo response is not valid JSON data, so we have to
108 jthreads_info
= json
.loads(re
.sub(r
"}]", "}", threads_info
))
110 for thread_info
in jthreads_info
:
111 tid
= thread_info
["tid"]
112 pc
= thread_info
["registers"][register
]
113 thread_pcs
[tid
] = self
.switch_endian(pc
) if little_endian
else pc
117 def test_QListThreadsInStopReply_supported(self
):
119 self
.set_inferior_startup_launch()
120 procs
= self
.prep_debug_monitor_and_inferior()
121 self
.test_sequence
.add_log_lines(
122 self
.ENABLE_THREADS_IN_STOP_REPLY_ENTRIES
, True
125 context
= self
.expect_gdbremote_sequence()
126 self
.assertIsNotNone(context
)
129 @expectedFailureAll(oslist
=["windows"]) # Extra threads present
130 def test_stop_reply_reports_multiple_threads(self
):
132 self
.set_inferior_startup_launch()
133 # Gather threads from stop notification when QThreadsInStopReply is
135 self
.test_sequence
.add_log_lines(
136 self
.ENABLE_THREADS_IN_STOP_REPLY_ENTRIES
, True
138 stop_reply_threads
= self
.gather_stop_reply_threads(5)
139 self
.assertEqual(len(stop_reply_threads
), 5)
142 def test_no_QListThreadsInStopReply_supplies_no_threads(self
):
144 self
.set_inferior_startup_launch()
145 # Gather threads from stop notification when QThreadsInStopReply is not
147 stop_reply_threads
= self
.gather_stop_reply_threads(5)
148 self
.assertEqual(len(stop_reply_threads
), 0)
151 def test_stop_reply_reports_correct_threads(self
):
153 self
.set_inferior_startup_launch()
154 # Gather threads from stop notification when QThreadsInStopReply is
157 self
.test_sequence
.add_log_lines(
158 self
.ENABLE_THREADS_IN_STOP_REPLY_ENTRIES
, True
160 stop_reply_threads
= self
.gather_stop_reply_threads(thread_count
)
162 # Gather threads from q{f,s}ThreadInfo.
163 self
.reset_test_sequence()
164 self
.add_threadinfo_collection_packets()
166 context
= self
.expect_gdbremote_sequence()
167 self
.assertIsNotNone(context
)
169 threads
= self
.parse_threadinfo_packets(context
)
170 self
.assertIsNotNone(threads
)
171 self
.assertGreaterEqual(len(threads
), thread_count
)
173 # Ensure each thread in q{f,s}ThreadInfo appears in stop reply threads
175 self
.assertIn(tid
, stop_reply_threads
)
178 @skipIfWindows # Flaky on Windows
179 def test_stop_reply_contains_thread_pcs(self
):
181 self
.set_inferior_startup_launch()
183 self
.test_sequence
.add_log_lines(
184 self
.ENABLE_THREADS_IN_STOP_REPLY_ENTRIES
, True
186 results
= self
.gather_stop_reply_pcs(thread_count
)
187 stop_reply_pcs
= results
["thread_pcs"]
188 pc_register
= results
["pc_register"]
189 little_endian
= results
["little_endian"]
190 self
.assertGreaterEqual(len(stop_reply_pcs
), thread_count
)
192 threads_info_pcs
= self
.gather_threads_info_pcs(pc_register
, little_endian
)
194 self
.assertEqual(len(threads_info_pcs
), len(stop_reply_pcs
))
195 for thread_id
in stop_reply_pcs
:
196 self
.assertIn(thread_id
, threads_info_pcs
)
198 int(stop_reply_pcs
[thread_id
], 16), int(threads_info_pcs
[thread_id
], 16)