[sanitizer] Improve FreeBSD ASLR detection
[llvm-project.git] / lldb / third_party / Python / module / pexpect-4.6 / pexpect / expect.py
blob1c0275b4853a56350f8a7c8207800a2b2cfed875
1 import time
3 from .exceptions import EOF, TIMEOUT
5 class Expecter(object):
6 def __init__(self, spawn, searcher, searchwindowsize=-1):
7 self.spawn = spawn
8 self.searcher = searcher
9 if searchwindowsize == -1:
10 searchwindowsize = spawn.searchwindowsize
11 self.searchwindowsize = searchwindowsize
13 def new_data(self, data):
14 spawn = self.spawn
15 searcher = self.searcher
17 pos = spawn._buffer.tell()
18 spawn._buffer.write(data)
19 spawn._before.write(data)
21 # determine which chunk of data to search; if a windowsize is
22 # specified, this is the *new* data + the preceding <windowsize> bytes
23 if self.searchwindowsize:
24 spawn._buffer.seek(max(0, pos - self.searchwindowsize))
25 window = spawn._buffer.read(self.searchwindowsize + len(data))
26 else:
27 # otherwise, search the whole buffer (really slow for large datasets)
28 window = spawn.buffer
29 index = searcher.search(window, len(data))
30 if index >= 0:
31 spawn._buffer = spawn.buffer_type()
32 spawn._buffer.write(window[searcher.end:])
33 spawn.before = spawn._before.getvalue()[0:-(len(window) - searcher.start)]
34 spawn._before = spawn.buffer_type()
35 spawn.after = window[searcher.start: searcher.end]
36 spawn.match = searcher.match
37 spawn.match_index = index
38 # Found a match
39 return index
40 elif self.searchwindowsize:
41 spawn._buffer = spawn.buffer_type()
42 spawn._buffer.write(window)
44 def eof(self, err=None):
45 spawn = self.spawn
47 spawn.before = spawn.buffer
48 spawn._buffer = spawn.buffer_type()
49 spawn._before = spawn.buffer_type()
50 spawn.after = EOF
51 index = self.searcher.eof_index
52 if index >= 0:
53 spawn.match = EOF
54 spawn.match_index = index
55 return index
56 else:
57 spawn.match = None
58 spawn.match_index = None
59 msg = str(spawn)
60 msg += '\nsearcher: %s' % self.searcher
61 if err is not None:
62 msg = str(err) + '\n' + msg
63 raise EOF(msg)
65 def timeout(self, err=None):
66 spawn = self.spawn
68 spawn.before = spawn.buffer
69 spawn.after = TIMEOUT
70 index = self.searcher.timeout_index
71 if index >= 0:
72 spawn.match = TIMEOUT
73 spawn.match_index = index
74 return index
75 else:
76 spawn.match = None
77 spawn.match_index = None
78 msg = str(spawn)
79 msg += '\nsearcher: %s' % self.searcher
80 if err is not None:
81 msg = str(err) + '\n' + msg
82 raise TIMEOUT(msg)
84 def errored(self):
85 spawn = self.spawn
86 spawn.before = spawn.buffer
87 spawn.after = None
88 spawn.match = None
89 spawn.match_index = None
91 def expect_loop(self, timeout=-1):
92 """Blocking expect"""
93 spawn = self.spawn
95 if timeout is not None:
96 end_time = time.time() + timeout
98 try:
99 incoming = spawn.buffer
100 spawn._buffer = spawn.buffer_type()
101 spawn._before = spawn.buffer_type()
102 while True:
103 idx = self.new_data(incoming)
104 # Keep reading until exception or return.
105 if idx is not None:
106 return idx
107 # No match at this point
108 if (timeout is not None) and (timeout < 0):
109 return self.timeout()
110 # Still have time left, so read more data
111 incoming = spawn.read_nonblocking(spawn.maxread, timeout)
112 if self.spawn.delayafterread is not None:
113 time.sleep(self.spawn.delayafterread)
114 if timeout is not None:
115 timeout = end_time - time.time()
116 except EOF as e:
117 return self.eof(e)
118 except TIMEOUT as e:
119 return self.timeout(e)
120 except:
121 self.errored()
122 raise
125 class searcher_string(object):
126 '''This is a plain string search helper for the spawn.expect_any() method.
127 This helper class is for speed. For more powerful regex patterns
128 see the helper class, searcher_re.
130 Attributes:
132 eof_index - index of EOF, or -1
133 timeout_index - index of TIMEOUT, or -1
135 After a successful match by the search() method the following attributes
136 are available:
138 start - index into the buffer, first byte of match
139 end - index into the buffer, first byte after match
140 match - the matching string itself
144 def __init__(self, strings):
145 '''This creates an instance of searcher_string. This argument 'strings'
146 may be a list; a sequence of strings; or the EOF or TIMEOUT types. '''
148 self.eof_index = -1
149 self.timeout_index = -1
150 self._strings = []
151 for n, s in enumerate(strings):
152 if s is EOF:
153 self.eof_index = n
154 continue
155 if s is TIMEOUT:
156 self.timeout_index = n
157 continue
158 self._strings.append((n, s))
160 def __str__(self):
161 '''This returns a human-readable string that represents the state of
162 the object.'''
164 ss = [(ns[0], ' %d: %r' % ns) for ns in self._strings]
165 ss.append((-1, 'searcher_string:'))
166 if self.eof_index >= 0:
167 ss.append((self.eof_index, ' %d: EOF' % self.eof_index))
168 if self.timeout_index >= 0:
169 ss.append((self.timeout_index,
170 ' %d: TIMEOUT' % self.timeout_index))
171 ss.sort()
172 ss = list(zip(*ss))[1]
173 return '\n'.join(ss)
175 def search(self, buffer, freshlen, searchwindowsize=None):
176 '''This searches 'buffer' for the first occurrence of one of the search
177 strings. 'freshlen' must indicate the number of bytes at the end of
178 'buffer' which have not been searched before. It helps to avoid
179 searching the same, possibly big, buffer over and over again.
181 See class spawn for the 'searchwindowsize' argument.
183 If there is a match this returns the index of that string, and sets
184 'start', 'end' and 'match'. Otherwise, this returns -1. '''
186 first_match = None
188 # 'freshlen' helps a lot here. Further optimizations could
189 # possibly include:
191 # using something like the Boyer-Moore Fast String Searching
192 # Algorithm; pre-compiling the search through a list of
193 # strings into something that can scan the input once to
194 # search for all N strings; realize that if we search for
195 # ['bar', 'baz'] and the input is '...foo' we need not bother
196 # rescanning until we've read three more bytes.
198 # Sadly, I don't know enough about this interesting topic. /grahn
200 for index, s in self._strings:
201 if searchwindowsize is None:
202 # the match, if any, can only be in the fresh data,
203 # or at the very end of the old data
204 offset = -(freshlen + len(s))
205 else:
206 # better obey searchwindowsize
207 offset = -searchwindowsize
208 n = buffer.find(s, offset)
209 if n >= 0 and (first_match is None or n < first_match):
210 first_match = n
211 best_index, best_match = index, s
212 if first_match is None:
213 return -1
214 self.match = best_match
215 self.start = first_match
216 self.end = self.start + len(self.match)
217 return best_index
220 class searcher_re(object):
221 '''This is regular expression string search helper for the
222 spawn.expect_any() method. This helper class is for powerful
223 pattern matching. For speed, see the helper class, searcher_string.
225 Attributes:
227 eof_index - index of EOF, or -1
228 timeout_index - index of TIMEOUT, or -1
230 After a successful match by the search() method the following attributes
231 are available:
233 start - index into the buffer, first byte of match
234 end - index into the buffer, first byte after match
235 match - the re.match object returned by a successful re.search
239 def __init__(self, patterns):
240 '''This creates an instance that searches for 'patterns' Where
241 'patterns' may be a list or other sequence of compiled regular
242 expressions, or the EOF or TIMEOUT types.'''
244 self.eof_index = -1
245 self.timeout_index = -1
246 self._searches = []
247 for n, s in zip(list(range(len(patterns))), patterns):
248 if s is EOF:
249 self.eof_index = n
250 continue
251 if s is TIMEOUT:
252 self.timeout_index = n
253 continue
254 self._searches.append((n, s))
256 def __str__(self):
257 '''This returns a human-readable string that represents the state of
258 the object.'''
260 #ss = [(n, ' %d: re.compile("%s")' %
261 # (n, repr(s.pattern))) for n, s in self._searches]
262 ss = list()
263 for n, s in self._searches:
264 ss.append((n, ' %d: re.compile(%r)' % (n, s.pattern)))
265 ss.append((-1, 'searcher_re:'))
266 if self.eof_index >= 0:
267 ss.append((self.eof_index, ' %d: EOF' % self.eof_index))
268 if self.timeout_index >= 0:
269 ss.append((self.timeout_index, ' %d: TIMEOUT' %
270 self.timeout_index))
271 ss.sort()
272 ss = list(zip(*ss))[1]
273 return '\n'.join(ss)
275 def search(self, buffer, freshlen, searchwindowsize=None):
276 '''This searches 'buffer' for the first occurrence of one of the regular
277 expressions. 'freshlen' must indicate the number of bytes at the end of
278 'buffer' which have not been searched before.
280 See class spawn for the 'searchwindowsize' argument.
282 If there is a match this returns the index of that string, and sets
283 'start', 'end' and 'match'. Otherwise, returns -1.'''
285 first_match = None
286 # 'freshlen' doesn't help here -- we cannot predict the
287 # length of a match, and the re module provides no help.
288 if searchwindowsize is None:
289 searchstart = 0
290 else:
291 searchstart = max(0, len(buffer) - searchwindowsize)
292 for index, s in self._searches:
293 match = s.search(buffer, searchstart)
294 if match is None:
295 continue
296 n = match.start()
297 if first_match is None or n < first_match:
298 first_match = n
299 the_match = match
300 best_index = index
301 if first_match is None:
302 return -1
303 self.start = first_match
304 self.match = the_match
305 self.end = self.match.end()
306 return best_index