2 Base class for gdb-remote test cases.
16 from lldbsuite
.test
import configuration
17 from lldbsuite
.test
.lldbtest
import *
18 from lldbsuite
.support
import seven
19 from lldbgdbserverutils
import *
23 class _ConnectionRefused(IOError):
27 class GdbRemoteTestCaseFactory(type):
28 def __new__(cls
, name
, bases
, attrs
):
30 for attrname
, attrvalue
in attrs
.items():
31 if not attrname
.startswith("test"):
32 newattrs
[attrname
] = attrvalue
35 # If any debug server categories were explicitly tagged, assume
36 # that list to be authoritative. If none were specified, try
38 all_categories
= set(["debugserver", "llgs"])
39 categories
= set(getattr(attrvalue
, "categories", [])) & all_categories
41 categories
= all_categories
43 for cat
in categories
:
45 @decorators.add_test_categories([cat
])
47 def test_method(self
, attrvalue
=attrvalue
):
48 return attrvalue(self
)
50 method_name
= attrname
+ "_" + cat
51 test_method
.__name
__ = method_name
52 test_method
.debug_server
= cat
53 newattrs
[method_name
] = test_method
55 return super(GdbRemoteTestCaseFactory
, cls
).__new
__(cls
, name
, bases
, newattrs
)
58 class GdbRemoteTestCaseBase(Base
, metaclass
=GdbRemoteTestCaseFactory
):
59 # Default time out in seconds. The timeout is increased tenfold under Asan.
60 DEFAULT_TIMEOUT
= 20 * (10 if ("ASAN_OPTIONS" in os
.environ
) else 1)
61 # Default sleep time in seconds. The sleep time is doubled under Asan.
62 DEFAULT_SLEEP
= 5 * (2 if ("ASAN_OPTIONS" in os
.environ
) else 1)
64 _GDBREMOTE_KILL_PACKET
= b
"$k#6b"
66 # Start the inferior separately, attach to the inferior on the stub
68 _STARTUP_ATTACH
= "attach"
69 # Start the inferior separately, start the stub without attaching, allow
70 # the test to attach to the inferior however it wants (e.g. $vAttach;pid).
71 _STARTUP_ATTACH_MANUALLY
= "attach_manually"
72 # Start the stub, and launch the inferior with an $A packet via the
73 # initial packet stream.
74 _STARTUP_LAUNCH
= "launch"
76 # GDB Signal numbers that are not target-specific used for common
78 TARGET_EXC_BAD_ACCESS
= 0x91
79 TARGET_EXC_BAD_INSTRUCTION
= 0x92
80 TARGET_EXC_ARITHMETIC
= 0x93
81 TARGET_EXC_EMULATION
= 0x94
82 TARGET_EXC_SOFTWARE
= 0x95
83 TARGET_EXC_BREAKPOINT
= 0x96
85 _verbose_log_handler
= None
86 _log_formatter
= logging
.Formatter(fmt
="%(asctime)-15s %(levelname)-8s %(message)s")
88 def setUpBaseLogging(self
):
89 self
.logger
= logging
.getLogger(__name__
)
91 if len(self
.logger
.handlers
) > 0:
92 return # We have set up this handler already
94 self
.logger
.propagate
= False
95 self
.logger
.setLevel(logging
.DEBUG
)
97 # log all warnings to stderr
98 handler
= logging
.StreamHandler()
99 handler
.setLevel(logging
.WARNING
)
100 handler
.setFormatter(self
._log
_formatter
)
101 self
.logger
.addHandler(handler
)
103 def isVerboseLoggingRequested(self
):
104 # We will report our detailed logs if the user requested that the "gdb-remote" channel is
106 return any(("gdb-remote" in channel
) for channel
in lldbtest_config
.channels
)
108 def getDebugServer(self
):
109 method
= getattr(self
, self
.testMethodName
)
110 return getattr(method
, "debug_server", None)
113 super(GdbRemoteTestCaseBase
, self
).setUp()
115 self
.setUpBaseLogging()
116 self
.debug_monitor_extra_args
= []
118 if self
.isVerboseLoggingRequested():
119 # If requested, full logs go to a log file
120 self
._verbose
_log
_handler
= logging
.FileHandler(
121 self
.getLogBasenameForCurrentTest() + "-host.log"
123 self
._verbose
_log
_handler
.setFormatter(self
._log
_formatter
)
124 self
._verbose
_log
_handler
.setLevel(logging
.DEBUG
)
125 self
.logger
.addHandler(self
._verbose
_log
_handler
)
127 self
.test_sequence
= GdbRemoteTestSequence(self
.logger
)
128 self
.set_inferior_startup_launch()
129 self
.port
= self
.get_next_port()
130 self
.stub_sends_two_stop_notifications_on_kill
= False
131 if configuration
.lldb_platform_url
:
132 if configuration
.lldb_platform_url
.startswith("unix-"):
133 url_pattern
= "(.+)://\[?(.+?)\]?/.*"
135 url_pattern
= "(.+)://(.+):\d+"
136 scheme
, host
= re
.match(
137 url_pattern
, configuration
.lldb_platform_url
140 configuration
.lldb_platform_name
== "remote-android"
141 and host
!= "localhost"
143 self
.stub_device
= host
144 self
.stub_hostname
= "localhost"
146 self
.stub_device
= None
147 self
.stub_hostname
= host
149 self
.stub_hostname
= "localhost"
151 debug_server
= self
.getDebugServer()
152 if debug_server
== "debugserver":
153 self
._init
_debugserver
_test
()
155 self
._init
_llgs
_test
()
158 self
.logger
.removeHandler(self
._verbose
_log
_handler
)
159 self
._verbose
_log
_handler
= None
160 TestBase
.tearDown(self
)
162 def getLocalServerLogFile(self
):
163 return self
.getLogBasenameForCurrentTest() + "-server.log"
165 def setUpServerLogging(self
, is_llgs
):
166 if len(lldbtest_config
.channels
) == 0:
167 return # No logging requested
169 if lldb
.remote_platform
:
170 log_file
= lldbutil
.join_remote_paths(
171 lldb
.remote_platform
.GetWorkingDirectory(), "server.log"
174 log_file
= self
.getLocalServerLogFile()
177 self
.debug_monitor_extra_args
.append("--log-file=" + log_file
)
178 self
.debug_monitor_extra_args
.append(
179 "--log-channels={}".format(":".join(lldbtest_config
.channels
))
182 self
.debug_monitor_extra_args
= [
183 "--log-file=" + log_file
,
184 "--log-flags=0x800000",
187 def get_next_port(self
):
188 return 12000 + random
.randint(0, 3999)
190 def reset_test_sequence(self
):
191 self
.test_sequence
= GdbRemoteTestSequence(self
.logger
)
193 def _init_llgs_test(self
):
194 reverse_connect
= True
195 if lldb
.remote_platform
:
196 # Reverse connections may be tricky due to firewalls/NATs.
197 reverse_connect
= False
199 # FIXME: This is extremely linux-oriented
201 # Grab the ppid from /proc/[shell pid]/stat
202 err
, retcode
, shell_stat
= self
.run_platform_command("cat /proc/$$/stat")
204 err
.Success() and retcode
== 0,
205 "Failed to read file /proc/$$/stat: %s, retcode: %d"
206 % (err
.GetCString(), retcode
),
209 # [pid] ([executable]) [state] [*ppid*]
210 pid
= re
.match(r
"^\d+ \(.+\) . (\d+)", shell_stat
).group(1)
211 err
, retcode
, ls_output
= self
.run_platform_command(
212 "ls -l /proc/%s/exe" % pid
215 err
.Success() and retcode
== 0,
216 "Failed to read file /proc/%s/exe: %s, retcode: %d"
217 % (pid
, err
.GetCString(), retcode
),
219 exe
= ls_output
.split()[-1]
221 # If the binary has been deleted, the link name has " (deleted)" appended.
222 # Remove if it's there.
223 self
.debug_monitor_exe
= re
.sub(r
" \(deleted\)$", "", exe
)
225 self
.debug_monitor_exe
= get_lldb_server_exe()
227 self
.debug_monitor_extra_args
= ["gdbserver"]
228 self
.setUpServerLogging(is_llgs
=True)
230 self
.reverse_connect
= reverse_connect
232 def _init_debugserver_test(self
):
233 self
.debug_monitor_exe
= get_debugserver_exe()
234 self
.setUpServerLogging(is_llgs
=False)
235 self
.reverse_connect
= True
237 # The debugserver stub has a race on handling the 'k' command, so it sends an X09 right away, then sends the real X notification
238 # when the process truly dies.
239 self
.stub_sends_two_stop_notifications_on_kill
= True
241 def forward_adb_port(self
, source
, target
, direction
, device
):
242 adb
= ["adb"] + (["-s", device
] if device
else []) + [direction
]
244 def remove_port_forward():
245 subprocess
.call(adb
+ ["--remove", "tcp:%d" % source
])
247 subprocess
.call(adb
+ ["tcp:%d" % source
, "tcp:%d" % target
])
248 self
.addTearDownHook(remove_port_forward
)
250 def _verify_socket(self
, sock
):
251 # Normally, when the remote stub is not ready, we will get ECONNREFUSED during the
252 # connect() attempt. However, due to the way how ADB forwarding works, on android targets
253 # the connect() will always be successful, but the connection will be immediately dropped
254 # if ADB could not connect on the remote side. This function tries to detect this
255 # situation, and report it as "connection refused" so that the upper layers attempt the
257 triple
= self
.dbg
.GetSelectedPlatform().GetTriple()
258 if not re
.match(".*-.*-.*-android", triple
):
259 return # Not android.
260 can_read
, _
, _
= select
.select([sock
], [], [], 0.1)
261 if sock
not in can_read
:
262 return # Data is not available, but the connection is alive.
263 if len(sock
.recv(1, socket
.MSG_PEEK
)) == 0:
264 raise _ConnectionRefused() # Got EOF, connection dropped.
266 def create_socket(self
):
268 sock
= socket
.socket(family
=socket
.AF_INET
)
270 if e
.errno
!= errno
.EAFNOSUPPORT
:
272 sock
= socket
.socket(family
=socket
.AF_INET6
)
276 triple
= self
.dbg
.GetSelectedPlatform().GetTriple()
277 if re
.match(".*-.*-.*-android", triple
):
278 self
.forward_adb_port(self
.port
, self
.port
, "forward", self
.stub_device
)
281 "Connecting to debug monitor on %s:%d", self
.stub_hostname
, self
.port
283 connect_info
= (self
.stub_hostname
, self
.port
)
285 sock
.connect(connect_info
)
286 except socket
.error
as serr
:
287 if serr
.errno
== errno
.ECONNREFUSED
:
288 raise _ConnectionRefused()
291 def shutdown_socket():
294 # send the kill packet so lldb-server shuts down gracefully
295 sock
.sendall(GdbRemoteTestCaseBase
._GDBREMOTE
_KILL
_PACKET
)
298 "failed to send kill packet to debug monitor: {}; ignoring".format(
307 "failed to close socket to debug monitor: {}; ignoring".format(
312 self
.addTearDownHook(shutdown_socket
)
314 self
._verify
_socket
(sock
)
318 def set_inferior_startup_launch(self
):
319 self
._inferior
_startup
= self
._STARTUP
_LAUNCH
321 def set_inferior_startup_attach(self
):
322 self
._inferior
_startup
= self
._STARTUP
_ATTACH
324 def set_inferior_startup_attach_manually(self
):
325 self
._inferior
_startup
= self
._STARTUP
_ATTACH
_MANUALLY
327 def get_debug_monitor_command_line_args(self
, attach_pid
=None):
328 commandline_args
= self
.debug_monitor_extra_args
330 commandline_args
+= ["--attach=%d" % attach_pid
]
331 if self
.reverse_connect
:
332 commandline_args
+= ["--reverse-connect", self
.connect_address
]
334 if lldb
.remote_platform
:
335 commandline_args
+= ["*:{}".format(self
.port
)]
337 commandline_args
+= ["localhost:{}".format(self
.port
)]
339 return commandline_args
341 def get_target_byte_order(self
):
342 inferior_exe_path
= self
.getBuildArtifact("a.out")
343 target
= self
.dbg
.CreateTarget(inferior_exe_path
)
344 return target
.GetByteOrder()
346 def launch_debug_monitor(self
, attach_pid
=None, logfile
=None):
347 if self
.reverse_connect
:
348 family
, type, proto
, _
, addr
= socket
.getaddrinfo(
349 "localhost", 0, proto
=socket
.IPPROTO_TCP
351 sock
= socket
.socket(family
, type, proto
)
352 sock
.settimeout(self
.DEFAULT_TIMEOUT
)
356 addr
= sock
.getsockname()
357 self
.connect_address
= "[{}]:{}".format(*addr
)
359 # Create the command line.
360 commandline_args
= self
.get_debug_monitor_command_line_args(
361 attach_pid
=attach_pid
365 server
= self
.spawnSubprocess(
366 self
.debug_monitor_exe
, commandline_args
, install_remote
=False
368 self
.assertIsNotNone(server
)
370 if self
.reverse_connect
:
371 self
.sock
= sock
.accept()[0]
372 self
.sock
.settimeout(self
.DEFAULT_TIMEOUT
)
376 def connect_to_debug_monitor(self
, attach_pid
=None):
377 if self
.reverse_connect
:
379 server
= self
.launch_debug_monitor(attach_pid
=attach_pid
)
380 self
.assertIsNotNone(server
)
382 # Schedule debug monitor to be shut down during teardown.
385 self
._server
= Server(self
.sock
, server
)
388 # We're using a random port algorithm to try not to collide with other ports,
389 # and retry a max # times.
393 while attempts
< MAX_ATTEMPTS
:
394 server
= self
.launch_debug_monitor(attach_pid
=attach_pid
)
396 # Schedule debug monitor to be shut down during teardown.
400 MAX_CONNECT_ATTEMPTS
= 10
402 while connect_attemps
< MAX_CONNECT_ATTEMPTS
:
403 # Create a socket to talk to the server
405 logger
.info("Connect attempt %d", connect_attemps
+ 1)
406 self
.sock
= self
.create_socket()
407 self
._server
= Server(self
.sock
, server
)
409 except _ConnectionRefused
as serr
:
410 # Ignore, and try again.
415 # We should close the server here to be safe.
418 # Increment attempts.
420 "connect to debug monitor on port %d failed, attempt #%d of %d"
421 % (self
.port
, attempts
+ 1, MAX_ATTEMPTS
)
425 # And wait a random length of time before next attempt, to avoid
427 time
.sleep(random
.randint(1, 5))
429 # Now grab a new port number.
430 self
.port
= self
.get_next_port()
433 "failed to create a socket to the launched debug monitor after %d tries"
437 def launch_process_for_attach(
438 self
, inferior_args
=None, sleep_seconds
=3, exe_path
=None
440 # We're going to start a child process that the debug monitor stub can later attach to.
441 # This process needs to be started so that it just hangs around for a while. We'll
444 exe_path
= self
.getBuildArtifact("a.out")
448 args
.extend(inferior_args
)
450 args
.append("sleep:%d" % sleep_seconds
)
452 return self
.spawnSubprocess(exe_path
, args
)
454 def prep_debug_monitor_and_inferior(
457 inferior_sleep_seconds
=3,
458 inferior_exe_path
=None,
461 """Prep the debug monitor, the inferior, and the expected packet stream.
463 Handle the separate cases of using the debug monitor in attach-to-inferior mode
464 and in launch-inferior mode.
466 For attach-to-inferior mode, the inferior process is first started, then
467 the debug monitor is started in attach to pid mode (using --attach on the
468 stub command line), and the no-ack-mode setup is appended to the packet
469 stream. The packet stream is not yet executed, ready to have more expected
470 packet entries added to it.
472 For launch-inferior mode, the stub is first started, then no ack mode is
473 setup on the expected packet stream, then the verified launch packets are added
474 to the expected socket stream. The packet stream is not yet executed, ready
475 to have more expected packet entries added to it.
478 {inferior:<inferior>, server:<server>}
484 self
._inferior
_startup
== self
._STARTUP
_ATTACH
485 or self
._inferior
_startup
== self
._STARTUP
_ATTACH
_MANUALLY
487 # Launch the process that we'll use as the inferior.
488 inferior
= self
.launch_process_for_attach(
489 inferior_args
=inferior_args
,
490 sleep_seconds
=inferior_sleep_seconds
,
491 exe_path
=inferior_exe_path
,
493 self
.assertIsNotNone(inferior
)
494 self
.assertTrue(inferior
.pid
> 0)
495 if self
._inferior
_startup
== self
._STARTUP
_ATTACH
:
496 # In this case, we want the stub to attach via the command
497 # line, so set the command line attach pid here.
498 attach_pid
= inferior
.pid
500 if self
._inferior
_startup
== self
._STARTUP
_LAUNCH
:
502 if not inferior_exe_path
:
503 inferior_exe_path
= self
.getBuildArtifact("a.out")
505 if lldb
.remote_platform
:
506 remote_path
= lldbutil
.append_to_process_working_directory(
507 self
, os
.path
.basename(inferior_exe_path
)
509 remote_file_spec
= lldb
.SBFileSpec(remote_path
, False)
510 err
= lldb
.remote_platform
.Install(
511 lldb
.SBFileSpec(inferior_exe_path
, True), remote_file_spec
515 "remote_platform.Install('%s', '%s') failed: %s"
516 % (inferior_exe_path
, remote_path
, err
)
518 inferior_exe_path
= remote_path
520 launch_args
= [inferior_exe_path
]
522 launch_args
.extend(inferior_args
)
524 # Launch the debug monitor stub, attaching to the inferior.
525 server
= self
.connect_to_debug_monitor(attach_pid
=attach_pid
)
526 self
.assertIsNotNone(server
)
530 # Build the expected protocol stream
532 for name
, value
in inferior_env
.items():
533 self
.add_set_environment_packets(name
, value
)
534 if self
._inferior
_startup
== self
._STARTUP
_LAUNCH
:
535 self
.add_verified_launch_packets(launch_args
)
537 return {"inferior": inferior
, "server": server
}
539 def do_handshake(self
):
540 server
= self
._server
542 server
.send_packet(b
"QStartNoAckMode")
543 self
.assertEqual(server
.get_normal_packet(), b
"+")
544 self
.assertEqual(server
.get_normal_packet(), b
"OK")
547 def add_verified_launch_packets(self
, launch_args
):
548 self
.test_sequence
.add_log_lines(
550 "read packet: %s" % build_gdbremote_A_packet(launch_args
),
551 "send packet: $OK#00",
552 "read packet: $qLaunchSuccess#a5",
553 "send packet: $OK#00",
558 def add_thread_suffix_request_packets(self
):
559 self
.test_sequence
.add_log_lines(
561 "read packet: $QThreadSuffixSupported#e4",
562 "send packet: $OK#00",
567 def add_process_info_collection_packets(self
):
568 self
.test_sequence
.add_log_lines(
570 "read packet: $qProcessInfo#dc",
573 "regex": r
"^\$(.+)#[0-9a-fA-F]{2}$",
574 "capture": {1: "process_info_raw"},
580 def add_set_environment_packets(self
, name
, value
):
581 self
.test_sequence
.add_log_lines(
583 "read packet: $QEnvironment:" + name
+ "=" + value
+ "#00",
584 "send packet: $OK#00",
589 _KNOWN_PROCESS_INFO_KEYS
= [
606 def parse_process_info_response(self
, context
):
607 # Ensure we have a process info response.
608 self
.assertIsNotNone(context
)
609 process_info_raw
= context
.get("process_info_raw")
610 self
.assertIsNotNone(process_info_raw
)
612 # Pull out key:value; pairs.
613 process_info_dict
= {
614 match
.group(1): match
.group(2)
615 for match
in re
.finditer(r
"([^:]+):([^;]+);", process_info_raw
)
618 # Validate keys are known.
619 for key
, val
in list(process_info_dict
.items()):
620 self
.assertTrue(key
in self
._KNOWN
_PROCESS
_INFO
_KEYS
)
621 self
.assertIsNotNone(val
)
623 return process_info_dict
625 def add_register_info_collection_packets(self
):
626 self
.test_sequence
.add_log_lines(
629 "type": "multi_response",
630 "query": "qRegisterInfo",
631 "append_iteration_suffix": True,
632 "end_regex": re
.compile(r
"^\$(E\d+)?#[0-9a-fA-F]{2}$"),
633 "save_key": "reg_info_responses",
639 def parse_register_info_packets(self
, context
):
640 """Return an array of register info dictionaries, one per register info."""
641 reg_info_responses
= context
.get("reg_info_responses")
642 self
.assertIsNotNone(reg_info_responses
)
644 # Parse register infos.
646 parse_reg_info_response(reg_info_response
)
647 for reg_info_response
in reg_info_responses
650 def expect_gdbremote_sequence(self
):
651 return expect_lldb_gdbserver_replay(
655 self
.DEFAULT_TIMEOUT
* len(self
.test_sequence
),
659 _KNOWN_REGINFO_KEYS
= [
673 "dynamic_size_dwarf_expr_bytes",
674 "dynamic_size_dwarf_len",
677 def assert_valid_reg_info(self
, reg_info
):
678 # Assert we know about all the reginfo keys parsed.
680 self
.assertTrue(key
in self
._KNOWN
_REGINFO
_KEYS
)
682 # Check the bare-minimum expected set of register info keys.
683 self
.assertTrue("name" in reg_info
)
684 self
.assertTrue("bitsize" in reg_info
)
686 if not self
.getArchitecture() == "aarch64":
687 self
.assertTrue("offset" in reg_info
)
689 self
.assertTrue("encoding" in reg_info
)
690 self
.assertTrue("format" in reg_info
)
692 def find_pc_reg_info(self
, reg_infos
):
694 for reg_info
in reg_infos
:
695 if ("generic" in reg_info
) and (reg_info
["generic"] == "pc"):
696 return (lldb_reg_index
, reg_info
)
701 def add_lldb_register_index(self
, reg_infos
):
702 """Add a "lldb_register_index" key containing the 0-baed index of each reg_infos entry.
704 We'll use this when we want to call packets like P/p with a register index but do so
705 on only a subset of the full register info set.
707 self
.assertIsNotNone(reg_infos
)
710 for reg_info
in reg_infos
:
711 reg_info
["lldb_register_index"] = reg_index
714 def add_query_memory_region_packets(self
, address
):
715 self
.test_sequence
.add_log_lines(
717 "read packet: $qMemoryRegionInfo:{0:x}#00".format(address
),
720 "regex": r
"^\$(.+)#[0-9a-fA-F]{2}$",
721 "capture": {1: "memory_region_response"},
727 def parse_key_val_dict(self
, key_val_text
, allow_dupes
=True):
728 self
.assertIsNotNone(key_val_text
)
730 for match
in re
.finditer(r
";?([^:]+):([^;]+)", key_val_text
):
735 if isinstance(kv_dict
[key
], list):
736 kv_dict
[key
].append(val
)
739 kv_dict
[key
] = [kv_dict
[key
], val
]
742 "key '{}' already present when attempting to add value '{}' (text='{}', dict={})".format(
743 key
, val
, key_val_text
, kv_dict
750 def parse_memory_region_packet(self
, context
):
751 # Ensure we have a context.
752 self
.assertIsNotNone(context
.get("memory_region_response"))
754 # Pull out key:value; pairs.
755 mem_region_dict
= self
.parse_key_val_dict(context
.get("memory_region_response"))
757 # Validate keys are known.
758 for key
, val
in list(mem_region_dict
.items()):
772 self
.assertIsNotNone(val
)
774 mem_region_dict
["name"] = seven
.unhexlify(mem_region_dict
.get("name", ""))
775 # Return the dictionary of key-value pairs for the memory region.
776 return mem_region_dict
778 def assert_address_within_memory_region(self
, test_address
, mem_region_dict
):
779 self
.assertIsNotNone(mem_region_dict
)
780 self
.assertTrue("start" in mem_region_dict
)
781 self
.assertTrue("size" in mem_region_dict
)
783 range_start
= int(mem_region_dict
["start"], 16)
784 range_size
= int(mem_region_dict
["size"], 16)
785 range_end
= range_start
+ range_size
787 if test_address
< range_start
:
789 "address 0x{0:x} comes before range 0x{1:x} - 0x{2:x} (size 0x{3:x})".format(
790 test_address
, range_start
, range_end
, range_size
793 elif test_address
>= range_end
:
795 "address 0x{0:x} comes after range 0x{1:x} - 0x{2:x} (size 0x{3:x})".format(
796 test_address
, range_start
, range_end
, range_size
800 def add_threadinfo_collection_packets(self
):
801 self
.test_sequence
.add_log_lines(
804 "type": "multi_response",
805 "first_query": "qfThreadInfo",
806 "next_query": "qsThreadInfo",
807 "append_iteration_suffix": False,
808 "end_regex": re
.compile(r
"^\$(l)?#[0-9a-fA-F]{2}$"),
809 "save_key": "threadinfo_responses",
815 def parse_threadinfo_packets(self
, context
):
816 """Return an array of thread ids (decimal ints), one per thread."""
817 threadinfo_responses
= context
.get("threadinfo_responses")
818 self
.assertIsNotNone(threadinfo_responses
)
821 for threadinfo_response
in threadinfo_responses
:
822 new_thread_infos
= parse_threadinfo_response(threadinfo_response
)
823 thread_ids
.extend(new_thread_infos
)
826 def launch_with_threads(self
, thread_count
):
827 procs
= self
.prep_debug_monitor_and_inferior(
828 inferior_args
=["thread:new"] * (thread_count
- 1) + ["trap"]
831 self
.test_sequence
.add_log_lines(
833 "read packet: $c#00",
836 "regex": r
"^\$T([0-9a-fA-F]{2})([^#]*)#..$",
837 "capture": {1: "stop_signo", 2: "stop_reply_kv"},
842 self
.add_threadinfo_collection_packets()
843 context
= self
.expect_gdbremote_sequence()
844 threads
= self
.parse_threadinfo_packets(context
)
845 self
.assertGreaterEqual(len(threads
), thread_count
)
846 return context
, threads
848 def add_set_breakpoint_packets(
849 self
, address
, z_packet_type
=0, do_continue
=True, breakpoint_kind
=1
851 self
.test_sequence
.add_log_lines(
852 [ # Set the breakpoint.
853 "read packet: $Z{2},{0:x},{1}#00".format(
854 address
, breakpoint_kind
, z_packet_type
856 # Verify the stub could set it.
857 "send packet: $OK#00",
863 self
.test_sequence
.add_log_lines(
864 [ # Continue the inferior.
865 "read packet: $c#63",
866 # Expect a breakpoint stop report.
869 "regex": r
"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);",
870 "capture": {1: "stop_signo", 2: "stop_thread_id"},
876 def add_remove_breakpoint_packets(
877 self
, address
, z_packet_type
=0, breakpoint_kind
=1
879 self
.test_sequence
.add_log_lines(
880 [ # Remove the breakpoint.
881 "read packet: $z{2},{0:x},{1}#00".format(
882 address
, breakpoint_kind
, z_packet_type
884 # Verify the stub could unset it.
885 "send packet: $OK#00",
890 def add_qSupported_packets(self
, client_features
=[]):
891 features
= "".join(";" + x
for x
in client_features
)
892 self
.test_sequence
.add_log_lines(
894 "read packet: $qSupported{}#00".format(features
),
897 "regex": r
"^\$(.*)#[0-9a-fA-F]{2}",
898 "capture": {1: "qSupported_response"},
904 _KNOWN_QSUPPORTED_STUB_FEATURES
= [
905 "augmented-libraries-svr4-read",
908 "QThreadSuffixSupported",
909 "QListThreadsInStopReply",
911 "qXfer:libraries:read",
912 "qXfer:libraries-svr4:read",
913 "qXfer:features:read",
914 "qXfer:siginfo:read",
926 def parse_qSupported_response(self
, context
):
927 self
.assertIsNotNone(context
)
929 raw_response
= context
.get("qSupported_response")
930 self
.assertIsNotNone(raw_response
)
932 # For values with key=val, the dict key and vals are set as expected. For feature+, feature- and feature?, the
933 # +,-,? is stripped from the key and set as the value.
935 for match
in re
.finditer(r
";?([^=;]+)(=([^;]+))?", raw_response
):
939 # key=val: store as is
940 if val
and len(val
) > 0:
941 supported_dict
[key
] = val
945 "singular stub feature is too short: must be stub_feature{+,-,?}"
947 supported_type
= key
[-1]
949 if not supported_type
in ["+", "-", "?"]:
951 "malformed stub feature: final character {} not in expected set (+,-,?)".format(
955 supported_dict
[key
] = supported_type
956 # Ensure we know the supported element
957 if key
not in self
._KNOWN
_QSUPPORTED
_STUB
_FEATURES
:
958 raise Exception("unknown qSupported stub feature reported: %s" % key
)
960 return supported_dict
962 def continue_process_and_wait_for_stop(self
):
963 self
.test_sequence
.add_log_lines(
965 "read packet: $vCont;c#a8",
968 "regex": r
"^\$T([0-9a-fA-F]{2})(.*)#[0-9a-fA-F]{2}$",
969 "capture": {1: "stop_signo", 2: "stop_key_val_text"},
974 context
= self
.expect_gdbremote_sequence()
975 self
.assertIsNotNone(context
)
976 return self
.parse_interrupt_packets(context
)
978 def select_modifiable_register(self
, reg_infos
):
979 """Find a register that can be read/written freely."""
980 PREFERRED_REGISTER_NAMES
= set(
986 # First check for the first register from the preferred register name
988 alternative_register_index
= None
990 self
.assertIsNotNone(reg_infos
)
991 for reg_info
in reg_infos
:
992 if ("name" in reg_info
) and (reg_info
["name"] in PREFERRED_REGISTER_NAMES
):
993 # We found a preferred register. Use it.
994 return reg_info
["lldb_register_index"]
995 if ("generic" in reg_info
) and (
996 reg_info
["generic"] == "fp" or reg_info
["generic"] == "arg1"
998 # A frame pointer or first arg register will do as a
999 # register to modify temporarily.
1000 alternative_register_index
= reg_info
["lldb_register_index"]
1002 # We didn't find a preferred register. Return whatever alternative register
1004 return alternative_register_index
1006 def extract_registers_from_stop_notification(self
, stop_key_vals_text
):
1007 self
.assertIsNotNone(stop_key_vals_text
)
1008 kv_dict
= self
.parse_key_val_dict(stop_key_vals_text
)
1011 for key
, val
in list(kv_dict
.items()):
1012 if re
.match(r
"^[0-9a-fA-F]+$", key
):
1013 registers
[int(key
, 16)] = val
1016 def gather_register_infos(self
):
1017 self
.reset_test_sequence()
1018 self
.add_register_info_collection_packets()
1020 context
= self
.expect_gdbremote_sequence()
1021 self
.assertIsNotNone(context
)
1023 reg_infos
= self
.parse_register_info_packets(context
)
1024 self
.assertIsNotNone(reg_infos
)
1025 self
.add_lldb_register_index(reg_infos
)
1029 def find_generic_register_with_name(self
, reg_infos
, generic_name
):
1030 self
.assertIsNotNone(reg_infos
)
1031 for reg_info
in reg_infos
:
1032 if ("generic" in reg_info
) and (reg_info
["generic"] == generic_name
):
1036 def find_register_with_name_and_dwarf_regnum(self
, reg_infos
, name
, dwarf_num
):
1037 self
.assertIsNotNone(reg_infos
)
1038 for reg_info
in reg_infos
:
1039 if (reg_info
["name"] == name
) and (reg_info
["dwarf"] == dwarf_num
):
1043 def decode_gdbremote_binary(self
, encoded_bytes
):
1046 while i
< len(encoded_bytes
):
1047 if encoded_bytes
[i
] == "}":
1048 # Handle escaped char.
1049 self
.assertTrue(i
+ 1 < len(encoded_bytes
))
1050 decoded_bytes
+= chr(ord(encoded_bytes
[i
+ 1]) ^
0x20)
1052 elif encoded_bytes
[i
] == "*":
1053 # Handle run length encoding.
1054 self
.assertTrue(len(decoded_bytes
) > 0)
1055 self
.assertTrue(i
+ 1 < len(encoded_bytes
))
1056 repeat_count
= ord(encoded_bytes
[i
+ 1]) - 29
1057 decoded_bytes
+= decoded_bytes
[-1] * repeat_count
1060 decoded_bytes
+= encoded_bytes
[i
]
1062 return decoded_bytes
1064 def build_auxv_dict(self
, endian
, word_size
, auxv_data
):
1065 self
.assertIsNotNone(endian
)
1066 self
.assertIsNotNone(word_size
)
1067 self
.assertIsNotNone(auxv_data
)
1071 # PowerPC64le's auxvec has a special key that must be ignored.
1072 # This special key may be used multiple times, resulting in
1073 # multiple key/value pairs with the same key, which would otherwise
1074 # break this test check for repeated keys.
1077 ignored_keys_for_arch
= {"powerpc64le": [22]}
1078 arch
= self
.getArchitecture()
1080 if arch
in ignored_keys_for_arch
:
1081 ignore_keys
= ignored_keys_for_arch
[arch
]
1083 while len(auxv_data
) > 0:
1085 raw_key
= auxv_data
[:word_size
]
1086 auxv_data
= auxv_data
[word_size
:]
1089 raw_value
= auxv_data
[:word_size
]
1090 auxv_data
= auxv_data
[word_size
:]
1092 # Convert raw text from target endian.
1093 key
= unpack_endian_binary_string(endian
, raw_key
)
1094 value
= unpack_endian_binary_string(endian
, raw_value
)
1096 if ignore_keys
and key
in ignore_keys
:
1099 # Handle ending entry.
1101 self
.assertEqual(value
, 0)
1104 # The key should not already be present.
1105 self
.assertFalse(key
in auxv_dict
)
1106 auxv_dict
[key
] = value
1109 "should not reach here - implies required double zero entry not found"
1113 def read_binary_data_in_chunks(self
, command_prefix
, chunk_length
):
1114 """Collect command_prefix{offset:x},{chunk_length:x} until a single 'l' or 'l' with data is returned."""
1120 # Grab the next iteration of data.
1121 self
.reset_test_sequence()
1122 self
.test_sequence
.add_log_lines(
1124 "read packet: ${}{:x},{:x}:#00".format(
1125 command_prefix
, offset
, chunk_length
1128 "direction": "send",
1129 "regex": re
.compile(
1130 r
"^\$([^E])(.*)#[0-9a-fA-F]{2}$", re
.MULTILINE | re
.DOTALL
1132 "capture": {1: "response_type", 2: "content_raw"},
1138 context
= self
.expect_gdbremote_sequence()
1139 self
.assertIsNotNone(context
)
1141 response_type
= context
.get("response_type")
1142 self
.assertIsNotNone(response_type
)
1143 self
.assertTrue(response_type
in ["l", "m"])
1145 # Move offset along.
1146 offset
+= chunk_length
1148 # Figure out if we're done. We're done if the response type is l.
1149 done
= response_type
== "l"
1151 # Decode binary data.
1152 content_raw
= context
.get("content_raw")
1153 if content_raw
and len(content_raw
) > 0:
1154 self
.assertIsNotNone(content_raw
)
1155 decoded_data
+= self
.decode_gdbremote_binary(content_raw
)
1158 def add_interrupt_packets(self
):
1159 self
.test_sequence
.add_log_lines(
1161 # Send the intterupt.
1162 "read packet: {}".format(chr(3)),
1163 # And wait for the stop notification.
1165 "direction": "send",
1166 "regex": r
"^\$T([0-9a-fA-F]{2})(.*)#[0-9a-fA-F]{2}$",
1167 "capture": {1: "stop_signo", 2: "stop_key_val_text"},
1173 def parse_interrupt_packets(self
, context
):
1174 self
.assertIsNotNone(context
.get("stop_signo"))
1175 self
.assertIsNotNone(context
.get("stop_key_val_text"))
1177 int(context
["stop_signo"], 16),
1178 self
.parse_key_val_dict(context
["stop_key_val_text"]),
1181 def add_QSaveRegisterState_packets(self
, thread_id
):
1183 # Use the thread suffix form.
1184 request
= "read packet: $QSaveRegisterState;thread:{:x}#00".format(
1188 request
= "read packet: $QSaveRegisterState#00"
1190 self
.test_sequence
.add_log_lines(
1194 "direction": "send",
1195 "regex": r
"^\$(E?.*)#[0-9a-fA-F]{2}$",
1196 "capture": {1: "save_response"},
1202 def parse_QSaveRegisterState_response(self
, context
):
1203 self
.assertIsNotNone(context
)
1205 save_response
= context
.get("save_response")
1206 self
.assertIsNotNone(save_response
)
1208 if len(save_response
) < 1 or save_response
[0] == "E":
1210 return (False, None)
1212 return (True, int(save_response
))
1214 def add_QRestoreRegisterState_packets(self
, save_id
, thread_id
=None):
1216 # Use the thread suffix form.
1217 request
= "read packet: $QRestoreRegisterState:{};thread:{:x}#00".format(
1221 request
= "read packet: $QRestoreRegisterState:{}#00".format(save_id
)
1223 self
.test_sequence
.add_log_lines([request
, "send packet: $OK#00"], True)
1225 def flip_all_bits_in_each_register_value(self
, reg_infos
, endian
, thread_id
=None):
1226 self
.assertIsNotNone(reg_infos
)
1228 successful_writes
= 0
1231 for reg_info
in reg_infos
:
1232 # Use the lldb register index added to the reg info. We're not necessarily
1233 # working off a full set of register infos, so an inferred register
1234 # index could be wrong.
1235 reg_index
= reg_info
["lldb_register_index"]
1236 self
.assertIsNotNone(reg_index
)
1238 reg_byte_size
= int(reg_info
["bitsize"]) // 8
1239 self
.assertTrue(reg_byte_size
> 0)
1241 # Handle thread suffix.
1243 p_request
= "read packet: $p{:x};thread:{:x}#00".format(
1244 reg_index
, thread_id
1247 p_request
= "read packet: $p{:x}#00".format(reg_index
)
1249 # Read the existing value.
1250 self
.reset_test_sequence()
1251 self
.test_sequence
.add_log_lines(
1255 "direction": "send",
1256 "regex": r
"^\$([0-9a-fA-F]+)#",
1257 "capture": {1: "p_response"},
1262 context
= self
.expect_gdbremote_sequence()
1263 self
.assertIsNotNone(context
)
1265 # Verify the response length.
1266 p_response
= context
.get("p_response")
1267 self
.assertIsNotNone(p_response
)
1268 initial_reg_value
= unpack_register_hex_unsigned(endian
, p_response
)
1270 # Flip the value by xoring with all 1s
1271 all_one_bits_raw
= "ff" * (int(reg_info
["bitsize"]) // 8)
1272 flipped_bits_int
= initial_reg_value ^
int(all_one_bits_raw
, 16)
1273 # print("reg (index={}, name={}): val={}, flipped bits (int={}, hex={:x})".format(reg_index, reg_info["name"], initial_reg_value, flipped_bits_int, flipped_bits_int))
1275 # Handle thread suffix for P.
1277 P_request
= "read packet: $P{:x}={};thread:{:x}#00".format(
1280 endian
, flipped_bits_int
, byte_size
=reg_byte_size
1285 P_request
= "read packet: $P{:x}={}#00".format(
1288 endian
, flipped_bits_int
, byte_size
=reg_byte_size
1292 # Write the flipped value to the register.
1293 self
.reset_test_sequence()
1294 self
.test_sequence
.add_log_lines(
1298 "direction": "send",
1299 "regex": r
"^\$(OK|E[0-9a-fA-F]+)#[0-9a-fA-F]{2}",
1300 "capture": {1: "P_response"},
1305 context
= self
.expect_gdbremote_sequence()
1306 self
.assertIsNotNone(context
)
1308 # Determine if the write succeeded. There are a handful of registers that can fail, or partially fail
1309 # (e.g. flags, segment selectors, etc.) due to register value restrictions. Don't worry about them
1310 # all flipping perfectly.
1311 P_response
= context
.get("P_response")
1312 self
.assertIsNotNone(P_response
)
1313 if P_response
== "OK":
1314 successful_writes
+= 1
1317 # print("reg (index={}, name={}) write FAILED (error: {})".format(reg_index, reg_info["name"], P_response))
1319 # Read back the register value, ensure it matches the flipped
1321 if P_response
== "OK":
1322 self
.reset_test_sequence()
1323 self
.test_sequence
.add_log_lines(
1327 "direction": "send",
1328 "regex": r
"^\$([0-9a-fA-F]+)#",
1329 "capture": {1: "p_response"},
1334 context
= self
.expect_gdbremote_sequence()
1335 self
.assertIsNotNone(context
)
1337 verify_p_response_raw
= context
.get("p_response")
1338 self
.assertIsNotNone(verify_p_response_raw
)
1339 verify_bits
= unpack_register_hex_unsigned(
1340 endian
, verify_p_response_raw
1343 if verify_bits
!= flipped_bits_int
:
1344 # Some registers, like mxcsrmask and others, will permute what's written. Adjust succeed/fail counts.
1345 # print("reg (index={}, name={}): read verify FAILED: wrote {:x}, verify read back {:x}".format(reg_index, reg_info["name"], flipped_bits_int, verify_bits))
1346 successful_writes
-= 1
1349 return (successful_writes
, failed_writes
)
1351 def is_bit_flippable_register(self
, reg_info
):
1354 if not "set" in reg_info
:
1356 if reg_info
["set"] != "General Purpose Registers":
1358 if ("container-regs" in reg_info
) and (len(reg_info
["container-regs"]) > 0):
1359 # Don't try to bit flip registers contained in another register.
1361 if re
.match("^.s$", reg_info
["name"]):
1362 # This is a 2-letter register name that ends in "s", like a segment register.
1363 # Don't try to bit flip these.
1365 if re
.match("^(c|)psr$", reg_info
["name"]):
1366 # This is an ARM program status register; don't flip it.
1368 # Okay, this looks fine-enough.
1371 def read_register_values(self
, reg_infos
, endian
, thread_id
=None):
1372 self
.assertIsNotNone(reg_infos
)
1375 for reg_info
in reg_infos
:
1376 # We append a register index when load reg infos so we can work
1378 reg_index
= reg_info
.get("lldb_register_index")
1379 self
.assertIsNotNone(reg_index
)
1381 # Handle thread suffix.
1383 p_request
= "read packet: $p{:x};thread:{:x}#00".format(
1384 reg_index
, thread_id
1387 p_request
= "read packet: $p{:x}#00".format(reg_index
)
1390 self
.reset_test_sequence()
1391 self
.test_sequence
.add_log_lines(
1395 "direction": "send",
1396 "regex": r
"^\$([0-9a-fA-F]+)#",
1397 "capture": {1: "p_response"},
1402 context
= self
.expect_gdbremote_sequence()
1403 self
.assertIsNotNone(context
)
1405 # Convert value from target endian to integral.
1406 p_response
= context
.get("p_response")
1407 self
.assertIsNotNone(p_response
)
1408 self
.assertTrue(len(p_response
) > 0)
1409 self
.assertFalse(p_response
[0] == "E")
1411 values
[reg_index
] = unpack_register_hex_unsigned(endian
, p_response
)
1415 def add_vCont_query_packets(self
):
1416 self
.test_sequence
.add_log_lines(
1418 "read packet: $vCont?#49",
1420 "direction": "send",
1421 "regex": r
"^\$(vCont)?(.*)#[0-9a-fA-F]{2}$",
1422 "capture": {2: "vCont_query_response"},
1428 def parse_vCont_query_response(self
, context
):
1429 self
.assertIsNotNone(context
)
1430 vCont_query_response
= context
.get("vCont_query_response")
1432 # Handle case of no vCont support at all - in which case the capture
1433 # group will be none or zero length.
1434 if not vCont_query_response
or len(vCont_query_response
) == 0:
1438 key
: 1 for key
in vCont_query_response
.split(";") if key
and len(key
) > 0
1441 def count_single_steps_until_true(
1448 step_instruction
="s",
1450 """Used by single step test that appears in a few different contexts."""
1451 single_step_count
= 0
1453 while single_step_count
< max_step_count
:
1454 self
.assertIsNotNone(thread_id
)
1456 # Build the packet for the single step instruction. We replace
1457 # {thread}, if present, with the thread_id.
1458 step_packet
= "read packet: ${}#00".format(
1459 re
.sub(r
"{thread}", "{:x}".format(thread_id
), step_instruction
)
1461 # print("\nstep_packet created: {}\n".format(step_packet))
1464 self
.reset_test_sequence()
1466 self
.test_sequence
.add_log_lines(
1467 [ # Set the continue thread.
1468 "read packet: $Hc{0:x}#00".format(thread_id
),
1469 "send packet: $OK#00",
1473 self
.test_sequence
.add_log_lines(
1477 # "read packet: $vCont;s:{0:x}#00".format(thread_id),
1478 # Expect a breakpoint stop report.
1480 "direction": "send",
1481 "regex": r
"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);",
1482 "capture": {1: "stop_signo", 2: "stop_thread_id"},
1487 context
= self
.expect_gdbremote_sequence()
1488 self
.assertIsNotNone(context
)
1489 self
.assertIsNotNone(context
.get("stop_signo"))
1491 int(context
.get("stop_signo"), 16),
1492 lldbutil
.get_signal_number("SIGTRAP"),
1495 single_step_count
+= 1
1497 # See if the predicate is true. If so, we're done.
1499 return (True, single_step_count
)
1501 # The predicate didn't return true within the runaway step count.
1502 return (False, single_step_count
)
1504 def g_c1_c2_contents_are(self
, args
):
1505 """Used by single step test that appears in a few different contexts."""
1506 g_c1_address
= args
["g_c1_address"]
1507 g_c2_address
= args
["g_c2_address"]
1508 expected_g_c1
= args
["expected_g_c1"]
1509 expected_g_c2
= args
["expected_g_c2"]
1511 # Read g_c1 and g_c2 contents.
1512 self
.reset_test_sequence()
1513 self
.test_sequence
.add_log_lines(
1515 "read packet: $m{0:x},{1:x}#00".format(g_c1_address
, 1),
1517 "direction": "send",
1518 "regex": r
"^\$(.+)#[0-9a-fA-F]{2}$",
1519 "capture": {1: "g_c1_contents"},
1521 "read packet: $m{0:x},{1:x}#00".format(g_c2_address
, 1),
1523 "direction": "send",
1524 "regex": r
"^\$(.+)#[0-9a-fA-F]{2}$",
1525 "capture": {1: "g_c2_contents"},
1531 # Run the packet stream.
1532 context
= self
.expect_gdbremote_sequence()
1533 self
.assertIsNotNone(context
)
1535 # Check if what we read from inferior memory is what we are expecting.
1536 self
.assertIsNotNone(context
.get("g_c1_contents"))
1537 self
.assertIsNotNone(context
.get("g_c2_contents"))
1539 return (seven
.unhexlify(context
.get("g_c1_contents")) == expected_g_c1
) and (
1540 seven
.unhexlify(context
.get("g_c2_contents")) == expected_g_c2
1543 def single_step_only_steps_one_instruction(
1544 self
, use_Hc_packet
=True, step_instruction
="s"
1546 """Used by single step test that appears in a few different contexts."""
1547 # Start up the inferior.
1548 procs
= self
.prep_debug_monitor_and_inferior(
1550 "get-code-address-hex:swap_chars",
1551 "get-data-address-hex:g_c1",
1552 "get-data-address-hex:g_c2",
1554 "call-function:swap_chars",
1560 self
.test_sequence
.add_log_lines(
1561 [ # Start running after initial stop.
1562 "read packet: $c#63",
1563 # Match output line that prints the memory address of the function call entry point.
1564 # Note we require launch-only testing so we can get inferior otuput.
1566 "type": "output_match",
1567 "regex": r
"^code address: 0x([0-9a-fA-F]+)\r\ndata address: 0x([0-9a-fA-F]+)\r\ndata address: 0x([0-9a-fA-F]+)\r\n$",
1569 1: "function_address",
1574 # Now stop the inferior.
1575 "read packet: {}".format(chr(3)),
1576 # And wait for the stop notification.
1578 "direction": "send",
1579 "regex": r
"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);",
1580 "capture": {1: "stop_signo", 2: "stop_thread_id"},
1586 # Run the packet stream.
1587 context
= self
.expect_gdbremote_sequence()
1588 self
.assertIsNotNone(context
)
1590 # Grab the main thread id.
1591 self
.assertIsNotNone(context
.get("stop_thread_id"))
1592 main_thread_id
= int(context
.get("stop_thread_id"), 16)
1594 # Grab the function address.
1595 self
.assertIsNotNone(context
.get("function_address"))
1596 function_address
= int(context
.get("function_address"), 16)
1598 # Grab the data addresses.
1599 self
.assertIsNotNone(context
.get("g_c1_address"))
1600 g_c1_address
= int(context
.get("g_c1_address"), 16)
1602 self
.assertIsNotNone(context
.get("g_c2_address"))
1603 g_c2_address
= int(context
.get("g_c2_address"), 16)
1605 # Set a breakpoint at the given address.
1606 if self
.getArchitecture().startswith("arm"):
1607 # TODO: Handle case when setting breakpoint in thumb code
1611 self
.reset_test_sequence()
1612 self
.add_set_breakpoint_packets(
1613 function_address
, do_continue
=True, breakpoint_kind
=BREAKPOINT_KIND
1615 context
= self
.expect_gdbremote_sequence()
1616 self
.assertIsNotNone(context
)
1618 # Remove the breakpoint.
1619 self
.reset_test_sequence()
1620 self
.add_remove_breakpoint_packets(
1621 function_address
, breakpoint_kind
=BREAKPOINT_KIND
1623 context
= self
.expect_gdbremote_sequence()
1624 self
.assertIsNotNone(context
)
1626 # Verify g_c1 and g_c2 match expected initial state.
1628 args
["g_c1_address"] = g_c1_address
1629 args
["g_c2_address"] = g_c2_address
1630 args
["expected_g_c1"] = "0"
1631 args
["expected_g_c2"] = "1"
1633 self
.assertTrue(self
.g_c1_c2_contents_are(args
))
1635 # Verify we take only a small number of steps to hit the first state.
1636 # Might need to work through function entry prologue code.
1637 args
["expected_g_c1"] = "1"
1638 args
["expected_g_c2"] = "1"
1639 (state_reached
, step_count
) = self
.count_single_steps_until_true(
1641 self
.g_c1_c2_contents_are
,
1644 use_Hc_packet
=use_Hc_packet
,
1645 step_instruction
=step_instruction
,
1647 self
.assertTrue(state_reached
)
1649 # Verify we hit the next state.
1650 args
["expected_g_c1"] = "1"
1651 args
["expected_g_c2"] = "0"
1652 (state_reached
, step_count
) = self
.count_single_steps_until_true(
1654 self
.g_c1_c2_contents_are
,
1657 use_Hc_packet
=use_Hc_packet
,
1658 step_instruction
=step_instruction
,
1660 self
.assertTrue(state_reached
)
1661 expected_step_count
= 1
1662 arch
= self
.getArchitecture()
1664 # MIPS required "3" (ADDIU, SB, LD) machine instructions for updation
1666 if re
.match("mips", arch
):
1667 expected_step_count
= 3
1668 # S390X requires "2" (LARL, MVI) machine instructions for updation of
1670 if re
.match("s390x", arch
):
1671 expected_step_count
= 2
1672 # ARM64 requires "4" instructions: 2 to compute the address (adrp,
1673 # add), one to materialize the constant (mov) and the store. Once
1674 # addresses and constants are materialized, only one instruction is
1676 if re
.match("arm64", arch
):
1677 before_materialization_step_count
= 4
1678 after_matrialization_step_count
= 1
1681 [before_materialization_step_count
, after_matrialization_step_count
],
1683 expected_step_count
= after_matrialization_step_count
1685 self
.assertEqual(step_count
, expected_step_count
)
1687 # Verify we hit the next state.
1688 args
["expected_g_c1"] = "0"
1689 args
["expected_g_c2"] = "0"
1690 (state_reached
, step_count
) = self
.count_single_steps_until_true(
1692 self
.g_c1_c2_contents_are
,
1695 use_Hc_packet
=use_Hc_packet
,
1696 step_instruction
=step_instruction
,
1698 self
.assertTrue(state_reached
)
1699 self
.assertEqual(step_count
, expected_step_count
)
1701 # Verify we hit the next state.
1702 args
["expected_g_c1"] = "0"
1703 args
["expected_g_c2"] = "1"
1704 (state_reached
, step_count
) = self
.count_single_steps_until_true(
1706 self
.g_c1_c2_contents_are
,
1709 use_Hc_packet
=use_Hc_packet
,
1710 step_instruction
=step_instruction
,
1712 self
.assertTrue(state_reached
)
1713 self
.assertEqual(step_count
, expected_step_count
)
1715 def maybe_strict_output_regex(self
, regex
):
1718 if lldbplatformutil
.hasChattyStderr(self
)
1719 else "^" + regex
+ "$"
1722 def install_and_create_launch_args(self
):
1723 exe_path
= self
.getBuildArtifact("a.out")
1724 if not lldb
.remote_platform
:
1726 remote_path
= lldbutil
.append_to_process_working_directory(
1727 self
, os
.path
.basename(exe_path
)
1729 remote_file_spec
= lldb
.SBFileSpec(remote_path
, False)
1730 err
= lldb
.remote_platform
.Install(
1731 lldb
.SBFileSpec(exe_path
, True), remote_file_spec
1735 "remote_platform.Install('%s', '%s') failed: %s"
1736 % (exe_path
, remote_path
, err
)
1738 return [remote_path
]