1 # Copyright 2000-2024 Steven Knight
3 # This module is free software, and you may redistribute it and/or modify
4 # it under the same terms as Python itself, so long as this copyright message
5 # and disclaimer are retained in their original form.
7 # IN NO EVENT SHALL THE AUTHOR BE LIABLE TO ANY PARTY FOR DIRECT, INDIRECT,
8 # SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT OF THE USE OF
9 # THIS CODE, EVEN IF THE AUTHOR HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH
12 # THE AUTHOR SPECIFICALLY DISCLAIMS ANY WARRANTIES, INCLUDING, BUT NOT
13 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
14 # PARTICULAR PURPOSE. THE CODE PROVIDED HEREUNDER IS ON AN "AS IS" BASIS,
15 # AND THERE IS NO OBLIGATION WHATSOEVER TO PROVIDE MAINTENANCE,
16 # SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
18 # Python License: https://docs.python.org/3/license.html#psf-license
21 A testing framework for commands and scripts.
23 The TestCmd module provides a framework for portable automated testing
24 of executable commands and scripts (in any language, not just Python),
25 especially commands and scripts that require file system interaction.
27 In addition to running tests and evaluating conditions, the TestCmd
28 module manages and cleans up one or more temporary workspace
29 directories, and provides methods for creating files and directories in
30 those workspace directories from in-line data, here-documents), allowing
31 tests to be completely self-contained.
33 A TestCmd environment object is created via the usual invocation:
36 test = TestCmd.TestCmd()
38 There are a bunch of keyword arguments available at instantiation:
40 test = TestCmd.TestCmd(
42 program='program_or_script_to_test',
43 interpreter='script_interpreter',
46 verbose=int, # verbosity level
47 match=default_match_function,
48 match_stdout=default_match_stdout_function,
49 match_stderr=default_match_stderr_function,
50 diff=default_diff_stderr_function,
51 diff_stdout=default_diff_stdout_function,
52 diff_stderr=default_diff_stderr_function,
56 There are a bunch of methods that let you do different things:
60 test.description_set('string')
62 test.program_set('program_or_script_to_test')
64 test.interpreter_set('script_interpreter')
65 test.interpreter_set(['script_interpreter', 'arg'])
67 test.workdir_set('prefix')
71 test.workpath('subdir', 'file')
73 test.subdir('subdir', ...)
75 test.rmdir('subdir', ...)
77 test.write('file', "contents\n")
78 test.write(['subdir', 'file'], "contents\n")
81 test.read(['subdir', 'file'])
82 test.read('file', mode)
83 test.read(['subdir', 'file'], mode)
85 test.writable('dir', 1)
86 test.writable('dir', None)
88 test.preserve(condition, ...)
90 test.cleanup(condition)
93 program='program_or_script_to_run',
94 interpreter='script_interpreter',
95 arguments='arguments to pass to program',
99 program='program_or_script_to_run',
100 interpreter='script_interpreter',
101 arguments='arguments to pass to program',
102 chdir='directory_to_chdir_to',
103 stdin='input to feed to the program\n',
104 universal_newlines=True,
108 program='program_or_script_to_run',
109 interpreter='script_interpreter',
110 arguments='arguments to pass to program',
111 universal_newlines=None,
117 test.pass_test(condition)
118 test.pass_test(condition, function)
121 test.fail_test(condition)
122 test.fail_test(condition, function)
123 test.fail_test(condition, function, skip)
124 test.fail_test(condition, function, skip, message)
127 test.no_result(condition)
128 test.no_result(condition, function)
129 test.no_result(condition, function, skip)
137 test.symlink(target, link)
140 test.banner(string, width)
142 test.diff(actual, expected)
144 test.diff_stderr(actual, expected)
146 test.diff_stdout(actual, expected)
148 test.match(actual, expected)
150 test.match_stderr(actual, expected)
152 test.match_stdout(actual, expected)
154 test.set_match_function(match, stdout, stderr)
156 test.match_exact("actual 1\nactual 2\n", "expected 1\nexpected 2\n")
157 test.match_exact(["actual 1\n", "actual 2\n"],
158 ["expected 1\n", "expected 2\n"])
159 test.match_caseinsensitive("Actual 1\nACTUAL 2\n", "expected 1\nEXPECTED 2\n")
161 test.match_re("actual 1\nactual 2\n", regex_string)
162 test.match_re(["actual 1\n", "actual 2\n"], list_of_regexes)
164 test.match_re_dotall("actual 1\nactual 2\n", regex_string)
165 test.match_re_dotall(["actual 1\n", "actual 2\n"], list_of_regexes)
168 test.tempdir('temporary-directory')
174 test.where_is('foo', 'PATH1:PATH2')
175 test.where_is('foo', 'PATH1;PATH2', '.suffix3;.suffix4')
178 test.unlink('subdir', 'file')
180 The TestCmd module provides pass_test(), fail_test(), and no_result()
181 unbound functions that report test results for use with the Aegis change
182 management system. These methods terminate the test immediately,
183 reporting PASSED, FAILED, or NO RESULT respectively, and exiting with
184 status 0 (success), 1 or 2 respectively. This allows for a distinction
185 between an actual failed test and a test that could not be properly
186 evaluated because of an external condition (such as a full file system
187 or incorrect permissions).
192 TestCmd.pass_test(condition)
193 TestCmd.pass_test(condition, function)
196 TestCmd.fail_test(condition)
197 TestCmd.fail_test(condition, function)
198 TestCmd.fail_test(condition, function, skip)
199 TestCmd.fail_test(condition, function, skip, message)
202 TestCmd.no_result(condition)
203 TestCmd.no_result(condition, function)
204 TestCmd.no_result(condition, function, skip)
206 The TestCmd module also provides unbound global functions that handle
207 matching in the same way as the match_*() methods described above.
211 test = TestCmd.TestCmd(match=TestCmd.match_exact)
213 test = TestCmd.TestCmd(match=TestCmd.match_caseinsensitive)
215 test = TestCmd.TestCmd(match=TestCmd.match_re)
217 test = TestCmd.TestCmd(match=TestCmd.match_re_dotall)
219 These functions are also available as static methods:
223 test = TestCmd.TestCmd(match=TestCmd.TestCmd.match_exact)
225 test = TestCmd.TestCmd(match=TestCmd.TestCmd.match_caseinsensitive)
227 test = TestCmd.TestCmd(match=TestCmd.TestCmd.match_re)
229 test = TestCmd.TestCmd(match=TestCmd.TestCmd.match_re_dotall)
231 These static methods can be accessed by a string naming the method:
235 test = TestCmd.TestCmd(match='match_exact')
237 test = TestCmd.TestCmd(match='match_caseinsensitive')
239 test = TestCmd.TestCmd(match='match_re')
241 test = TestCmd.TestCmd(match='match_re_dotall')
243 The TestCmd module provides unbound global functions that can be used
244 for the "diff" argument to TestCmd.TestCmd instantiation:
248 test = TestCmd.TestCmd(match=TestCmd.match_re, diff=TestCmd.diff_re)
250 test = TestCmd.TestCmd(diff=TestCmd.simple_diff)
252 test = TestCmd.TestCmd(diff=TestCmd.context_diff)
254 test = TestCmd.TestCmd(diff=TestCmd.unified_diff)
256 These functions are also available as static methods:
260 test = TestCmd.TestCmd(match=TestCmd.TestCmd.match_re, diff=TestCmd.TestCmd.diff_re)
262 test = TestCmd.TestCmd(diff=TestCmd.TestCmd.simple_diff)
264 test = TestCmd.TestCmd(diff=TestCmd.TestCmd.context_diff)
266 test = TestCmd.TestCmd(diff=TestCmd.TestCmd.unified_diff)
268 These static methods can be accessed by a string naming the method:
272 test = TestCmd.TestCmd(match='match_re', diff='diff_re')
274 test = TestCmd.TestCmd(diff='simple_diff')
276 test = TestCmd.TestCmd(diff='context_diff')
278 test = TestCmd.TestCmd(diff='unified_diff')
280 The "diff" argument can also be used with standard difflib functions:
284 test = TestCmd.TestCmd(diff=difflib.context_diff)
286 test = TestCmd.TestCmd(diff=difflib.unified_diff)
288 Lastly, the where_is() method also exists in an unbound function
293 TestCmd.where_is('foo')
294 TestCmd.where_is('foo', 'PATH1:PATH2')
295 TestCmd.where_is('foo', 'PATH1;PATH2', '.suffix3;.suffix4')
298 from __future__
import annotations
300 __author__
= "Steven Knight <knight at baldmt dot com>"
301 __revision__
= "TestCmd.py 1.3.D001 2010/06/03 12:58:27 knight"
325 from collections
import UserList
, UserString
326 from pathlib
import Path
327 from subprocess
import PIPE
, STDOUT
328 from typing
import Callable
330 IS_WINDOWS
= sys
.platform
== 'win32'
331 IS_MACOS
= sys
.platform
== 'darwin'
332 IS_64_BIT
= sys
.maxsize
> 2**32
333 IS_PYPY
= hasattr(sys
, 'pypy_translation_info')
335 IS_ROOT
= os
.geteuid() == 0
336 except AttributeError:
338 NEED_HELPER
= os
.environ
.get('SCONS_NO_DIRECT_SCRIPT')
341 # sentinel for cases where None won't do
350 'match_caseinsensitive',
362 return isinstance(e
, (list, UserList
))
366 if isinstance(s
, bytes
):
368 return bytes(s
, 'utf-8')
374 return str(s
, 'utf-8')
378 return isinstance(e
, (str, UserString
))
381 testprefix
= 'testcmd.'
382 if os
.name
in ('posix', 'nt'):
383 testprefix
+= f
"{os.getpid()}."
385 re_space
= re
.compile(r
'\s')
388 def _caller(tblist
, skip
):
391 for file, line
, name
, text
in tblist
:
392 if file[-10:] == "TestCmd.py":
394 arr
= [(file, line
, name
, text
)] + arr
396 for file, line
, name
, text
in arr
[skip
:]:
397 if name
in ("?", "<module>"):
401 string
= string
+ ("%s line %d of %s%s\n" % (atfrom
, line
, file, name
))
406 def clean_up_ninja_daemon(self
, result_type
) -> None:
408 Kill any running scons daemon started by ninja and clean up
410 Working directory and temp files are removed.
411 Skipped if this platform doesn't have psutil (e.g. msys2 on Windows)
416 for path
in Path(self
.workdir
).rglob('.ninja'):
417 daemon_dir
= Path(tempfile
.gettempdir()) / (
418 f
"scons_daemon_{str(hashlib.md5(str(path.resolve()).encode()).hexdigest())}"
420 pidfiles
= [daemon_dir
/ 'pidfile', path
/ 'scons_daemon_dirty']
421 for pidfile
in pidfiles
:
423 with
open(pidfile
) as f
:
426 os
.kill(pid
, signal
.SIGINT
)
431 if pid
not in [proc
.pid
for proc
in psutil
.process_iter()]:
436 if not self
._preserve
[result_type
]:
437 if daemon_dir
.exists():
438 shutil
.rmtree(daemon_dir
)
443 condition
: bool = True,
444 function
: Callable |
None = None,
448 """Causes a test to exit with a fail.
450 Reports that the test FAILED and exits with a status of 1, unless
451 a condition argument is supplied; if so the completion processing
452 takes place only if the condition is true.
455 self: a test class instance. Must be passed in explicitly
456 by the caller since this is an unbound method.
457 condition (optional): if false, return to let test continue.
458 function (optional): function to call before completion processing.
459 skip (optional): how many lines at the top of the traceback to skip.
460 message (optional): additional text to include in the fail message.
464 if function
is not None:
466 clean_up_ninja_daemon(self
, 'fail_test')
472 of
= f
" of {self.program}"
475 desc
= f
" [{self.description}]"
478 at
= _caller(traceback
.extract_stack(), skip
)
480 msg
= f
"\t{message}\n"
483 sys
.stderr
.write(f
"FAILED test{of}{desc}{sep}{at}{msg}")
488 def no_result(self
=None, condition
: bool = True, function
=None, skip
: int = 0) -> None:
489 """Causes a test to exit with a no result.
491 In testing parlance NO RESULT means the test could not be completed
492 for reasons that imply neither success nor failure - for example a
493 component needed to run the test could be found. However, at this
494 point we still have an "outcome", so record the information and exit
495 with a status code of 2, unless a condition argument is supplied;
496 if so the completion processing takes place only if the condition is true.
498 The different exit code and message allows other logic to distinguish
499 from a fail and decide how to treat NO RESULT tests.
502 self: a test class instance. Must be passed in explicitly
503 by the caller since this is an unbound method.
504 condition (optional): if false, return to let test continue.
505 function (optional): function to call before completion processing.
506 skip (optional): how many lines at the top of the traceback to skip.
510 if function
is not None:
512 clean_up_ninja_daemon(self
, 'no_result')
518 of
= f
" of {self.program}"
521 desc
= f
" [{self.description}]"
524 at
= _caller(traceback
.extract_stack(), skip
)
525 sys
.stderr
.write(f
"NO RESULT for test{of}{desc}{sep}{at}")
530 def pass_test(self
=None, condition
: bool = True, function
=None) -> None:
531 """Causes a test to exit with a pass.
533 Reports that the test PASSED and exits with a status of 0, unless
534 a condition argument is supplied; if so the completion processing
535 takes place only if the condition is true.
537 the test passes only if the condition is true.
540 self: a test class instance. Must be passed in explicitly
541 by the caller since this is an unbound method.
542 condition (optional): if false, return to let test continue.
543 function (optional): function to call before completion processing.
547 if function
is not None:
549 clean_up_ninja_daemon(self
, 'pass_test')
550 sys
.stderr
.write("PASSED\n")
554 def match_exact(lines
=None, matches
=None, newline
=os
.sep
):
555 """Match function using exact match.
557 :param lines: data lines
558 :type lines: str or list[str]
559 :param matches: expected lines to match
560 :type matches: str or list[str]
561 :param newline: line separator
562 :returns: None on failure, 1 on success.
565 if isinstance(lines
, bytes
):
566 newline
= to_bytes(newline
)
568 if not is_List(lines
):
569 lines
= lines
.split(newline
)
570 if not is_List(matches
):
571 matches
= matches
.split(newline
)
572 if len(lines
) != len(matches
):
574 for line
, match
in zip(lines
, matches
):
580 def match_caseinsensitive(lines
=None, matches
=None):
581 """Match function using case-insensitive matching.
583 Only a simplistic comparison is done, based on casefolding
584 the strings. This may still fail but is the suggestion of
585 the Unicode Standard.
587 :param lines: data lines
588 :type lines: str or list[str]
589 :param matches: expected lines to match
590 :type matches: str or list[str]
591 :returns: None on failure, 1 on success.
594 if not is_List(lines
):
595 lines
= lines
.split("\n")
596 if not is_List(matches
):
597 matches
= matches
.split("\n")
598 if len(lines
) != len(matches
):
600 for line
, match
in zip(lines
, matches
):
601 if line
.casefold() != match
.casefold():
606 def match_re(lines
=None, res
=None):
607 """Match function using line-by-line regular expression match.
609 :param lines: data lines
610 :type lines: str or list[str]
611 :param res: regular expression(s) for matching
612 :type res: str or list[str]
613 :returns: None on failure, 1 on success.
616 if not is_List(lines
):
617 # CRs mess up matching (Windows) so split carefully
618 lines
= re
.split('\r?\n', lines
)
620 res
= res
.split("\n")
621 if len(lines
) != len(res
):
622 print(f
"match_re: expected {len(res)} lines, found {len(lines)}")
624 for i
, (line
, regex
) in enumerate(zip(lines
, res
)):
628 except re
.error
as e
:
629 msg
= "Regular expression error in %s: %s"
630 raise re
.error(msg
% (repr(s
), e
.args
[0]))
631 if not expr
.search(line
):
632 miss_tmpl
= "match_re: mismatch at line {}:\n search re='{}'\n line='{}'"
633 print(miss_tmpl
.format(i
, s
, line
))
638 def match_re_dotall(lines
=None, res
=None):
639 """Match function using regular expression match.
641 Unlike match_re, the arguments are converted to strings (if necessary)
642 and must match exactly.
644 :param lines: data lines
645 :type lines: str or list[str]
646 :param res: regular expression(s) for matching
647 :type res: str or list[str]
648 :returns: a match object on match, else None, like re.match
651 if not isinstance(lines
, str):
652 lines
= "\n".join(lines
)
653 if not isinstance(res
, str):
657 expr
= re
.compile(s
, re
.DOTALL
)
658 except re
.error
as e
:
659 msg
= "Regular expression error in %s: %s"
660 raise re
.error(msg
% (repr(s
), e
.args
[0]))
661 return expr
.match(lines
)
669 fromfiledate
: str = '',
670 tofiledate
: str = '',
674 r
"""Compare two sequences of lines; generate the delta as a simple diff.
676 Similar to difflib.context_diff and difflib.unified_diff but
677 output is like from the "diff" command without arguments. The function
678 keeps the same signature as the difflib ones so they will be
679 interchangeable, but except for lineterm, the arguments beyond the
680 two sequences are ignored in this version. By default, the
681 diff is not created with trailing newlines, set the lineterm
682 argument to '\n' to do so.
686 >>> print(''.join(simple_diff('one\ntwo\nthree\nfour\n'.splitlines(True),
687 ... 'zero\none\ntree\nfour\n'.splitlines(True), lineterm='\n')))
697 a
= [to_str(q
) for q
in a
]
698 b
= [to_str(q
) for q
in b
]
699 sm
= difflib
.SequenceMatcher(None, a
, b
)
702 return x1
+ 1 == x2
and str(x2
) or f
'{x1 + 1},{x2}'
704 for op
, a1
, a2
, b1
, b2
in sm
.get_opcodes():
706 yield f
"{comma(a1, a2)}d{b1}{lineterm}"
710 yield f
"{a1}a{comma(b1, b2)}{lineterm}"
713 elif op
== 'replace':
714 yield f
"{comma(a1, a2)}c{comma(b1, b2)}{lineterm}"
717 yield f
'---{lineterm}'
727 fromfiledate
: str = '',
728 tofiledate
: str = '',
730 lineterm
: str = '\n',
732 """Compare a and b (lists of strings) where a are regular expressions.
734 A simple "diff" of two sets of lines when the expected lines
735 are regular expressions. This is a really dumb thing that
736 just compares each line in turn, so it doesn't look for
737 chunks of matching lines and the like--but at least it lets
738 you know exactly which line first didn't compare correctl...
741 re.error: if a regex fails to compile
744 diff
= len(a
) - len(b
)
746 a
= a
+ [''] * (-diff
)
749 for i
, (aline
, bline
) in enumerate(zip(a
, b
)):
753 except re
.error
as e
:
754 msg
= "Regular expression error in %s: %s"
755 raise re
.error(msg
% (repr(s
), e
.args
[0]))
756 if not expr
.search(bline
):
757 result
.append(f
"{i + 1}c{i + 1}")
758 result
.append(f
"< {a[i]!r}")
760 result
.append(f
"> {b[i]!r}")
764 if os
.name
== 'posix':
767 """escape shell special characters"""
770 arg
= arg
.replace(slash
, slash
+ slash
)
772 arg
= arg
.replace(c
, slash
+ c
)
773 if re_space
.search(arg
):
777 # Windows does not allow special characters in file names
778 # anyway, so no need for an escape function, we will just quote
781 if re_space
.search(arg
):
786 if os
.name
== 'java':
787 python
= os
.path
.join(sys
.prefix
, 'jython')
789 python
= os
.environ
.get('python_executable', sys
.executable
)
790 _python_
= escape(python
)
792 if sys
.platform
== 'win32':
793 default_sleep_seconds
= 2
795 def where_is(file, path
=None, pathext
=None):
797 path
= os
.environ
['PATH']
799 path
= path
.split(os
.pathsep
)
801 pathext
= os
.environ
['PATHEXT']
802 if is_String(pathext
):
803 pathext
= pathext
.split(os
.pathsep
)
805 if ext
.casefold() == file[-len(ext
) :].casefold():
809 f
= os
.path
.join(dir, file)
812 if os
.path
.isfile(fext
):
818 def where_is(file, path
=None, pathext
=None):
820 path
= os
.environ
['PATH']
822 path
= path
.split(os
.pathsep
)
824 f
= os
.path
.join(dir, file)
825 if os
.path
.isfile(f
):
830 if stat
.S_IMODE(st
.st_mode
) & stat
.S_IXUSR
:
834 default_sleep_seconds
= 1
837 # From Josiah Carlson,
838 # ASPN : Python Cookbook : Module to allow Asynchronous subprocess use on Windows and Posix platforms
839 # https://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/440554
841 if sys
.platform
== 'win32': # and subprocess.mswindows:
843 from win32file
import ReadFile
, WriteFile
844 from win32pipe
import PeekNamedPipe
846 # If PyWin32 is not available, try ctypes instead
847 # XXX These replicate _just_enough_ PyWin32 behaviour for our purposes
849 from ctypes
.wintypes
import DWORD
851 def ReadFile(hFile
, bufSize
, ol
=None):
853 lpBuffer
= ctypes
.create_string_buffer(bufSize
)
855 bErr
= ctypes
.windll
.kernel32
.ReadFile(
856 hFile
, lpBuffer
, bufSize
, ctypes
.byref(bytesRead
), ol
859 raise ctypes
.WinError()
860 return (0, ctypes
.string_at(lpBuffer
, bytesRead
.value
))
862 def WriteFile(hFile
, data
, ol
=None):
864 bytesWritten
= DWORD()
865 bErr
= ctypes
.windll
.kernel32
.WriteFile(
866 hFile
, data
, len(data
), ctypes
.byref(bytesWritten
), ol
869 raise ctypes
.WinError()
870 return (0, bytesWritten
.value
)
872 def PeekNamedPipe(hPipe
, size
):
875 bErr
= ctypes
.windll
.kernel32
.PeekNamedPipe(
876 hPipe
, None, size
, None, ctypes
.byref(bytesAvail
), None
879 raise ctypes
.WinError()
880 return ("", bytesAvail
.value
, None)
889 except AttributeError:
894 except AttributeError:
898 class Popen(subprocess
.Popen
):
899 def recv(self
, maxsize
=None):
900 return self
._recv
('stdout', maxsize
)
902 def recv_err(self
, maxsize
=None):
903 return self
._recv
('stderr', maxsize
)
905 def send_recv(self
, input: str = '', maxsize
=None):
906 return self
.send(input), self
.recv(maxsize
), self
.recv_err(maxsize
)
908 def get_conn_maxsize(self
, which
, maxsize
):
913 return getattr(self
, which
), maxsize
915 def _close(self
, which
) -> None:
916 getattr(self
, which
).close()
917 setattr(self
, which
, None)
919 if sys
.platform
== 'win32': # and subprocess.mswindows:
921 def send(self
, input):
922 input = to_bytes(input)
927 x
= msvcrt
.get_osfhandle(self
.stdin
.fileno())
928 (errCode
, written
) = WriteFile(x
, input)
930 return self
._close
('stdin')
931 except (subprocess
.pywintypes
.error
, Exception) as why
:
932 if why
.args
[0] in (109, errno
.ESHUTDOWN
):
933 return self
._close
('stdin')
938 def _recv(self
, which
, maxsize
):
939 conn
, maxsize
= self
.get_conn_maxsize(which
, maxsize
)
944 x
= msvcrt
.get_osfhandle(conn
.fileno())
945 (read
, nAvail
, nMessage
) = PeekNamedPipe(x
, 0)
949 (errCode
, read
) = ReadFile(x
, nAvail
, None)
951 return self
._close
(which
)
952 except (subprocess
.pywintypes
.error
, Exception) as why
:
953 if why
.args
[0] in (109, errno
.ESHUTDOWN
):
954 return self
._close
(which
)
957 # if self.universal_newlines:
958 # read = self._translate_newlines(read)
963 def send(self
, input):
967 if not select
.select([], [self
.stdin
], [], 0)[1]:
971 written
= os
.write(self
.stdin
.fileno(), bytearray(input, 'utf-8'))
972 except OSError as why
:
973 if why
.args
[0] == errno
.EPIPE
: # broken pipe
974 return self
._close
('stdin')
979 def _recv(self
, which
, maxsize
):
980 conn
, maxsize
= self
.get_conn_maxsize(which
, maxsize
)
985 flags
= fcntl
.fcntl(conn
, fcntl
.F_GETFL
)
990 fcntl
.fcntl(conn
, fcntl
.F_SETFL
, flags | os
.O_NONBLOCK
)
993 if not select
.select([conn
], [], [], 0)[0]:
996 r
= conn
.read(maxsize
)
998 return self
._close
(which
)
1000 # if self.universal_newlines:
1001 # r = self._translate_newlines(r)
1004 if not conn
.closed
and flags
is not None:
1005 fcntl
.fcntl(conn
, fcntl
.F_SETFL
, flags
)
1008 disconnect_message
= "Other end disconnected!"
1011 def recv_some(p
, t
: float = 0.1, e
: int = 1, tr
: int = 5, stderr
: int = 0):
1020 while time
.time() < x
or r
:
1024 raise Exception(disconnect_message
)
1030 time
.sleep(max((x
- time
.time()) / tr
, 0))
1034 def send_all(p
, data
):
1038 raise Exception(disconnect_message
)
1039 data
= memoryview(data
)[sent
:]
1046 def _clean() -> None:
1048 cleanlist
= [c
for c
in _Cleanup
if c
]
1051 for test
in cleanlist
:
1072 combine
: bool = False,
1073 universal_newlines
: bool |
None = True,
1074 timeout
: float |
None = None,
1076 self
.external
= os
.environ
.get('SCONS_EXTERNAL_TEST', 0)
1077 self
._cwd
= os
.getcwd()
1078 self
.description_set(description
)
1079 self
.program_set(program
)
1080 self
.interpreter_set(interpreter
)
1083 verbose
= max(0, int(os
.environ
.get('TESTCMD_VERBOSE', 0)))
1086 self
.verbose_set(verbose
)
1087 self
.combine
= combine
1088 self
.universal_newlines
= universal_newlines
1089 self
.process
: Popen |
None = None
1090 # Two layers of timeout: one at the test class instance level,
1091 # one set on an individual start() call (usually via a run() call)
1092 self
.timeout
= timeout
1093 self
.start_timeout
= None
1094 self
.set_match_function(match
, match_stdout
, match_stderr
)
1095 self
.set_diff_function(diff
, diff_stdout
, diff_stderr
)
1097 self
._preserve
: dict[str, str |
bool] = {
1102 preserve_value
= os
.environ
.get('PRESERVE', False)
1103 if preserve_value
not in [0, '0', 'False']:
1104 self
._preserve
['pass_test'] = os
.environ
['PRESERVE']
1105 self
._preserve
['fail_test'] = os
.environ
['PRESERVE']
1106 self
._preserve
['no_result'] = os
.environ
['PRESERVE']
1108 self
._preserve
['pass_test'] = os
.environ
.get('PRESERVE_PASS', False)
1109 self
._preserve
['fail_test'] = os
.environ
.get('PRESERVE_FAIL', False)
1110 self
._preserve
['no_result'] = os
.environ
.get('PRESERVE_NO_RESULT', False)
1113 self
.status
: int |
None = None
1114 self
.condition
= 'no_result'
1115 self
.workdir
: str |
None
1116 self
.workdir_set(workdir
)
1120 self
.fixture_dirs
= (os
.environ
['FIXTURE_DIRS']).split(os
.pathsep
)
1122 self
.fixture_dirs
= []
1124 def __del__(self
) -> None:
1127 def __repr__(self
) -> str:
1128 return f
"{id(self):x}"
1133 def banner(self
, s
, width
=None) -> str:
1135 width
= self
.banner_width
1136 return f
"{s:{self.banner_char}<{width}}"
1138 escape
= staticmethod(escape
)
1140 def canonicalize(self
, path
):
1142 path
= os
.path
.join(*path
)
1143 if not os
.path
.isabs(path
):
1144 path
= os
.path
.join(self
.workdir
, path
)
1147 def chmod(self
, path
, mode
) -> None:
1148 """Changes permissions on the specified file or directory."""
1149 path
= self
.canonicalize(path
)
1150 os
.chmod(path
, mode
)
1152 def cleanup(self
, condition
=None) -> None:
1153 """Removes any temporary working directories.
1155 Cleans the TestCmd instance. If the environment variable PRESERVE was
1156 set when the TestCmd environment was created, temporary working
1157 directories are not removed. If any of the environment variables
1158 PRESERVE_PASS, PRESERVE_FAIL, or PRESERVE_NO_RESULT were set
1159 when the TestCmd environment was created, then temporary working
1160 directories are not removed if the test passed, failed, or had
1161 no result, respectively. Temporary working directories are also
1162 preserved for conditions specified via the preserve method.
1164 Typically, this method is not called directly, but is used when
1165 the script exits to clean up temporary working directories as
1166 appropriate for the exit status.
1168 if not self
._dirlist
:
1172 if condition
is None:
1173 condition
= self
.condition
1174 if self
._preserve
[condition
]:
1175 for dir in self
._dirlist
:
1176 print(f
"Preserved directory {dir}")
1178 list = self
._dirlist
[:]
1181 self
.writable(dir, True)
1182 shutil
.rmtree(dir, ignore_errors
=True)
1186 if self
in _Cleanup
:
1187 _Cleanup
.remove(self
)
1189 def command_args(self
, program
=None, interpreter
=None, arguments
=None):
1190 if not self
.external
:
1192 if isinstance(program
, str) and not os
.path
.isabs(program
):
1193 program
= os
.path
.join(self
._cwd
, program
)
1195 program
= self
.program
1197 interpreter
= self
.interpreter
1200 program
= self
.program
1202 interpreter
= self
.interpreter
1203 if not isinstance(program
, (list, tuple)):
1207 if not isinstance(interpreter
, (list, tuple)):
1208 interpreter
= [interpreter
]
1209 cmd
= list(interpreter
) + cmd
1211 if isinstance(arguments
, dict):
1212 cmd
.extend([f
"{k}={v}" for k
, v
in arguments
.items()])
1214 if isinstance(arguments
, str):
1215 # Split into a list for passing to SCons. This *will*
1216 # break if the string has embedded spaces as part of a substing -
1217 # use a # list to pass those to avoid the problem.
1218 arguments
= arguments
.split()
1219 cmd
.extend(arguments
)
1222 def description_set(self
, description
) -> None:
1223 """Set the description of the functionality being tested."""
1224 self
.description
= description
1226 def set_diff_function(self
, diff
=_Null
, stdout
=_Null
, stderr
=_Null
) -> None:
1227 """Sets the specified diff functions."""
1228 if diff
is not _Null
:
1229 self
._diff
_function
= diff
1230 if stdout
is not _Null
:
1231 self
._diff
_stdout
_function
= stdout
1232 if stderr
is not _Null
:
1233 self
._diff
_stderr
_function
= stderr
1235 def diff(self
, a
, b
, name
=None, diff_function
=None, *args
, **kw
) -> None:
1236 if diff_function
is None:
1238 diff_function
= getattr(self
, self
._diff
_function
)
1240 diff_function
= self
._diff
_function
1241 if diff_function
is None:
1242 diff_function
= self
.simple_diff
1243 if name
is not None:
1244 print(self
.banner(name
))
1251 args
= (a
, b
) + args
1252 for line
in diff_function(*args
, **kw
):
1255 def diff_stderr(self
, a
, b
, *args
, **kw
):
1256 """Compare actual and expected file contents."""
1258 diff_stderr_function
= getattr(self
, self
._diff
_stderr
_function
)
1260 diff_stderr_function
= self
._diff
_stderr
_function
1261 return self
.diff(a
, b
, diff_function
=diff_stderr_function
, *args
, **kw
)
1263 def diff_stdout(self
, a
, b
, *args
, **kw
):
1264 """Compare actual and expected file contents."""
1266 diff_stdout_function
= getattr(self
, self
._diff
_stdout
_function
)
1268 diff_stdout_function
= self
._diff
_stdout
_function
1269 return self
.diff(a
, b
, diff_function
=diff_stdout_function
, *args
, **kw
)
1271 simple_diff
= staticmethod(simple_diff
)
1273 diff_re
= staticmethod(diff_re
)
1275 context_diff
= staticmethod(difflib
.context_diff
)
1277 unified_diff
= staticmethod(difflib
.unified_diff
)
1281 condition
: bool = True,
1282 function
: Callable |
None = None,
1286 """Cause the test to fail."""
1289 self
.condition
= 'fail_test'
1292 condition
=condition
,
1298 def interpreter_set(self
, interpreter
) -> None:
1299 """Set the program to be used to interpret the program
1300 under test as a script.
1302 self
.interpreter
= interpreter
1304 def set_match_function(self
, match
=_Null
, stdout
=_Null
, stderr
=_Null
) -> None:
1305 """Sets the specified match functions."""
1306 if match
is not _Null
:
1307 self
._match
_function
= match
1308 if stdout
is not _Null
:
1309 self
._match
_stdout
_function
= stdout
1310 if stderr
is not _Null
:
1311 self
._match
_stderr
_function
= stderr
1313 def match(self
, lines
, matches
):
1314 """Compare actual and expected file contents."""
1316 match_function
= getattr(self
, self
._match
_function
)
1318 match_function
= self
._match
_function
1319 if match_function
is None:
1320 # Default is regular expression matches.
1321 match_function
= self
.match_re
1322 return match_function(lines
, matches
)
1324 def match_stderr(self
, lines
, matches
):
1325 """Compare actual and expected file contents."""
1327 match_stderr_function
= getattr(self
, self
._match
_stderr
_function
)
1329 match_stderr_function
= self
._match
_stderr
_function
1330 if match_stderr_function
is None:
1331 # Default is to use whatever match= is set to.
1332 match_stderr_function
= self
.match
1333 return match_stderr_function(lines
, matches
)
1335 def match_stdout(self
, lines
, matches
):
1336 """Compare actual and expected file contents."""
1338 match_stdout_function
= getattr(self
, self
._match
_stdout
_function
)
1340 match_stdout_function
= self
._match
_stdout
_function
1341 if match_stdout_function
is None:
1342 # Default is to use whatever match= is set to.
1343 match_stdout_function
= self
.match
1344 return match_stdout_function(lines
, matches
)
1346 match_exact
= staticmethod(match_exact
)
1348 match_caseinsensitive
= staticmethod(match_caseinsensitive
)
1350 match_re
= staticmethod(match_re
)
1352 match_re_dotall
= staticmethod(match_re_dotall
)
1354 def no_result(self
, condition
: bool = True, function
=None, skip
: int = 0) -> None:
1355 """Report that the test could not be run."""
1358 self
.condition
= 'no_result'
1359 no_result(self
=self
, condition
=condition
, function
=function
, skip
=skip
)
1361 def pass_test(self
, condition
: bool = True, function
=None) -> None:
1362 """Cause the test to pass."""
1365 self
.condition
= 'pass_test'
1366 pass_test(self
=self
, condition
=condition
, function
=function
)
1368 def preserve(self
, *conditions
) -> None:
1369 """Preserves temporary working directories.
1371 Arrange for the temporary working directories for the
1372 specified TestCmd environment to be preserved for one or more
1373 conditions. If no conditions are specified, arranges for
1374 the temporary working directories to be preserved for all
1378 conditions
= ('pass_test', 'fail_test', 'no_result')
1379 for cond
in conditions
:
1380 self
._preserve
[cond
] = True
1382 def program_set(self
, program
) -> None:
1383 """Sets the executable program or script to be tested."""
1384 if not self
.external
:
1385 if program
and not os
.path
.isabs(program
):
1386 program
= os
.path
.join(self
._cwd
, program
)
1387 self
.program
= program
1389 def read(self
, file, mode
: str = 'rb', newline
=None):
1390 """Reads and returns the contents of the specified file name.
1392 The file name may be a list, in which case the elements are
1393 concatenated with the os.path.join() method. The file is
1394 assumed to be under the temporary working directory unless it
1395 is an absolute path name. The I/O mode for the file may
1396 be specified; it must begin with an 'r'. The default is
1399 file = self
.canonicalize(file)
1401 raise ValueError("mode must begin with 'r'")
1403 with
open(file, mode
, newline
=newline
) as f
:
1406 with
open(file, mode
) as f
:
1409 def rmdir(self
, dir) -> None:
1410 """Removes the specified dir name.
1412 The dir name may be a list, in which case the elements are
1413 concatenated with the os.path.join() method. The dir is
1414 assumed to be under the temporary working directory unless it
1415 is an absolute path name.
1416 The dir must be empty.
1418 dir = self
.canonicalize(dir)
1421 def parse_path(self
, path
, suppress_current
: bool = False):
1422 """Return a list with the single path components of path."""
1423 head
, tail
= os
.path
.split(path
)
1430 head
, tail
= os
.path
.split(head
)
1431 while head
and tail
:
1433 head
, tail
= os
.path
.split(head
)
1434 result
.append(head
or tail
)
1439 def dir_fixture(self
, srcdir
, dstdir
=None) -> None:
1440 """Copies the contents of the fixture directory to the test directory.
1442 If srcdir is an absolute path, it is tried directly, else
1443 the fixture_dirs are searched in order to find the named fixture
1444 directory. To tightly control the search order, the harness may
1445 be called with FIXTURE_DIRS set including the test source directory
1446 in the desired position, else it will be tried last.
1448 If dstdir not an absolute path, it is taken as a destination under
1449 the working dir (if omitted of the default None indicates '.',
1450 aka the test dir). dstdir is created automatically if needed.
1452 srcdir or dstdir may be a list, in which case the elements are first
1453 joined into a pathname.
1456 srcdir
= os
.path
.join(*srcdir
)
1458 if srcdir
and self
.fixture_dirs
and not os
.path
.isabs(srcdir
):
1459 for dir in self
.fixture_dirs
:
1460 spath
= os
.path
.join(dir, srcdir
)
1461 if os
.path
.isdir(spath
):
1466 if not dstdir
or dstdir
== '.':
1467 dstdir
= self
.workdir
1470 dstdir
= os
.path
.join(*dstdir
)
1471 if os
.path
.isabs(dstdir
):
1472 os
.makedirs(dstdir
, exist_ok
=True)
1474 dstlist
= self
.parse_path(dstdir
)
1475 if dstlist
and dstlist
[0] == ".":
1476 dstdir
= os
.path
.join(dstlist
[1:])
1479 for entry
in os
.listdir(spath
):
1480 epath
= os
.path
.join(spath
, entry
)
1481 dpath
= os
.path
.join(dstdir
, entry
)
1482 if os
.path
.isdir(epath
):
1483 # Copy the subfolder
1484 shutil
.copytree(epath
, dpath
)
1486 shutil
.copy(epath
, dpath
)
1488 def file_fixture(self
, srcfile
, dstfile
=None) -> None:
1489 """Copies a fixture file to the test directory, optionally renaming.
1491 If srcfile is an absolute path, it is tried directly, else
1492 the fixture_dirs are searched in order to find the named fixture
1493 file. To tightly control the search order, the harness may
1494 be called with FIXTURE_DIRS also including the test source directory
1495 in the desired place, it will otherwise be tried last.
1497 dstfile is the name to give the copied file; if the argument
1498 is omitted the basename of srcfile is used. If dstfile is not
1499 an absolute path name. Any directory components of dstfile are
1500 created automatically if needed.
1502 srcfile or dstfile may be a list, in which case the elements are first
1503 joined into a pathname.
1505 if is_List(srcfile
):
1506 srcfile
= os
.path
.join(*srcfile
)
1508 srcpath
, srctail
= os
.path
.split(srcfile
)
1510 if srcfile
and self
.fixture_dirs
and not os
.path
.isabs(srcfile
):
1511 for dir in self
.fixture_dirs
:
1512 spath
= os
.path
.join(dir, srcfile
)
1513 if os
.path
.isfile(spath
):
1520 dpath
= os
.path
.join(self
.workdir
, srctail
)
1524 dstdir
, dsttail
= os
.path
.split(dstfile
)
1526 # if dstfile has a dir part, and is not abspath, create
1527 if os
.path
.isabs(dstdir
):
1528 os
.makedirs(dstdir
, exist_ok
=True)
1531 dstlist
= self
.parse_path(dstdir
)
1532 if dstlist
and dstlist
[0] == ".":
1533 # strip leading ./ if present
1534 dstdir
= os
.path
.join(dstlist
[1:])
1536 dpath
= os
.path
.join(self
.workdir
, dstfile
)
1538 dpath
= os
.path
.join(self
.workdir
, dstfile
)
1540 shutil
.copy(spath
, dpath
)
1547 universal_newlines
=None,
1551 """Starts a program or script for the test environment.
1553 The specified program will have the original directory
1554 prepended unless it is enclosed in a [list].
1556 cmd
= self
.command_args(program
, interpreter
, arguments
)
1558 cmd_string
= ' '.join([self
.escape(c
) for c
in cmd
])
1559 sys
.stderr
.write(cmd_string
+ "\n")
1560 if universal_newlines
is None:
1561 universal_newlines
= self
.universal_newlines
1563 # On Windows, if we make stdin a pipe when we plan to send
1564 # no input, and the test program exits before
1565 # Popen calls msvcrt.open_osfhandle, that call will fail.
1566 # So don't use a pipe for stdin if we don't need one.
1567 stdin
= kw
.get('stdin', None)
1568 if stdin
is not None:
1571 combine
= kw
.get('combine', self
.combine
)
1573 stderr_value
= STDOUT
1578 self
.start_timeout
= timeout
1580 if sys
.platform
== 'win32':
1581 # Set this otherwist stdout/stderr pipes default to
1582 # windows default locale cp1252 which will throw exception
1583 # if using non-ascii characters.
1584 # For example test/Install/non-ascii-name.py
1585 os
.environ
['PYTHONIOENCODING'] = 'utf-8'
1587 # It seems that all pythons up to py3.6 still set text mode if you set encoding.
1588 # TODO: File enhancement request on python to propagate universal_newlines even
1589 # if encoding is set.hg c
1594 stderr
=stderr_value
,
1596 universal_newlines
=False,
1603 def fix_binary_stream(stream
):
1604 """Handle stream from popen when universal_newline is not enabled.
1606 This will read from the pipes in binary mode, will not decode the
1607 output, and will not convert line endings to \n. We do this because
1608 in py3 (3.5) with universal_newlines=True, it will choose the default
1609 system locale to decode the output, and this breaks unicode output.
1610 Specifically test/option--tree.py which outputs a unicode char.
1612 py 3.6 allows us to pass an encoding param to Popen thus not requiring
1613 the decode nor end of line handling, because we propagate
1614 universal_newlines as specified.
1616 TODO: Do we need to pass universal newlines into this function?
1620 # It seems that py3.6 still sets text mode if you set encoding.
1621 stream
= stream
.decode('utf-8', errors
='replace')
1622 return stream
.replace('\r\n', '\n')
1624 def finish(self
, popen
=None, **kw
) -> None:
1625 """Finishes and waits for the process.
1627 Process being run under control of the specified popen argument
1628 is waited for, recording the exit status, output and error output.
1631 popen
= self
.process
1632 if self
.start_timeout
:
1633 timeout
= self
.start_timeout
1634 # we're using a timeout from start, now reset it to default
1635 self
.start_timeout
= None
1637 timeout
= self
.timeout
1639 stdout
, stderr
= popen
.communicate(timeout
=timeout
)
1640 except subprocess
.TimeoutExpired
:
1642 stdout
, stderr
= popen
.communicate()
1644 # this is instead of using Popen as a context manager:
1646 popen
.stdout
.close()
1648 popen
.stderr
.close()
1655 stdout
= self
.fix_binary_stream(stdout
)
1656 stderr
= self
.fix_binary_stream(stderr
)
1658 self
.status
= popen
.returncode
1660 self
._stdout
.append(stdout
or '')
1661 self
._stderr
.append(stderr
or '')
1670 universal_newlines
=None,
1673 """Runs a test of the program or script for the test environment.
1675 Output and error output are saved for future retrieval via
1676 the stdout() and stderr() methods.
1678 The specified *program* will have the original directory
1679 prepended unless it is enclosed in a [list].
1681 If *arguments* is a dict then will create arguments with KEY+VALUE
1682 for each entry in the dict.
1686 program
= self
.program
1688 interpreter
= self
.interpreter
1690 if universal_newlines
is None:
1691 universal_newlines
= self
.universal_newlines
1694 oldcwd
= os
.getcwd()
1695 if not os
.path
.isabs(chdir
):
1696 chdir
= os
.path
.join(self
.workpath(chdir
))
1698 sys
.stderr
.write(f
"chdir({chdir})\n")
1701 timeout
= self
.timeout
1704 interpreter
=interpreter
,
1705 arguments
=arguments
,
1706 universal_newlines
=universal_newlines
,
1711 stdin
= ''.join(stdin
)
1714 stdin
= to_bytes(stdin
)
1716 # TODO(sgk): figure out how to re-use the logic in the .finish()
1717 # method above. Just calling it from here causes problems with
1718 # subclasses that redefine .finish(). We could abstract this
1719 # into Yet Another common method called both here and by .finish(),
1720 # but that seems ill-thought-out.
1722 stdout
, stderr
= p
.communicate(input=stdin
, timeout
=timeout
)
1723 except subprocess
.TimeoutExpired
:
1725 stdout
, stderr
= p
.communicate()
1727 # this is instead of using Popen as a context manager:
1738 self
.status
= p
.returncode
1741 stdout
= self
.fix_binary_stream(stdout
)
1742 stderr
= self
.fix_binary_stream(stderr
)
1744 self
._stdout
.append(stdout
or '')
1745 self
._stderr
.append(stderr
or '')
1749 if self
.verbose
>= 2:
1750 write
= sys
.stdout
.write
1751 write('============ STATUS: %d\n' % self
.status
)
1752 out
= self
.stdout() or ""
1753 if out
or self
.verbose
>= 3:
1754 write(f
'============ BEGIN STDOUT (len={len(out)}):\n')
1756 write('============ END STDOUT\n')
1757 err
= self
.stderr() or ""
1758 if err
or self
.verbose
>= 3:
1759 write(f
'============ BEGIN STDERR (len={len(err)})\n')
1761 write('============ END STDERR\n')
1763 def sleep(self
, seconds
=default_sleep_seconds
) -> None:
1764 """Sleeps at least the specified number of seconds.
1766 If no number is specified, sleeps at least the minimum number of
1767 seconds necessary to advance file time stamps on the current
1768 system. Sleeping more seconds is all right.
1772 def stderr(self
, run
=None) -> str |
None:
1773 """Returns the stored standard error output from a given run.
1776 run: run number to select. If run number is omitted,
1777 return the standard error of the most recent run.
1778 If negative, use as a relative offset, e.g. -2
1779 means the run two prior to the most recent.
1782 selected sterr string or None if there are no stored runs.
1785 run
= len(self
._stderr
)
1787 run
= len(self
._stderr
) + run
1790 return self
._stderr
[run
]
1794 def stdout(self
, run
=None) -> str |
None:
1795 """Returns the stored standard output from a given run.
1798 run: run number to select. If run number is omitted,
1799 return the standard output of the most recent run.
1800 If negative, use as a relative offset, e.g. -2
1801 means the run two prior to the most recent.
1804 selected stdout string or None if there are no stored runs.
1807 run
= len(self
._stdout
)
1809 run
= len(self
._stdout
) + run
1812 return self
._stdout
[run
]
1816 def subdir(self
, *subdirs
):
1817 """Creates new subdirectories under the temporary working directory.
1819 Creates a subdir for each argument. An argument may be a list,
1820 in which case the list elements are joined into a path.
1822 Returns the number of directories created, not including
1823 intermediate directories, for historical reasons. A directory
1824 which already existed is counted as "created".
1831 sub
= os
.path
.join(*sub
)
1832 new
= os
.path
.join(self
.workdir
, sub
)
1834 # okay to exist, we just do this for counting
1835 os
.makedirs(new
, exist_ok
=True)
1837 except OSError as e
:
1842 def symlink(self
, target
, link
) -> None:
1843 """Creates a symlink to the specified target.
1845 The link name may be a list, in which case the elements are
1846 concatenated with the os.path.join() method. The link is
1847 assumed to be under the temporary working directory unless it
1848 is an absolute path name. The target is *not* assumed to be
1849 under the temporary working directory.
1851 if sys
.platform
== 'win32':
1852 # Skip this on windows as we're not enabling it due to
1853 # it requiring user permissions which aren't always present
1854 # and we don't have a good way to detect those permissions yet.
1856 link
= self
.canonicalize(link
)
1858 os
.symlink(target
, link
)
1859 except AttributeError:
1860 pass # Windows has no symlink
1862 def tempdir(self
, path
=None):
1863 """Creates a temporary directory.
1865 A unique directory name is generated if no path name is specified.
1866 The directory is created, and will be removed when the TestCmd
1867 object is destroyed.
1871 # put tests in a subdir of the default, so antivirus
1872 # can be given that directory as an "ignore".
1873 testdir
= Path(tempfile
.gettempdir()) / "scons"
1874 testdir
.mkdir(exist_ok
=True)
1875 path
= tempfile
.mkdtemp(prefix
=testprefix
, dir=testdir
)
1877 path
= tempfile
.mkdtemp()
1881 # Symlinks in the path will report things
1882 # differently from os.getcwd(), so chdir there
1883 # and back to fetch the canonical path.
1891 # Uppercase the drive letter since the case of drive
1892 # letters is pretty much random on win32:
1893 drive
, rest
= os
.path
.splitdrive(path
)
1895 path
= drive
.upper() + rest
1898 self
._dirlist
.append(path
)
1901 if self
not in _Cleanup
:
1902 _Cleanup
.append(self
)
1906 def touch(self
, path
, mtime
=None) -> None:
1907 """Updates the modification time on the specified file or directory.
1909 The default is to update to the
1910 current time if no explicit modification time is specified.
1912 path
= self
.canonicalize(path
)
1913 atime
= os
.path
.getatime(path
)
1916 os
.utime(path
, (atime
, mtime
))
1918 def unlink(self
, file) -> None:
1919 """Unlinks the specified file name.
1921 The file name may be a list, in which case the elements are
1922 concatenated with the os.path.join() method. The file is
1923 assumed to be under the temporary working directory unless it
1924 is an absolute path name.
1926 file = self
.canonicalize(file)
1929 def unlink_files(self
, dirpath
, files
):
1930 """Unlinks a list of file names from the specified directory.
1932 The directory path may be a list, in which case the elements are
1933 concatenated with the os.path.join() method.
1935 A file name may be a list, in which case the elements are
1936 concatenated with the os.path.join() method.
1938 The directory path and file name are concatenated with the
1939 os.path.join() method. The resulting file path is assumed to be
1940 under the temporary working directory unless it is an absolute path
1941 name. An attempt to unlink the resulting file is made only when the
1942 file exists otherwise the file path is ignored.
1944 if is_List(dirpath
):
1945 dirpath
= os
.path
.join(*dirpath
)
1948 file = os
.path
.join(*file)
1949 filepath
= os
.path
.join(dirpath
, file)
1950 filepath
= self
.canonicalize(filepath
)
1951 if os
.path
.exists(filepath
):
1952 self
.unlink(filepath
)
1954 def verbose_set(self
, verbose
) -> None:
1955 """Sets the verbose level."""
1956 self
.verbose
= verbose
1958 def where_is(self
, file, path
=None, pathext
=None):
1959 """Finds an executable file."""
1961 file = os
.path
.join(*file)
1962 if not os
.path
.isabs(file):
1963 file = where_is(file, path
, pathext
)
1966 def workdir_set(self
, path
) -> None:
1967 """Creates a temporary working directory with the specified path name.
1969 If the path is a null string (''), a unique directory name is created.
1971 if path
is not None:
1974 path
= self
.tempdir(path
)
1977 def workpath(self
, *args
):
1978 """Returns the absolute path name to a subdirectory or file within the current temporary working directory.
1980 Concatenates the temporary working directory name with the specified
1981 arguments using the os.path.join() method.
1983 return os
.path
.join(self
.workdir
, *args
)
1985 def readable(self
, top
, read
: bool = True) -> None:
1986 """Makes the specified directory tree readable or unreadable.
1988 Tree is made readable if `read` evaluates True (the default),
1989 else it is made not readable.
1991 This method has no effect on Windows systems, which use a
1992 completely different mechanism to control file readability.
1995 if sys
.platform
== 'win32':
2000 def do_chmod(fname
) -> None:
2006 os
.chmod(fname
, stat
.S_IMODE(st
.st_mode | stat
.S_IREAD
))
2009 def do_chmod(fname
) -> None:
2015 os
.chmod(fname
, stat
.S_IMODE(st
.st_mode
& ~stat
.S_IREAD
))
2017 if os
.path
.isfile(top
):
2018 # If it's a file, that's easy, just chmod it.
2021 # It's a directory and we're trying to turn on read
2022 # permission, so it's also pretty easy, just chmod the
2023 # directory and then chmod every entry on our walk down the
2026 for dirpath
, dirnames
, filenames
in os
.walk(top
):
2027 for name
in dirnames
+ filenames
:
2028 do_chmod(os
.path
.join(dirpath
, name
))
2030 # It's a directory and we're trying to turn off read
2031 # permission, which means we have to chmod the directories
2032 # in the tree bottom-up, lest disabling read permission from
2033 # the top down get in the way of being able to get at lower
2034 # parts of the tree.
2035 for dirpath
, dirnames
, filenames
in os
.walk(top
, topdown
=False):
2036 for name
in dirnames
+ filenames
:
2037 do_chmod(os
.path
.join(dirpath
, name
))
2040 def writable(self
, top
, write
: bool = True) -> None:
2041 """Make the specified directory tree writable or unwritable.
2043 Tree is made writable if `write` evaluates True (the default),
2044 else it is made not writable.
2046 Note on Windows the only thing we can do is and/remove the
2047 "readable" setting without resorting to PyWin32 - and that,
2048 only as Administrator, so this is kind of pointless there.
2051 if sys
.platform
== 'win32':
2054 def do_chmod(fname
) -> None:
2056 os
.chmod(fname
, stat
.S_IWRITE
)
2061 def do_chmod(fname
) -> None:
2063 os
.chmod(fname
, stat
.S_IREAD
)
2070 def do_chmod(fname
) -> None:
2076 os
.chmod(fname
, stat
.S_IMODE(st
.st_mode | stat
.S_IWRITE
))
2079 def do_chmod(fname
) -> None:
2085 os
.chmod(fname
, stat
.S_IMODE(st
.st_mode
& ~stat
.S_IWRITE
))
2087 if os
.path
.isfile(top
):
2091 for dirpath
, dirnames
, filenames
in os
.walk(top
, topdown
=False):
2092 for name
in dirnames
+ filenames
:
2093 do_chmod(os
.path
.join(dirpath
, name
))
2095 def executable(self
, top
, execute
: bool = True) -> None:
2096 """Make the specified directory tree executable or not executable.
2098 Tree is made executable if `execute` evaluates True (the default),
2099 else it is made not executable.
2101 This method has no effect on Windows systems, which use a
2102 completely different mechanism to control file executability.
2105 if sys
.platform
== 'win32':
2110 def do_chmod(fname
) -> None:
2116 os
.chmod(fname
, stat
.S_IMODE(st
.st_mode | stat
.S_IEXEC
))
2119 def do_chmod(fname
) -> None:
2125 os
.chmod(fname
, stat
.S_IMODE(st
.st_mode
& ~stat
.S_IEXEC
))
2127 if os
.path
.isfile(top
):
2128 # If it's a file, that's easy, just chmod it.
2131 # It's a directory and we're trying to turn on execute
2132 # permission, so it's also pretty easy, just chmod the
2133 # directory and then chmod every entry on our walk down the
2136 for dirpath
, dirnames
, filenames
in os
.walk(top
):
2137 for name
in dirnames
+ filenames
:
2138 do_chmod(os
.path
.join(dirpath
, name
))
2140 # It's a directory and we're trying to turn off execute
2141 # permission, which means we have to chmod the directories
2142 # in the tree bottom-up, lest disabling execute permission from
2143 # the top down get in the way of being able to get at lower
2144 # parts of the tree.
2145 for dirpath
, dirnames
, filenames
in os
.walk(top
, topdown
=False):
2146 for name
in dirnames
+ filenames
:
2147 do_chmod(os
.path
.join(dirpath
, name
))
2150 def write(self
, file, content
, mode
: str = 'wb'):
2151 """Writes data to file.
2153 The file is created under the temporary working directory.
2154 Any subdirectories in the path must already exist. The
2155 write is converted to the required type rather than failing
2156 if there is a str/bytes mistmatch.
2158 :param file: name of file to write to. If a list, treated
2159 as components of a path and concatenated into a path.
2160 :type file: str or list(str)
2161 :param content: data to write.
2162 :type content: str or bytes
2163 :param mode: file mode, default is binary.
2166 file = self
.canonicalize(file)
2168 raise ValueError("mode must begin with 'w'")
2169 with
open(file, mode
) as f
:
2172 except TypeError as e
:
2173 f
.write(bytes(content
, 'utf-8'))
2178 # indent-tabs-mode:nil
2180 # vim: set expandtab tabstop=4 shiftwidth=4: