test framework: bulk reformat
[scons.git] / testing / framework / TestCmd.py
blob8e799b995dd8dfbe1a6d2e7793c7f7524a2f7b63
1 # Copyright 2000-2024 Steven Knight
3 # This module is free software, and you may redistribute it and/or modify
4 # it under the same terms as Python itself, so long as this copyright message
5 # and disclaimer are retained in their original form.
7 # IN NO EVENT SHALL THE AUTHOR BE LIABLE TO ANY PARTY FOR DIRECT, INDIRECT,
8 # SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT OF THE USE OF
9 # THIS CODE, EVEN IF THE AUTHOR HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH
10 # DAMAGE.
12 # THE AUTHOR SPECIFICALLY DISCLAIMS ANY WARRANTIES, INCLUDING, BUT NOT
13 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
14 # PARTICULAR PURPOSE. THE CODE PROVIDED HEREUNDER IS ON AN "AS IS" BASIS,
15 # AND THERE IS NO OBLIGATION WHATSOEVER TO PROVIDE MAINTENANCE,
16 # SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
18 # Python License: https://docs.python.org/3/license.html#psf-license
20 """
21 A testing framework for commands and scripts.
23 The TestCmd module provides a framework for portable automated testing
24 of executable commands and scripts (in any language, not just Python),
25 especially commands and scripts that require file system interaction.
27 In addition to running tests and evaluating conditions, the TestCmd
28 module manages and cleans up one or more temporary workspace
29 directories, and provides methods for creating files and directories in
30 those workspace directories from in-line data, here-documents), allowing
31 tests to be completely self-contained.
33 A TestCmd environment object is created via the usual invocation:
35 import TestCmd
36 test = TestCmd.TestCmd()
38 There are a bunch of keyword arguments available at instantiation:
40 test = TestCmd.TestCmd(
41 description='string',
42 program='program_or_script_to_test',
43 interpreter='script_interpreter',
44 workdir='prefix',
45 subdir='subdir',
46 verbose=int, # verbosity level
47 match=default_match_function,
48 match_stdout=default_match_stdout_function,
49 match_stderr=default_match_stderr_function,
50 diff=default_diff_stderr_function,
51 diff_stdout=default_diff_stdout_function,
52 diff_stderr=default_diff_stderr_function,
53 combine=Boolean,
56 There are a bunch of methods that let you do different things:
58 test.verbose_set(1)
60 test.description_set('string')
62 test.program_set('program_or_script_to_test')
64 test.interpreter_set('script_interpreter')
65 test.interpreter_set(['script_interpreter', 'arg'])
67 test.workdir_set('prefix')
68 test.workdir_set('')
70 test.workpath('file')
71 test.workpath('subdir', 'file')
73 test.subdir('subdir', ...)
75 test.rmdir('subdir', ...)
77 test.write('file', "contents\n")
78 test.write(['subdir', 'file'], "contents\n")
80 test.read('file')
81 test.read(['subdir', 'file'])
82 test.read('file', mode)
83 test.read(['subdir', 'file'], mode)
85 test.writable('dir', 1)
86 test.writable('dir', None)
88 test.preserve(condition, ...)
90 test.cleanup(condition)
92 test.command_args(
93 program='program_or_script_to_run',
94 interpreter='script_interpreter',
95 arguments='arguments to pass to program',
98 test.run(
99 program='program_or_script_to_run',
100 interpreter='script_interpreter',
101 arguments='arguments to pass to program',
102 chdir='directory_to_chdir_to',
103 stdin='input to feed to the program\n',
104 universal_newlines=True,
107 p = test.start(
108 program='program_or_script_to_run',
109 interpreter='script_interpreter',
110 arguments='arguments to pass to program',
111 universal_newlines=None,
114 test.finish(self, p)
116 test.pass_test()
117 test.pass_test(condition)
118 test.pass_test(condition, function)
120 test.fail_test()
121 test.fail_test(condition)
122 test.fail_test(condition, function)
123 test.fail_test(condition, function, skip)
124 test.fail_test(condition, function, skip, message)
126 test.no_result()
127 test.no_result(condition)
128 test.no_result(condition, function)
129 test.no_result(condition, function, skip)
131 test.stdout()
132 test.stdout(run)
134 test.stderr()
135 test.stderr(run)
137 test.symlink(target, link)
139 test.banner(string)
140 test.banner(string, width)
142 test.diff(actual, expected)
144 test.diff_stderr(actual, expected)
146 test.diff_stdout(actual, expected)
148 test.match(actual, expected)
150 test.match_stderr(actual, expected)
152 test.match_stdout(actual, expected)
154 test.set_match_function(match, stdout, stderr)
156 test.match_exact("actual 1\nactual 2\n", "expected 1\nexpected 2\n")
157 test.match_exact(["actual 1\n", "actual 2\n"],
158 ["expected 1\n", "expected 2\n"])
159 test.match_caseinsensitive("Actual 1\nACTUAL 2\n", "expected 1\nEXPECTED 2\n")
161 test.match_re("actual 1\nactual 2\n", regex_string)
162 test.match_re(["actual 1\n", "actual 2\n"], list_of_regexes)
164 test.match_re_dotall("actual 1\nactual 2\n", regex_string)
165 test.match_re_dotall(["actual 1\n", "actual 2\n"], list_of_regexes)
167 test.tempdir()
168 test.tempdir('temporary-directory')
170 test.sleep()
171 test.sleep(seconds)
173 test.where_is('foo')
174 test.where_is('foo', 'PATH1:PATH2')
175 test.where_is('foo', 'PATH1;PATH2', '.suffix3;.suffix4')
177 test.unlink('file')
178 test.unlink('subdir', 'file')
180 The TestCmd module provides pass_test(), fail_test(), and no_result()
181 unbound functions that report test results for use with the Aegis change
182 management system. These methods terminate the test immediately,
183 reporting PASSED, FAILED, or NO RESULT respectively, and exiting with
184 status 0 (success), 1 or 2 respectively. This allows for a distinction
185 between an actual failed test and a test that could not be properly
186 evaluated because of an external condition (such as a full file system
187 or incorrect permissions).
189 import TestCmd
191 TestCmd.pass_test()
192 TestCmd.pass_test(condition)
193 TestCmd.pass_test(condition, function)
195 TestCmd.fail_test()
196 TestCmd.fail_test(condition)
197 TestCmd.fail_test(condition, function)
198 TestCmd.fail_test(condition, function, skip)
199 TestCmd.fail_test(condition, function, skip, message)
201 TestCmd.no_result()
202 TestCmd.no_result(condition)
203 TestCmd.no_result(condition, function)
204 TestCmd.no_result(condition, function, skip)
206 The TestCmd module also provides unbound global functions that handle
207 matching in the same way as the match_*() methods described above.
209 import TestCmd
211 test = TestCmd.TestCmd(match=TestCmd.match_exact)
213 test = TestCmd.TestCmd(match=TestCmd.match_caseinsensitive)
215 test = TestCmd.TestCmd(match=TestCmd.match_re)
217 test = TestCmd.TestCmd(match=TestCmd.match_re_dotall)
219 These functions are also available as static methods:
221 import TestCmd
223 test = TestCmd.TestCmd(match=TestCmd.TestCmd.match_exact)
225 test = TestCmd.TestCmd(match=TestCmd.TestCmd.match_caseinsensitive)
227 test = TestCmd.TestCmd(match=TestCmd.TestCmd.match_re)
229 test = TestCmd.TestCmd(match=TestCmd.TestCmd.match_re_dotall)
231 These static methods can be accessed by a string naming the method:
233 import TestCmd
235 test = TestCmd.TestCmd(match='match_exact')
237 test = TestCmd.TestCmd(match='match_caseinsensitive')
239 test = TestCmd.TestCmd(match='match_re')
241 test = TestCmd.TestCmd(match='match_re_dotall')
243 The TestCmd module provides unbound global functions that can be used
244 for the "diff" argument to TestCmd.TestCmd instantiation:
246 import TestCmd
248 test = TestCmd.TestCmd(match=TestCmd.match_re, diff=TestCmd.diff_re)
250 test = TestCmd.TestCmd(diff=TestCmd.simple_diff)
252 test = TestCmd.TestCmd(diff=TestCmd.context_diff)
254 test = TestCmd.TestCmd(diff=TestCmd.unified_diff)
256 These functions are also available as static methods:
258 import TestCmd
260 test = TestCmd.TestCmd(match=TestCmd.TestCmd.match_re, diff=TestCmd.TestCmd.diff_re)
262 test = TestCmd.TestCmd(diff=TestCmd.TestCmd.simple_diff)
264 test = TestCmd.TestCmd(diff=TestCmd.TestCmd.context_diff)
266 test = TestCmd.TestCmd(diff=TestCmd.TestCmd.unified_diff)
268 These static methods can be accessed by a string naming the method:
270 import TestCmd
272 test = TestCmd.TestCmd(match='match_re', diff='diff_re')
274 test = TestCmd.TestCmd(diff='simple_diff')
276 test = TestCmd.TestCmd(diff='context_diff')
278 test = TestCmd.TestCmd(diff='unified_diff')
280 The "diff" argument can also be used with standard difflib functions:
282 import difflib
284 test = TestCmd.TestCmd(diff=difflib.context_diff)
286 test = TestCmd.TestCmd(diff=difflib.unified_diff)
288 Lastly, the where_is() method also exists in an unbound function
289 version.
291 import TestCmd
293 TestCmd.where_is('foo')
294 TestCmd.where_is('foo', 'PATH1:PATH2')
295 TestCmd.where_is('foo', 'PATH1;PATH2', '.suffix3;.suffix4')
298 from __future__ import annotations
300 __author__ = "Steven Knight <knight at baldmt dot com>"
301 __revision__ = "TestCmd.py 1.3.D001 2010/06/03 12:58:27 knight"
302 __version__ = "1.3"
304 import atexit
305 import difflib
306 import errno
307 import hashlib
308 import os
309 import re
311 try:
312 import psutil
313 except ImportError:
314 HAVE_PSUTIL = False
315 else:
316 HAVE_PSUTIL = True
317 import shutil
318 import signal
319 import stat
320 import subprocess
321 import sys
322 import tempfile
323 import time
324 import traceback
325 from collections import UserList, UserString
326 from pathlib import Path
327 from subprocess import PIPE, STDOUT
328 from typing import Callable
330 IS_WINDOWS = sys.platform == 'win32'
331 IS_MACOS = sys.platform == 'darwin'
332 IS_64_BIT = sys.maxsize > 2**32
333 IS_PYPY = hasattr(sys, 'pypy_translation_info')
334 try:
335 IS_ROOT = os.geteuid() == 0
336 except AttributeError:
337 IS_ROOT = False
338 NEED_HELPER = os.environ.get('SCONS_NO_DIRECT_SCRIPT')
341 # sentinel for cases where None won't do
342 _Null = object()
344 __all__ = [
345 'diff_re',
346 'fail_test',
347 'no_result',
348 'pass_test',
349 'match_exact',
350 'match_caseinsensitive',
351 'match_re',
352 'match_re_dotall',
353 'python',
354 '_python_',
355 'TestCmd',
356 'to_bytes',
357 'to_str',
361 def is_List(e):
362 return isinstance(e, (list, UserList))
365 def to_bytes(s):
366 if isinstance(s, bytes):
367 return s
368 return bytes(s, 'utf-8')
371 def to_str(s):
372 if is_String(s):
373 return s
374 return str(s, 'utf-8')
377 def is_String(e):
378 return isinstance(e, (str, UserString))
381 testprefix = 'testcmd.'
382 if os.name in ('posix', 'nt'):
383 testprefix += f"{os.getpid()}."
385 re_space = re.compile(r'\s')
388 def _caller(tblist, skip):
389 string = ""
390 arr = []
391 for file, line, name, text in tblist:
392 if file[-10:] == "TestCmd.py":
393 break
394 arr = [(file, line, name, text)] + arr
395 atfrom = "at"
396 for file, line, name, text in arr[skip:]:
397 if name in ("?", "<module>"):
398 name = ""
399 else:
400 name = f" ({name})"
401 string = string + ("%s line %d of %s%s\n" % (atfrom, line, file, name))
402 atfrom = "\tfrom"
403 return string
406 def clean_up_ninja_daemon(self, result_type) -> None:
408 Kill any running scons daemon started by ninja and clean up
410 Working directory and temp files are removed.
411 Skipped if this platform doesn't have psutil (e.g. msys2 on Windows)
413 if not self:
414 return
416 for path in Path(self.workdir).rglob('.ninja'):
417 daemon_dir = Path(tempfile.gettempdir()) / (
418 f"scons_daemon_{str(hashlib.md5(str(path.resolve()).encode()).hexdigest())}"
420 pidfiles = [daemon_dir / 'pidfile', path / 'scons_daemon_dirty']
421 for pidfile in pidfiles:
422 if pidfile.exists():
423 with open(pidfile) as f:
424 try:
425 pid = int(f.read())
426 os.kill(pid, signal.SIGINT)
427 except OSError:
428 pass
430 while HAVE_PSUTIL:
431 if pid not in [proc.pid for proc in psutil.process_iter()]:
432 break
433 else:
434 time.sleep(0.1)
436 if not self._preserve[result_type]:
437 if daemon_dir.exists():
438 shutil.rmtree(daemon_dir)
441 def fail_test(
442 self=None,
443 condition: bool = True,
444 function: Callable | None = None,
445 skip: int = 0,
446 message: str = "",
447 ) -> None:
448 """Causes a test to exit with a fail.
450 Reports that the test FAILED and exits with a status of 1, unless
451 a condition argument is supplied; if so the completion processing
452 takes place only if the condition is true.
454 Args:
455 self: a test class instance. Must be passed in explicitly
456 by the caller since this is an unbound method.
457 condition (optional): if false, return to let test continue.
458 function (optional): function to call before completion processing.
459 skip (optional): how many lines at the top of the traceback to skip.
460 message (optional): additional text to include in the fail message.
462 if not condition:
463 return
464 if function is not None:
465 function()
466 clean_up_ninja_daemon(self, 'fail_test')
467 of = ""
468 desc = ""
469 sep = " "
470 if self is not None:
471 if self.program:
472 of = f" of {self.program}"
473 sep = "\n\t"
474 if self.description:
475 desc = f" [{self.description}]"
476 sep = "\n\t"
478 at = _caller(traceback.extract_stack(), skip)
479 if message:
480 msg = f"\t{message}\n"
481 else:
482 msg = ""
483 sys.stderr.write(f"FAILED test{of}{desc}{sep}{at}{msg}")
485 sys.exit(1)
488 def no_result(self=None, condition: bool = True, function=None, skip: int = 0) -> None:
489 """Causes a test to exit with a no result.
491 In testing parlance NO RESULT means the test could not be completed
492 for reasons that imply neither success nor failure - for example a
493 component needed to run the test could be found. However, at this
494 point we still have an "outcome", so record the information and exit
495 with a status code of 2, unless a condition argument is supplied;
496 if so the completion processing takes place only if the condition is true.
498 The different exit code and message allows other logic to distinguish
499 from a fail and decide how to treat NO RESULT tests.
501 Args:
502 self: a test class instance. Must be passed in explicitly
503 by the caller since this is an unbound method.
504 condition (optional): if false, return to let test continue.
505 function (optional): function to call before completion processing.
506 skip (optional): how many lines at the top of the traceback to skip.
508 if not condition:
509 return
510 if function is not None:
511 function()
512 clean_up_ninja_daemon(self, 'no_result')
513 of = ""
514 desc = ""
515 sep = " "
516 if self is not None:
517 if self.program:
518 of = f" of {self.program}"
519 sep = "\n\t"
520 if self.description:
521 desc = f" [{self.description}]"
522 sep = "\n\t"
524 at = _caller(traceback.extract_stack(), skip)
525 sys.stderr.write(f"NO RESULT for test{of}{desc}{sep}{at}")
527 sys.exit(2)
530 def pass_test(self=None, condition: bool = True, function=None) -> None:
531 """Causes a test to exit with a pass.
533 Reports that the test PASSED and exits with a status of 0, unless
534 a condition argument is supplied; if so the completion processing
535 takes place only if the condition is true.
537 the test passes only if the condition is true.
539 Args:
540 self: a test class instance. Must be passed in explicitly
541 by the caller since this is an unbound method.
542 condition (optional): if false, return to let test continue.
543 function (optional): function to call before completion processing.
545 if not condition:
546 return
547 if function is not None:
548 function()
549 clean_up_ninja_daemon(self, 'pass_test')
550 sys.stderr.write("PASSED\n")
551 sys.exit(0)
554 def match_exact(lines=None, matches=None, newline=os.sep):
555 """Match function using exact match.
557 :param lines: data lines
558 :type lines: str or list[str]
559 :param matches: expected lines to match
560 :type matches: str or list[str]
561 :param newline: line separator
562 :returns: None on failure, 1 on success.
565 if isinstance(lines, bytes):
566 newline = to_bytes(newline)
568 if not is_List(lines):
569 lines = lines.split(newline)
570 if not is_List(matches):
571 matches = matches.split(newline)
572 if len(lines) != len(matches):
573 return None
574 for line, match in zip(lines, matches):
575 if line != match:
576 return None
577 return 1
580 def match_caseinsensitive(lines=None, matches=None):
581 """Match function using case-insensitive matching.
583 Only a simplistic comparison is done, based on casefolding
584 the strings. This may still fail but is the suggestion of
585 the Unicode Standard.
587 :param lines: data lines
588 :type lines: str or list[str]
589 :param matches: expected lines to match
590 :type matches: str or list[str]
591 :returns: None on failure, 1 on success.
594 if not is_List(lines):
595 lines = lines.split("\n")
596 if not is_List(matches):
597 matches = matches.split("\n")
598 if len(lines) != len(matches):
599 return None
600 for line, match in zip(lines, matches):
601 if line.casefold() != match.casefold():
602 return None
603 return 1
606 def match_re(lines=None, res=None):
607 """Match function using line-by-line regular expression match.
609 :param lines: data lines
610 :type lines: str or list[str]
611 :param res: regular expression(s) for matching
612 :type res: str or list[str]
613 :returns: None on failure, 1 on success.
616 if not is_List(lines):
617 # CRs mess up matching (Windows) so split carefully
618 lines = re.split('\r?\n', lines)
619 if not is_List(res):
620 res = res.split("\n")
621 if len(lines) != len(res):
622 print(f"match_re: expected {len(res)} lines, found {len(lines)}")
623 return None
624 for i, (line, regex) in enumerate(zip(lines, res)):
625 s = rf"^{regex}$"
626 try:
627 expr = re.compile(s)
628 except re.error as e:
629 msg = "Regular expression error in %s: %s"
630 raise re.error(msg % (repr(s), e.args[0]))
631 if not expr.search(line):
632 miss_tmpl = "match_re: mismatch at line {}:\n search re='{}'\n line='{}'"
633 print(miss_tmpl.format(i, s, line))
634 return None
635 return 1
638 def match_re_dotall(lines=None, res=None):
639 """Match function using regular expression match.
641 Unlike match_re, the arguments are converted to strings (if necessary)
642 and must match exactly.
644 :param lines: data lines
645 :type lines: str or list[str]
646 :param res: regular expression(s) for matching
647 :type res: str or list[str]
648 :returns: a match object on match, else None, like re.match
651 if not isinstance(lines, str):
652 lines = "\n".join(lines)
653 if not isinstance(res, str):
654 res = "\n".join(res)
655 s = rf"^{res}$"
656 try:
657 expr = re.compile(s, re.DOTALL)
658 except re.error as e:
659 msg = "Regular expression error in %s: %s"
660 raise re.error(msg % (repr(s), e.args[0]))
661 return expr.match(lines)
664 def simple_diff(
667 fromfile: str = '',
668 tofile: str = '',
669 fromfiledate: str = '',
670 tofiledate: str = '',
671 n: int = 0,
672 lineterm: str = '',
674 r"""Compare two sequences of lines; generate the delta as a simple diff.
676 Similar to difflib.context_diff and difflib.unified_diff but
677 output is like from the "diff" command without arguments. The function
678 keeps the same signature as the difflib ones so they will be
679 interchangeable, but except for lineterm, the arguments beyond the
680 two sequences are ignored in this version. By default, the
681 diff is not created with trailing newlines, set the lineterm
682 argument to '\n' to do so.
684 Example:
686 >>> print(''.join(simple_diff('one\ntwo\nthree\nfour\n'.splitlines(True),
687 ... 'zero\none\ntree\nfour\n'.splitlines(True), lineterm='\n')))
689 > zero
690 2,3c3
691 < two
692 < three
694 > tree
697 a = [to_str(q) for q in a]
698 b = [to_str(q) for q in b]
699 sm = difflib.SequenceMatcher(None, a, b)
701 def comma(x1, x2):
702 return x1 + 1 == x2 and str(x2) or f'{x1 + 1},{x2}'
704 for op, a1, a2, b1, b2 in sm.get_opcodes():
705 if op == 'delete':
706 yield f"{comma(a1, a2)}d{b1}{lineterm}"
707 for l in a[a1:a2]:
708 yield f"< {l}"
709 elif op == 'insert':
710 yield f"{a1}a{comma(b1, b2)}{lineterm}"
711 for l in b[b1:b2]:
712 yield f"> {l}"
713 elif op == 'replace':
714 yield f"{comma(a1, a2)}c{comma(b1, b2)}{lineterm}"
715 for l in a[a1:a2]:
716 yield f"< {l}"
717 yield f'---{lineterm}'
718 for l in b[b1:b2]:
719 yield f"> {l}"
722 def diff_re(
725 fromfile: str = '',
726 tofile: str = '',
727 fromfiledate: str = '',
728 tofiledate: str = '',
729 n: int = 3,
730 lineterm: str = '\n',
732 """Compare a and b (lists of strings) where a are regular expressions.
734 A simple "diff" of two sets of lines when the expected lines
735 are regular expressions. This is a really dumb thing that
736 just compares each line in turn, so it doesn't look for
737 chunks of matching lines and the like--but at least it lets
738 you know exactly which line first didn't compare correctl...
740 Raises:
741 re.error: if a regex fails to compile
743 result = []
744 diff = len(a) - len(b)
745 if diff < 0:
746 a = a + [''] * (-diff)
747 elif diff > 0:
748 b = b + [''] * diff
749 for i, (aline, bline) in enumerate(zip(a, b)):
750 s = rf"^{aline}$"
751 try:
752 expr = re.compile(s)
753 except re.error as e:
754 msg = "Regular expression error in %s: %s"
755 raise re.error(msg % (repr(s), e.args[0]))
756 if not expr.search(bline):
757 result.append(f"{i + 1}c{i + 1}")
758 result.append(f"< {a[i]!r}")
759 result.append('---')
760 result.append(f"> {b[i]!r}")
761 return result
764 if os.name == 'posix':
766 def escape(arg):
767 """escape shell special characters"""
768 slash = '\\'
769 special = '"$'
770 arg = arg.replace(slash, slash + slash)
771 for c in special:
772 arg = arg.replace(c, slash + c)
773 if re_space.search(arg):
774 arg = f"\"{arg}\""
775 return arg
776 else:
777 # Windows does not allow special characters in file names
778 # anyway, so no need for an escape function, we will just quote
779 # the arg.
780 def escape(arg):
781 if re_space.search(arg):
782 arg = f"\"{arg}\""
783 return arg
786 if os.name == 'java':
787 python = os.path.join(sys.prefix, 'jython')
788 else:
789 python = os.environ.get('python_executable', sys.executable)
790 _python_ = escape(python)
792 if sys.platform == 'win32':
793 default_sleep_seconds = 2
795 def where_is(file, path=None, pathext=None):
796 if path is None:
797 path = os.environ['PATH']
798 if is_String(path):
799 path = path.split(os.pathsep)
800 if pathext is None:
801 pathext = os.environ['PATHEXT']
802 if is_String(pathext):
803 pathext = pathext.split(os.pathsep)
804 for ext in pathext:
805 if ext.casefold() == file[-len(ext) :].casefold():
806 pathext = ['']
807 break
808 for dir in path:
809 f = os.path.join(dir, file)
810 for ext in pathext:
811 fext = f + ext
812 if os.path.isfile(fext):
813 return fext
814 return None
816 else:
818 def where_is(file, path=None, pathext=None):
819 if path is None:
820 path = os.environ['PATH']
821 if is_String(path):
822 path = path.split(os.pathsep)
823 for dir in path:
824 f = os.path.join(dir, file)
825 if os.path.isfile(f):
826 try:
827 st = os.stat(f)
828 except OSError:
829 continue
830 if stat.S_IMODE(st.st_mode) & stat.S_IXUSR:
831 return f
832 return None
834 default_sleep_seconds = 1
837 # From Josiah Carlson,
838 # ASPN : Python Cookbook : Module to allow Asynchronous subprocess use on Windows and Posix platforms
839 # https://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/440554
841 if sys.platform == 'win32': # and subprocess.mswindows:
842 try:
843 from win32file import ReadFile, WriteFile
844 from win32pipe import PeekNamedPipe
845 except ImportError:
846 # If PyWin32 is not available, try ctypes instead
847 # XXX These replicate _just_enough_ PyWin32 behaviour for our purposes
848 import ctypes
849 from ctypes.wintypes import DWORD
851 def ReadFile(hFile, bufSize, ol=None):
852 assert ol is None
853 lpBuffer = ctypes.create_string_buffer(bufSize)
854 bytesRead = DWORD()
855 bErr = ctypes.windll.kernel32.ReadFile(
856 hFile, lpBuffer, bufSize, ctypes.byref(bytesRead), ol
858 if not bErr:
859 raise ctypes.WinError()
860 return (0, ctypes.string_at(lpBuffer, bytesRead.value))
862 def WriteFile(hFile, data, ol=None):
863 assert ol is None
864 bytesWritten = DWORD()
865 bErr = ctypes.windll.kernel32.WriteFile(
866 hFile, data, len(data), ctypes.byref(bytesWritten), ol
868 if not bErr:
869 raise ctypes.WinError()
870 return (0, bytesWritten.value)
872 def PeekNamedPipe(hPipe, size):
873 assert size == 0
874 bytesAvail = DWORD()
875 bErr = ctypes.windll.kernel32.PeekNamedPipe(
876 hPipe, None, size, None, ctypes.byref(bytesAvail), None
878 if not bErr:
879 raise ctypes.WinError()
880 return ("", bytesAvail.value, None)
882 import msvcrt
883 else:
884 import select
885 import fcntl
887 try:
888 fcntl.F_GETFL
889 except AttributeError:
890 fcntl.F_GETFL = 3
892 try:
893 fcntl.F_SETFL
894 except AttributeError:
895 fcntl.F_SETFL = 4
898 class Popen(subprocess.Popen):
899 def recv(self, maxsize=None):
900 return self._recv('stdout', maxsize)
902 def recv_err(self, maxsize=None):
903 return self._recv('stderr', maxsize)
905 def send_recv(self, input: str = '', maxsize=None):
906 return self.send(input), self.recv(maxsize), self.recv_err(maxsize)
908 def get_conn_maxsize(self, which, maxsize):
909 if maxsize is None:
910 maxsize = 1024
911 elif maxsize < 1:
912 maxsize = 1
913 return getattr(self, which), maxsize
915 def _close(self, which) -> None:
916 getattr(self, which).close()
917 setattr(self, which, None)
919 if sys.platform == 'win32': # and subprocess.mswindows:
921 def send(self, input):
922 input = to_bytes(input)
923 if not self.stdin:
924 return None
926 try:
927 x = msvcrt.get_osfhandle(self.stdin.fileno())
928 (errCode, written) = WriteFile(x, input)
929 except ValueError:
930 return self._close('stdin')
931 except (subprocess.pywintypes.error, Exception) as why:
932 if why.args[0] in (109, errno.ESHUTDOWN):
933 return self._close('stdin')
934 raise
936 return written
938 def _recv(self, which, maxsize):
939 conn, maxsize = self.get_conn_maxsize(which, maxsize)
940 if conn is None:
941 return None
943 try:
944 x = msvcrt.get_osfhandle(conn.fileno())
945 (read, nAvail, nMessage) = PeekNamedPipe(x, 0)
946 if maxsize < nAvail:
947 nAvail = maxsize
948 if nAvail > 0:
949 (errCode, read) = ReadFile(x, nAvail, None)
950 except ValueError:
951 return self._close(which)
952 except (subprocess.pywintypes.error, Exception) as why:
953 if why.args[0] in (109, errno.ESHUTDOWN):
954 return self._close(which)
955 raise
957 # if self.universal_newlines:
958 # read = self._translate_newlines(read)
959 return read
961 else:
963 def send(self, input):
964 if not self.stdin:
965 return None
967 if not select.select([], [self.stdin], [], 0)[1]:
968 return 0
970 try:
971 written = os.write(self.stdin.fileno(), bytearray(input, 'utf-8'))
972 except OSError as why:
973 if why.args[0] == errno.EPIPE: # broken pipe
974 return self._close('stdin')
975 raise
977 return written
979 def _recv(self, which, maxsize):
980 conn, maxsize = self.get_conn_maxsize(which, maxsize)
981 if conn is None:
982 return None
984 try:
985 flags = fcntl.fcntl(conn, fcntl.F_GETFL)
986 except TypeError:
987 flags = None
988 else:
989 if not conn.closed:
990 fcntl.fcntl(conn, fcntl.F_SETFL, flags | os.O_NONBLOCK)
992 try:
993 if not select.select([conn], [], [], 0)[0]:
994 return ''
996 r = conn.read(maxsize)
997 if not r:
998 return self._close(which)
1000 # if self.universal_newlines:
1001 # r = self._translate_newlines(r)
1002 return r
1003 finally:
1004 if not conn.closed and flags is not None:
1005 fcntl.fcntl(conn, fcntl.F_SETFL, flags)
1008 disconnect_message = "Other end disconnected!"
1011 def recv_some(p, t: float = 0.1, e: int = 1, tr: int = 5, stderr: int = 0):
1012 if tr < 1:
1013 tr = 1
1014 x = time.time() + t
1015 y = []
1016 r = ''
1017 pr = p.recv
1018 if stderr:
1019 pr = p.recv_err
1020 while time.time() < x or r:
1021 r = pr()
1022 if r is None:
1023 if e:
1024 raise Exception(disconnect_message)
1025 else:
1026 break
1027 elif r:
1028 y.append(r)
1029 else:
1030 time.sleep(max((x - time.time()) / tr, 0))
1031 return ''.join(y)
1034 def send_all(p, data):
1035 while len(data):
1036 sent = p.send(data)
1037 if sent is None:
1038 raise Exception(disconnect_message)
1039 data = memoryview(data)[sent:]
1042 _Cleanup = []
1045 @atexit.register
1046 def _clean() -> None:
1047 global _Cleanup
1048 cleanlist = [c for c in _Cleanup if c]
1049 del _Cleanup[:]
1050 cleanlist.reverse()
1051 for test in cleanlist:
1052 test.cleanup()
1055 class TestCmd:
1056 """Class TestCmd"""
1058 def __init__(
1059 self,
1060 description=None,
1061 program=None,
1062 interpreter=None,
1063 workdir=None,
1064 subdir=None,
1065 verbose: int = -1,
1066 match=None,
1067 match_stdout=None,
1068 match_stderr=None,
1069 diff=None,
1070 diff_stdout=None,
1071 diff_stderr=None,
1072 combine: bool = False,
1073 universal_newlines: bool | None = True,
1074 timeout: float | None = None,
1075 ) -> None:
1076 self.external = os.environ.get('SCONS_EXTERNAL_TEST', 0)
1077 self._cwd = os.getcwd()
1078 self.description_set(description)
1079 self.program_set(program)
1080 self.interpreter_set(interpreter)
1081 if verbose == -1:
1082 try:
1083 verbose = max(0, int(os.environ.get('TESTCMD_VERBOSE', 0)))
1084 except ValueError:
1085 verbose = 0
1086 self.verbose_set(verbose)
1087 self.combine = combine
1088 self.universal_newlines = universal_newlines
1089 self.process: Popen | None = None
1090 # Two layers of timeout: one at the test class instance level,
1091 # one set on an individual start() call (usually via a run() call)
1092 self.timeout = timeout
1093 self.start_timeout = None
1094 self.set_match_function(match, match_stdout, match_stderr)
1095 self.set_diff_function(diff, diff_stdout, diff_stderr)
1096 self._dirlist = []
1097 self._preserve: dict[str, str | bool] = {
1098 'pass_test': False,
1099 'fail_test': False,
1100 'no_result': False,
1102 preserve_value = os.environ.get('PRESERVE', False)
1103 if preserve_value not in [0, '0', 'False']:
1104 self._preserve['pass_test'] = os.environ['PRESERVE']
1105 self._preserve['fail_test'] = os.environ['PRESERVE']
1106 self._preserve['no_result'] = os.environ['PRESERVE']
1107 else:
1108 self._preserve['pass_test'] = os.environ.get('PRESERVE_PASS', False)
1109 self._preserve['fail_test'] = os.environ.get('PRESERVE_FAIL', False)
1110 self._preserve['no_result'] = os.environ.get('PRESERVE_NO_RESULT', False)
1111 self._stdout = []
1112 self._stderr = []
1113 self.status: int | None = None
1114 self.condition = 'no_result'
1115 self.workdir: str | None
1116 self.workdir_set(workdir)
1117 self.subdir(subdir)
1119 try:
1120 self.fixture_dirs = (os.environ['FIXTURE_DIRS']).split(os.pathsep)
1121 except KeyError:
1122 self.fixture_dirs = []
1124 def __del__(self) -> None:
1125 self.cleanup()
1127 def __repr__(self) -> str:
1128 return f"{id(self):x}"
1130 banner_char = '='
1131 banner_width = 80
1133 def banner(self, s, width=None) -> str:
1134 if width is None:
1135 width = self.banner_width
1136 return f"{s:{self.banner_char}<{width}}"
1138 escape = staticmethod(escape)
1140 def canonicalize(self, path):
1141 if is_List(path):
1142 path = os.path.join(*path)
1143 if not os.path.isabs(path):
1144 path = os.path.join(self.workdir, path)
1145 return path
1147 def chmod(self, path, mode) -> None:
1148 """Changes permissions on the specified file or directory."""
1149 path = self.canonicalize(path)
1150 os.chmod(path, mode)
1152 def cleanup(self, condition=None) -> None:
1153 """Removes any temporary working directories.
1155 Cleans the TestCmd instance. If the environment variable PRESERVE was
1156 set when the TestCmd environment was created, temporary working
1157 directories are not removed. If any of the environment variables
1158 PRESERVE_PASS, PRESERVE_FAIL, or PRESERVE_NO_RESULT were set
1159 when the TestCmd environment was created, then temporary working
1160 directories are not removed if the test passed, failed, or had
1161 no result, respectively. Temporary working directories are also
1162 preserved for conditions specified via the preserve method.
1164 Typically, this method is not called directly, but is used when
1165 the script exits to clean up temporary working directories as
1166 appropriate for the exit status.
1168 if not self._dirlist:
1169 return
1170 os.chdir(self._cwd)
1171 self.workdir = None
1172 if condition is None:
1173 condition = self.condition
1174 if self._preserve[condition]:
1175 for dir in self._dirlist:
1176 print(f"Preserved directory {dir}")
1177 else:
1178 list = self._dirlist[:]
1179 list.reverse()
1180 for dir in list:
1181 self.writable(dir, True)
1182 shutil.rmtree(dir, ignore_errors=True)
1183 self._dirlist = []
1185 global _Cleanup
1186 if self in _Cleanup:
1187 _Cleanup.remove(self)
1189 def command_args(self, program=None, interpreter=None, arguments=None):
1190 if not self.external:
1191 if program:
1192 if isinstance(program, str) and not os.path.isabs(program):
1193 program = os.path.join(self._cwd, program)
1194 else:
1195 program = self.program
1196 if not interpreter:
1197 interpreter = self.interpreter
1198 else:
1199 if not program:
1200 program = self.program
1201 if not interpreter:
1202 interpreter = self.interpreter
1203 if not isinstance(program, (list, tuple)):
1204 program = [program]
1205 cmd = list(program)
1206 if interpreter:
1207 if not isinstance(interpreter, (list, tuple)):
1208 interpreter = [interpreter]
1209 cmd = list(interpreter) + cmd
1210 if arguments:
1211 if isinstance(arguments, dict):
1212 cmd.extend([f"{k}={v}" for k, v in arguments.items()])
1213 return cmd
1214 if isinstance(arguments, str):
1215 # Split into a list for passing to SCons. This *will*
1216 # break if the string has embedded spaces as part of a substing -
1217 # use a # list to pass those to avoid the problem.
1218 arguments = arguments.split()
1219 cmd.extend(arguments)
1220 return cmd
1222 def description_set(self, description) -> None:
1223 """Set the description of the functionality being tested."""
1224 self.description = description
1226 def set_diff_function(self, diff=_Null, stdout=_Null, stderr=_Null) -> None:
1227 """Sets the specified diff functions."""
1228 if diff is not _Null:
1229 self._diff_function = diff
1230 if stdout is not _Null:
1231 self._diff_stdout_function = stdout
1232 if stderr is not _Null:
1233 self._diff_stderr_function = stderr
1235 def diff(self, a, b, name=None, diff_function=None, *args, **kw) -> None:
1236 if diff_function is None:
1237 try:
1238 diff_function = getattr(self, self._diff_function)
1239 except TypeError:
1240 diff_function = self._diff_function
1241 if diff_function is None:
1242 diff_function = self.simple_diff
1243 if name is not None:
1244 print(self.banner(name))
1246 if not is_List(a):
1247 a = a.splitlines()
1248 if not is_List(b):
1249 b = b.splitlines()
1251 args = (a, b) + args
1252 for line in diff_function(*args, **kw):
1253 print(line)
1255 def diff_stderr(self, a, b, *args, **kw):
1256 """Compare actual and expected file contents."""
1257 try:
1258 diff_stderr_function = getattr(self, self._diff_stderr_function)
1259 except TypeError:
1260 diff_stderr_function = self._diff_stderr_function
1261 return self.diff(a, b, diff_function=diff_stderr_function, *args, **kw)
1263 def diff_stdout(self, a, b, *args, **kw):
1264 """Compare actual and expected file contents."""
1265 try:
1266 diff_stdout_function = getattr(self, self._diff_stdout_function)
1267 except TypeError:
1268 diff_stdout_function = self._diff_stdout_function
1269 return self.diff(a, b, diff_function=diff_stdout_function, *args, **kw)
1271 simple_diff = staticmethod(simple_diff)
1273 diff_re = staticmethod(diff_re)
1275 context_diff = staticmethod(difflib.context_diff)
1277 unified_diff = staticmethod(difflib.unified_diff)
1279 def fail_test(
1280 self,
1281 condition: bool = True,
1282 function: Callable | None = None,
1283 skip: int = 0,
1284 message: str = "",
1285 ) -> None:
1286 """Cause the test to fail."""
1287 if not condition:
1288 return
1289 self.condition = 'fail_test'
1290 fail_test(
1291 self=self,
1292 condition=condition,
1293 function=function,
1294 skip=skip,
1295 message=message,
1298 def interpreter_set(self, interpreter) -> None:
1299 """Set the program to be used to interpret the program
1300 under test as a script.
1302 self.interpreter = interpreter
1304 def set_match_function(self, match=_Null, stdout=_Null, stderr=_Null) -> None:
1305 """Sets the specified match functions."""
1306 if match is not _Null:
1307 self._match_function = match
1308 if stdout is not _Null:
1309 self._match_stdout_function = stdout
1310 if stderr is not _Null:
1311 self._match_stderr_function = stderr
1313 def match(self, lines, matches):
1314 """Compare actual and expected file contents."""
1315 try:
1316 match_function = getattr(self, self._match_function)
1317 except TypeError:
1318 match_function = self._match_function
1319 if match_function is None:
1320 # Default is regular expression matches.
1321 match_function = self.match_re
1322 return match_function(lines, matches)
1324 def match_stderr(self, lines, matches):
1325 """Compare actual and expected file contents."""
1326 try:
1327 match_stderr_function = getattr(self, self._match_stderr_function)
1328 except TypeError:
1329 match_stderr_function = self._match_stderr_function
1330 if match_stderr_function is None:
1331 # Default is to use whatever match= is set to.
1332 match_stderr_function = self.match
1333 return match_stderr_function(lines, matches)
1335 def match_stdout(self, lines, matches):
1336 """Compare actual and expected file contents."""
1337 try:
1338 match_stdout_function = getattr(self, self._match_stdout_function)
1339 except TypeError:
1340 match_stdout_function = self._match_stdout_function
1341 if match_stdout_function is None:
1342 # Default is to use whatever match= is set to.
1343 match_stdout_function = self.match
1344 return match_stdout_function(lines, matches)
1346 match_exact = staticmethod(match_exact)
1348 match_caseinsensitive = staticmethod(match_caseinsensitive)
1350 match_re = staticmethod(match_re)
1352 match_re_dotall = staticmethod(match_re_dotall)
1354 def no_result(self, condition: bool = True, function=None, skip: int = 0) -> None:
1355 """Report that the test could not be run."""
1356 if not condition:
1357 return
1358 self.condition = 'no_result'
1359 no_result(self=self, condition=condition, function=function, skip=skip)
1361 def pass_test(self, condition: bool = True, function=None) -> None:
1362 """Cause the test to pass."""
1363 if not condition:
1364 return
1365 self.condition = 'pass_test'
1366 pass_test(self=self, condition=condition, function=function)
1368 def preserve(self, *conditions) -> None:
1369 """Preserves temporary working directories.
1371 Arrange for the temporary working directories for the
1372 specified TestCmd environment to be preserved for one or more
1373 conditions. If no conditions are specified, arranges for
1374 the temporary working directories to be preserved for all
1375 conditions.
1377 if not conditions:
1378 conditions = ('pass_test', 'fail_test', 'no_result')
1379 for cond in conditions:
1380 self._preserve[cond] = True
1382 def program_set(self, program) -> None:
1383 """Sets the executable program or script to be tested."""
1384 if not self.external:
1385 if program and not os.path.isabs(program):
1386 program = os.path.join(self._cwd, program)
1387 self.program = program
1389 def read(self, file, mode: str = 'rb', newline=None):
1390 """Reads and returns the contents of the specified file name.
1392 The file name may be a list, in which case the elements are
1393 concatenated with the os.path.join() method. The file is
1394 assumed to be under the temporary working directory unless it
1395 is an absolute path name. The I/O mode for the file may
1396 be specified; it must begin with an 'r'. The default is
1397 'rb' (binary read).
1399 file = self.canonicalize(file)
1400 if mode[0] != 'r':
1401 raise ValueError("mode must begin with 'r'")
1402 if 'b' not in mode:
1403 with open(file, mode, newline=newline) as f:
1404 return f.read()
1405 else:
1406 with open(file, mode) as f:
1407 return f.read()
1409 def rmdir(self, dir) -> None:
1410 """Removes the specified dir name.
1412 The dir name may be a list, in which case the elements are
1413 concatenated with the os.path.join() method. The dir is
1414 assumed to be under the temporary working directory unless it
1415 is an absolute path name.
1416 The dir must be empty.
1418 dir = self.canonicalize(dir)
1419 os.rmdir(dir)
1421 def parse_path(self, path, suppress_current: bool = False):
1422 """Return a list with the single path components of path."""
1423 head, tail = os.path.split(path)
1424 result = []
1425 if not tail:
1426 if head == path:
1427 return [head]
1428 else:
1429 result.append(tail)
1430 head, tail = os.path.split(head)
1431 while head and tail:
1432 result.append(tail)
1433 head, tail = os.path.split(head)
1434 result.append(head or tail)
1435 result.reverse()
1437 return result
1439 def dir_fixture(self, srcdir, dstdir=None) -> None:
1440 """Copies the contents of the fixture directory to the test directory.
1442 If srcdir is an absolute path, it is tried directly, else
1443 the fixture_dirs are searched in order to find the named fixture
1444 directory. To tightly control the search order, the harness may
1445 be called with FIXTURE_DIRS set including the test source directory
1446 in the desired position, else it will be tried last.
1448 If dstdir not an absolute path, it is taken as a destination under
1449 the working dir (if omitted of the default None indicates '.',
1450 aka the test dir). dstdir is created automatically if needed.
1452 srcdir or dstdir may be a list, in which case the elements are first
1453 joined into a pathname.
1455 if is_List(srcdir):
1456 srcdir = os.path.join(*srcdir)
1457 spath = srcdir
1458 if srcdir and self.fixture_dirs and not os.path.isabs(srcdir):
1459 for dir in self.fixture_dirs:
1460 spath = os.path.join(dir, srcdir)
1461 if os.path.isdir(spath):
1462 break
1463 else:
1464 spath = srcdir
1466 if not dstdir or dstdir == '.':
1467 dstdir = self.workdir
1468 else:
1469 if is_List(dstdir):
1470 dstdir = os.path.join(*dstdir)
1471 if os.path.isabs(dstdir):
1472 os.makedirs(dstdir, exist_ok=True)
1473 else:
1474 dstlist = self.parse_path(dstdir)
1475 if dstlist and dstlist[0] == ".":
1476 dstdir = os.path.join(dstlist[1:])
1477 self.subdir(dstdir)
1479 for entry in os.listdir(spath):
1480 epath = os.path.join(spath, entry)
1481 dpath = os.path.join(dstdir, entry)
1482 if os.path.isdir(epath):
1483 # Copy the subfolder
1484 shutil.copytree(epath, dpath)
1485 else:
1486 shutil.copy(epath, dpath)
1488 def file_fixture(self, srcfile, dstfile=None) -> None:
1489 """Copies a fixture file to the test directory, optionally renaming.
1491 If srcfile is an absolute path, it is tried directly, else
1492 the fixture_dirs are searched in order to find the named fixture
1493 file. To tightly control the search order, the harness may
1494 be called with FIXTURE_DIRS also including the test source directory
1495 in the desired place, it will otherwise be tried last.
1497 dstfile is the name to give the copied file; if the argument
1498 is omitted the basename of srcfile is used. If dstfile is not
1499 an absolute path name. Any directory components of dstfile are
1500 created automatically if needed.
1502 srcfile or dstfile may be a list, in which case the elements are first
1503 joined into a pathname.
1505 if is_List(srcfile):
1506 srcfile = os.path.join(*srcfile)
1508 srcpath, srctail = os.path.split(srcfile)
1509 spath = srcfile
1510 if srcfile and self.fixture_dirs and not os.path.isabs(srcfile):
1511 for dir in self.fixture_dirs:
1512 spath = os.path.join(dir, srcfile)
1513 if os.path.isfile(spath):
1514 break
1515 else:
1516 spath = srcfile
1518 if not dstfile:
1519 if srctail:
1520 dpath = os.path.join(self.workdir, srctail)
1521 else:
1522 return
1523 else:
1524 dstdir, dsttail = os.path.split(dstfile)
1525 if dstdir:
1526 # if dstfile has a dir part, and is not abspath, create
1527 if os.path.isabs(dstdir):
1528 os.makedirs(dstdir, exist_ok=True)
1529 dpath = dstfile
1530 else:
1531 dstlist = self.parse_path(dstdir)
1532 if dstlist and dstlist[0] == ".":
1533 # strip leading ./ if present
1534 dstdir = os.path.join(dstlist[1:])
1535 self.subdir(dstdir)
1536 dpath = os.path.join(self.workdir, dstfile)
1537 else:
1538 dpath = os.path.join(self.workdir, dstfile)
1540 shutil.copy(spath, dpath)
1542 def start(
1543 self,
1544 program=None,
1545 interpreter=None,
1546 arguments=None,
1547 universal_newlines=None,
1548 timeout=None,
1549 **kw,
1550 ) -> Popen:
1551 """Starts a program or script for the test environment.
1553 The specified program will have the original directory
1554 prepended unless it is enclosed in a [list].
1556 cmd = self.command_args(program, interpreter, arguments)
1557 if self.verbose:
1558 cmd_string = ' '.join([self.escape(c) for c in cmd])
1559 sys.stderr.write(cmd_string + "\n")
1560 if universal_newlines is None:
1561 universal_newlines = self.universal_newlines
1563 # On Windows, if we make stdin a pipe when we plan to send
1564 # no input, and the test program exits before
1565 # Popen calls msvcrt.open_osfhandle, that call will fail.
1566 # So don't use a pipe for stdin if we don't need one.
1567 stdin = kw.get('stdin', None)
1568 if stdin is not None:
1569 stdin = PIPE
1571 combine = kw.get('combine', self.combine)
1572 if combine:
1573 stderr_value = STDOUT
1574 else:
1575 stderr_value = PIPE
1577 if timeout:
1578 self.start_timeout = timeout
1580 if sys.platform == 'win32':
1581 # Set this otherwist stdout/stderr pipes default to
1582 # windows default locale cp1252 which will throw exception
1583 # if using non-ascii characters.
1584 # For example test/Install/non-ascii-name.py
1585 os.environ['PYTHONIOENCODING'] = 'utf-8'
1587 # It seems that all pythons up to py3.6 still set text mode if you set encoding.
1588 # TODO: File enhancement request on python to propagate universal_newlines even
1589 # if encoding is set.hg c
1590 p = Popen(
1591 cmd,
1592 stdin=stdin,
1593 stdout=PIPE,
1594 stderr=stderr_value,
1595 env=os.environ,
1596 universal_newlines=False,
1599 self.process = p
1600 return p
1602 @staticmethod
1603 def fix_binary_stream(stream):
1604 """Handle stream from popen when universal_newline is not enabled.
1606 This will read from the pipes in binary mode, will not decode the
1607 output, and will not convert line endings to \n. We do this because
1608 in py3 (3.5) with universal_newlines=True, it will choose the default
1609 system locale to decode the output, and this breaks unicode output.
1610 Specifically test/option--tree.py which outputs a unicode char.
1612 py 3.6 allows us to pass an encoding param to Popen thus not requiring
1613 the decode nor end of line handling, because we propagate
1614 universal_newlines as specified.
1616 TODO: Do we need to pass universal newlines into this function?
1618 if not stream:
1619 return stream
1620 # It seems that py3.6 still sets text mode if you set encoding.
1621 stream = stream.decode('utf-8', errors='replace')
1622 return stream.replace('\r\n', '\n')
1624 def finish(self, popen=None, **kw) -> None:
1625 """Finishes and waits for the process.
1627 Process being run under control of the specified popen argument
1628 is waited for, recording the exit status, output and error output.
1630 if popen is None:
1631 popen = self.process
1632 if self.start_timeout:
1633 timeout = self.start_timeout
1634 # we're using a timeout from start, now reset it to default
1635 self.start_timeout = None
1636 else:
1637 timeout = self.timeout
1638 try:
1639 stdout, stderr = popen.communicate(timeout=timeout)
1640 except subprocess.TimeoutExpired:
1641 popen.terminate()
1642 stdout, stderr = popen.communicate()
1644 # this is instead of using Popen as a context manager:
1645 if popen.stdout:
1646 popen.stdout.close()
1647 if popen.stderr:
1648 popen.stderr.close()
1649 try:
1650 if popen.stdin:
1651 popen.stdin.close()
1652 finally:
1653 popen.wait()
1655 stdout = self.fix_binary_stream(stdout)
1656 stderr = self.fix_binary_stream(stderr)
1658 self.status = popen.returncode
1659 self.process = None
1660 self._stdout.append(stdout or '')
1661 self._stderr.append(stderr or '')
1663 def run(
1664 self,
1665 program=None,
1666 interpreter=None,
1667 arguments=None,
1668 chdir=None,
1669 stdin=None,
1670 universal_newlines=None,
1671 timeout=None,
1672 ) -> None:
1673 """Runs a test of the program or script for the test environment.
1675 Output and error output are saved for future retrieval via
1676 the stdout() and stderr() methods.
1678 The specified *program* will have the original directory
1679 prepended unless it is enclosed in a [list].
1681 If *arguments* is a dict then will create arguments with KEY+VALUE
1682 for each entry in the dict.
1684 if self.external:
1685 if not program:
1686 program = self.program
1687 if not interpreter:
1688 interpreter = self.interpreter
1690 if universal_newlines is None:
1691 universal_newlines = self.universal_newlines
1693 if chdir:
1694 oldcwd = os.getcwd()
1695 if not os.path.isabs(chdir):
1696 chdir = os.path.join(self.workpath(chdir))
1697 if self.verbose:
1698 sys.stderr.write(f"chdir({chdir})\n")
1699 os.chdir(chdir)
1700 if not timeout:
1701 timeout = self.timeout
1702 p = self.start(
1703 program=program,
1704 interpreter=interpreter,
1705 arguments=arguments,
1706 universal_newlines=universal_newlines,
1707 timeout=timeout,
1708 stdin=stdin,
1710 if is_List(stdin):
1711 stdin = ''.join(stdin)
1713 if stdin:
1714 stdin = to_bytes(stdin)
1716 # TODO(sgk): figure out how to re-use the logic in the .finish()
1717 # method above. Just calling it from here causes problems with
1718 # subclasses that redefine .finish(). We could abstract this
1719 # into Yet Another common method called both here and by .finish(),
1720 # but that seems ill-thought-out.
1721 try:
1722 stdout, stderr = p.communicate(input=stdin, timeout=timeout)
1723 except subprocess.TimeoutExpired:
1724 p.terminate()
1725 stdout, stderr = p.communicate()
1727 # this is instead of using Popen as a context manager:
1728 if p.stdout:
1729 p.stdout.close()
1730 if p.stderr:
1731 p.stderr.close()
1732 try:
1733 if p.stdin:
1734 p.stdin.close()
1735 finally:
1736 p.wait()
1738 self.status = p.returncode
1739 self.process = None
1741 stdout = self.fix_binary_stream(stdout)
1742 stderr = self.fix_binary_stream(stderr)
1744 self._stdout.append(stdout or '')
1745 self._stderr.append(stderr or '')
1747 if chdir:
1748 os.chdir(oldcwd)
1749 if self.verbose >= 2:
1750 write = sys.stdout.write
1751 write('============ STATUS: %d\n' % self.status)
1752 out = self.stdout() or ""
1753 if out or self.verbose >= 3:
1754 write(f'============ BEGIN STDOUT (len={len(out)}):\n')
1755 write(out)
1756 write('============ END STDOUT\n')
1757 err = self.stderr() or ""
1758 if err or self.verbose >= 3:
1759 write(f'============ BEGIN STDERR (len={len(err)})\n')
1760 write(err)
1761 write('============ END STDERR\n')
1763 def sleep(self, seconds=default_sleep_seconds) -> None:
1764 """Sleeps at least the specified number of seconds.
1766 If no number is specified, sleeps at least the minimum number of
1767 seconds necessary to advance file time stamps on the current
1768 system. Sleeping more seconds is all right.
1770 time.sleep(seconds)
1772 def stderr(self, run=None) -> str | None:
1773 """Returns the stored standard error output from a given run.
1775 Args:
1776 run: run number to select. If run number is omitted,
1777 return the standard error of the most recent run.
1778 If negative, use as a relative offset, e.g. -2
1779 means the run two prior to the most recent.
1781 Returns:
1782 selected sterr string or None if there are no stored runs.
1784 if not run:
1785 run = len(self._stderr)
1786 elif run < 0:
1787 run = len(self._stderr) + run
1788 run -= 1
1789 try:
1790 return self._stderr[run]
1791 except IndexError:
1792 return None
1794 def stdout(self, run=None) -> str | None:
1795 """Returns the stored standard output from a given run.
1797 Args:
1798 run: run number to select. If run number is omitted,
1799 return the standard output of the most recent run.
1800 If negative, use as a relative offset, e.g. -2
1801 means the run two prior to the most recent.
1803 Returns:
1804 selected stdout string or None if there are no stored runs.
1806 if not run:
1807 run = len(self._stdout)
1808 elif run < 0:
1809 run = len(self._stdout) + run
1810 run -= 1
1811 try:
1812 return self._stdout[run]
1813 except IndexError:
1814 return None
1816 def subdir(self, *subdirs):
1817 """Creates new subdirectories under the temporary working directory.
1819 Creates a subdir for each argument. An argument may be a list,
1820 in which case the list elements are joined into a path.
1822 Returns the number of directories created, not including
1823 intermediate directories, for historical reasons. A directory
1824 which already existed is counted as "created".
1826 count = 0
1827 for sub in subdirs:
1828 if sub is None:
1829 continue
1830 if is_List(sub):
1831 sub = os.path.join(*sub)
1832 new = os.path.join(self.workdir, sub)
1833 try:
1834 # okay to exist, we just do this for counting
1835 os.makedirs(new, exist_ok=True)
1836 count = count + 1
1837 except OSError as e:
1838 pass
1840 return count
1842 def symlink(self, target, link) -> None:
1843 """Creates a symlink to the specified target.
1845 The link name may be a list, in which case the elements are
1846 concatenated with the os.path.join() method. The link is
1847 assumed to be under the temporary working directory unless it
1848 is an absolute path name. The target is *not* assumed to be
1849 under the temporary working directory.
1851 if sys.platform == 'win32':
1852 # Skip this on windows as we're not enabling it due to
1853 # it requiring user permissions which aren't always present
1854 # and we don't have a good way to detect those permissions yet.
1855 return
1856 link = self.canonicalize(link)
1857 try:
1858 os.symlink(target, link)
1859 except AttributeError:
1860 pass # Windows has no symlink
1862 def tempdir(self, path=None):
1863 """Creates a temporary directory.
1865 A unique directory name is generated if no path name is specified.
1866 The directory is created, and will be removed when the TestCmd
1867 object is destroyed.
1869 if path is None:
1870 try:
1871 # put tests in a subdir of the default, so antivirus
1872 # can be given that directory as an "ignore".
1873 testdir = Path(tempfile.gettempdir()) / "scons"
1874 testdir.mkdir(exist_ok=True)
1875 path = tempfile.mkdtemp(prefix=testprefix, dir=testdir)
1876 except TypeError:
1877 path = tempfile.mkdtemp()
1878 else:
1879 os.mkdir(path)
1881 # Symlinks in the path will report things
1882 # differently from os.getcwd(), so chdir there
1883 # and back to fetch the canonical path.
1884 cwd = os.getcwd()
1885 try:
1886 os.chdir(path)
1887 path = os.getcwd()
1888 finally:
1889 os.chdir(cwd)
1891 # Uppercase the drive letter since the case of drive
1892 # letters is pretty much random on win32:
1893 drive, rest = os.path.splitdrive(path)
1894 if drive:
1895 path = drive.upper() + rest
1898 self._dirlist.append(path)
1900 global _Cleanup
1901 if self not in _Cleanup:
1902 _Cleanup.append(self)
1904 return path
1906 def touch(self, path, mtime=None) -> None:
1907 """Updates the modification time on the specified file or directory.
1909 The default is to update to the
1910 current time if no explicit modification time is specified.
1912 path = self.canonicalize(path)
1913 atime = os.path.getatime(path)
1914 if mtime is None:
1915 mtime = time.time()
1916 os.utime(path, (atime, mtime))
1918 def unlink(self, file) -> None:
1919 """Unlinks the specified file name.
1921 The file name may be a list, in which case the elements are
1922 concatenated with the os.path.join() method. The file is
1923 assumed to be under the temporary working directory unless it
1924 is an absolute path name.
1926 file = self.canonicalize(file)
1927 os.unlink(file)
1929 def unlink_files(self, dirpath, files):
1930 """Unlinks a list of file names from the specified directory.
1932 The directory path may be a list, in which case the elements are
1933 concatenated with the os.path.join() method.
1935 A file name may be a list, in which case the elements are
1936 concatenated with the os.path.join() method.
1938 The directory path and file name are concatenated with the
1939 os.path.join() method. The resulting file path is assumed to be
1940 under the temporary working directory unless it is an absolute path
1941 name. An attempt to unlink the resulting file is made only when the
1942 file exists otherwise the file path is ignored.
1944 if is_List(dirpath):
1945 dirpath = os.path.join(*dirpath)
1946 for file in files:
1947 if is_List(file):
1948 file = os.path.join(*file)
1949 filepath = os.path.join(dirpath, file)
1950 filepath = self.canonicalize(filepath)
1951 if os.path.exists(filepath):
1952 self.unlink(filepath)
1954 def verbose_set(self, verbose) -> None:
1955 """Sets the verbose level."""
1956 self.verbose = verbose
1958 def where_is(self, file, path=None, pathext=None):
1959 """Finds an executable file."""
1960 if is_List(file):
1961 file = os.path.join(*file)
1962 if not os.path.isabs(file):
1963 file = where_is(file, path, pathext)
1964 return file
1966 def workdir_set(self, path) -> None:
1967 """Creates a temporary working directory with the specified path name.
1969 If the path is a null string (''), a unique directory name is created.
1971 if path is not None:
1972 if path == '':
1973 path = None
1974 path = self.tempdir(path)
1975 self.workdir = path
1977 def workpath(self, *args):
1978 """Returns the absolute path name to a subdirectory or file within the current temporary working directory.
1980 Concatenates the temporary working directory name with the specified
1981 arguments using the os.path.join() method.
1983 return os.path.join(self.workdir, *args)
1985 def readable(self, top, read: bool = True) -> None:
1986 """Makes the specified directory tree readable or unreadable.
1988 Tree is made readable if `read` evaluates True (the default),
1989 else it is made not readable.
1991 This method has no effect on Windows systems, which use a
1992 completely different mechanism to control file readability.
1995 if sys.platform == 'win32':
1996 return
1998 if read:
2000 def do_chmod(fname) -> None:
2001 try:
2002 st = os.stat(fname)
2003 except OSError:
2004 pass
2005 else:
2006 os.chmod(fname, stat.S_IMODE(st.st_mode | stat.S_IREAD))
2007 else:
2009 def do_chmod(fname) -> None:
2010 try:
2011 st = os.stat(fname)
2012 except OSError:
2013 pass
2014 else:
2015 os.chmod(fname, stat.S_IMODE(st.st_mode & ~stat.S_IREAD))
2017 if os.path.isfile(top):
2018 # If it's a file, that's easy, just chmod it.
2019 do_chmod(top)
2020 elif read:
2021 # It's a directory and we're trying to turn on read
2022 # permission, so it's also pretty easy, just chmod the
2023 # directory and then chmod every entry on our walk down the
2024 # tree.
2025 do_chmod(top)
2026 for dirpath, dirnames, filenames in os.walk(top):
2027 for name in dirnames + filenames:
2028 do_chmod(os.path.join(dirpath, name))
2029 else:
2030 # It's a directory and we're trying to turn off read
2031 # permission, which means we have to chmod the directories
2032 # in the tree bottom-up, lest disabling read permission from
2033 # the top down get in the way of being able to get at lower
2034 # parts of the tree.
2035 for dirpath, dirnames, filenames in os.walk(top, topdown=False):
2036 for name in dirnames + filenames:
2037 do_chmod(os.path.join(dirpath, name))
2038 do_chmod(top)
2040 def writable(self, top, write: bool = True) -> None:
2041 """Make the specified directory tree writable or unwritable.
2043 Tree is made writable if `write` evaluates True (the default),
2044 else it is made not writable.
2046 Note on Windows the only thing we can do is and/remove the
2047 "readable" setting without resorting to PyWin32 - and that,
2048 only as Administrator, so this is kind of pointless there.
2051 if sys.platform == 'win32':
2052 if write:
2054 def do_chmod(fname) -> None:
2055 try:
2056 os.chmod(fname, stat.S_IWRITE)
2057 except OSError:
2058 pass
2059 else:
2061 def do_chmod(fname) -> None:
2062 try:
2063 os.chmod(fname, stat.S_IREAD)
2064 except OSError:
2065 pass
2067 else:
2068 if write:
2070 def do_chmod(fname) -> None:
2071 try:
2072 st = os.stat(fname)
2073 except OSError:
2074 pass
2075 else:
2076 os.chmod(fname, stat.S_IMODE(st.st_mode | stat.S_IWRITE))
2077 else:
2079 def do_chmod(fname) -> None:
2080 try:
2081 st = os.stat(fname)
2082 except OSError:
2083 pass
2084 else:
2085 os.chmod(fname, stat.S_IMODE(st.st_mode & ~stat.S_IWRITE))
2087 if os.path.isfile(top):
2088 do_chmod(top)
2089 else:
2090 do_chmod(top)
2091 for dirpath, dirnames, filenames in os.walk(top, topdown=False):
2092 for name in dirnames + filenames:
2093 do_chmod(os.path.join(dirpath, name))
2095 def executable(self, top, execute: bool = True) -> None:
2096 """Make the specified directory tree executable or not executable.
2098 Tree is made executable if `execute` evaluates True (the default),
2099 else it is made not executable.
2101 This method has no effect on Windows systems, which use a
2102 completely different mechanism to control file executability.
2105 if sys.platform == 'win32':
2106 return
2108 if execute:
2110 def do_chmod(fname) -> None:
2111 try:
2112 st = os.stat(fname)
2113 except OSError:
2114 pass
2115 else:
2116 os.chmod(fname, stat.S_IMODE(st.st_mode | stat.S_IEXEC))
2117 else:
2119 def do_chmod(fname) -> None:
2120 try:
2121 st = os.stat(fname)
2122 except OSError:
2123 pass
2124 else:
2125 os.chmod(fname, stat.S_IMODE(st.st_mode & ~stat.S_IEXEC))
2127 if os.path.isfile(top):
2128 # If it's a file, that's easy, just chmod it.
2129 do_chmod(top)
2130 elif execute:
2131 # It's a directory and we're trying to turn on execute
2132 # permission, so it's also pretty easy, just chmod the
2133 # directory and then chmod every entry on our walk down the
2134 # tree.
2135 do_chmod(top)
2136 for dirpath, dirnames, filenames in os.walk(top):
2137 for name in dirnames + filenames:
2138 do_chmod(os.path.join(dirpath, name))
2139 else:
2140 # It's a directory and we're trying to turn off execute
2141 # permission, which means we have to chmod the directories
2142 # in the tree bottom-up, lest disabling execute permission from
2143 # the top down get in the way of being able to get at lower
2144 # parts of the tree.
2145 for dirpath, dirnames, filenames in os.walk(top, topdown=False):
2146 for name in dirnames + filenames:
2147 do_chmod(os.path.join(dirpath, name))
2148 do_chmod(top)
2150 def write(self, file, content, mode: str = 'wb'):
2151 """Writes data to file.
2153 The file is created under the temporary working directory.
2154 Any subdirectories in the path must already exist. The
2155 write is converted to the required type rather than failing
2156 if there is a str/bytes mistmatch.
2158 :param file: name of file to write to. If a list, treated
2159 as components of a path and concatenated into a path.
2160 :type file: str or list(str)
2161 :param content: data to write.
2162 :type content: str or bytes
2163 :param mode: file mode, default is binary.
2164 :type mode: str
2166 file = self.canonicalize(file)
2167 if mode[0] != 'w':
2168 raise ValueError("mode must begin with 'w'")
2169 with open(file, mode) as f:
2170 try:
2171 f.write(content)
2172 except TypeError as e:
2173 f.write(bytes(content, 'utf-8'))
2176 # Local Variables:
2177 # tab-width:4
2178 # indent-tabs-mode:nil
2179 # End:
2180 # vim: set expandtab tabstop=4 shiftwidth=4: