3 from lldbsuite
.test
.decorators
import *
4 from lldbsuite
.test
.lldbtest
import *
6 from fork_testbase
import GdbRemoteForkTestBase
9 class TestGdbRemoteFork(GdbRemoteForkTestBase
):
11 GdbRemoteForkTestBase
.setUp(self
)
12 if self
.getPlatform() == "linux" and self
.getArchitecture() in [
16 self
.skipTest("Unsupported for Arm/AArch64 Linux")
18 @add_test_categories(["fork"])
19 def test_fork_multithreaded(self
):
20 _
, _
, child_pid
, _
= self
.start_fork_test(["thread:new"] * 2 + ["fork"])
22 # detach the forked child
23 self
.test_sequence
.add_log_lines(
25 "read packet: $D;{}#00".format(child_pid
),
26 "send packet: $OK#00",
31 self
.expect_gdbremote_sequence()
33 @add_test_categories(["fork"])
35 parent_pid
, _
= self
.fork_and_detach_test("fork")
38 self
.test_sequence
.add_log_lines(
41 "send packet: $W00;process:{}#00".format(parent_pid
),
45 self
.expect_gdbremote_sequence()
47 @add_test_categories(["fork"])
49 parent_pid
, parent_tid
= self
.fork_and_detach_test("vfork")
52 self
.test_sequence
.add_log_lines(
57 "regex": r
"[$]T[0-9a-fA-F]{{2}}thread:p{}[.]{}.*vforkdone.*".format(
58 parent_pid
, parent_tid
62 "send packet: $W00;process:{}#00".format(parent_pid
),
66 self
.expect_gdbremote_sequence()
68 @add_test_categories(["fork"])
69 def test_fork_follow(self
):
70 self
.fork_and_follow_test("fork")
72 @add_test_categories(["fork"])
73 def test_vfork_follow(self
):
74 self
.fork_and_follow_test("vfork")
76 @add_test_categories(["fork"])
77 def test_select_wrong_pid(self
):
79 self
.prep_debug_monitor_and_inferior()
80 self
.add_qSupported_packets(["multiprocess+"])
81 ret
= self
.expect_gdbremote_sequence()
82 self
.assertIn("multiprocess+", ret
["qSupported_response"])
83 self
.reset_test_sequence()
86 self
.test_sequence
.add_log_lines(
88 "read packet: $qC#00",
91 "regex": "[$]QCp([0-9a-f]+).([0-9a-f]+)#.*",
92 "capture": {1: "pid", 2: "tid"},
97 ret
= self
.expect_gdbremote_sequence()
98 pid
, tid
= (int(ret
[x
], 16) for x
in ("pid", "tid"))
99 self
.reset_test_sequence()
101 self
.test_sequence
.add_log_lines(
103 # try switching to correct pid
104 "read packet: $Hgp{:x}.{:x}#00".format(pid
, tid
),
105 "send packet: $OK#00",
106 "read packet: $Hcp{:x}.{:x}#00".format(pid
, tid
),
107 "send packet: $OK#00",
108 # try switching to invalid tid
109 "read packet: $Hgp{:x}.{:x}#00".format(pid
, tid
+ 1),
110 "send packet: $E15#00",
111 "read packet: $Hcp{:x}.{:x}#00".format(pid
, tid
+ 1),
112 "send packet: $E15#00",
113 # try switching to invalid pid
114 "read packet: $Hgp{:x}.{:x}#00".format(pid
+ 1, tid
),
115 "send packet: $Eff#00",
116 "read packet: $Hcp{:x}.{:x}#00".format(pid
+ 1, tid
),
117 "send packet: $Eff#00",
121 self
.expect_gdbremote_sequence()
123 @add_test_categories(["fork"])
124 def test_detach_current(self
):
126 self
.prep_debug_monitor_and_inferior()
127 self
.add_qSupported_packets(["multiprocess+"])
128 ret
= self
.expect_gdbremote_sequence()
129 self
.assertIn("multiprocess+", ret
["qSupported_response"])
130 self
.reset_test_sequence()
133 self
.test_sequence
.add_log_lines(
135 "read packet: $qC#00",
138 "regex": "[$]QCp([0-9a-f]+).[0-9a-f]+#.*",
139 "capture": {1: "pid"},
144 ret
= self
.expect_gdbremote_sequence()
146 self
.reset_test_sequence()
149 self
.test_sequence
.add_log_lines(
151 "read packet: $D;{}#00".format(pid
),
152 "send packet: $OK#00",
153 "read packet: $qC#00",
154 "send packet: $E44#00",
158 self
.expect_gdbremote_sequence()
160 @add_test_categories(["fork"])
161 def test_detach_all(self
):
162 self
.detach_all_test()
164 @add_test_categories(["fork"])
165 def test_kill_all(self
):
166 parent_pid
, _
, child_pid
, _
= self
.start_fork_test(["fork"])
168 exit_regex
= "[$]X09;process:([0-9a-f]+)#.*"
169 self
.test_sequence
.add_log_lines(
172 "read packet: $k#00",
173 {"direction": "send", "regex": exit_regex
, "capture": {1: "pid1"}},
174 {"direction": "send", "regex": exit_regex
, "capture": {1: "pid2"}},
178 ret
= self
.expect_gdbremote_sequence()
179 self
.assertEqual(set([ret
["pid1"], ret
["pid2"]]), set([parent_pid
, child_pid
]))
181 @add_test_categories(["fork"])
182 def test_vkill_child(self
):
183 self
.vkill_test(kill_child
=True)
185 @add_test_categories(["fork"])
186 def test_vkill_parent(self
):
187 self
.vkill_test(kill_parent
=True)
189 @add_test_categories(["fork"])
190 def test_vkill_both(self
):
191 self
.vkill_test(kill_parent
=True, kill_child
=True)
193 @add_test_categories(["fork"])
194 def test_c_parent(self
):
195 self
.resume_one_test(run_order
=["parent", "parent"])
197 @add_test_categories(["fork"])
198 def test_c_child(self
):
199 self
.resume_one_test(run_order
=["child", "child"])
201 @add_test_categories(["fork"])
202 def test_c_parent_then_child(self
):
203 self
.resume_one_test(run_order
=["parent", "parent", "child", "child"])
205 @add_test_categories(["fork"])
206 def test_c_child_then_parent(self
):
207 self
.resume_one_test(run_order
=["child", "child", "parent", "parent"])
209 @add_test_categories(["fork"])
210 def test_c_interspersed(self
):
211 self
.resume_one_test(run_order
=["parent", "child", "parent", "child"])
213 @add_test_categories(["fork"])
214 def test_vCont_parent(self
):
215 self
.resume_one_test(run_order
=["parent", "parent"], use_vCont
=True)
217 @add_test_categories(["fork"])
218 def test_vCont_child(self
):
219 self
.resume_one_test(run_order
=["child", "child"], use_vCont
=True)
221 @add_test_categories(["fork"])
222 def test_vCont_parent_then_child(self
):
223 self
.resume_one_test(
224 run_order
=["parent", "parent", "child", "child"], use_vCont
=True
227 @add_test_categories(["fork"])
228 def test_vCont_child_then_parent(self
):
229 self
.resume_one_test(
230 run_order
=["child", "child", "parent", "parent"], use_vCont
=True
233 @add_test_categories(["fork"])
234 def test_vCont_interspersed(self
):
235 self
.resume_one_test(
236 run_order
=["parent", "child", "parent", "child"], use_vCont
=True
239 @add_test_categories(["fork"])
240 def test_vCont_two_processes(self
):
241 parent_pid
, parent_tid
, child_pid
, child_tid
= self
.start_fork_test(
245 self
.test_sequence
.add_log_lines(
247 # try to resume both processes
248 "read packet: $vCont;c:p{}.{};c:p{}.{}#00".format(
249 parent_pid
, parent_tid
, child_pid
, child_tid
251 "send packet: $E03#00",
255 self
.expect_gdbremote_sequence()
257 @add_test_categories(["fork"])
258 def test_vCont_all_processes_explicit(self
):
259 self
.start_fork_test(["fork", "stop"])
261 self
.test_sequence
.add_log_lines(
263 # try to resume all processes implicitly
264 "read packet: $vCont;c:p-1.-1#00",
265 "send packet: $E03#00",
269 self
.expect_gdbremote_sequence()
271 @add_test_categories(["fork"])
272 def test_vCont_all_processes_implicit(self
):
273 self
.start_fork_test(["fork", "stop"])
275 self
.test_sequence
.add_log_lines(
277 # try to resume all processes implicitly
278 "read packet: $vCont;c#00",
279 "send packet: $E03#00",
283 self
.expect_gdbremote_sequence()
285 @add_test_categories(["fork"])
286 def test_threadinfo(self
):
287 parent_pid
, parent_tid
, child_pid
, child_tid
= self
.start_fork_test(
288 ["fork", "thread:new", "stop"]
291 (parent_pid
, parent_tid
),
292 (child_pid
, child_tid
),
295 self
.add_threadinfo_collection_packets()
296 ret
= self
.expect_gdbremote_sequence()
297 prev_pidtids
= set(self
.parse_threadinfo_packets(ret
))
300 frozenset((int(pid
, 16), int(tid
, 16)) for pid
, tid
in pidtids
),
302 self
.reset_test_sequence()
304 for pidtid
in pidtids
:
305 self
.test_sequence
.add_log_lines(
307 "read packet: $Hcp{}.{}#00".format(*pidtid
),
308 "send packet: $OK#00",
309 "read packet: $c#00",
312 "regex": self
.stop_regex
.format(*pidtid
),
317 self
.add_threadinfo_collection_packets()
318 ret
= self
.expect_gdbremote_sequence()
319 self
.reset_test_sequence()
320 new_pidtids
= set(self
.parse_threadinfo_packets(ret
))
321 added_pidtid
= new_pidtids
- prev_pidtids
322 prev_pidtids
= new_pidtids
324 # verify that we've got exactly one new thread, and that
326 self
.assertEqual(len(added_pidtid
), 1)
327 self
.assertEqual(added_pidtid
.pop()[0], int(pidtid
[0], 16))
329 for pidtid
in new_pidtids
:
330 self
.test_sequence
.add_log_lines(
332 "read packet: $Hgp{:x}.{:x}#00".format(*pidtid
),
333 "send packet: $OK#00",
337 self
.expect_gdbremote_sequence()
339 @add_test_categories(["fork"])
340 def test_memory_read_write(self
):
342 INITIAL_DATA
= "Initial message"
343 self
.prep_debug_monitor_and_inferior(
345 "set-message:{}".format(INITIAL_DATA
),
346 "get-data-address-hex:g_message",
352 self
.add_qSupported_packets(["multiprocess+", "fork-events+"])
353 ret
= self
.expect_gdbremote_sequence()
354 self
.assertIn("fork-events+", ret
["qSupported_response"])
355 self
.reset_test_sequence()
357 # continue and expect fork
358 self
.test_sequence
.add_log_lines(
360 "read packet: $c#00",
362 "type": "output_match",
363 "regex": self
.maybe_strict_output_regex(
364 r
"data address: 0x([0-9a-fA-F]+)\r\n"
366 "capture": {1: "addr"},
370 "regex": self
.fork_regex
.format("fork"),
371 "capture": self
.fork_capture
,
376 ret
= self
.expect_gdbremote_sequence()
378 "parent": (ret
["parent_pid"], ret
["parent_tid"]),
379 "child": (ret
["child_pid"], ret
["child_tid"]),
382 self
.reset_test_sequence()
384 for name
, pidtid
in pidtids
.items():
385 self
.test_sequence
.add_log_lines(
387 "read packet: $Hgp{}.{}#00".format(*pidtid
),
388 "send packet: $OK#00",
389 # read the current memory contents
390 "read packet: $m{},{:x}#00".format(addr
, len(INITIAL_DATA
) + 1),
393 "regex": r
"^[$](.+)#.*$",
394 "capture": {1: "data"},
397 "read packet: $M{},{:x}:{}#00".format(
398 addr
, len(name
) + 1, seven
.hexlify(name
+ "\0")
400 "send packet: $OK#00",
401 # resume the process and wait for the trap
402 "read packet: $Hcp{}.{}#00".format(*pidtid
),
403 "send packet: $OK#00",
404 "read packet: $c#00",
406 "type": "output_match",
407 "regex": self
.maybe_strict_output_regex(r
"message: (.*)\r\n"),
408 "capture": {1: "printed_message"},
412 "regex": self
.stop_regex
.format(*pidtid
),
417 ret
= self
.expect_gdbremote_sequence()
418 data
= seven
.unhexlify(ret
["data"])
419 self
.assertEqual(data
, INITIAL_DATA
+ "\0")
420 self
.assertEqual(ret
["printed_message"], name
)
421 self
.reset_test_sequence()
423 # we do the second round separately to make sure that initial data
424 # is correctly preserved while writing into the first process
426 for name
, pidtid
in pidtids
.items():
427 self
.test_sequence
.add_log_lines(
429 "read packet: $Hgp{}.{}#00".format(*pidtid
),
430 "send packet: $OK#00",
431 # read the current memory contents
432 "read packet: $m{},{:x}#00".format(addr
, len(name
) + 1),
435 "regex": r
"^[$](.+)#.*$",
436 "capture": {1: "data"},
441 ret
= self
.expect_gdbremote_sequence()
442 self
.assertIsNotNone(ret
.get("data"))
443 data
= seven
.unhexlify(ret
.get("data"))
444 self
.assertEqual(data
, name
+ "\0")
445 self
.reset_test_sequence()
447 @add_test_categories(["fork"])
448 def test_register_read_write(self
):
449 parent_pid
, parent_tid
, child_pid
, child_tid
= self
.start_fork_test(
450 ["fork", "thread:new", "stop"]
453 (parent_pid
, parent_tid
),
454 (child_pid
, child_tid
),
457 for pidtid
in pidtids
:
458 self
.test_sequence
.add_log_lines(
460 "read packet: $Hcp{}.{}#00".format(*pidtid
),
461 "send packet: $OK#00",
462 "read packet: $c#00",
465 "regex": self
.stop_regex
.format(*pidtid
),
471 self
.add_threadinfo_collection_packets()
472 ret
= self
.expect_gdbremote_sequence()
473 self
.reset_test_sequence()
475 pidtids
= set(self
.parse_threadinfo_packets(ret
))
476 self
.assertEqual(len(pidtids
), 4)
477 # first, save register values from all the threads
479 for pidtid
in pidtids
:
480 for regno
in range(256):
481 self
.test_sequence
.add_log_lines(
483 "read packet: $Hgp{:x}.{:x}#00".format(*pidtid
),
484 "send packet: $OK#00",
485 "read packet: $p{:x}#00".format(regno
),
488 "regex": r
"^[$](.+)#.*$",
489 "capture": {1: "data"},
494 ret
= self
.expect_gdbremote_sequence()
495 data
= ret
.get("data")
496 self
.assertIsNotNone(data
)
497 # ignore registers shorter than 32 bits (this also catches
502 self
.skipTest("no usable register found")
503 thread_regs
[pidtid
] = (regno
, data
)
505 vals
= set(x
[1] for x
in thread_regs
.values())
506 # NB: cheap hack to make the loop below easier
507 new_val
= next(iter(vals
))
509 # then, start altering them and verify that we don't unexpectedly
510 # change the value from another thread
511 for pidtid
in pidtids
:
512 old_val
= thread_regs
[pidtid
]
514 old_val_length
= len(old_val
[1])
515 # generate a unique new_val
516 while new_val
in vals
:
517 new_val
= "{{:0{}x}}".format(old_val_length
).format(
518 random
.getrandbits(old_val_length
* 4)
522 self
.test_sequence
.add_log_lines(
524 "read packet: $Hgp{:x}.{:x}#00".format(*pidtid
),
525 "send packet: $OK#00",
526 "read packet: $p{:x}#00".format(regno
),
529 "regex": r
"^[$](.+)#.*$",
530 "capture": {1: "data"},
532 "read packet: $P{:x}={}#00".format(regno
, new_val
),
533 "send packet: $OK#00",
537 ret
= self
.expect_gdbremote_sequence()
538 data
= ret
.get("data")
539 self
.assertIsNotNone(data
)
540 self
.assertEqual(data
, old_val
[1])
541 thread_regs
[pidtid
] = (regno
, new_val
)
543 # finally, verify that new values took effect
544 for pidtid
in pidtids
:
545 old_val
= thread_regs
[pidtid
]
546 self
.test_sequence
.add_log_lines(
548 "read packet: $Hgp{:x}.{:x}#00".format(*pidtid
),
549 "send packet: $OK#00",
550 "read packet: $p{:x}#00".format(old_val
[0]),
553 "regex": r
"^[$](.+)#.*$",
554 "capture": {1: "data"},
559 ret
= self
.expect_gdbremote_sequence()
560 data
= ret
.get("data")
561 self
.assertIsNotNone(data
)
562 self
.assertEqual(data
, old_val
[1])
564 @add_test_categories(["fork"])
566 parent_pid
, parent_tid
, child_pid
, child_tid
= self
.start_fork_test(
567 ["fork", "thread:new", "stop"]
570 (parent_pid
, parent_tid
),
571 (child_pid
, child_tid
),
574 for pidtid
in pidtids
:
575 self
.test_sequence
.add_log_lines(
577 "read packet: $Hcp{}.{}#00".format(*pidtid
),
578 "send packet: $OK#00",
579 "read packet: $c#00",
582 "regex": self
.stop_regex
.format(*pidtid
),
588 self
.add_threadinfo_collection_packets()
589 ret
= self
.expect_gdbremote_sequence()
590 self
.reset_test_sequence()
592 pidtids
= set(self
.parse_threadinfo_packets(ret
))
593 self
.assertEqual(len(pidtids
), 4)
594 for pidtid
in pidtids
:
595 self
.test_sequence
.add_log_lines(
597 "read packet: $Hgp{:x}.{:x}#00".format(*pidtid
),
598 "send packet: $OK#00",
599 "read packet: $qC#00",
600 "send packet: $QCp{:x}.{:x}#00".format(*pidtid
),
604 self
.expect_gdbremote_sequence()
606 @add_test_categories(["fork"])
608 parent_pid
, parent_tid
, child_pid
, child_tid
= self
.start_fork_test(
609 ["fork", "thread:new", "stop"]
612 (parent_pid
, parent_tid
),
613 (child_pid
, child_tid
),
616 for pidtid
in pidtids
:
617 self
.test_sequence
.add_log_lines(
619 "read packet: $Hcp{}.{}#00".format(*pidtid
),
620 "send packet: $OK#00",
621 "read packet: $c#00",
624 "regex": self
.stop_regex
.format(*pidtid
),
630 self
.add_threadinfo_collection_packets()
631 ret
= self
.expect_gdbremote_sequence()
632 self
.reset_test_sequence()
634 pidtids
= set(self
.parse_threadinfo_packets(ret
))
635 self
.assertEqual(len(pidtids
), 4)
636 max_pid
= max(pid
for pid
, tid
in pidtids
)
637 max_tid
= max(tid
for pid
, tid
in pidtids
)
639 (max_pid
, max_tid
+ 1, "E02"),
640 (max_pid
+ 1, max_tid
, "E01"),
641 (max_pid
+ 1, max_tid
+ 1, "E01"),
644 for pidtid
in pidtids
:
645 self
.test_sequence
.add_log_lines(
647 # test explicit PID+TID
648 "read packet: $Tp{:x}.{:x}#00".format(*pidtid
),
649 "send packet: $OK#00",
650 # test implicit PID via Hg
651 "read packet: $Hgp{:x}.{:x}#00".format(*pidtid
),
652 "send packet: $OK#00",
653 "read packet: $T{:x}#00".format(max_tid
+ 1),
654 "send packet: $E02#00",
655 "read packet: $T{:x}#00".format(pidtid
[1]),
656 "send packet: $OK#00",
660 for pid
, tid
, expected
in bad_pidtids
:
661 self
.test_sequence
.add_log_lines(
663 "read packet: $Tp{:x}.{:x}#00".format(pid
, tid
),
664 "send packet: ${}#00".format(expected
),
668 self
.expect_gdbremote_sequence()