1 # Class for actually running tests.
3 # Copyright (c) 2020-2021 Virtuozzo International GmbH
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 2 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
20 from pathlib
import Path
29 from contextlib
import contextmanager
30 from typing
import List
, Optional
, Iterator
, Any
, Sequence
, Dict
, \
33 from testenv
import TestEnv
36 def silent_unlink(path
: Path
) -> None:
43 def file_diff(file1
: str, file2
: str) -> List
[str]:
44 with
open(file1
, encoding
="utf-8") as f1
, \
45 open(file2
, encoding
="utf-8") as f2
:
46 # We want to ignore spaces at line ends. There are a lot of mess about
48 # TODO: fix all tests to not produce extra spaces, fix all .out files
49 # and use strict diff here!
50 seq1
= [line
.rstrip() for line
in f1
]
51 seq2
= [line
.rstrip() for line
in f2
]
53 for line
in difflib
.unified_diff(seq1
, seq2
, file1
, file2
)]
57 # We want to save current tty settings during test run,
58 # since an aborting qemu call may leave things screwed up.
60 def savetty() -> Iterator
[None]:
61 isterm
= sys
.stdin
.isatty()
63 fd
= sys
.stdin
.fileno()
64 attr
= termios
.tcgetattr(fd
)
70 termios
.tcsetattr(fd
, termios
.TCSADRAIN
, attr
)
73 class LastElapsedTime(ContextManager
['LastElapsedTime']):
74 """ Cache for elapsed time for tests, to show it during new test run
76 It is safe to use get() at any time. To use update(), you must either
77 use it inside with-block or use save() after update().
79 def __init__(self
, cache_file
: str, env
: TestEnv
) -> None:
81 self
.cache_file
= cache_file
82 self
.cache
: Dict
[str, Dict
[str, Dict
[str, float]]]
85 with
open(cache_file
, encoding
="utf-8") as f
:
86 self
.cache
= json
.load(f
)
87 except (OSError, ValueError):
90 def get(self
, test
: str,
91 default
: Optional
[float] = None) -> Optional
[float]:
92 if test
not in self
.cache
:
95 if self
.env
.imgproto
not in self
.cache
[test
]:
98 return self
.cache
[test
][self
.env
.imgproto
].get(self
.env
.imgfmt
,
101 def update(self
, test
: str, elapsed
: float) -> None:
102 d
= self
.cache
.setdefault(test
, {})
103 d
.setdefault(self
.env
.imgproto
, {})[self
.env
.imgfmt
] = elapsed
105 def save(self
) -> None:
106 with
open(self
.cache_file
, 'w', encoding
="utf-8") as f
:
107 json
.dump(self
.cache
, f
)
109 def __enter__(self
) -> 'LastElapsedTime':
112 def __exit__(self
, exc_type
: Any
, exc_value
: Any
, traceback
: Any
) -> None:
117 def __init__(self
, status
: str, description
: str = '',
118 elapsed
: Optional
[float] = None, diff
: Sequence
[str] = (),
119 casenotrun
: str = '', interrupted
: bool = False) -> None:
121 self
.description
= description
122 self
.elapsed
= elapsed
124 self
.casenotrun
= casenotrun
125 self
.interrupted
= interrupted
128 class TestRunner(ContextManager
['TestRunner']):
129 def __init__(self
, env
: TestEnv
, makecheck
: bool = False,
130 color
: str = 'auto') -> None:
132 self
.makecheck
= makecheck
133 self
.last_elapsed
= LastElapsedTime('.last-elapsed-cache', env
)
135 assert color
in ('auto', 'on', 'off')
136 self
.color
= (color
== 'on') or (color
== 'auto' and
139 self
._stack
: contextlib
.ExitStack
141 def __enter__(self
) -> 'TestRunner':
142 self
._stack
= contextlib
.ExitStack()
143 self
._stack
.enter_context(self
.env
)
144 self
._stack
.enter_context(self
.last_elapsed
)
145 self
._stack
.enter_context(savetty())
148 def __exit__(self
, exc_type
: Any
, exc_value
: Any
, traceback
: Any
) -> None:
151 def test_print_one_line(self
, test
: str, starttime
: str,
152 endtime
: Optional
[str] = None, status
: str = '...',
153 lasttime
: Optional
[float] = None,
154 thistime
: Optional
[float] = None,
155 description
: str = '',
156 test_field_width
: Optional
[int] = None,
157 end
: str = '\n') -> None:
158 """ Print short test info before/after test run """
159 test
= os
.path
.basename(test
)
161 if test_field_width
is None:
164 if self
.makecheck
and status
!= '...':
165 if status
and status
!= 'pass':
166 status
= f
' [{status}]'
170 print(f
' TEST iotest-{self.env.imgfmt}: {test}{status}')
174 lasttime_s
= f
' (last: {lasttime:.1f}s)'
178 thistime_s
= f
'{thistime:.1f}s'
183 endtime
= f
'[{endtime}]'
190 elif status
== 'fail':
191 col
= '\033[1m\033[31m'
192 elif status
== 'not run':
202 print(f
'{test:{test_field_width}} {col}{status:10}{col_end} '
203 f
'[{starttime}] {endtime:13}{thistime_s:5} {lasttime_s:14} '
204 f
'{description}', end
=end
)
206 def find_reference(self
, test
: str) -> str:
207 if self
.env
.cachemode
== 'none':
208 ref
= f
'{test}.out.nocache'
209 if os
.path
.isfile(ref
):
212 ref
= f
'{test}.out.{self.env.imgfmt}'
213 if os
.path
.isfile(ref
):
216 ref
= f
'{test}.{self.env.qemu_default_machine}.out'
217 if os
.path
.isfile(ref
):
222 def do_run_test(self
, test
: str) -> TestResult
:
224 f_bad
= Path(f_test
.name
+ '.out.bad')
225 f_notrun
= Path(f_test
.name
+ '.notrun')
226 f_casenotrun
= Path(f_test
.name
+ '.casenotrun')
227 f_reference
= Path(self
.find_reference(test
))
229 if not f_test
.exists():
230 return TestResult(status
='fail',
231 description
=f
'No such test file: {f_test}')
233 if not os
.access(str(f_test
), os
.X_OK
):
234 sys
.exit(f
'Not executable: {f_test}')
236 if not f_reference
.exists():
237 return TestResult(status
='not run',
238 description
='No qualified output '
239 f
'(expected {f_reference})')
241 for p
in (f_bad
, f_notrun
, f_casenotrun
):
244 args
= [str(f_test
.resolve())]
245 env
= self
.env
.prepare_subprocess(args
)
248 with f_bad
.open('w', encoding
="utf-8") as f
:
249 with subprocess
.Popen(args
, cwd
=str(f_test
.parent
), env
=env
,
250 stdout
=f
, stderr
=subprocess
.STDOUT
) as proc
:
253 except KeyboardInterrupt:
256 return TestResult(status
='not run',
257 description
='Interrupted by user',
259 ret
= proc
.returncode
261 elapsed
= round(time
.time() - t0
, 1)
264 return TestResult(status
='fail', elapsed
=elapsed
,
265 description
=f
'failed, exit status {ret}',
266 diff
=file_diff(str(f_reference
), str(f_bad
)))
268 if f_notrun
.exists():
269 return TestResult(status
='not run',
270 description
=f_notrun
.read_text().strip())
273 if f_casenotrun
.exists():
274 casenotrun
= f_casenotrun
.read_text()
276 diff
= file_diff(str(f_reference
), str(f_bad
))
278 return TestResult(status
='fail', elapsed
=elapsed
,
279 description
=f
'output mismatch (see {f_bad})',
280 diff
=diff
, casenotrun
=casenotrun
)
283 self
.last_elapsed
.update(test
, elapsed
)
284 return TestResult(status
='pass', elapsed
=elapsed
,
285 casenotrun
=casenotrun
)
287 def run_test(self
, test
: str,
288 test_field_width
: Optional
[int] = None) -> TestResult
:
289 last_el
= self
.last_elapsed
.get(test
)
290 start
= datetime
.datetime
.now().strftime('%H:%M:%S')
292 if not self
.makecheck
:
293 self
.test_print_one_line(test
=test
, starttime
=start
,
294 lasttime
=last_el
, end
='\r',
295 test_field_width
=test_field_width
)
297 res
= self
.do_run_test(test
)
299 end
= datetime
.datetime
.now().strftime('%H:%M:%S')
300 self
.test_print_one_line(test
=test
, status
=res
.status
,
301 starttime
=start
, endtime
=end
,
302 lasttime
=last_el
, thistime
=res
.elapsed
,
303 description
=res
.description
,
304 test_field_width
=test_field_width
)
307 print(res
.casenotrun
)
311 def run_tests(self
, tests
: List
[str]) -> bool:
317 if not self
.makecheck
:
320 test_field_width
= max(len(os
.path
.basename(t
)) for t
in tests
) + 2
323 name
= os
.path
.basename(t
)
324 res
= self
.run_test(t
, test_field_width
=test_field_width
)
326 assert res
.status
in ('pass', 'fail', 'not run')
331 if res
.status
!= 'not run':
334 if res
.status
== 'fail':
339 print('\n'.join(res
.diff
))
340 elif res
.status
== 'not run':
347 print('Not run:', ' '.join(notrun
))
350 print('Some cases not run in:', ' '.join(casenotrun
))
353 print('Failures:', ' '.join(failed
))
354 print(f
'Failed {len(failed)} of {n_run} iotests')
357 print(f
'Passed all {n_run} iotests')