1 import os
, json
, struct
, signal
, uuid
, tempfile
3 from typing
import Any
, Dict
6 from lldb
.plugins
.scripted_process
import ScriptedProcess
7 from lldb
.plugins
.scripted_process
import ScriptedThread
9 from lldb
.macosx
.crashlog
import CrashLog
, CrashLogParser
12 class CrashLogScriptedProcess(ScriptedProcess
):
13 def set_crashlog(self
, crashlog
):
14 self
.crashlog
= crashlog
15 if self
.crashlog
.process_id
:
16 if type(self
.crashlog
.process_id
) is int:
17 self
.pid
= self
.crashlog
.process_id
18 elif type(self
.crashlog
.process_id
) is str:
19 self
.pid
= int(self
.crashlog
.process_id
, 0)
21 self
.pid
= super().get_process_id()
22 self
.addr_mask
= self
.crashlog
.addr_mask
23 self
.crashed_thread_idx
= self
.crashlog
.crashed_thread_idx
24 self
.loaded_images
= []
25 self
.exception
= self
.crashlog
.exception
26 self
.app_specific_thread
= None
27 if hasattr(self
.crashlog
, "asi"):
28 self
.metadata
["asi"] = self
.crashlog
.asi
29 if hasattr(self
.crashlog
, "asb"):
30 self
.extended_thread_info
= self
.crashlog
.asb
32 if self
.load_all_images
:
33 for image
in self
.crashlog
.images
:
36 for thread
in self
.crashlog
.threads
:
37 if thread
.did_crash():
38 for ident
in thread
.idents
:
39 for image
in self
.crashlog
.find_images_with_identifier(ident
):
42 with tempfile
.TemporaryDirectory() as obj_dir
:
43 for image
in self
.crashlog
.images
:
44 if image
not in self
.loaded_images
:
45 if image
.uuid
== uuid
.UUID(int=0):
47 err
= image
.add_module(self
.target
, obj_dir
)
49 # Append to SBCommandReturnObject
52 self
.loaded_images
.append(image
)
54 for thread
in self
.crashlog
.threads
:
56 hasattr(thread
, "app_specific_backtrace")
57 and thread
.app_specific_backtrace
59 # We don't want to include the Application Specific Backtrace
60 # Thread into the Scripted Process' Thread list.
61 # Instead, we will try to extract the stackframe pcs from the
62 # backtrace and inject that as the extended thread info.
63 self
.app_specific_thread
= thread
66 self
.threads
[thread
.index
] = CrashLogScriptedThread(self
, None, thread
)
68 if self
.app_specific_thread
:
69 self
.extended_thread_info
= CrashLogScriptedThread
.resolve_stackframes(
70 self
.app_specific_thread
, self
.addr_mask
, self
.target
73 def __init__(self
, exe_ctx
: lldb
.SBExecutionContext
, args
: lldb
.SBStructuredData
):
74 super().__init
__(exe_ctx
, args
)
76 if not self
.target
or not self
.target
.IsValid():
80 self
.crashlog_path
= None
82 crashlog_path
= args
.GetValueForKey("file_path")
83 if crashlog_path
and crashlog_path
.IsValid():
84 if crashlog_path
.GetType() == lldb
.eStructuredDataTypeString
:
85 self
.crashlog_path
= crashlog_path
.GetStringValue(4096)
87 if not self
.crashlog_path
:
91 load_all_images
= args
.GetValueForKey("load_all_images")
92 if load_all_images
and load_all_images
.IsValid():
93 if load_all_images
.GetType() == lldb
.eStructuredDataTypeBoolean
:
94 self
.load_all_images
= load_all_images
.GetBooleanValue()
96 if not self
.load_all_images
:
97 self
.load_all_images
= False
99 self
.pid
= super().get_process_id()
100 self
.crashed_thread_idx
= 0
101 self
.exception
= None
102 self
.extended_thread_info
= None
104 def read_memory_at_address(
105 self
, addr
: int, size
: int, error
: lldb
.SBError
107 # NOTE: CrashLogs don't contain any memory.
110 def get_loaded_images(self
):
111 # TODO: Iterate over corefile_target modules and build a data structure
113 return self
.loaded_images
115 def should_stop(self
) -> bool:
118 def is_alive(self
) -> bool:
121 def get_scripted_thread_plugin(self
):
122 return CrashLogScriptedThread
.__module
__ + "." + CrashLogScriptedThread
.__name
__
124 def get_process_metadata(self
):
128 class CrashLogScriptedThread(ScriptedThread
):
129 def create_register_ctx(self
):
130 if not self
.has_crashed
:
131 return dict.fromkeys(
132 [*map(lambda reg
: reg
["name"], self
.register_info
["registers"])], 0
135 if not self
.backing_thread
or not len(self
.backing_thread
.registers
):
136 return dict.fromkeys(
137 [*map(lambda reg
: reg
["name"], self
.register_info
["registers"])], 0
140 for reg
in self
.register_info
["registers"]:
141 reg_name
= reg
["name"]
142 if reg_name
in self
.backing_thread
.registers
:
143 self
.register_ctx
[reg_name
] = self
.backing_thread
.registers
[reg_name
]
145 self
.register_ctx
[reg_name
] = 0
147 return self
.register_ctx
149 def resolve_stackframes(thread
, addr_mask
, target
):
151 for frame
in thread
.frames
:
152 frame_pc
= frame
.pc
& addr_mask
153 pc
= frame_pc
if frame
.index
== 0 or frame_pc
== 0 else frame_pc
- 1
154 sym_addr
= lldb
.SBAddress()
155 sym_addr
.SetLoadAddress(pc
, target
)
156 if not sym_addr
.IsValid():
158 frames
.append({"idx": frame
.index
, "pc": pc
})
161 def create_stackframes(self
):
162 if not (self
.originating_process
.load_all_images
or self
.has_crashed
):
165 if not self
.backing_thread
or not len(self
.backing_thread
.frames
):
168 self
.frames
= CrashLogScriptedThread
.resolve_stackframes(
169 self
.backing_thread
, self
.originating_process
.addr_mask
, self
.target
174 def __init__(self
, process
, args
, crashlog_thread
):
175 super().__init
__(process
, args
)
177 self
.backing_thread
= crashlog_thread
178 self
.idx
= self
.backing_thread
.index
179 self
.tid
= self
.backing_thread
.id
180 if self
.backing_thread
.app_specific_backtrace
:
181 self
.name
= "Application Specific Backtrace"
183 self
.name
= self
.backing_thread
.name
184 self
.queue
= self
.backing_thread
.queue
185 self
.has_crashed
= self
.originating_process
.crashed_thread_idx
== self
.idx
186 self
.create_stackframes()
189 if not self
.has_crashed
:
190 return lldb
.eStateStopped
191 return lldb
.eStateCrashed
193 def get_stop_reason(self
) -> Dict
[str, Any
]:
194 if not self
.has_crashed
:
195 return {"type": lldb
.eStopReasonNone
}
196 # TODO: Investigate what stop reason should be reported when crashed
197 stop_reason
= {"type": lldb
.eStopReasonException
, "data": {}}
198 if self
.originating_process
.exception
:
199 stop_reason
["data"]["mach_exception"] = self
.originating_process
.exception
202 def get_register_context(self
) -> str:
203 if not self
.register_ctx
:
204 self
.register_ctx
= self
.create_register_ctx()
207 "{}Q".format(len(self
.register_ctx
)), *self
.register_ctx
.values()
210 def get_extended_info(self
):
212 self
.extended_info
= self
.originating_process
.extended_thread_info
213 return self
.extended_info