Run DCE after a LoopFlatten test to reduce spurious output [nfc]
[llvm-project.git] / lldb / packages / Python / lldbsuite / test / tools / lldb-server / gdbremote_testcase.py
blob3341b6e54a3bc74890f8a813d81281944bfce25a
1 """
2 Base class for gdb-remote test cases.
3 """
5 import errno
6 import os
7 import os.path
8 import random
9 import re
10 import select
11 import socket
12 import subprocess
13 import sys
14 import tempfile
15 import time
16 from lldbsuite.test import configuration
17 from lldbsuite.test.lldbtest import *
18 from lldbsuite.support import seven
19 from lldbgdbserverutils import *
20 import logging
23 class _ConnectionRefused(IOError):
24 pass
27 class GdbRemoteTestCaseFactory(type):
28 def __new__(cls, name, bases, attrs):
29 newattrs = {}
30 for attrname, attrvalue in attrs.items():
31 if not attrname.startswith("test"):
32 newattrs[attrname] = attrvalue
33 continue
35 # If any debug server categories were explicitly tagged, assume
36 # that list to be authoritative. If none were specified, try
37 # all of them.
38 all_categories = set(["debugserver", "llgs"])
39 categories = set(getattr(attrvalue, "categories", [])) & all_categories
40 if not categories:
41 categories = all_categories
43 for cat in categories:
45 @decorators.add_test_categories([cat])
46 @wraps(attrvalue)
47 def test_method(self, attrvalue=attrvalue):
48 return attrvalue(self)
50 method_name = attrname + "_" + cat
51 test_method.__name__ = method_name
52 test_method.debug_server = cat
53 newattrs[method_name] = test_method
55 return super(GdbRemoteTestCaseFactory, cls).__new__(cls, name, bases, newattrs)
58 class GdbRemoteTestCaseBase(Base, metaclass=GdbRemoteTestCaseFactory):
59 # Default time out in seconds. The timeout is increased tenfold under Asan.
60 DEFAULT_TIMEOUT = 20 * (10 if ("ASAN_OPTIONS" in os.environ) else 1)
61 # Default sleep time in seconds. The sleep time is doubled under Asan.
62 DEFAULT_SLEEP = 5 * (2 if ("ASAN_OPTIONS" in os.environ) else 1)
64 _GDBREMOTE_KILL_PACKET = b"$k#6b"
66 # Start the inferior separately, attach to the inferior on the stub
67 # command line.
68 _STARTUP_ATTACH = "attach"
69 # Start the inferior separately, start the stub without attaching, allow
70 # the test to attach to the inferior however it wants (e.g. $vAttach;pid).
71 _STARTUP_ATTACH_MANUALLY = "attach_manually"
72 # Start the stub, and launch the inferior with an $A packet via the
73 # initial packet stream.
74 _STARTUP_LAUNCH = "launch"
76 # GDB Signal numbers that are not target-specific used for common
77 # exceptions
78 TARGET_EXC_BAD_ACCESS = 0x91
79 TARGET_EXC_BAD_INSTRUCTION = 0x92
80 TARGET_EXC_ARITHMETIC = 0x93
81 TARGET_EXC_EMULATION = 0x94
82 TARGET_EXC_SOFTWARE = 0x95
83 TARGET_EXC_BREAKPOINT = 0x96
85 _verbose_log_handler = None
86 _log_formatter = logging.Formatter(fmt="%(asctime)-15s %(levelname)-8s %(message)s")
88 def setUpBaseLogging(self):
89 self.logger = logging.getLogger(__name__)
91 if len(self.logger.handlers) > 0:
92 return # We have set up this handler already
94 self.logger.propagate = False
95 self.logger.setLevel(logging.DEBUG)
97 # log all warnings to stderr
98 handler = logging.StreamHandler()
99 handler.setLevel(logging.WARNING)
100 handler.setFormatter(self._log_formatter)
101 self.logger.addHandler(handler)
103 def isVerboseLoggingRequested(self):
104 # We will report our detailed logs if the user requested that the "gdb-remote" channel is
105 # logged.
106 return any(("gdb-remote" in channel) for channel in lldbtest_config.channels)
108 def getDebugServer(self):
109 method = getattr(self, self.testMethodName)
110 return getattr(method, "debug_server", None)
112 def setUp(self):
113 super(GdbRemoteTestCaseBase, self).setUp()
115 self.setUpBaseLogging()
116 self.debug_monitor_extra_args = []
118 if self.isVerboseLoggingRequested():
119 # If requested, full logs go to a log file
120 self._verbose_log_handler = logging.FileHandler(
121 self.getLogBasenameForCurrentTest() + "-host.log"
123 self._verbose_log_handler.setFormatter(self._log_formatter)
124 self._verbose_log_handler.setLevel(logging.DEBUG)
125 self.logger.addHandler(self._verbose_log_handler)
127 self.test_sequence = GdbRemoteTestSequence(self.logger)
128 self.set_inferior_startup_launch()
129 self.port = self.get_next_port()
130 self.stub_sends_two_stop_notifications_on_kill = False
131 if configuration.lldb_platform_url:
132 if configuration.lldb_platform_url.startswith("unix-"):
133 url_pattern = "(.+)://\[?(.+?)\]?/.*"
134 else:
135 url_pattern = "(.+)://(.+):\d+"
136 scheme, host = re.match(
137 url_pattern, configuration.lldb_platform_url
138 ).groups()
139 if (
140 configuration.lldb_platform_name == "remote-android"
141 and host != "localhost"
143 self.stub_device = host
144 self.stub_hostname = "localhost"
145 else:
146 self.stub_device = None
147 self.stub_hostname = host
148 else:
149 self.stub_hostname = "localhost"
151 debug_server = self.getDebugServer()
152 if debug_server == "debugserver":
153 self._init_debugserver_test()
154 else:
155 self._init_llgs_test()
157 def tearDown(self):
158 self.logger.removeHandler(self._verbose_log_handler)
159 self._verbose_log_handler = None
160 TestBase.tearDown(self)
162 def getLocalServerLogFile(self):
163 return self.getLogBasenameForCurrentTest() + "-server.log"
165 def setUpServerLogging(self, is_llgs):
166 if len(lldbtest_config.channels) == 0:
167 return # No logging requested
169 if lldb.remote_platform:
170 log_file = lldbutil.join_remote_paths(
171 lldb.remote_platform.GetWorkingDirectory(), "server.log"
173 else:
174 log_file = self.getLocalServerLogFile()
176 if is_llgs:
177 self.debug_monitor_extra_args.append("--log-file=" + log_file)
178 self.debug_monitor_extra_args.append(
179 "--log-channels={}".format(":".join(lldbtest_config.channels))
181 else:
182 self.debug_monitor_extra_args = [
183 "--log-file=" + log_file,
184 "--log-flags=0x800000",
187 def get_next_port(self):
188 return 12000 + random.randint(0, 3999)
190 def reset_test_sequence(self):
191 self.test_sequence = GdbRemoteTestSequence(self.logger)
193 def _init_llgs_test(self):
194 reverse_connect = True
195 if lldb.remote_platform:
196 # Reverse connections may be tricky due to firewalls/NATs.
197 reverse_connect = False
199 # FIXME: This is extremely linux-oriented
201 # Grab the ppid from /proc/[shell pid]/stat
202 err, retcode, shell_stat = self.run_platform_command("cat /proc/$$/stat")
203 self.assertTrue(
204 err.Success() and retcode == 0,
205 "Failed to read file /proc/$$/stat: %s, retcode: %d"
206 % (err.GetCString(), retcode),
209 # [pid] ([executable]) [state] [*ppid*]
210 pid = re.match(r"^\d+ \(.+\) . (\d+)", shell_stat).group(1)
211 err, retcode, ls_output = self.run_platform_command(
212 "ls -l /proc/%s/exe" % pid
214 self.assertTrue(
215 err.Success() and retcode == 0,
216 "Failed to read file /proc/%s/exe: %s, retcode: %d"
217 % (pid, err.GetCString(), retcode),
219 exe = ls_output.split()[-1]
221 # If the binary has been deleted, the link name has " (deleted)" appended.
222 # Remove if it's there.
223 self.debug_monitor_exe = re.sub(r" \(deleted\)$", "", exe)
224 else:
225 self.debug_monitor_exe = get_lldb_server_exe()
227 self.debug_monitor_extra_args = ["gdbserver"]
228 self.setUpServerLogging(is_llgs=True)
230 self.reverse_connect = reverse_connect
232 def _init_debugserver_test(self):
233 self.debug_monitor_exe = get_debugserver_exe()
234 self.setUpServerLogging(is_llgs=False)
235 self.reverse_connect = True
237 # The debugserver stub has a race on handling the 'k' command, so it sends an X09 right away, then sends the real X notification
238 # when the process truly dies.
239 self.stub_sends_two_stop_notifications_on_kill = True
241 def forward_adb_port(self, source, target, direction, device):
242 adb = ["adb"] + (["-s", device] if device else []) + [direction]
244 def remove_port_forward():
245 subprocess.call(adb + ["--remove", "tcp:%d" % source])
247 subprocess.call(adb + ["tcp:%d" % source, "tcp:%d" % target])
248 self.addTearDownHook(remove_port_forward)
250 def _verify_socket(self, sock):
251 # Normally, when the remote stub is not ready, we will get ECONNREFUSED during the
252 # connect() attempt. However, due to the way how ADB forwarding works, on android targets
253 # the connect() will always be successful, but the connection will be immediately dropped
254 # if ADB could not connect on the remote side. This function tries to detect this
255 # situation, and report it as "connection refused" so that the upper layers attempt the
256 # connection again.
257 triple = self.dbg.GetSelectedPlatform().GetTriple()
258 if not re.match(".*-.*-.*-android", triple):
259 return # Not android.
260 can_read, _, _ = select.select([sock], [], [], 0.1)
261 if sock not in can_read:
262 return # Data is not available, but the connection is alive.
263 if len(sock.recv(1, socket.MSG_PEEK)) == 0:
264 raise _ConnectionRefused() # Got EOF, connection dropped.
266 def create_socket(self):
267 try:
268 sock = socket.socket(family=socket.AF_INET)
269 except OSError as e:
270 if e.errno != errno.EAFNOSUPPORT:
271 raise
272 sock = socket.socket(family=socket.AF_INET6)
274 logger = self.logger
276 triple = self.dbg.GetSelectedPlatform().GetTriple()
277 if re.match(".*-.*-.*-android", triple):
278 self.forward_adb_port(self.port, self.port, "forward", self.stub_device)
280 logger.info(
281 "Connecting to debug monitor on %s:%d", self.stub_hostname, self.port
283 connect_info = (self.stub_hostname, self.port)
284 try:
285 sock.connect(connect_info)
286 except socket.error as serr:
287 if serr.errno == errno.ECONNREFUSED:
288 raise _ConnectionRefused()
289 raise serr
291 def shutdown_socket():
292 if sock:
293 try:
294 # send the kill packet so lldb-server shuts down gracefully
295 sock.sendall(GdbRemoteTestCaseBase._GDBREMOTE_KILL_PACKET)
296 except:
297 logger.warning(
298 "failed to send kill packet to debug monitor: {}; ignoring".format(
299 sys.exc_info()[0]
303 try:
304 sock.close()
305 except:
306 logger.warning(
307 "failed to close socket to debug monitor: {}; ignoring".format(
308 sys.exc_info()[0]
312 self.addTearDownHook(shutdown_socket)
314 self._verify_socket(sock)
316 return sock
318 def set_inferior_startup_launch(self):
319 self._inferior_startup = self._STARTUP_LAUNCH
321 def set_inferior_startup_attach(self):
322 self._inferior_startup = self._STARTUP_ATTACH
324 def set_inferior_startup_attach_manually(self):
325 self._inferior_startup = self._STARTUP_ATTACH_MANUALLY
327 def get_debug_monitor_command_line_args(self, attach_pid=None):
328 commandline_args = self.debug_monitor_extra_args
329 if attach_pid:
330 commandline_args += ["--attach=%d" % attach_pid]
331 if self.reverse_connect:
332 commandline_args += ["--reverse-connect", self.connect_address]
333 else:
334 if lldb.remote_platform:
335 commandline_args += ["*:{}".format(self.port)]
336 else:
337 commandline_args += ["localhost:{}".format(self.port)]
339 return commandline_args
341 def get_target_byte_order(self):
342 inferior_exe_path = self.getBuildArtifact("a.out")
343 target = self.dbg.CreateTarget(inferior_exe_path)
344 return target.GetByteOrder()
346 def launch_debug_monitor(self, attach_pid=None, logfile=None):
347 if self.reverse_connect:
348 family, type, proto, _, addr = socket.getaddrinfo(
349 "localhost", 0, proto=socket.IPPROTO_TCP
350 )[0]
351 sock = socket.socket(family, type, proto)
352 sock.settimeout(self.DEFAULT_TIMEOUT)
354 sock.bind(addr)
355 sock.listen(1)
356 addr = sock.getsockname()
357 self.connect_address = "[{}]:{}".format(*addr)
359 # Create the command line.
360 commandline_args = self.get_debug_monitor_command_line_args(
361 attach_pid=attach_pid
364 # Start the server.
365 server = self.spawnSubprocess(
366 self.debug_monitor_exe, commandline_args, install_remote=False
368 self.assertIsNotNone(server)
370 if self.reverse_connect:
371 self.sock = sock.accept()[0]
372 self.sock.settimeout(self.DEFAULT_TIMEOUT)
374 return server
376 def connect_to_debug_monitor(self, attach_pid=None):
377 if self.reverse_connect:
378 # Create the stub.
379 server = self.launch_debug_monitor(attach_pid=attach_pid)
380 self.assertIsNotNone(server)
382 # Schedule debug monitor to be shut down during teardown.
383 logger = self.logger
385 self._server = Server(self.sock, server)
386 return server
388 # We're using a random port algorithm to try not to collide with other ports,
389 # and retry a max # times.
390 attempts = 0
391 MAX_ATTEMPTS = 20
393 while attempts < MAX_ATTEMPTS:
394 server = self.launch_debug_monitor(attach_pid=attach_pid)
396 # Schedule debug monitor to be shut down during teardown.
397 logger = self.logger
399 connect_attemps = 0
400 MAX_CONNECT_ATTEMPTS = 10
402 while connect_attemps < MAX_CONNECT_ATTEMPTS:
403 # Create a socket to talk to the server
404 try:
405 logger.info("Connect attempt %d", connect_attemps + 1)
406 self.sock = self.create_socket()
407 self._server = Server(self.sock, server)
408 return server
409 except _ConnectionRefused as serr:
410 # Ignore, and try again.
411 pass
412 time.sleep(0.5)
413 connect_attemps += 1
415 # We should close the server here to be safe.
416 server.terminate()
418 # Increment attempts.
419 print(
420 "connect to debug monitor on port %d failed, attempt #%d of %d"
421 % (self.port, attempts + 1, MAX_ATTEMPTS)
423 attempts += 1
425 # And wait a random length of time before next attempt, to avoid
426 # collisions.
427 time.sleep(random.randint(1, 5))
429 # Now grab a new port number.
430 self.port = self.get_next_port()
432 raise Exception(
433 "failed to create a socket to the launched debug monitor after %d tries"
434 % attempts
437 def launch_process_for_attach(
438 self, inferior_args=None, sleep_seconds=3, exe_path=None
440 # We're going to start a child process that the debug monitor stub can later attach to.
441 # This process needs to be started so that it just hangs around for a while. We'll
442 # have it sleep.
443 if not exe_path:
444 exe_path = self.getBuildArtifact("a.out")
446 args = []
447 if inferior_args:
448 args.extend(inferior_args)
449 if sleep_seconds:
450 args.append("sleep:%d" % sleep_seconds)
452 return self.spawnSubprocess(exe_path, args)
454 def prep_debug_monitor_and_inferior(
455 self,
456 inferior_args=None,
457 inferior_sleep_seconds=3,
458 inferior_exe_path=None,
459 inferior_env=None,
461 """Prep the debug monitor, the inferior, and the expected packet stream.
463 Handle the separate cases of using the debug monitor in attach-to-inferior mode
464 and in launch-inferior mode.
466 For attach-to-inferior mode, the inferior process is first started, then
467 the debug monitor is started in attach to pid mode (using --attach on the
468 stub command line), and the no-ack-mode setup is appended to the packet
469 stream. The packet stream is not yet executed, ready to have more expected
470 packet entries added to it.
472 For launch-inferior mode, the stub is first started, then no ack mode is
473 setup on the expected packet stream, then the verified launch packets are added
474 to the expected socket stream. The packet stream is not yet executed, ready
475 to have more expected packet entries added to it.
477 The return value is:
478 {inferior:<inferior>, server:<server>}
480 inferior = None
481 attach_pid = None
483 if (
484 self._inferior_startup == self._STARTUP_ATTACH
485 or self._inferior_startup == self._STARTUP_ATTACH_MANUALLY
487 # Launch the process that we'll use as the inferior.
488 inferior = self.launch_process_for_attach(
489 inferior_args=inferior_args,
490 sleep_seconds=inferior_sleep_seconds,
491 exe_path=inferior_exe_path,
493 self.assertIsNotNone(inferior)
494 self.assertTrue(inferior.pid > 0)
495 if self._inferior_startup == self._STARTUP_ATTACH:
496 # In this case, we want the stub to attach via the command
497 # line, so set the command line attach pid here.
498 attach_pid = inferior.pid
500 if self._inferior_startup == self._STARTUP_LAUNCH:
501 # Build launch args
502 if not inferior_exe_path:
503 inferior_exe_path = self.getBuildArtifact("a.out")
505 if lldb.remote_platform:
506 remote_path = lldbutil.append_to_process_working_directory(
507 self, os.path.basename(inferior_exe_path)
509 remote_file_spec = lldb.SBFileSpec(remote_path, False)
510 err = lldb.remote_platform.Install(
511 lldb.SBFileSpec(inferior_exe_path, True), remote_file_spec
513 if err.Fail():
514 raise Exception(
515 "remote_platform.Install('%s', '%s') failed: %s"
516 % (inferior_exe_path, remote_path, err)
518 inferior_exe_path = remote_path
520 launch_args = [inferior_exe_path]
521 if inferior_args:
522 launch_args.extend(inferior_args)
524 # Launch the debug monitor stub, attaching to the inferior.
525 server = self.connect_to_debug_monitor(attach_pid=attach_pid)
526 self.assertIsNotNone(server)
528 self.do_handshake()
530 # Build the expected protocol stream
531 if inferior_env:
532 for name, value in inferior_env.items():
533 self.add_set_environment_packets(name, value)
534 if self._inferior_startup == self._STARTUP_LAUNCH:
535 self.add_verified_launch_packets(launch_args)
537 return {"inferior": inferior, "server": server}
539 def do_handshake(self):
540 server = self._server
541 server.send_ack()
542 server.send_packet(b"QStartNoAckMode")
543 self.assertEqual(server.get_normal_packet(), b"+")
544 self.assertEqual(server.get_normal_packet(), b"OK")
545 server.send_ack()
547 def add_verified_launch_packets(self, launch_args):
548 self.test_sequence.add_log_lines(
550 "read packet: %s" % build_gdbremote_A_packet(launch_args),
551 "send packet: $OK#00",
552 "read packet: $qLaunchSuccess#a5",
553 "send packet: $OK#00",
555 True,
558 def add_thread_suffix_request_packets(self):
559 self.test_sequence.add_log_lines(
561 "read packet: $QThreadSuffixSupported#e4",
562 "send packet: $OK#00",
564 True,
567 def add_process_info_collection_packets(self):
568 self.test_sequence.add_log_lines(
570 "read packet: $qProcessInfo#dc",
572 "direction": "send",
573 "regex": r"^\$(.+)#[0-9a-fA-F]{2}$",
574 "capture": {1: "process_info_raw"},
577 True,
580 def add_set_environment_packets(self, name, value):
581 self.test_sequence.add_log_lines(
583 "read packet: $QEnvironment:" + name + "=" + value + "#00",
584 "send packet: $OK#00",
586 True,
589 _KNOWN_PROCESS_INFO_KEYS = [
590 "pid",
591 "parent-pid",
592 "real-uid",
593 "real-gid",
594 "effective-uid",
595 "effective-gid",
596 "cputype",
597 "cpusubtype",
598 "ostype",
599 "triple",
600 "vendor",
601 "endian",
602 "elf_abi",
603 "ptrsize",
606 def parse_process_info_response(self, context):
607 # Ensure we have a process info response.
608 self.assertIsNotNone(context)
609 process_info_raw = context.get("process_info_raw")
610 self.assertIsNotNone(process_info_raw)
612 # Pull out key:value; pairs.
613 process_info_dict = {
614 match.group(1): match.group(2)
615 for match in re.finditer(r"([^:]+):([^;]+);", process_info_raw)
618 # Validate keys are known.
619 for key, val in list(process_info_dict.items()):
620 self.assertTrue(key in self._KNOWN_PROCESS_INFO_KEYS)
621 self.assertIsNotNone(val)
623 return process_info_dict
625 def add_register_info_collection_packets(self):
626 self.test_sequence.add_log_lines(
629 "type": "multi_response",
630 "query": "qRegisterInfo",
631 "append_iteration_suffix": True,
632 "end_regex": re.compile(r"^\$(E\d+)?#[0-9a-fA-F]{2}$"),
633 "save_key": "reg_info_responses",
636 True,
639 def parse_register_info_packets(self, context):
640 """Return an array of register info dictionaries, one per register info."""
641 reg_info_responses = context.get("reg_info_responses")
642 self.assertIsNotNone(reg_info_responses)
644 # Parse register infos.
645 return [
646 parse_reg_info_response(reg_info_response)
647 for reg_info_response in reg_info_responses
650 def expect_gdbremote_sequence(self):
651 return expect_lldb_gdbserver_replay(
652 self,
653 self._server,
654 self.test_sequence,
655 self.DEFAULT_TIMEOUT * len(self.test_sequence),
656 self.logger,
659 _KNOWN_REGINFO_KEYS = [
660 "name",
661 "alt-name",
662 "bitsize",
663 "offset",
664 "encoding",
665 "format",
666 "set",
667 "gcc",
668 "ehframe",
669 "dwarf",
670 "generic",
671 "container-regs",
672 "invalidate-regs",
673 "dynamic_size_dwarf_expr_bytes",
674 "dynamic_size_dwarf_len",
677 def assert_valid_reg_info(self, reg_info):
678 # Assert we know about all the reginfo keys parsed.
679 for key in reg_info:
680 self.assertTrue(key in self._KNOWN_REGINFO_KEYS)
682 # Check the bare-minimum expected set of register info keys.
683 self.assertTrue("name" in reg_info)
684 self.assertTrue("bitsize" in reg_info)
686 if not self.getArchitecture() == "aarch64":
687 self.assertTrue("offset" in reg_info)
689 self.assertTrue("encoding" in reg_info)
690 self.assertTrue("format" in reg_info)
692 def find_pc_reg_info(self, reg_infos):
693 lldb_reg_index = 0
694 for reg_info in reg_infos:
695 if ("generic" in reg_info) and (reg_info["generic"] == "pc"):
696 return (lldb_reg_index, reg_info)
697 lldb_reg_index += 1
699 return (None, None)
701 def add_lldb_register_index(self, reg_infos):
702 """Add a "lldb_register_index" key containing the 0-baed index of each reg_infos entry.
704 We'll use this when we want to call packets like P/p with a register index but do so
705 on only a subset of the full register info set.
707 self.assertIsNotNone(reg_infos)
709 reg_index = 0
710 for reg_info in reg_infos:
711 reg_info["lldb_register_index"] = reg_index
712 reg_index += 1
714 def add_query_memory_region_packets(self, address):
715 self.test_sequence.add_log_lines(
717 "read packet: $qMemoryRegionInfo:{0:x}#00".format(address),
719 "direction": "send",
720 "regex": r"^\$(.+)#[0-9a-fA-F]{2}$",
721 "capture": {1: "memory_region_response"},
724 True,
727 def parse_key_val_dict(self, key_val_text, allow_dupes=True):
728 self.assertIsNotNone(key_val_text)
729 kv_dict = {}
730 for match in re.finditer(r";?([^:]+):([^;]+)", key_val_text):
731 key = match.group(1)
732 val = match.group(2)
733 if key in kv_dict:
734 if allow_dupes:
735 if isinstance(kv_dict[key], list):
736 kv_dict[key].append(val)
737 else:
738 # Promote to list
739 kv_dict[key] = [kv_dict[key], val]
740 else:
741 self.fail(
742 "key '{}' already present when attempting to add value '{}' (text='{}', dict={})".format(
743 key, val, key_val_text, kv_dict
746 else:
747 kv_dict[key] = val
748 return kv_dict
750 def parse_memory_region_packet(self, context):
751 # Ensure we have a context.
752 self.assertIsNotNone(context.get("memory_region_response"))
754 # Pull out key:value; pairs.
755 mem_region_dict = self.parse_key_val_dict(context.get("memory_region_response"))
757 # Validate keys are known.
758 for key, val in list(mem_region_dict.items()):
759 self.assertIn(
760 key,
762 "start",
763 "size",
764 "permissions",
765 "flags",
766 "name",
767 "error",
768 "dirty-pages",
769 "type",
772 self.assertIsNotNone(val)
774 mem_region_dict["name"] = seven.unhexlify(mem_region_dict.get("name", ""))
775 # Return the dictionary of key-value pairs for the memory region.
776 return mem_region_dict
778 def assert_address_within_memory_region(self, test_address, mem_region_dict):
779 self.assertIsNotNone(mem_region_dict)
780 self.assertTrue("start" in mem_region_dict)
781 self.assertTrue("size" in mem_region_dict)
783 range_start = int(mem_region_dict["start"], 16)
784 range_size = int(mem_region_dict["size"], 16)
785 range_end = range_start + range_size
787 if test_address < range_start:
788 self.fail(
789 "address 0x{0:x} comes before range 0x{1:x} - 0x{2:x} (size 0x{3:x})".format(
790 test_address, range_start, range_end, range_size
793 elif test_address >= range_end:
794 self.fail(
795 "address 0x{0:x} comes after range 0x{1:x} - 0x{2:x} (size 0x{3:x})".format(
796 test_address, range_start, range_end, range_size
800 def add_threadinfo_collection_packets(self):
801 self.test_sequence.add_log_lines(
804 "type": "multi_response",
805 "first_query": "qfThreadInfo",
806 "next_query": "qsThreadInfo",
807 "append_iteration_suffix": False,
808 "end_regex": re.compile(r"^\$(l)?#[0-9a-fA-F]{2}$"),
809 "save_key": "threadinfo_responses",
812 True,
815 def parse_threadinfo_packets(self, context):
816 """Return an array of thread ids (decimal ints), one per thread."""
817 threadinfo_responses = context.get("threadinfo_responses")
818 self.assertIsNotNone(threadinfo_responses)
820 thread_ids = []
821 for threadinfo_response in threadinfo_responses:
822 new_thread_infos = parse_threadinfo_response(threadinfo_response)
823 thread_ids.extend(new_thread_infos)
824 return thread_ids
826 def launch_with_threads(self, thread_count):
827 procs = self.prep_debug_monitor_and_inferior(
828 inferior_args=["thread:new"] * (thread_count - 1) + ["trap"]
831 self.test_sequence.add_log_lines(
833 "read packet: $c#00",
835 "direction": "send",
836 "regex": r"^\$T([0-9a-fA-F]{2})([^#]*)#..$",
837 "capture": {1: "stop_signo", 2: "stop_reply_kv"},
840 True,
842 self.add_threadinfo_collection_packets()
843 context = self.expect_gdbremote_sequence()
844 threads = self.parse_threadinfo_packets(context)
845 self.assertGreaterEqual(len(threads), thread_count)
846 return context, threads
848 def add_set_breakpoint_packets(
849 self, address, z_packet_type=0, do_continue=True, breakpoint_kind=1
851 self.test_sequence.add_log_lines(
852 [ # Set the breakpoint.
853 "read packet: $Z{2},{0:x},{1}#00".format(
854 address, breakpoint_kind, z_packet_type
856 # Verify the stub could set it.
857 "send packet: $OK#00",
859 True,
862 if do_continue:
863 self.test_sequence.add_log_lines(
864 [ # Continue the inferior.
865 "read packet: $c#63",
866 # Expect a breakpoint stop report.
868 "direction": "send",
869 "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);",
870 "capture": {1: "stop_signo", 2: "stop_thread_id"},
873 True,
876 def add_remove_breakpoint_packets(
877 self, address, z_packet_type=0, breakpoint_kind=1
879 self.test_sequence.add_log_lines(
880 [ # Remove the breakpoint.
881 "read packet: $z{2},{0:x},{1}#00".format(
882 address, breakpoint_kind, z_packet_type
884 # Verify the stub could unset it.
885 "send packet: $OK#00",
887 True,
890 def add_qSupported_packets(self, client_features=[]):
891 features = "".join(";" + x for x in client_features)
892 self.test_sequence.add_log_lines(
894 "read packet: $qSupported{}#00".format(features),
896 "direction": "send",
897 "regex": r"^\$(.*)#[0-9a-fA-F]{2}",
898 "capture": {1: "qSupported_response"},
901 True,
904 _KNOWN_QSUPPORTED_STUB_FEATURES = [
905 "augmented-libraries-svr4-read",
906 "PacketSize",
907 "QStartNoAckMode",
908 "QThreadSuffixSupported",
909 "QListThreadsInStopReply",
910 "qXfer:auxv:read",
911 "qXfer:libraries:read",
912 "qXfer:libraries-svr4:read",
913 "qXfer:features:read",
914 "qXfer:siginfo:read",
915 "qEcho",
916 "QPassSignals",
917 "multiprocess",
918 "fork-events",
919 "vfork-events",
920 "memory-tagging",
921 "qSaveCore",
922 "native-signals",
923 "QNonStop",
926 def parse_qSupported_response(self, context):
927 self.assertIsNotNone(context)
929 raw_response = context.get("qSupported_response")
930 self.assertIsNotNone(raw_response)
932 # For values with key=val, the dict key and vals are set as expected. For feature+, feature- and feature?, the
933 # +,-,? is stripped from the key and set as the value.
934 supported_dict = {}
935 for match in re.finditer(r";?([^=;]+)(=([^;]+))?", raw_response):
936 key = match.group(1)
937 val = match.group(3)
939 # key=val: store as is
940 if val and len(val) > 0:
941 supported_dict[key] = val
942 else:
943 if len(key) < 2:
944 raise Exception(
945 "singular stub feature is too short: must be stub_feature{+,-,?}"
947 supported_type = key[-1]
948 key = key[:-1]
949 if not supported_type in ["+", "-", "?"]:
950 raise Exception(
951 "malformed stub feature: final character {} not in expected set (+,-,?)".format(
952 supported_type
955 supported_dict[key] = supported_type
956 # Ensure we know the supported element
957 if key not in self._KNOWN_QSUPPORTED_STUB_FEATURES:
958 raise Exception("unknown qSupported stub feature reported: %s" % key)
960 return supported_dict
962 def continue_process_and_wait_for_stop(self):
963 self.test_sequence.add_log_lines(
965 "read packet: $vCont;c#a8",
967 "direction": "send",
968 "regex": r"^\$T([0-9a-fA-F]{2})(.*)#[0-9a-fA-F]{2}$",
969 "capture": {1: "stop_signo", 2: "stop_key_val_text"},
972 True,
974 context = self.expect_gdbremote_sequence()
975 self.assertIsNotNone(context)
976 return self.parse_interrupt_packets(context)
978 def select_modifiable_register(self, reg_infos):
979 """Find a register that can be read/written freely."""
980 PREFERRED_REGISTER_NAMES = set(
982 "rax",
986 # First check for the first register from the preferred register name
987 # set.
988 alternative_register_index = None
990 self.assertIsNotNone(reg_infos)
991 for reg_info in reg_infos:
992 if ("name" in reg_info) and (reg_info["name"] in PREFERRED_REGISTER_NAMES):
993 # We found a preferred register. Use it.
994 return reg_info["lldb_register_index"]
995 if ("generic" in reg_info) and (
996 reg_info["generic"] == "fp" or reg_info["generic"] == "arg1"
998 # A frame pointer or first arg register will do as a
999 # register to modify temporarily.
1000 alternative_register_index = reg_info["lldb_register_index"]
1002 # We didn't find a preferred register. Return whatever alternative register
1003 # we found, if any.
1004 return alternative_register_index
1006 def extract_registers_from_stop_notification(self, stop_key_vals_text):
1007 self.assertIsNotNone(stop_key_vals_text)
1008 kv_dict = self.parse_key_val_dict(stop_key_vals_text)
1010 registers = {}
1011 for key, val in list(kv_dict.items()):
1012 if re.match(r"^[0-9a-fA-F]+$", key):
1013 registers[int(key, 16)] = val
1014 return registers
1016 def gather_register_infos(self):
1017 self.reset_test_sequence()
1018 self.add_register_info_collection_packets()
1020 context = self.expect_gdbremote_sequence()
1021 self.assertIsNotNone(context)
1023 reg_infos = self.parse_register_info_packets(context)
1024 self.assertIsNotNone(reg_infos)
1025 self.add_lldb_register_index(reg_infos)
1027 return reg_infos
1029 def find_generic_register_with_name(self, reg_infos, generic_name):
1030 self.assertIsNotNone(reg_infos)
1031 for reg_info in reg_infos:
1032 if ("generic" in reg_info) and (reg_info["generic"] == generic_name):
1033 return reg_info
1034 return None
1036 def find_register_with_name_and_dwarf_regnum(self, reg_infos, name, dwarf_num):
1037 self.assertIsNotNone(reg_infos)
1038 for reg_info in reg_infos:
1039 if (reg_info["name"] == name) and (reg_info["dwarf"] == dwarf_num):
1040 return reg_info
1041 return None
1043 def decode_gdbremote_binary(self, encoded_bytes):
1044 decoded_bytes = ""
1045 i = 0
1046 while i < len(encoded_bytes):
1047 if encoded_bytes[i] == "}":
1048 # Handle escaped char.
1049 self.assertTrue(i + 1 < len(encoded_bytes))
1050 decoded_bytes += chr(ord(encoded_bytes[i + 1]) ^ 0x20)
1051 i += 2
1052 elif encoded_bytes[i] == "*":
1053 # Handle run length encoding.
1054 self.assertTrue(len(decoded_bytes) > 0)
1055 self.assertTrue(i + 1 < len(encoded_bytes))
1056 repeat_count = ord(encoded_bytes[i + 1]) - 29
1057 decoded_bytes += decoded_bytes[-1] * repeat_count
1058 i += 2
1059 else:
1060 decoded_bytes += encoded_bytes[i]
1061 i += 1
1062 return decoded_bytes
1064 def build_auxv_dict(self, endian, word_size, auxv_data):
1065 self.assertIsNotNone(endian)
1066 self.assertIsNotNone(word_size)
1067 self.assertIsNotNone(auxv_data)
1069 auxv_dict = {}
1071 # PowerPC64le's auxvec has a special key that must be ignored.
1072 # This special key may be used multiple times, resulting in
1073 # multiple key/value pairs with the same key, which would otherwise
1074 # break this test check for repeated keys.
1076 # AT_IGNOREPPC = 22
1077 ignored_keys_for_arch = {"powerpc64le": [22]}
1078 arch = self.getArchitecture()
1079 ignore_keys = None
1080 if arch in ignored_keys_for_arch:
1081 ignore_keys = ignored_keys_for_arch[arch]
1083 while len(auxv_data) > 0:
1084 # Chop off key.
1085 raw_key = auxv_data[:word_size]
1086 auxv_data = auxv_data[word_size:]
1088 # Chop of value.
1089 raw_value = auxv_data[:word_size]
1090 auxv_data = auxv_data[word_size:]
1092 # Convert raw text from target endian.
1093 key = unpack_endian_binary_string(endian, raw_key)
1094 value = unpack_endian_binary_string(endian, raw_value)
1096 if ignore_keys and key in ignore_keys:
1097 continue
1099 # Handle ending entry.
1100 if key == 0:
1101 self.assertEqual(value, 0)
1102 return auxv_dict
1104 # The key should not already be present.
1105 self.assertFalse(key in auxv_dict)
1106 auxv_dict[key] = value
1108 self.fail(
1109 "should not reach here - implies required double zero entry not found"
1111 return auxv_dict
1113 def read_binary_data_in_chunks(self, command_prefix, chunk_length):
1114 """Collect command_prefix{offset:x},{chunk_length:x} until a single 'l' or 'l' with data is returned."""
1115 offset = 0
1116 done = False
1117 decoded_data = ""
1119 while not done:
1120 # Grab the next iteration of data.
1121 self.reset_test_sequence()
1122 self.test_sequence.add_log_lines(
1124 "read packet: ${}{:x},{:x}:#00".format(
1125 command_prefix, offset, chunk_length
1128 "direction": "send",
1129 "regex": re.compile(
1130 r"^\$([^E])(.*)#[0-9a-fA-F]{2}$", re.MULTILINE | re.DOTALL
1132 "capture": {1: "response_type", 2: "content_raw"},
1135 True,
1138 context = self.expect_gdbremote_sequence()
1139 self.assertIsNotNone(context)
1141 response_type = context.get("response_type")
1142 self.assertIsNotNone(response_type)
1143 self.assertTrue(response_type in ["l", "m"])
1145 # Move offset along.
1146 offset += chunk_length
1148 # Figure out if we're done. We're done if the response type is l.
1149 done = response_type == "l"
1151 # Decode binary data.
1152 content_raw = context.get("content_raw")
1153 if content_raw and len(content_raw) > 0:
1154 self.assertIsNotNone(content_raw)
1155 decoded_data += self.decode_gdbremote_binary(content_raw)
1156 return decoded_data
1158 def add_interrupt_packets(self):
1159 self.test_sequence.add_log_lines(
1161 # Send the intterupt.
1162 "read packet: {}".format(chr(3)),
1163 # And wait for the stop notification.
1165 "direction": "send",
1166 "regex": r"^\$T([0-9a-fA-F]{2})(.*)#[0-9a-fA-F]{2}$",
1167 "capture": {1: "stop_signo", 2: "stop_key_val_text"},
1170 True,
1173 def parse_interrupt_packets(self, context):
1174 self.assertIsNotNone(context.get("stop_signo"))
1175 self.assertIsNotNone(context.get("stop_key_val_text"))
1176 return (
1177 int(context["stop_signo"], 16),
1178 self.parse_key_val_dict(context["stop_key_val_text"]),
1181 def add_QSaveRegisterState_packets(self, thread_id):
1182 if thread_id:
1183 # Use the thread suffix form.
1184 request = "read packet: $QSaveRegisterState;thread:{:x}#00".format(
1185 thread_id
1187 else:
1188 request = "read packet: $QSaveRegisterState#00"
1190 self.test_sequence.add_log_lines(
1192 request,
1194 "direction": "send",
1195 "regex": r"^\$(E?.*)#[0-9a-fA-F]{2}$",
1196 "capture": {1: "save_response"},
1199 True,
1202 def parse_QSaveRegisterState_response(self, context):
1203 self.assertIsNotNone(context)
1205 save_response = context.get("save_response")
1206 self.assertIsNotNone(save_response)
1208 if len(save_response) < 1 or save_response[0] == "E":
1209 # error received
1210 return (False, None)
1211 else:
1212 return (True, int(save_response))
1214 def add_QRestoreRegisterState_packets(self, save_id, thread_id=None):
1215 if thread_id:
1216 # Use the thread suffix form.
1217 request = "read packet: $QRestoreRegisterState:{};thread:{:x}#00".format(
1218 save_id, thread_id
1220 else:
1221 request = "read packet: $QRestoreRegisterState:{}#00".format(save_id)
1223 self.test_sequence.add_log_lines([request, "send packet: $OK#00"], True)
1225 def flip_all_bits_in_each_register_value(self, reg_infos, endian, thread_id=None):
1226 self.assertIsNotNone(reg_infos)
1228 successful_writes = 0
1229 failed_writes = 0
1231 for reg_info in reg_infos:
1232 # Use the lldb register index added to the reg info. We're not necessarily
1233 # working off a full set of register infos, so an inferred register
1234 # index could be wrong.
1235 reg_index = reg_info["lldb_register_index"]
1236 self.assertIsNotNone(reg_index)
1238 reg_byte_size = int(reg_info["bitsize"]) // 8
1239 self.assertTrue(reg_byte_size > 0)
1241 # Handle thread suffix.
1242 if thread_id:
1243 p_request = "read packet: $p{:x};thread:{:x}#00".format(
1244 reg_index, thread_id
1246 else:
1247 p_request = "read packet: $p{:x}#00".format(reg_index)
1249 # Read the existing value.
1250 self.reset_test_sequence()
1251 self.test_sequence.add_log_lines(
1253 p_request,
1255 "direction": "send",
1256 "regex": r"^\$([0-9a-fA-F]+)#",
1257 "capture": {1: "p_response"},
1260 True,
1262 context = self.expect_gdbremote_sequence()
1263 self.assertIsNotNone(context)
1265 # Verify the response length.
1266 p_response = context.get("p_response")
1267 self.assertIsNotNone(p_response)
1268 initial_reg_value = unpack_register_hex_unsigned(endian, p_response)
1270 # Flip the value by xoring with all 1s
1271 all_one_bits_raw = "ff" * (int(reg_info["bitsize"]) // 8)
1272 flipped_bits_int = initial_reg_value ^ int(all_one_bits_raw, 16)
1273 # print("reg (index={}, name={}): val={}, flipped bits (int={}, hex={:x})".format(reg_index, reg_info["name"], initial_reg_value, flipped_bits_int, flipped_bits_int))
1275 # Handle thread suffix for P.
1276 if thread_id:
1277 P_request = "read packet: $P{:x}={};thread:{:x}#00".format(
1278 reg_index,
1279 pack_register_hex(
1280 endian, flipped_bits_int, byte_size=reg_byte_size
1282 thread_id,
1284 else:
1285 P_request = "read packet: $P{:x}={}#00".format(
1286 reg_index,
1287 pack_register_hex(
1288 endian, flipped_bits_int, byte_size=reg_byte_size
1292 # Write the flipped value to the register.
1293 self.reset_test_sequence()
1294 self.test_sequence.add_log_lines(
1296 P_request,
1298 "direction": "send",
1299 "regex": r"^\$(OK|E[0-9a-fA-F]+)#[0-9a-fA-F]{2}",
1300 "capture": {1: "P_response"},
1303 True,
1305 context = self.expect_gdbremote_sequence()
1306 self.assertIsNotNone(context)
1308 # Determine if the write succeeded. There are a handful of registers that can fail, or partially fail
1309 # (e.g. flags, segment selectors, etc.) due to register value restrictions. Don't worry about them
1310 # all flipping perfectly.
1311 P_response = context.get("P_response")
1312 self.assertIsNotNone(P_response)
1313 if P_response == "OK":
1314 successful_writes += 1
1315 else:
1316 failed_writes += 1
1317 # print("reg (index={}, name={}) write FAILED (error: {})".format(reg_index, reg_info["name"], P_response))
1319 # Read back the register value, ensure it matches the flipped
1320 # value.
1321 if P_response == "OK":
1322 self.reset_test_sequence()
1323 self.test_sequence.add_log_lines(
1325 p_request,
1327 "direction": "send",
1328 "regex": r"^\$([0-9a-fA-F]+)#",
1329 "capture": {1: "p_response"},
1332 True,
1334 context = self.expect_gdbremote_sequence()
1335 self.assertIsNotNone(context)
1337 verify_p_response_raw = context.get("p_response")
1338 self.assertIsNotNone(verify_p_response_raw)
1339 verify_bits = unpack_register_hex_unsigned(
1340 endian, verify_p_response_raw
1343 if verify_bits != flipped_bits_int:
1344 # Some registers, like mxcsrmask and others, will permute what's written. Adjust succeed/fail counts.
1345 # print("reg (index={}, name={}): read verify FAILED: wrote {:x}, verify read back {:x}".format(reg_index, reg_info["name"], flipped_bits_int, verify_bits))
1346 successful_writes -= 1
1347 failed_writes += 1
1349 return (successful_writes, failed_writes)
1351 def is_bit_flippable_register(self, reg_info):
1352 if not reg_info:
1353 return False
1354 if not "set" in reg_info:
1355 return False
1356 if reg_info["set"] != "General Purpose Registers":
1357 return False
1358 if ("container-regs" in reg_info) and (len(reg_info["container-regs"]) > 0):
1359 # Don't try to bit flip registers contained in another register.
1360 return False
1361 if re.match("^.s$", reg_info["name"]):
1362 # This is a 2-letter register name that ends in "s", like a segment register.
1363 # Don't try to bit flip these.
1364 return False
1365 if re.match("^(c|)psr$", reg_info["name"]):
1366 # This is an ARM program status register; don't flip it.
1367 return False
1368 # Okay, this looks fine-enough.
1369 return True
1371 def read_register_values(self, reg_infos, endian, thread_id=None):
1372 self.assertIsNotNone(reg_infos)
1373 values = {}
1375 for reg_info in reg_infos:
1376 # We append a register index when load reg infos so we can work
1377 # with subsets.
1378 reg_index = reg_info.get("lldb_register_index")
1379 self.assertIsNotNone(reg_index)
1381 # Handle thread suffix.
1382 if thread_id:
1383 p_request = "read packet: $p{:x};thread:{:x}#00".format(
1384 reg_index, thread_id
1386 else:
1387 p_request = "read packet: $p{:x}#00".format(reg_index)
1389 # Read it with p.
1390 self.reset_test_sequence()
1391 self.test_sequence.add_log_lines(
1393 p_request,
1395 "direction": "send",
1396 "regex": r"^\$([0-9a-fA-F]+)#",
1397 "capture": {1: "p_response"},
1400 True,
1402 context = self.expect_gdbremote_sequence()
1403 self.assertIsNotNone(context)
1405 # Convert value from target endian to integral.
1406 p_response = context.get("p_response")
1407 self.assertIsNotNone(p_response)
1408 self.assertTrue(len(p_response) > 0)
1409 self.assertFalse(p_response[0] == "E")
1411 values[reg_index] = unpack_register_hex_unsigned(endian, p_response)
1413 return values
1415 def add_vCont_query_packets(self):
1416 self.test_sequence.add_log_lines(
1418 "read packet: $vCont?#49",
1420 "direction": "send",
1421 "regex": r"^\$(vCont)?(.*)#[0-9a-fA-F]{2}$",
1422 "capture": {2: "vCont_query_response"},
1425 True,
1428 def parse_vCont_query_response(self, context):
1429 self.assertIsNotNone(context)
1430 vCont_query_response = context.get("vCont_query_response")
1432 # Handle case of no vCont support at all - in which case the capture
1433 # group will be none or zero length.
1434 if not vCont_query_response or len(vCont_query_response) == 0:
1435 return {}
1437 return {
1438 key: 1 for key in vCont_query_response.split(";") if key and len(key) > 0
1441 def count_single_steps_until_true(
1442 self,
1443 thread_id,
1444 predicate,
1445 args,
1446 max_step_count=100,
1447 use_Hc_packet=True,
1448 step_instruction="s",
1450 """Used by single step test that appears in a few different contexts."""
1451 single_step_count = 0
1453 while single_step_count < max_step_count:
1454 self.assertIsNotNone(thread_id)
1456 # Build the packet for the single step instruction. We replace
1457 # {thread}, if present, with the thread_id.
1458 step_packet = "read packet: ${}#00".format(
1459 re.sub(r"{thread}", "{:x}".format(thread_id), step_instruction)
1461 # print("\nstep_packet created: {}\n".format(step_packet))
1463 # Single step.
1464 self.reset_test_sequence()
1465 if use_Hc_packet:
1466 self.test_sequence.add_log_lines(
1467 [ # Set the continue thread.
1468 "read packet: $Hc{0:x}#00".format(thread_id),
1469 "send packet: $OK#00",
1471 True,
1473 self.test_sequence.add_log_lines(
1475 # Single step.
1476 step_packet,
1477 # "read packet: $vCont;s:{0:x}#00".format(thread_id),
1478 # Expect a breakpoint stop report.
1480 "direction": "send",
1481 "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);",
1482 "capture": {1: "stop_signo", 2: "stop_thread_id"},
1485 True,
1487 context = self.expect_gdbremote_sequence()
1488 self.assertIsNotNone(context)
1489 self.assertIsNotNone(context.get("stop_signo"))
1490 self.assertEqual(
1491 int(context.get("stop_signo"), 16),
1492 lldbutil.get_signal_number("SIGTRAP"),
1495 single_step_count += 1
1497 # See if the predicate is true. If so, we're done.
1498 if predicate(args):
1499 return (True, single_step_count)
1501 # The predicate didn't return true within the runaway step count.
1502 return (False, single_step_count)
1504 def g_c1_c2_contents_are(self, args):
1505 """Used by single step test that appears in a few different contexts."""
1506 g_c1_address = args["g_c1_address"]
1507 g_c2_address = args["g_c2_address"]
1508 expected_g_c1 = args["expected_g_c1"]
1509 expected_g_c2 = args["expected_g_c2"]
1511 # Read g_c1 and g_c2 contents.
1512 self.reset_test_sequence()
1513 self.test_sequence.add_log_lines(
1515 "read packet: $m{0:x},{1:x}#00".format(g_c1_address, 1),
1517 "direction": "send",
1518 "regex": r"^\$(.+)#[0-9a-fA-F]{2}$",
1519 "capture": {1: "g_c1_contents"},
1521 "read packet: $m{0:x},{1:x}#00".format(g_c2_address, 1),
1523 "direction": "send",
1524 "regex": r"^\$(.+)#[0-9a-fA-F]{2}$",
1525 "capture": {1: "g_c2_contents"},
1528 True,
1531 # Run the packet stream.
1532 context = self.expect_gdbremote_sequence()
1533 self.assertIsNotNone(context)
1535 # Check if what we read from inferior memory is what we are expecting.
1536 self.assertIsNotNone(context.get("g_c1_contents"))
1537 self.assertIsNotNone(context.get("g_c2_contents"))
1539 return (seven.unhexlify(context.get("g_c1_contents")) == expected_g_c1) and (
1540 seven.unhexlify(context.get("g_c2_contents")) == expected_g_c2
1543 def single_step_only_steps_one_instruction(
1544 self, use_Hc_packet=True, step_instruction="s"
1546 """Used by single step test that appears in a few different contexts."""
1547 # Start up the inferior.
1548 procs = self.prep_debug_monitor_and_inferior(
1549 inferior_args=[
1550 "get-code-address-hex:swap_chars",
1551 "get-data-address-hex:g_c1",
1552 "get-data-address-hex:g_c2",
1553 "sleep:1",
1554 "call-function:swap_chars",
1555 "sleep:5",
1559 # Run the process
1560 self.test_sequence.add_log_lines(
1561 [ # Start running after initial stop.
1562 "read packet: $c#63",
1563 # Match output line that prints the memory address of the function call entry point.
1564 # Note we require launch-only testing so we can get inferior otuput.
1566 "type": "output_match",
1567 "regex": r"^code address: 0x([0-9a-fA-F]+)\r\ndata address: 0x([0-9a-fA-F]+)\r\ndata address: 0x([0-9a-fA-F]+)\r\n$",
1568 "capture": {
1569 1: "function_address",
1570 2: "g_c1_address",
1571 3: "g_c2_address",
1574 # Now stop the inferior.
1575 "read packet: {}".format(chr(3)),
1576 # And wait for the stop notification.
1578 "direction": "send",
1579 "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);",
1580 "capture": {1: "stop_signo", 2: "stop_thread_id"},
1583 True,
1586 # Run the packet stream.
1587 context = self.expect_gdbremote_sequence()
1588 self.assertIsNotNone(context)
1590 # Grab the main thread id.
1591 self.assertIsNotNone(context.get("stop_thread_id"))
1592 main_thread_id = int(context.get("stop_thread_id"), 16)
1594 # Grab the function address.
1595 self.assertIsNotNone(context.get("function_address"))
1596 function_address = int(context.get("function_address"), 16)
1598 # Grab the data addresses.
1599 self.assertIsNotNone(context.get("g_c1_address"))
1600 g_c1_address = int(context.get("g_c1_address"), 16)
1602 self.assertIsNotNone(context.get("g_c2_address"))
1603 g_c2_address = int(context.get("g_c2_address"), 16)
1605 # Set a breakpoint at the given address.
1606 if self.getArchitecture().startswith("arm"):
1607 # TODO: Handle case when setting breakpoint in thumb code
1608 BREAKPOINT_KIND = 4
1609 else:
1610 BREAKPOINT_KIND = 1
1611 self.reset_test_sequence()
1612 self.add_set_breakpoint_packets(
1613 function_address, do_continue=True, breakpoint_kind=BREAKPOINT_KIND
1615 context = self.expect_gdbremote_sequence()
1616 self.assertIsNotNone(context)
1618 # Remove the breakpoint.
1619 self.reset_test_sequence()
1620 self.add_remove_breakpoint_packets(
1621 function_address, breakpoint_kind=BREAKPOINT_KIND
1623 context = self.expect_gdbremote_sequence()
1624 self.assertIsNotNone(context)
1626 # Verify g_c1 and g_c2 match expected initial state.
1627 args = {}
1628 args["g_c1_address"] = g_c1_address
1629 args["g_c2_address"] = g_c2_address
1630 args["expected_g_c1"] = "0"
1631 args["expected_g_c2"] = "1"
1633 self.assertTrue(self.g_c1_c2_contents_are(args))
1635 # Verify we take only a small number of steps to hit the first state.
1636 # Might need to work through function entry prologue code.
1637 args["expected_g_c1"] = "1"
1638 args["expected_g_c2"] = "1"
1639 (state_reached, step_count) = self.count_single_steps_until_true(
1640 main_thread_id,
1641 self.g_c1_c2_contents_are,
1642 args,
1643 max_step_count=25,
1644 use_Hc_packet=use_Hc_packet,
1645 step_instruction=step_instruction,
1647 self.assertTrue(state_reached)
1649 # Verify we hit the next state.
1650 args["expected_g_c1"] = "1"
1651 args["expected_g_c2"] = "0"
1652 (state_reached, step_count) = self.count_single_steps_until_true(
1653 main_thread_id,
1654 self.g_c1_c2_contents_are,
1655 args,
1656 max_step_count=5,
1657 use_Hc_packet=use_Hc_packet,
1658 step_instruction=step_instruction,
1660 self.assertTrue(state_reached)
1661 expected_step_count = 1
1662 arch = self.getArchitecture()
1664 # MIPS required "3" (ADDIU, SB, LD) machine instructions for updation
1665 # of variable value
1666 if re.match("mips", arch):
1667 expected_step_count = 3
1668 # S390X requires "2" (LARL, MVI) machine instructions for updation of
1669 # variable value
1670 if re.match("s390x", arch):
1671 expected_step_count = 2
1672 # ARM64 requires "4" instructions: 2 to compute the address (adrp,
1673 # add), one to materialize the constant (mov) and the store. Once
1674 # addresses and constants are materialized, only one instruction is
1675 # needed.
1676 if re.match("arm64", arch):
1677 before_materialization_step_count = 4
1678 after_matrialization_step_count = 1
1679 self.assertIn(
1680 step_count,
1681 [before_materialization_step_count, after_matrialization_step_count],
1683 expected_step_count = after_matrialization_step_count
1684 else:
1685 self.assertEqual(step_count, expected_step_count)
1687 # Verify we hit the next state.
1688 args["expected_g_c1"] = "0"
1689 args["expected_g_c2"] = "0"
1690 (state_reached, step_count) = self.count_single_steps_until_true(
1691 main_thread_id,
1692 self.g_c1_c2_contents_are,
1693 args,
1694 max_step_count=5,
1695 use_Hc_packet=use_Hc_packet,
1696 step_instruction=step_instruction,
1698 self.assertTrue(state_reached)
1699 self.assertEqual(step_count, expected_step_count)
1701 # Verify we hit the next state.
1702 args["expected_g_c1"] = "0"
1703 args["expected_g_c2"] = "1"
1704 (state_reached, step_count) = self.count_single_steps_until_true(
1705 main_thread_id,
1706 self.g_c1_c2_contents_are,
1707 args,
1708 max_step_count=5,
1709 use_Hc_packet=use_Hc_packet,
1710 step_instruction=step_instruction,
1712 self.assertTrue(state_reached)
1713 self.assertEqual(step_count, expected_step_count)
1715 def maybe_strict_output_regex(self, regex):
1716 return (
1717 ".*" + regex + ".*"
1718 if lldbplatformutil.hasChattyStderr(self)
1719 else "^" + regex + "$"
1722 def install_and_create_launch_args(self):
1723 exe_path = self.getBuildArtifact("a.out")
1724 if not lldb.remote_platform:
1725 return [exe_path]
1726 remote_path = lldbutil.append_to_process_working_directory(
1727 self, os.path.basename(exe_path)
1729 remote_file_spec = lldb.SBFileSpec(remote_path, False)
1730 err = lldb.remote_platform.Install(
1731 lldb.SBFileSpec(exe_path, True), remote_file_spec
1733 if err.Fail():
1734 raise Exception(
1735 "remote_platform.Install('%s', '%s') failed: %s"
1736 % (exe_path, remote_path, err)
1738 return [remote_path]