Run DCE after a LoopFlatten test to reduce spurious output [nfc]
[llvm-project.git] / lldb / examples / python / crashlog_scripted_process.py
blobc69985b1a072d0907a10e0d329d573e91c13b3a7
1 import os, json, struct, signal, uuid, tempfile
3 from typing import Any, Dict
5 import lldb
6 from lldb.plugins.scripted_process import ScriptedProcess
7 from lldb.plugins.scripted_process import ScriptedThread
9 from lldb.macosx.crashlog import CrashLog, CrashLogParser
12 class CrashLogScriptedProcess(ScriptedProcess):
13 def set_crashlog(self, crashlog):
14 self.crashlog = crashlog
15 if self.crashlog.process_id:
16 if type(self.crashlog.process_id) is int:
17 self.pid = self.crashlog.process_id
18 elif type(self.crashlog.process_id) is str:
19 self.pid = int(self.crashlog.process_id, 0)
20 else:
21 self.pid = super().get_process_id()
22 self.addr_mask = self.crashlog.addr_mask
23 self.crashed_thread_idx = self.crashlog.crashed_thread_idx
24 self.loaded_images = []
25 self.exception = self.crashlog.exception
26 self.app_specific_thread = None
27 if hasattr(self.crashlog, "asi"):
28 self.metadata["asi"] = self.crashlog.asi
29 if hasattr(self.crashlog, "asb"):
30 self.extended_thread_info = self.crashlog.asb
32 if self.load_all_images:
33 for image in self.crashlog.images:
34 image.resolve = True
35 else:
36 for thread in self.crashlog.threads:
37 if thread.did_crash():
38 for ident in thread.idents:
39 for image in self.crashlog.find_images_with_identifier(ident):
40 image.resolve = True
42 with tempfile.TemporaryDirectory() as obj_dir:
43 for image in self.crashlog.images:
44 if image not in self.loaded_images:
45 if image.uuid == uuid.UUID(int=0):
46 continue
47 err = image.add_module(self.target, obj_dir)
48 if err:
49 # Append to SBCommandReturnObject
50 print(err)
51 else:
52 self.loaded_images.append(image)
54 for thread in self.crashlog.threads:
55 if (
56 hasattr(thread, "app_specific_backtrace")
57 and thread.app_specific_backtrace
59 # We don't want to include the Application Specific Backtrace
60 # Thread into the Scripted Process' Thread list.
61 # Instead, we will try to extract the stackframe pcs from the
62 # backtrace and inject that as the extended thread info.
63 self.app_specific_thread = thread
64 continue
66 self.threads[thread.index] = CrashLogScriptedThread(self, None, thread)
68 if self.app_specific_thread:
69 self.extended_thread_info = CrashLogScriptedThread.resolve_stackframes(
70 self.app_specific_thread, self.addr_mask, self.target
73 def __init__(self, exe_ctx: lldb.SBExecutionContext, args: lldb.SBStructuredData):
74 super().__init__(exe_ctx, args)
76 if not self.target or not self.target.IsValid():
77 # Return error
78 return
80 self.crashlog_path = None
82 crashlog_path = args.GetValueForKey("file_path")
83 if crashlog_path and crashlog_path.IsValid():
84 if crashlog_path.GetType() == lldb.eStructuredDataTypeString:
85 self.crashlog_path = crashlog_path.GetStringValue(4096)
87 if not self.crashlog_path:
88 # Return error
89 return
91 load_all_images = args.GetValueForKey("load_all_images")
92 if load_all_images and load_all_images.IsValid():
93 if load_all_images.GetType() == lldb.eStructuredDataTypeBoolean:
94 self.load_all_images = load_all_images.GetBooleanValue()
96 if not self.load_all_images:
97 self.load_all_images = False
99 self.pid = super().get_process_id()
100 self.crashed_thread_idx = 0
101 self.exception = None
102 self.extended_thread_info = None
104 def read_memory_at_address(
105 self, addr: int, size: int, error: lldb.SBError
106 ) -> lldb.SBData:
107 # NOTE: CrashLogs don't contain any memory.
108 return lldb.SBData()
110 def get_loaded_images(self):
111 # TODO: Iterate over corefile_target modules and build a data structure
112 # from it.
113 return self.loaded_images
115 def should_stop(self) -> bool:
116 return True
118 def is_alive(self) -> bool:
119 return True
121 def get_scripted_thread_plugin(self):
122 return CrashLogScriptedThread.__module__ + "." + CrashLogScriptedThread.__name__
124 def get_process_metadata(self):
125 return self.metadata
128 class CrashLogScriptedThread(ScriptedThread):
129 def create_register_ctx(self):
130 if not self.has_crashed:
131 return dict.fromkeys(
132 [*map(lambda reg: reg["name"], self.register_info["registers"])], 0
135 if not self.backing_thread or not len(self.backing_thread.registers):
136 return dict.fromkeys(
137 [*map(lambda reg: reg["name"], self.register_info["registers"])], 0
140 for reg in self.register_info["registers"]:
141 reg_name = reg["name"]
142 if reg_name in self.backing_thread.registers:
143 self.register_ctx[reg_name] = self.backing_thread.registers[reg_name]
144 else:
145 self.register_ctx[reg_name] = 0
147 return self.register_ctx
149 def resolve_stackframes(thread, addr_mask, target):
150 frames = []
151 for frame in thread.frames:
152 frame_pc = frame.pc & addr_mask
153 pc = frame_pc if frame.index == 0 or frame_pc == 0 else frame_pc - 1
154 sym_addr = lldb.SBAddress()
155 sym_addr.SetLoadAddress(pc, target)
156 if not sym_addr.IsValid():
157 continue
158 frames.append({"idx": frame.index, "pc": pc})
159 return frames
161 def create_stackframes(self):
162 if not (self.originating_process.load_all_images or self.has_crashed):
163 return None
165 if not self.backing_thread or not len(self.backing_thread.frames):
166 return None
168 self.frames = CrashLogScriptedThread.resolve_stackframes(
169 self.backing_thread, self.originating_process.addr_mask, self.target
172 return self.frames
174 def __init__(self, process, args, crashlog_thread):
175 super().__init__(process, args)
177 self.backing_thread = crashlog_thread
178 self.idx = self.backing_thread.index
179 self.tid = self.backing_thread.id
180 if self.backing_thread.app_specific_backtrace:
181 self.name = "Application Specific Backtrace"
182 else:
183 self.name = self.backing_thread.name
184 self.queue = self.backing_thread.queue
185 self.has_crashed = self.originating_process.crashed_thread_idx == self.idx
186 self.create_stackframes()
188 def get_state(self):
189 if not self.has_crashed:
190 return lldb.eStateStopped
191 return lldb.eStateCrashed
193 def get_stop_reason(self) -> Dict[str, Any]:
194 if not self.has_crashed:
195 return {"type": lldb.eStopReasonNone}
196 # TODO: Investigate what stop reason should be reported when crashed
197 stop_reason = {"type": lldb.eStopReasonException, "data": {}}
198 if self.originating_process.exception:
199 stop_reason["data"]["mach_exception"] = self.originating_process.exception
200 return stop_reason
202 def get_register_context(self) -> str:
203 if not self.register_ctx:
204 self.register_ctx = self.create_register_ctx()
206 return struct.pack(
207 "{}Q".format(len(self.register_ctx)), *self.register_ctx.values()
210 def get_extended_info(self):
211 if self.has_crashed:
212 self.extended_info = self.originating_process.extended_thread_info
213 return self.extended_info