1 import os
, json
, struct
, signal
, uuid
, tempfile
3 from typing
import Any
, Dict
6 from lldb
.plugins
.scripted_process
import ScriptedProcess
7 from lldb
.plugins
.scripted_process
import ScriptedThread
9 from lldb
.macosx
.crashlog
import CrashLog
, CrashLogParser
12 class CrashLogScriptedProcess(ScriptedProcess
):
13 def set_crashlog(self
, crashlog
):
14 self
.crashlog
= crashlog
15 if self
.crashlog
.process_id
:
16 if type(self
.crashlog
.process_id
) is int:
17 self
.pid
= self
.crashlog
.process_id
18 elif type(self
.crashlog
.process_id
) is str:
19 self
.pid
= int(self
.crashlog
.process_id
, 0)
21 self
.pid
= super().get_process_id()
22 self
.addr_mask
= self
.crashlog
.addr_mask
23 self
.crashed_thread_idx
= self
.crashlog
.crashed_thread_idx
24 self
.loaded_images
= []
25 self
.exception
= self
.crashlog
.exception
26 self
.app_specific_thread
= None
27 if hasattr(self
.crashlog
, "asi"):
28 self
.metadata
["asi"] = self
.crashlog
.asi
29 if hasattr(self
.crashlog
, "asb"):
30 self
.extended_thread_info
= self
.crashlog
.asb
32 crashlog
.load_images(self
.options
, self
.loaded_images
)
34 for thread
in self
.crashlog
.threads
:
36 hasattr(thread
, "app_specific_backtrace")
37 and thread
.app_specific_backtrace
39 # We don't want to include the Application Specific Backtrace
40 # Thread into the Scripted Process' Thread list.
41 # Instead, we will try to extract the stackframe pcs from the
42 # backtrace and inject that as the extended thread info.
43 self
.app_specific_thread
= thread
46 self
.threads
[thread
.index
] = CrashLogScriptedThread(self
, None, thread
)
48 if self
.app_specific_thread
:
49 self
.extended_thread_info
= CrashLogScriptedThread
.resolve_stackframes(
50 self
.app_specific_thread
, self
.addr_mask
, self
.target
53 class CrashLogOptions
:
54 load_all_images
= False
56 no_parallel_image_loading
= False
58 def __init__(self
, exe_ctx
: lldb
.SBExecutionContext
, args
: lldb
.SBStructuredData
):
59 super().__init
__(exe_ctx
, args
)
61 if not self
.target
or not self
.target
.IsValid():
65 self
.crashlog_path
= None
67 crashlog_path
= args
.GetValueForKey("file_path")
68 if crashlog_path
and crashlog_path
.IsValid():
69 if crashlog_path
.GetType() == lldb
.eStructuredDataTypeString
:
70 self
.crashlog_path
= crashlog_path
.GetStringValue(4096)
72 if not self
.crashlog_path
:
76 self
.options
= self
.CrashLogOptions()
78 load_all_images
= args
.GetValueForKey("load_all_images")
79 if load_all_images
and load_all_images
.IsValid():
80 if load_all_images
.GetType() == lldb
.eStructuredDataTypeBoolean
:
81 self
.options
.load_all_images
= load_all_images
.GetBooleanValue()
83 crashed_only
= args
.GetValueForKey("crashed_only")
84 if crashed_only
and crashed_only
.IsValid():
85 if crashed_only
.GetType() == lldb
.eStructuredDataTypeBoolean
:
86 self
.options
.crashed_only
= crashed_only
.GetBooleanValue()
88 no_parallel_image_loading
= args
.GetValueForKey("no_parallel_image_loading")
89 if no_parallel_image_loading
and no_parallel_image_loading
.IsValid():
90 if no_parallel_image_loading
.GetType() == lldb
.eStructuredDataTypeBoolean
:
91 self
.options
.no_parallel_image_loading
= (
92 no_parallel_image_loading
.GetBooleanValue()
95 self
.pid
= super().get_process_id()
96 self
.crashed_thread_idx
= 0
98 self
.extended_thread_info
= None
100 def read_memory_at_address(
101 self
, addr
: int, size
: int, error
: lldb
.SBError
103 # NOTE: CrashLogs don't contain any memory.
106 def get_loaded_images(self
):
107 # TODO: Iterate over corefile_target modules and build a data structure
109 return self
.loaded_images
111 def should_stop(self
) -> bool:
114 def is_alive(self
) -> bool:
117 def get_scripted_thread_plugin(self
):
118 return CrashLogScriptedThread
.__module
__ + "." + CrashLogScriptedThread
.__name
__
120 def get_process_metadata(self
):
124 class CrashLogScriptedThread(ScriptedThread
):
125 def create_register_ctx(self
):
126 if not self
.has_crashed
:
127 return dict.fromkeys(
128 [*map(lambda reg
: reg
["name"], self
.register_info
["registers"])], 0
131 if not self
.backing_thread
or not len(self
.backing_thread
.registers
):
132 return dict.fromkeys(
133 [*map(lambda reg
: reg
["name"], self
.register_info
["registers"])], 0
136 for reg
in self
.register_info
["registers"]:
137 reg_name
= reg
["name"]
138 if reg_name
in self
.backing_thread
.registers
:
139 self
.register_ctx
[reg_name
] = self
.backing_thread
.registers
[reg_name
]
141 self
.register_ctx
[reg_name
] = 0
143 return self
.register_ctx
145 def resolve_stackframes(thread
, addr_mask
, target
):
147 for frame
in thread
.frames
:
148 frame_pc
= frame
.pc
& addr_mask
149 pc
= frame_pc
if frame
.index
== 0 or frame_pc
== 0 else frame_pc
- 1
150 sym_addr
= lldb
.SBAddress()
151 sym_addr
.SetLoadAddress(pc
, target
)
152 if not sym_addr
.IsValid():
154 frames
.append({"idx": frame
.index
, "pc": pc
})
157 def create_stackframes(self
):
158 if not (self
.originating_process
.options
.load_all_images
or self
.has_crashed
):
161 if not self
.backing_thread
or not len(self
.backing_thread
.frames
):
164 self
.frames
= CrashLogScriptedThread
.resolve_stackframes(
165 self
.backing_thread
, self
.originating_process
.addr_mask
, self
.target
170 def __init__(self
, process
, args
, crashlog_thread
):
171 super().__init
__(process
, args
)
173 self
.backing_thread
= crashlog_thread
174 self
.idx
= self
.backing_thread
.index
175 self
.tid
= self
.backing_thread
.id
176 self
.name
= self
.backing_thread
.name
177 self
.queue
= self
.backing_thread
.queue
178 self
.has_crashed
= self
.originating_process
.crashed_thread_idx
== self
.idx
179 self
.create_stackframes()
182 if not self
.has_crashed
:
183 return lldb
.eStateStopped
184 return lldb
.eStateCrashed
186 def get_stop_reason(self
) -> Dict
[str, Any
]:
187 if not self
.has_crashed
:
188 return {"type": lldb
.eStopReasonNone
}
189 # TODO: Investigate what stop reason should be reported when crashed
190 stop_reason
= {"type": lldb
.eStopReasonException
, "data": {}}
191 if self
.originating_process
.exception
:
192 stop_reason
["data"]["mach_exception"] = self
.originating_process
.exception
195 def get_register_context(self
) -> str:
196 if not self
.register_ctx
:
197 self
.register_ctx
= self
.create_register_ctx()
200 "{}Q".format(len(self
.register_ctx
)), *self
.register_ctx
.values()
203 def get_extended_info(self
):
205 self
.extended_info
= self
.originating_process
.extended_thread_info
206 return self
.extended_info