[RISCV] Rename a lambda to have plural nouns to reflect that it contains a loop. NFC
[llvm-project.git] / lldb / examples / python / crashlog_scripted_process.py
blobbe0ed49d35904a2d0929c6a0afe7b13b09be9e96
1 import os, json, struct, signal, uuid, tempfile
3 from typing import Any, Dict
5 import lldb
6 from lldb.plugins.scripted_process import ScriptedProcess
7 from lldb.plugins.scripted_process import ScriptedThread
9 from lldb.macosx.crashlog import CrashLog, CrashLogParser
12 class CrashLogScriptedProcess(ScriptedProcess):
13 def set_crashlog(self, crashlog):
14 self.crashlog = crashlog
15 if self.crashlog.process_id:
16 if type(self.crashlog.process_id) is int:
17 self.pid = self.crashlog.process_id
18 elif type(self.crashlog.process_id) is str:
19 self.pid = int(self.crashlog.process_id, 0)
20 else:
21 self.pid = super().get_process_id()
22 self.addr_mask = self.crashlog.addr_mask
23 self.crashed_thread_idx = self.crashlog.crashed_thread_idx
24 self.loaded_images = []
25 self.exception = self.crashlog.exception
26 self.app_specific_thread = None
27 if hasattr(self.crashlog, "asi"):
28 self.metadata["asi"] = self.crashlog.asi
29 if hasattr(self.crashlog, "asb"):
30 self.extended_thread_info = self.crashlog.asb
32 crashlog.load_images(self.options, self.loaded_images)
34 for thread in self.crashlog.threads:
35 if (
36 hasattr(thread, "app_specific_backtrace")
37 and thread.app_specific_backtrace
39 # We don't want to include the Application Specific Backtrace
40 # Thread into the Scripted Process' Thread list.
41 # Instead, we will try to extract the stackframe pcs from the
42 # backtrace and inject that as the extended thread info.
43 self.app_specific_thread = thread
44 continue
46 self.threads[thread.index] = CrashLogScriptedThread(self, None, thread)
48 if self.app_specific_thread:
49 self.extended_thread_info = CrashLogScriptedThread.resolve_stackframes(
50 self.app_specific_thread, self.addr_mask, self.target
53 class CrashLogOptions:
54 load_all_images = False
55 crashed_only = True
56 no_parallel_image_loading = False
58 def __init__(self, exe_ctx: lldb.SBExecutionContext, args: lldb.SBStructuredData):
59 super().__init__(exe_ctx, args)
61 if not self.target or not self.target.IsValid():
62 # Return error
63 return
65 self.crashlog_path = None
67 crashlog_path = args.GetValueForKey("file_path")
68 if crashlog_path and crashlog_path.IsValid():
69 if crashlog_path.GetType() == lldb.eStructuredDataTypeString:
70 self.crashlog_path = crashlog_path.GetStringValue(4096)
72 if not self.crashlog_path:
73 # Return error
74 return
76 self.options = self.CrashLogOptions()
78 load_all_images = args.GetValueForKey("load_all_images")
79 if load_all_images and load_all_images.IsValid():
80 if load_all_images.GetType() == lldb.eStructuredDataTypeBoolean:
81 self.options.load_all_images = load_all_images.GetBooleanValue()
83 crashed_only = args.GetValueForKey("crashed_only")
84 if crashed_only and crashed_only.IsValid():
85 if crashed_only.GetType() == lldb.eStructuredDataTypeBoolean:
86 self.options.crashed_only = crashed_only.GetBooleanValue()
88 no_parallel_image_loading = args.GetValueForKey("no_parallel_image_loading")
89 if no_parallel_image_loading and no_parallel_image_loading.IsValid():
90 if no_parallel_image_loading.GetType() == lldb.eStructuredDataTypeBoolean:
91 self.options.no_parallel_image_loading = (
92 no_parallel_image_loading.GetBooleanValue()
95 self.pid = super().get_process_id()
96 self.crashed_thread_idx = 0
97 self.exception = None
98 self.extended_thread_info = None
100 def read_memory_at_address(
101 self, addr: int, size: int, error: lldb.SBError
102 ) -> lldb.SBData:
103 # NOTE: CrashLogs don't contain any memory.
104 return lldb.SBData()
106 def get_loaded_images(self):
107 # TODO: Iterate over corefile_target modules and build a data structure
108 # from it.
109 return self.loaded_images
111 def should_stop(self) -> bool:
112 return True
114 def is_alive(self) -> bool:
115 return True
117 def get_scripted_thread_plugin(self):
118 return CrashLogScriptedThread.__module__ + "." + CrashLogScriptedThread.__name__
120 def get_process_metadata(self):
121 return self.metadata
124 class CrashLogScriptedThread(ScriptedThread):
125 def create_register_ctx(self):
126 if not self.has_crashed:
127 return dict.fromkeys(
128 [*map(lambda reg: reg["name"], self.register_info["registers"])], 0
131 if not self.backing_thread or not len(self.backing_thread.registers):
132 return dict.fromkeys(
133 [*map(lambda reg: reg["name"], self.register_info["registers"])], 0
136 for reg in self.register_info["registers"]:
137 reg_name = reg["name"]
138 if reg_name in self.backing_thread.registers:
139 self.register_ctx[reg_name] = self.backing_thread.registers[reg_name]
140 else:
141 self.register_ctx[reg_name] = 0
143 return self.register_ctx
145 def resolve_stackframes(thread, addr_mask, target):
146 frames = []
147 for frame in thread.frames:
148 frame_pc = frame.pc & addr_mask
149 pc = frame_pc if frame.index == 0 or frame_pc == 0 else frame_pc - 1
150 sym_addr = lldb.SBAddress()
151 sym_addr.SetLoadAddress(pc, target)
152 if not sym_addr.IsValid():
153 continue
154 frames.append({"idx": frame.index, "pc": pc})
155 return frames
157 def create_stackframes(self):
158 if not (self.originating_process.options.load_all_images or self.has_crashed):
159 return None
161 if not self.backing_thread or not len(self.backing_thread.frames):
162 return None
164 self.frames = CrashLogScriptedThread.resolve_stackframes(
165 self.backing_thread, self.originating_process.addr_mask, self.target
168 return self.frames
170 def __init__(self, process, args, crashlog_thread):
171 super().__init__(process, args)
173 self.backing_thread = crashlog_thread
174 self.idx = self.backing_thread.index
175 self.tid = self.backing_thread.id
176 self.name = self.backing_thread.name
177 self.queue = self.backing_thread.queue
178 self.has_crashed = self.originating_process.crashed_thread_idx == self.idx
179 self.create_stackframes()
181 def get_state(self):
182 if not self.has_crashed:
183 return lldb.eStateStopped
184 return lldb.eStateCrashed
186 def get_stop_reason(self) -> Dict[str, Any]:
187 if not self.has_crashed:
188 return {"type": lldb.eStopReasonNone}
189 # TODO: Investigate what stop reason should be reported when crashed
190 stop_reason = {"type": lldb.eStopReasonException, "data": {}}
191 if self.originating_process.exception:
192 stop_reason["data"]["mach_exception"] = self.originating_process.exception
193 return stop_reason
195 def get_register_context(self) -> str:
196 if not self.register_ctx:
197 self.register_ctx = self.create_register_ctx()
199 return struct.pack(
200 "{}Q".format(len(self.register_ctx)), *self.register_ctx.values()
203 def get_extended_info(self):
204 if self.has_crashed:
205 self.extended_info = self.originating_process.extended_thread_info
206 return self.extended_info