1 # -*- coding: utf-8; -*-
4 # Part of ‘dput’, a Debian package upload toolkit.
6 # This is free software, and you are welcome to redistribute it under
7 # certain conditions; see the end of this file for copyright
8 # information, grant of license, and disclaimer of warranty.
10 """ Helper functionality for Dput test suite. """
12 from __future__
import (absolute_import
, unicode_literals
)
16 if sys
.version_info
>= (3, 3):
18 import collections
.abc
as collections_abc
20 from io
import StringIO
as StringIO
22 import unittest
.mock
as mock
23 elif sys
.version_info
>= (3, 0):
24 raise RuntimeError("Python 3 earlier than 3.3 is not supported.")
25 elif sys
.version_info
>= (2, 7):
26 # Python 2 standard library.
27 import __builtin__
as builtins
28 import collections
as collections_abc
29 import ConfigParser
as configparser
30 from StringIO
import StringIO
as BaseStringIO
31 # Third-party backport of Python 3 unittest improvements.
32 import unittest2
as unittest
33 # Third-party mock library.
36 raise RuntimeError("Python earlier than 2.7 is not supported.")
63 # Alias for Python 3 types.
68 def make_unique_slug(testcase
):
69 """ Make a unique slug for the test case. """
70 text
= base64
.b64encode(
71 testcase
.getUniqueString().encode('utf-8')
80 # We don't yet have the StringIO we want. Create it.
82 class StringIO(BaseStringIO
, object):
83 """ StringIO with a context manager. """
88 def __exit__(self
, *args
):
102 def patch_stdout(testcase
):
103 """ Patch `sys.stdout` for the specified test case. """
104 patcher
= mock
.patch
.object(
105 sys
, "stdout", wraps
=StringIO())
107 testcase
.addCleanup(patcher
.stop
)
110 def patch_stderr(testcase
):
111 """ Patch `sys.stderr` for the specified test case. """
112 patcher
= mock
.patch
.object(
113 sys
, "stderr", wraps
=StringIO())
115 testcase
.addCleanup(patcher
.stop
)
118 def patch_signal_signal(testcase
):
119 """ Patch `signal.signal` for the specified test case. """
120 func_patcher
= mock
.patch
.object(signal
, "signal", autospec
=True)
122 testcase
.addCleanup(func_patcher
.stop
)
125 class FakeSystemExit(Exception):
126 """ Fake double for `SystemExit` exception. """
129 EXIT_STATUS_SUCCESS
= 0
130 EXIT_STATUS_FAILURE
= 1
131 EXIT_STATUS_COMMAND_NOT_FOUND
= 127
134 def patch_sys_exit(testcase
):
135 """ Patch `sys.exit` for the specified test case. """
136 func_patcher
= mock
.patch
.object(
137 sys
, "exit", autospec
=True,
138 side_effect
=FakeSystemExit())
140 testcase
.addCleanup(func_patcher
.stop
)
143 def patch_sys_argv(testcase
):
144 """ Patch the `sys.argv` sequence for the test case. """
145 if not hasattr(testcase
, 'progname'):
146 testcase
.progname
= make_unique_slug(testcase
)
147 if not hasattr(testcase
, 'sys_argv'):
148 testcase
.sys_argv
= [testcase
.progname
]
149 patcher
= mock
.patch
.object(
151 new
=list(testcase
.sys_argv
))
153 testcase
.addCleanup(patcher
.stop
)
156 def patch_system_interfaces(testcase
):
157 """ Patch system interfaces that are disruptive to the test runner. """
158 patch_stdout(testcase
)
159 patch_stderr(testcase
)
160 patch_sys_exit(testcase
)
161 patch_sys_argv(testcase
)
164 def patch_time_time(testcase
, values
=None):
165 """ Patch the `time.time` function for the specified test case.
167 :param testcase: The `TestCase` instance for binding to the patch.
168 :param values: An iterable to provide return values.
173 values
= itertools
.count()
175 def generator_fake_time():
179 func_patcher
= mock
.patch
.object(time
, "time", autospec
=True)
181 testcase
.addCleanup(func_patcher
.stop
)
183 time
.time
.side_effect
= generator_fake_time()
186 def patch_os_environ(testcase
):
187 """ Patch the `os.environ` mapping. """
188 if not hasattr(testcase
, 'os_environ'):
189 testcase
.os_environ
= {}
190 patcher
= mock
.patch
.object(os
, "environ", new
=testcase
.os_environ
)
192 testcase
.addCleanup(patcher
.stop
)
195 def patch_os_getpid(testcase
):
196 """ Patch `os.getpid` for the specified test case. """
197 func_patcher
= mock
.patch
.object(os
, "getpid", autospec
=True)
199 testcase
.addCleanup(func_patcher
.stop
)
202 def patch_os_getuid(testcase
):
203 """ Patch the `os.getuid` function. """
204 if not hasattr(testcase
, 'os_getuid_return_value'):
205 testcase
.os_getuid_return_value
= testcase
.getUniqueInteger()
206 func_patcher
= mock
.patch
.object(
207 os
, "getuid", autospec
=True,
208 return_value
=testcase
.os_getuid_return_value
)
210 testcase
.addCleanup(func_patcher
.stop
)
213 PasswdEntry
= collections
.namedtuple(
215 "pw_name pw_passwd pw_uid pw_gid pw_gecos pw_dir pw_shell")
218 def patch_pwd_getpwuid(testcase
):
219 """ Patch the `pwd.getpwuid` function. """
220 if not hasattr(testcase
, 'pwd_getpwuid_return_value'):
221 testcase
.pwd_getpwuid_return_value
= PasswdEntry(
222 pw_name
=make_unique_slug(testcase
),
223 pw_passwd
=make_unique_slug(testcase
),
224 pw_uid
=testcase
.getUniqueInteger(),
225 pw_gid
=testcase
.getUniqueInteger(),
226 pw_gecos
=testcase
.getUniqueString(),
227 pw_dir
=tempfile
.mktemp(),
228 pw_shell
=tempfile
.mktemp())
229 if not isinstance(testcase
.pwd_getpwuid_return_value
, pwd
.struct_passwd
):
230 pwent
= pwd
.struct_passwd(testcase
.pwd_getpwuid_return_value
)
232 pwent
= testcase
.pwd_getpwuid_return_value
233 func_patcher
= mock
.patch
.object(
234 pwd
, "getpwuid", autospec
=True,
237 testcase
.addCleanup(func_patcher
.stop
)
240 def patch_os_path_exists(testcase
):
241 """ Patch `os.path.exists` behaviour for this test case.
243 When the patched function is called, the registry of
244 `FileDouble` instances for this test case will be used to get
245 the instance for the path specified.
248 orig_os_path_exists
= os
.path
.exists
250 def fake_os_path_exists(path
):
251 registry
= FileDouble
.get_registry_for_testcase(testcase
)
253 file_double
= registry
[path
]
254 result
= file_double
.os_path_exists_scenario
.call_hook()
256 result
= orig_os_path_exists(path
)
259 func_patcher
= mock
.patch
.object(
260 os
.path
, "exists", autospec
=True,
261 side_effect
=fake_os_path_exists
)
263 testcase
.addCleanup(func_patcher
.stop
)
266 def patch_os_access(testcase
):
267 """ Patch `os.access` behaviour for this test case.
269 When the patched function is called, the registry of
270 `FileDouble` instances for this test case will be used to get
271 the instance for the path specified.
274 orig_os_access
= os
.access
276 def fake_os_access(path
, mode
):
277 registry
= FileDouble
.get_registry_for_testcase(testcase
)
279 file_double
= registry
[path
]
280 result
= file_double
.os_access_scenario
.call_hook(mode
)
282 result
= orig_os_access(path
, mode
)
285 func_patcher
= mock
.patch
.object(
286 os
, "access", autospec
=True,
287 side_effect
=fake_os_access
)
289 testcase
.addCleanup(func_patcher
.stop
)
292 StatResult
= collections
.namedtuple(
295 'st_ino', 'st_dev', 'st_nlink',
298 'st_atime', 'st_mtime', 'st_ctime',
302 def patch_os_stat(testcase
):
303 """ Patch `os.stat` behaviour for this test case.
305 When the patched function is called, the registry of
306 `FileDouble` instances for this test case will be used to get
307 the instance for the path specified.
310 orig_os_stat
= os
.stat
312 def fake_os_stat(path
):
313 registry
= FileDouble
.get_registry_for_testcase(testcase
)
315 file_double
= registry
[path
]
316 result
= file_double
.os_stat_scenario
.call_hook()
318 result
= orig_os_stat(path
)
321 func_patcher
= mock
.patch
.object(
322 os
, "stat", autospec
=True,
323 side_effect
=fake_os_stat
)
325 testcase
.addCleanup(func_patcher
.stop
)
328 def patch_os_lstat(testcase
):
329 """ Patch `os.lstat` behaviour for this test case.
331 When the patched function is called, the registry of
332 `FileDouble` instances for this test case will be used to get
333 the instance for the path specified.
336 orig_os_lstat
= os
.lstat
338 def fake_os_lstat(path
):
339 registry
= FileDouble
.get_registry_for_testcase(testcase
)
341 file_double
= registry
[path
]
342 result
= file_double
.os_lstat_scenario
.call_hook()
344 result
= orig_os_lstat(path
)
347 func_patcher
= mock
.patch
.object(
348 os
, "lstat", autospec
=True,
349 side_effect
=fake_os_lstat
)
351 testcase
.addCleanup(func_patcher
.stop
)
354 def patch_os_unlink(testcase
):
355 """ Patch `os.unlink` behaviour for this test case.
357 When the patched function is called, the registry of
358 `FileDouble` instances for this test case will be used to get
359 the instance for the path specified.
362 orig_os_unlink
= os
.unlink
364 def fake_os_unlink(path
):
365 registry
= FileDouble
.get_registry_for_testcase(testcase
)
367 file_double
= registry
[path
]
368 result
= file_double
.os_unlink_scenario
.call_hook()
370 result
= orig_os_unlink(path
)
373 func_patcher
= mock
.patch
.object(
374 os
, "unlink", autospec
=True,
375 side_effect
=fake_os_unlink
)
377 testcase
.addCleanup(func_patcher
.stop
)
380 def patch_os_rmdir(testcase
):
381 """ Patch `os.rmdir` behaviour for this test case.
383 When the patched function is called, the registry of
384 `FileDouble` instances for this test case will be used to get
385 the instance for the path specified.
388 orig_os_rmdir
= os
.rmdir
390 def fake_os_rmdir(path
):
391 registry
= FileDouble
.get_registry_for_testcase(testcase
)
393 file_double
= registry
[path
]
394 result
= file_double
.os_rmdir_scenario
.call_hook()
396 result
= orig_os_rmdir(path
)
399 func_patcher
= mock
.patch
.object(
400 os
, "rmdir", autospec
=True,
401 side_effect
=fake_os_rmdir
)
403 testcase
.addCleanup(func_patcher
.stop
)
406 def patch_shutil_rmtree(testcase
):
407 """ Patch `shutil.rmtree` behaviour for this test case.
409 When the patched function is called, the registry of
410 `FileDouble` instances for this test case will be used to get
411 the instance for the path specified.
414 orig_shutil_rmtree
= os
.rmdir
416 def fake_shutil_rmtree(path
, ignore_errors
=False, onerror
=None):
417 registry
= FileDouble
.get_registry_for_testcase(testcase
)
419 file_double
= registry
[path
]
420 result
= file_double
.shutil_rmtree_scenario
.call_hook()
422 result
= orig_shutil_rmtree(path
)
425 func_patcher
= mock
.patch
.object(
426 shutil
, "rmtree", autospec
=True,
427 side_effect
=fake_shutil_rmtree
)
429 testcase
.addCleanup(func_patcher
.stop
)
432 def patch_tempfile_mkdtemp(testcase
):
433 """ Patch the `tempfile.mkdtemp` function for this test case. """
434 if not hasattr(testcase
, 'tempfile_mkdtemp_file_double'):
435 testcase
.tempfile_mkdtemp_file_double
= FileDouble(tempfile
.mktemp())
437 double
= testcase
.tempfile_mkdtemp_file_double
438 double
.set_os_unlink_scenario('okay')
439 double
.set_os_rmdir_scenario('okay')
440 double
.register_for_testcase(testcase
)
442 func_patcher
= mock
.patch
.object(tempfile
, "mkdtemp", autospec
=True)
444 testcase
.addCleanup(func_patcher
.stop
)
446 tempfile
.mkdtemp
.return_value
= testcase
.tempfile_mkdtemp_file_double
.path
454 # Python 2 uses IOError.
455 def _ensure_ioerror_args(init_args
, init_kwargs
, errno_value
):
456 result_kwargs
= init_kwargs
457 result_errno
= errno_value
458 result_strerror
= os
.strerror(errno_value
)
459 result_filename
= None
460 if len(init_args
) >= 3:
461 result_errno
= init_args
[0]
462 result_filename
= init_args
[2]
463 if 'errno' in init_kwargs
:
464 result_errno
= init_kwargs
['errno']
465 del result_kwargs
['errno']
466 if 'filename' in init_kwargs
:
467 result_filename
= init_kwargs
['filename']
468 del result_kwargs
['filename']
469 if len(init_args
) >= 2:
470 result_strerror
= init_args
[1]
471 if 'strerror' in init_kwargs
:
472 result_strerror
= init_kwargs
['strerror']
473 del result_kwargs
['strerror']
474 result_args
= (result_errno
, result_strerror
, result_filename
)
475 return (result_args
, result_kwargs
)
477 class FileNotFoundError(IOError):
478 def __init__(self
, *args
, **kwargs
):
479 (args
, kwargs
) = _ensure_ioerror_args(
480 args
, kwargs
, errno_value
=errno
.ENOENT
)
481 super(FileNotFoundError
, self
).__init
__(*args
, **kwargs
)
483 class FileExistsError(IOError):
484 def __init__(self
, *args
, **kwargs
):
485 (args
, kwargs
) = _ensure_ioerror_args(
486 args
, kwargs
, errno_value
=errno
.EEXIST
)
487 super(FileExistsError
, self
).__init
__(*args
, **kwargs
)
489 class PermissionError(IOError):
490 def __init__(self
, *args
, **kwargs
):
491 (args
, kwargs
) = _ensure_ioerror_args(
492 args
, kwargs
, errno_value
=errno
.EPERM
)
493 super(PermissionError
, self
).__init
__(*args
, **kwargs
)
496 def make_fake_file_scenarios(path
=None):
497 """ Make a collection of scenarios for testing with fake files.
499 :path: The filesystem path of the fake file. If not specified,
500 a valid random path will be generated.
501 :return: A collection of scenarios for tests involving input files.
503 The collection is a mapping from scenario name to a dictionary of
509 file_path
= tempfile
.mktemp()
513 fake_file_empty
= StringIO()
514 fake_file_minimal
= StringIO("Lorem ipsum.")
515 fake_file_large
= StringIO("\n".join(
517 for __
in range(1000)))
519 default_scenario_params
= {
520 'open_scenario_name': 'okay',
521 'file_double_params': dict(
522 path
=file_path
, fake_file
=fake_file_minimal
),
528 'open_scenario_name': 'nonexist',
531 'open_scenario_name': 'exist_error',
533 'error-read-denied': {
534 'open_scenario_name': 'read_denied',
537 'file_double_params': dict(
538 path
=file_path
, fake_file
=fake_file_empty
),
541 'file_double_params': dict(
542 path
=file_path
, fake_file
=fake_file_empty
),
545 'file_double_params': dict(
546 path
=file_path
, fake_file
=fake_file_minimal
),
549 'file_double_params': dict(
550 path
=file_path
, fake_file
=fake_file_large
),
554 for (name
, scenario
) in scenarios
.items():
555 params
= default_scenario_params
.copy()
556 params
.update(scenario
)
557 scenario
.update(params
)
558 scenario
['file_double'] = FileDouble(**scenario
['file_double_params'])
559 scenario
['file_double'].set_open_scenario(params
['open_scenario_name'])
560 scenario
['fake_file_scenario_name'] = name
565 def get_file_doubles_from_fake_file_scenarios(scenarios
):
566 """ Get the `FileDouble` instances from fake file scenarios.
568 :param scenarios: Collection of fake file scenarios.
569 :return: Collection of `FileDouble` instances.
573 scenario
['file_double']
574 for scenario
in scenarios
575 if scenario
['file_double'] is not None)
580 def setup_file_double_behaviour(testcase
, doubles
=None):
581 """ Set up file double instances and behaviour.
583 :param testcase: The `TestCase` instance to modify.
584 :param doubles: Collection of `FileDouble` instances.
587 If `doubles` is ``None``, a default collection will be made
588 from the result of `make_fake_file_scenarios` result.
592 scenarios
= make_fake_file_scenarios()
593 doubles
= get_file_doubles_from_fake_file_scenarios(
596 for file_double
in doubles
:
597 file_double
.register_for_testcase(testcase
)
599 orig_open
= builtins
.open
601 def fake_open(path
, mode
='rt', buffering
=-1):
602 registry
= FileDouble
.get_registry_for_testcase(testcase
)
604 file_double
= registry
[path
]
605 result
= file_double
.builtins_open_scenario
.call_hook(
608 result
= orig_open(path
, mode
, buffering
)
611 mock_open
= mock
.mock_open()
612 mock_open
.side_effect
= fake_open
614 func_patcher
= mock
.patch
.object(
615 builtins
, "open", new
=mock_open
)
617 testcase
.addCleanup(func_patcher
.stop
)
620 def setup_fake_file_fixtures(testcase
):
621 """ Set up fixtures for fake file doubles.
623 :param testcase: The `TestCase` instance to modify.
627 scenarios
= make_fake_file_scenarios()
628 testcase
.fake_file_scenarios
= scenarios
630 file_doubles
= get_file_doubles_from_fake_file_scenarios(
632 setup_file_double_behaviour(testcase
, file_doubles
)
635 def set_fake_file_scenario(testcase
, name
):
636 """ Set the named fake file scenario for the test case. """
637 scenario
= testcase
.fake_file_scenarios
[name
]
638 testcase
.fake_file_scenario
= scenario
639 testcase
.file_double
= scenario
['file_double']
640 testcase
.file_double
.register_for_testcase(testcase
)
643 class TestDoubleFunctionScenario
:
644 """ Scenario for fake behaviour of a specific function. """
646 def __init__(self
, scenario_name
, double
):
647 self
.scenario_name
= scenario_name
650 self
.call_hook
= getattr(
651 self
, "_hook_{name}".format(name
=self
.scenario_name
))
655 "<{class_name} instance: {id}"
657 " call_hook name: {hook_name!r}"
658 " double: {double!r}"
660 class_name
=self
.__class
__.__name
__, id=id(self
),
661 name
=self
.scenario_name
, double
=self
.double
,
662 hook_name
=self
.call_hook
.__name
__)
665 def __eq__(self
, other
):
667 if not self
.scenario_name
== other
.scenario_name
:
669 if not self
.double
== other
.double
:
671 if not self
.call_hook
.__name
__ == other
.call_hook
.__name
__:
675 def __ne__(self
, other
):
676 result
= not self
.__eq
__(other
)
680 class os_path_exists_scenario(TestDoubleFunctionScenario
):
681 """ Scenario for `os.path.exists` behaviour. """
683 def _hook_exist(self
):
686 def _hook_not_exist(self
):
690 class os_access_scenario(TestDoubleFunctionScenario
):
691 """ Scenario for `os.access` behaviour. """
693 def _hook_okay(self
, mode
):
696 def _hook_not_exist(self
, mode
):
699 def _hook_read_only(self
, mode
):
700 if mode
& (os
.W_OK | os
.X_OK
):
706 def _hook_denied(self
, mode
):
707 if mode
& (os
.R_OK | os
.W_OK | os
.X_OK
):
714 class os_stat_scenario(TestDoubleFunctionScenario
):
715 """ Scenario for `os.stat` behaviour. """
717 def _hook_okay(self
):
718 return self
.double
.stat_result
720 def _hook_notfound_error(self
):
721 raise FileNotFoundError(
723 "No such file or directory: {path!r}".format(
724 path
=self
.double
.path
))
726 def _hook_denied_error(self
):
727 raise PermissionError(
732 class os_lstat_scenario(os_stat_scenario
):
733 """ Scenario for `os.lstat` behaviour. """
736 class os_unlink_scenario(TestDoubleFunctionScenario
):
737 """ Scenario for `os.unlink` behaviour. """
739 def _hook_okay(self
):
742 def _hook_nonexist(self
):
743 error
= FileNotFoundError(
745 "No such file or directory: {path!r}".format(
746 path
=self
.double
.path
))
749 def _hook_denied(self
):
750 error
= PermissionError(
756 class os_rmdir_scenario(TestDoubleFunctionScenario
):
757 """ Scenario for `os.rmdir` behaviour. """
759 def _hook_okay(self
):
762 def _hook_nonexist(self
):
763 error
= FileNotFoundError(
765 "No such file or directory: {path!r}".format(
766 path
=self
.double
.path
))
769 def _hook_denied(self
):
770 error
= PermissionError(
776 class shutil_rmtree_scenario(TestDoubleFunctionScenario
):
777 """ Scenario for `shutil.rmtree` behaviour. """
779 def _hook_okay(self
):
782 def _hook_nonexist(self
):
783 error
= FileNotFoundError(
785 "No such file or directory: {path!r}".format(
786 path
=self
.double
.path
))
789 def _hook_denied(self
):
790 error
= PermissionError(
796 class builtins_open_scenario(TestDoubleFunctionScenario
):
797 """ Scenario for `builtins.open` behaviour. """
799 def _hook_okay(self
, mode
, buffering
):
800 result
= self
.double
.fake_file
803 def _hook_nonexist(self
, mode
, buffering
):
804 if mode
.startswith('r'):
805 error
= FileNotFoundError(
807 "No such file or directory: {path!r}".format(
808 path
=self
.double
.path
))
810 result
= self
.double
.fake_file
813 def _hook_exist_error(self
, mode
, buffering
):
814 if mode
.startswith('w') or mode
.startswith('a'):
815 error
= FileExistsError(
817 "File already exists: {path!r}".format(
818 path
=self
.double
.path
))
820 result
= self
.double
.fake_file
823 def _hook_read_denied(self
, mode
, buffering
):
824 if mode
.startswith('r'):
825 error
= PermissionError(
827 "Read denied on {path!r}".format(
828 path
=self
.double
.path
))
830 result
= self
.double
.fake_file
833 def _hook_write_denied(self
, mode
, buffering
):
834 if mode
.startswith('w') or mode
.startswith('a'):
835 error
= PermissionError(
837 "Write denied on {path!r}".format(
838 path
=self
.double
.path
))
840 result
= self
.double
.fake_file
844 class TestDoubleWithRegistry
:
845 """ Abstract base class for a test double with a test case registry. """
847 registry_class
= NotImplemented
848 registries
= NotImplemented
850 function_scenario_params_by_class
= NotImplemented
852 def __new__(cls
, *args
, **kwargs
):
853 superclass
= super(TestDoubleWithRegistry
, cls
)
854 if superclass
.__new
__ is object.__new
__:
855 # The ‘object’ implementation complains about extra arguments.
856 instance
= superclass
.__new
__(cls
)
858 instance
= superclass
.__new
__(cls
, *args
, **kwargs
)
859 instance
.make_set_scenario_methods()
863 def __init__(self
, *args
, **kwargs
):
864 super(TestDoubleWithRegistry
, self
).__init
__(*args
, **kwargs
)
865 self
._set
_method
_per
_scenario
()
867 def _make_set_scenario_method(self
, scenario_class
, params
):
868 def method(self
, name
):
869 scenario
= scenario_class(name
, double
=self
)
870 setattr(self
, scenario_class
.__name
__, scenario
)
872 """ Set the scenario for `{name}` behaviour. """
873 ).format(name
=scenario_class
.__name
__)
874 method
.__name
__ = str(params
['set_scenario_method_name'])
877 def make_set_scenario_methods(self
):
878 """ Make `set_<scenario_class_name>` methods on this class. """
879 for (function_scenario_class
, function_scenario_params
) in (
880 self
.function_scenario_params_by_class
.items()):
881 method
= self
._make
_set
_scenario
_method
(
882 function_scenario_class
, function_scenario_params
)
883 setattr(self
.__class
__, method
.__name
__, method
)
884 function_scenario_params
['set_scenario_method'] = method
886 def _set_method_per_scenario(self
):
887 """ Set the method to be called for each scenario. """
888 for function_scenario_params
in (
889 self
.function_scenario_params_by_class
.values()):
890 function_scenario_params
['set_scenario_method'](
891 self
, function_scenario_params
['default_scenario_name'])
894 def get_registry_for_testcase(cls
, testcase
):
895 """ Get the FileDouble registry for the specified test case. """
896 # Key in a dict must be hashable.
897 key
= (testcase
.__class
__, id(testcase
))
898 registry
= cls
.registries
.setdefault(key
, cls
.registry_class())
901 def get_registry_key(self
):
902 """ Get the registry key for this double. """
903 raise NotImplementedError
905 def register_for_testcase(self
, testcase
):
906 """ Add this instance to registry for the specified testcase. """
907 registry
= self
.get_registry_for_testcase(testcase
)
908 key
= self
.get_registry_key()
910 unregister_func
= functools
.partial(
911 self
.unregister_for_testcase
, testcase
)
912 testcase
.addCleanup(unregister_func
)
914 def unregister_for_testcase(self
, testcase
):
915 """ Remove this instance from registry for the specified testcase. """
916 registry
= self
.get_registry_for_testcase(testcase
)
917 key
= self
.get_registry_key()
922 def copy_fake_file(fake_file
):
923 """ Make a copy of the StringIO instance. """
924 fake_file_type
= StringIO
926 if fake_file
is not None:
927 fake_file_type
= type(fake_file
)
928 content
= fake_file
.getvalue()
929 assert issubclass(fake_file_type
, object)
930 result
= fake_file_type(content
)
931 if hasattr(fake_file
, 'encoding'):
932 if not hasattr(result
, 'encoding'):
933 result
.encoding
= fake_file
.encoding
937 class FileDouble(TestDoubleWithRegistry
):
938 """ A testing double for a file. """
940 registry_class
= dict
943 function_scenario_params_by_class
= {
944 os_path_exists_scenario
: {
945 'default_scenario_name': 'not_exist',
946 'set_scenario_method_name': 'set_os_path_exists_scenario',
948 os_access_scenario
: {
949 'default_scenario_name': 'okay',
950 'set_scenario_method_name': 'set_os_access_scenario',
953 'default_scenario_name': 'okay',
954 'set_scenario_method_name': 'set_os_stat_scenario',
957 'default_scenario_name': 'okay',
958 'set_scenario_method_name': 'set_os_lstat_scenario',
960 builtins_open_scenario
: {
961 'default_scenario_name': 'okay',
962 'set_scenario_method_name': 'set_open_scenario',
964 os_unlink_scenario
: {
965 'default_scenario_name': 'okay',
966 'set_scenario_method_name': 'set_os_unlink_scenario',
969 'default_scenario_name': 'okay',
970 'set_scenario_method_name': 'set_os_rmdir_scenario',
972 shutil_rmtree_scenario
: {
973 'default_scenario_name': 'okay',
974 'set_scenario_method_name': 'set_shutil_rmtree_scenario',
978 def __init__(self
, path
=None, fake_file
=None, *args
, **kwargs
):
980 self
.fake_file
= copy_fake_file(fake_file
)
981 self
.fake_file
.name
= path
983 self
._set
_stat
_result
()
985 super(FileDouble
, self
).__init
__(*args
, **kwargs
)
987 def _set_stat_result(self
):
988 """ Set the `os.stat` result for this file. """
989 size
= len(self
.fake_file
.getvalue())
990 self
.stat_result
= StatResult(
992 st_ino
=None, st_dev
=None, st_nlink
=None,
995 st_atime
=None, st_mtime
=None, st_ctime
=None,
999 text
= "FileDouble(path={path!r}, fake_file={fake_file!r})".format(
1000 path
=self
.path
, fake_file
=self
.fake_file
)
1003 def get_registry_key(self
):
1004 """ Get the registry key for this double. """
1009 class os_popen_scenario(TestDoubleFunctionScenario
):
1010 """ Scenario for `os.popen` behaviour. """
1012 stream_name_by_mode
= {
1017 def _hook_success(self
, argv
, mode
, buffering
):
1018 stream_name
= self
.stream_name_by_mode
[mode
]
1019 stream_double
= getattr(
1020 self
.double
, stream_name
+ '_double')
1021 result
= stream_double
.fake_file
1024 def _hook_failure(self
, argv
, mode
, buffering
):
1028 def _hook_not_found(self
, argv
, mode
, buffering
):
1033 class os_waitpid_scenario(TestDoubleFunctionScenario
):
1034 """ Scenario for `os.waitpid` behaviour. """
1036 def _hook_success(self
, pid
, options
):
1037 result
= (pid
, EXIT_STATUS_SUCCESS
)
1040 def _hook_failure(self
, pid
, options
):
1041 result
= (pid
, EXIT_STATUS_FAILURE
)
1044 def _hook_not_found(self
, pid
, options
):
1045 error
= OSError(errno
.ECHILD
)
1049 class os_system_scenario(TestDoubleFunctionScenario
):
1050 """ Scenario for `os.system` behaviour. """
1052 def _hook_success(self
, command
):
1053 result
= EXIT_STATUS_SUCCESS
1056 def _hook_failure(self
, command
):
1057 result
= EXIT_STATUS_FAILURE
1060 def _hook_not_found(self
, command
):
1061 result
= EXIT_STATUS_COMMAND_NOT_FOUND
1065 class os_spawnv_scenario(TestDoubleFunctionScenario
):
1066 """ Scenario for `os.spawnv` behaviour. """
1068 def _hook_success(self
, mode
, file, args
):
1069 result
= EXIT_STATUS_SUCCESS
1072 def _hook_failure(self
, mode
, file, args
):
1073 result
= EXIT_STATUS_FAILURE
1076 def _hook_not_found(self
, mode
, file, args
):
1077 result
= EXIT_STATUS_COMMAND_NOT_FOUND
1086 """ A testing double for `subprocess.Popen`. """
1088 def __init__(self
, args
, *posargs
, **kwargs
):
1093 self
.returncode
= None
1095 if kwargs
.get('shell', False):
1096 self
.argv
= shlex
.split(args
)
1098 # The paramter is already a sequence of command-line arguments.
1101 def set_streams(self
, subprocess_double
, popen_kwargs
):
1102 """ Set the streams on the `PopenDouble`.
1104 :param subprocess_double: The `SubprocessDouble` from
1105 which to get existing stream doubles.
1106 :param popen_kwargs: The keyword arguments to the
1107 `subprocess.Popen` call.
1111 for stream_name
in (
1112 name
for name
in ['stdin', 'stdout', 'stderr']
1113 if name
in popen_kwargs
):
1114 stream_spec
= popen_kwargs
[stream_name
]
1115 if stream_spec
is subprocess
.PIPE
:
1116 stream_double
= getattr(
1118 "{name}_double".format(name
=stream_name
))
1119 stream_file
= stream_double
.fake_file
1120 elif stream_spec
is subprocess
.STDOUT
:
1121 stream_file
= subprocess_double
.stdout_double
.fake_file
1123 stream_file
= stream_spec
1124 setattr(self
, stream_name
, stream_file
)
1127 """ Wait for subprocess to terminate. """
1128 return self
.returncode
1131 class subprocess_popen_scenario(TestDoubleFunctionScenario
):
1132 """ Scenario for `subprocess.Popen` behaviour. """
1134 def _hook_success(self
, testcase
, args
, *posargs
, **kwargs
):
1135 double
= self
.double
.popen_double
1136 double
.set_streams(self
.double
, kwargs
)
1140 def patch_subprocess_popen(testcase
):
1141 """ Patch `subprocess.Popen` constructor for this test case.
1143 :param testcase: The `TestCase` instance to modify.
1146 When the patched function is called, the registry of
1147 `SubprocessDouble` instances for this test case will be used
1148 to get the instance for the program path specified.
1151 orig_subprocess_popen
= subprocess
.Popen
1153 def fake_subprocess_popen(args
, *posargs
, **kwargs
):
1154 if kwargs
.get('shell', False):
1155 argv
= shlex
.split(args
)
1158 registry
= SubprocessDouble
.get_registry_for_testcase(testcase
)
1159 if argv
in registry
:
1160 subprocess_double
= registry
[argv
]
1161 result
= subprocess_double
.subprocess_popen_scenario
.call_hook(
1162 testcase
, args
, *posargs
, **kwargs
)
1164 result
= orig_subprocess_popen(args
, *posargs
, **kwargs
)
1167 func_patcher
= mock
.patch
.object(
1168 subprocess
, "Popen", autospec
=True,
1169 side_effect
=fake_subprocess_popen
)
1170 func_patcher
.start()
1171 testcase
.addCleanup(func_patcher
.stop
)
1174 class subprocess_call_scenario(TestDoubleFunctionScenario
):
1175 """ Scenario for `subprocess.call` behaviour. """
1177 def _hook_success(self
, command
):
1178 result
= EXIT_STATUS_SUCCESS
1181 def _hook_failure(self
, command
):
1182 result
= EXIT_STATUS_FAILURE
1185 def _hook_not_found(self
, command
):
1186 result
= EXIT_STATUS_COMMAND_NOT_FOUND
1190 def patch_subprocess_call(testcase
):
1191 """ Patch `subprocess.call` function for this test case.
1193 :param testcase: The `TestCase` instance to modify.
1196 When the patched function is called, the registry of
1197 `SubprocessDouble` instances for this test case will be used
1198 to get the instance for the program path specified.
1201 orig_subprocess_call
= subprocess
.call
1203 def fake_subprocess_call(command
, *posargs
, **kwargs
):
1204 if kwargs
.get('shell', False):
1205 command_argv
= shlex
.split(command
)
1207 command_argv
= command
1208 registry
= SubprocessDouble
.get_registry_for_testcase(testcase
)
1209 if command_argv
in registry
:
1210 subprocess_double
= registry
[command_argv
]
1211 result
= subprocess_double
.subprocess_call_scenario
.call_hook(
1214 result
= orig_subprocess_call(command
, *posargs
, **kwargs
)
1217 func_patcher
= mock
.patch
.object(
1218 subprocess
, "call", autospec
=True,
1219 side_effect
=fake_subprocess_call
)
1220 func_patcher
.start()
1221 testcase
.addCleanup(func_patcher
.stop
)
1224 class subprocess_check_call_scenario(TestDoubleFunctionScenario
):
1225 """ Scenario for `subprocess.check_call` behaviour. """
1227 def _hook_success(self
, command
):
1230 def _hook_failure(self
, command
):
1231 result
= EXIT_STATUS_FAILURE
1232 error
= subprocess
.CalledProcessError(result
, command
)
1235 def _hook_not_found(self
, command
):
1236 result
= EXIT_STATUS_COMMAND_NOT_FOUND
1237 error
= subprocess
.CalledProcessError(result
, command
)
1241 def patch_subprocess_check_call(testcase
):
1242 """ Patch `subprocess.check_call` function for this test case.
1244 :param testcase: The `TestCase` instance to modify.
1247 When the patched function is called, the registry of
1248 `SubprocessDouble` instances for this test case will be used
1249 to get the instance for the program path specified.
1252 orig_subprocess_check_call
= subprocess
.check_call
1254 def fake_subprocess_check_call(command
, *posargs
, **kwargs
):
1255 if kwargs
.get('shell', False):
1256 command_argv
= shlex
.split(command
)
1258 command_argv
= command
1259 registry
= SubprocessDouble
.get_registry_for_testcase(testcase
)
1260 if command_argv
in registry
:
1261 subprocess_double
= registry
[command_argv
]
1262 scenario
= subprocess_double
.subprocess_check_call_scenario
1263 result
= scenario
.call_hook(command
)
1265 result
= orig_subprocess_check_call(command
, *posargs
, **kwargs
)
1268 func_patcher
= mock
.patch
.object(
1269 subprocess
, "check_call", autospec
=True,
1270 side_effect
=fake_subprocess_check_call
)
1271 func_patcher
.start()
1272 testcase
.addCleanup(func_patcher
.stop
)
1275 class SubprocessDoubleRegistry(collections_abc
.MutableMapping
):
1276 """ Registry of `SubprocessDouble` instances by `argv`. """
1278 def __init__(self
, *args
, **kwargs
):
1281 if isinstance(args
[0], collections_abc
.Mapping
):
1282 items
= args
[0].items()
1283 if isinstance(args
[0], collections_abc
.Iterable
):
1285 self
._mapping
= dict(items
)
1288 text
= "<{class_name} object: {mapping}>".format(
1289 class_name
=self
.__class
__.__name
__, mapping
=self
._mapping
)
1292 def _match_argv(self
, argv
):
1293 """ Match the specified `argv` with our registered keys. """
1295 if not isinstance(argv
, collections_abc
.Sequence
):
1297 candidates
= iter(self
._mapping
)
1298 while match
is None:
1300 candidate
= next(candidates
)
1301 except StopIteration:
1304 if candidate
== argv
:
1307 word_iter
= enumerate(candidate
)
1308 while found
is None:
1310 (word_index
, candidate_word
) = next(word_iter
)
1311 except StopIteration:
1313 if candidate_word
is ARG_MORE
:
1314 # Candiate matches any remaining words. We have a match.
1316 elif word_index
> len(argv
):
1317 # Candidate is too long for the specified argv.
1319 elif candidate_word
is ARG_ANY
:
1320 # Candidate matches any word at this position.
1322 elif candidate_word
== argv
[word_index
]:
1323 # Candidate matches the word at this position.
1326 # This candidate does not match.
1329 # Reached the end of the candidate without a mismatch.
1335 def __getitem__(self
, key
):
1336 match
= self
._match
_argv
(key
)
1339 result
= self
._mapping
[match
]
1342 def __setitem__(self
, key
, value
):
1345 self
._mapping
[key
] = value
1347 def __delitem__(self
, key
):
1348 match
= self
._match
_argv
(key
)
1349 if match
is not None:
1350 del self
._mapping
[match
]
1353 return self
._mapping
.__iter
__()
1356 return self
._mapping
.__len
__()
1359 class SubprocessDouble(TestDoubleWithRegistry
):
1360 """ A testing double for a subprocess. """
1362 registry_class
= SubprocessDoubleRegistry
1365 double_by_pid
= weakref
.WeakValueDictionary()
1367 function_scenario_params_by_class
= {
1368 subprocess_popen_scenario
: {
1369 'default_scenario_name': 'success',
1370 'set_scenario_method_name': 'set_subprocess_popen_scenario',
1372 subprocess_call_scenario
: {
1373 'default_scenario_name': 'success',
1374 'set_scenario_method_name': 'set_subprocess_call_scenario',
1376 subprocess_check_call_scenario
: {
1377 'default_scenario_name': 'success',
1378 'set_scenario_method_name':
1379 'set_subprocess_check_call_scenario',
1381 os_popen_scenario
: {
1382 'default_scenario_name': 'success',
1383 'set_scenario_method_name': 'set_os_popen_scenario',
1385 os_waitpid_scenario
: {
1386 'default_scenario_name': 'success',
1387 'set_scenario_method_name': 'set_os_waitpid_scenario',
1389 os_system_scenario
: {
1390 'default_scenario_name': 'success',
1391 'set_scenario_method_name': 'set_os_system_scenario',
1393 os_spawnv_scenario
: {
1394 'default_scenario_name': 'success',
1395 'set_scenario_method_name': 'set_os_spawnv_scenario',
1399 def __init__(self
, path
=None, argv
=None, *args
, **kwargs
):
1401 path
= tempfile
.mktemp()
1405 command_name
= os
.path
.basename(path
)
1406 argv
= [command_name
]
1409 self
.pid
= self
._make
_pid
()
1410 self
._register
_by
_pid
()
1412 self
.set_popen_double()
1414 stream_class
= SubprocessDouble
.stream_class
1415 for stream_name
in ['stdin', 'stdout', 'stderr']:
1416 fake_file
= stream_class()
1417 file_double
= FileDouble(fake_file
=fake_file
)
1418 stream_double_name
= '{name}_double'.format(name
=stream_name
)
1419 setattr(self
, stream_double_name
, file_double
)
1421 super(SubprocessDouble
, self
).__init
__(*args
, **kwargs
)
1423 def set_popen_double(self
):
1424 """ Set the `PopenDouble` for this instance. """
1425 double
= PopenDouble(self
.argv
)
1426 double
.pid
= self
.pid
1428 self
.popen_double
= double
1432 "<SubprocessDouble instance: {id}"
1435 " stdin_double: {stdin_double!r}"
1436 " stdout_double: {stdout_double!r}"
1437 " stderr_double: {stderr_double!r}"
1440 path
=self
.path
, argv
=self
.argv
,
1441 stdin_double
=self
.stdin_double
,
1442 stdout_double
=self
.stdout_double
,
1443 stderr_double
=self
.stderr_double
)
1448 """ Make a unique PID for a subprocess. """
1449 for pid
in itertools
.count(1):
1452 def _register_by_pid(self
):
1453 """ Register this subprocess by its PID. """
1454 self
.__class
__.double_by_pid
[self
.pid
] = self
1456 def get_registry_key(self
):
1457 """ Get the registry key for this double. """
1458 result
= tuple(self
.argv
)
1461 stream_class
= io
.BytesIO
1462 stream_encoding
= "utf-8"
1464 def set_stdin_content(self
, text
, bytes_encoding
=stream_encoding
):
1465 """ Set the content of the `stdin` stream for this double. """
1466 content
= text
.encode(bytes_encoding
)
1467 fake_file
= self
.stream_class(content
)
1468 self
.stdin_double
.fake_file
= fake_file
1470 def set_stdout_content(self
, text
, bytes_encoding
=stream_encoding
):
1471 """ Set the content of the `stdout` stream for this double. """
1472 content
= text
.encode(bytes_encoding
)
1473 fake_file
= self
.stream_class(content
)
1474 self
.stdout_double
.fake_file
= fake_file
1476 def set_stderr_content(self
, text
, bytes_encoding
=stream_encoding
):
1477 """ Set the content of the `stderr` stream for this double. """
1478 content
= text
.encode(bytes_encoding
)
1479 fake_file
= self
.stream_class(content
)
1480 self
.stderr_double
.fake_file
= fake_file
1483 def make_fake_subprocess_scenarios(path
=None):
1484 """ Make a collection of scenarios for testing with fake files.
1486 :path: The filesystem path of the fake program. If not specified,
1487 a valid random path will be generated.
1488 :return: A collection of scenarios for tests involving subprocesses.
1490 The collection is a mapping from scenario name to a dictionary of
1491 scenario attributes.
1495 file_path
= tempfile
.mktemp()
1499 default_scenario_params
= {
1500 'return_value': EXIT_STATUS_SUCCESS
,
1501 'program_path': file_path
,
1502 'argv_after_command_name': [],
1508 'return_value': EXIT_STATUS_COMMAND_NOT_FOUND
,
1512 for (name
, scenario
) in scenarios
.items():
1513 params
= default_scenario_params
.copy()
1514 params
.update(scenario
)
1515 scenario
.update(params
)
1516 program_path
= params
['program_path']
1517 program_name
= os
.path
.basename(params
['program_path'])
1518 argv
= [program_name
]
1519 argv
.extend(params
['argv_after_command_name'])
1520 subprocess_double_params
= dict(
1524 subprocess_double
= SubprocessDouble(**subprocess_double_params
)
1525 scenario
['subprocess_double'] = subprocess_double
1526 scenario
['fake_file_scenario_name'] = name
1531 def get_subprocess_doubles_from_fake_subprocess_scenarios(scenarios
):
1532 """ Get the `SubprocessDouble` instances from fake subprocess scenarios.
1534 :param scenarios: Collection of fake subprocess scenarios.
1535 :return: Collection of `SubprocessDouble` instances.
1539 scenario
['subprocess_double']
1540 for scenario
in scenarios
1541 if scenario
['subprocess_double'] is not None)
1546 def setup_subprocess_double_behaviour(testcase
, doubles
=None):
1547 """ Set up subprocess double instances and behaviour.
1549 :param testcase: The `TestCase` instance to modify.
1550 :param doubles: Collection of `SubprocessDouble` instances.
1553 If `doubles` is ``None``, a default collection will be made
1554 from the return value of `make_fake_subprocess_scenarios`.
1558 scenarios
= make_fake_subprocess_scenarios()
1559 doubles
= get_subprocess_doubles_from_fake_subprocess_scenarios(
1562 for double
in doubles
:
1563 double
.register_for_testcase(testcase
)
1566 def setup_fake_subprocess_fixtures(testcase
):
1567 """ Set up fixtures for fake subprocess doubles.
1569 :param testcase: The `TestCase` instance to modify.
1573 scenarios
= make_fake_subprocess_scenarios()
1574 testcase
.fake_subprocess_scenarios
= scenarios
1576 doubles
= get_subprocess_doubles_from_fake_subprocess_scenarios(
1578 setup_subprocess_double_behaviour(testcase
, doubles
)
1581 def patch_os_popen(testcase
):
1582 """ Patch `os.popen` behaviour for this test case.
1584 :param testcase: The `TestCase` instance to modify.
1587 When the patched function is called, the registry of
1588 `SubprocessDouble` instances for this test case will be used
1589 to get the instance for the program path specified.
1592 orig_os_popen
= os
.popen
1594 def fake_os_popen(cmd
, mode
='r', buffering
=-1):
1595 registry
= SubprocessDouble
.get_registry_for_testcase(testcase
)
1596 if isinstance(cmd
, basestring
):
1597 command_argv
= shlex
.split(cmd
)
1600 if command_argv
in registry
:
1601 subprocess_double
= registry
[command_argv
]
1602 result
= subprocess_double
.os_popen_scenario
.call_hook(
1603 command_argv
, mode
, buffering
)
1605 result
= orig_os_popen(cmd
, mode
, buffering
)
1608 func_patcher
= mock
.patch
.object(
1609 os
, "popen", autospec
=True,
1610 side_effect
=fake_os_popen
)
1611 func_patcher
.start()
1612 testcase
.addCleanup(func_patcher
.stop
)
1615 def patch_os_waitpid(testcase
):
1616 """ Patch `os.waitpid` behaviour for this test case.
1618 :param testcase: The `TestCase` instance to modify.
1621 When the patched function is called, the registry of
1622 `SubprocessDouble` instances for this test case will be used
1623 to get the instance for the program path specified.
1626 orig_os_waitpid
= os
.waitpid
1628 def fake_os_waitpid(pid
, options
):
1629 registry
= SubprocessDouble
.double_by_pid
1631 subprocess_double
= registry
[pid
]
1632 result
= subprocess_double
.os_waitpid_scenario
.call_hook(
1635 result
= orig_os_waitpid(pid
, options
)
1638 func_patcher
= mock
.patch
.object(
1639 os
, "waitpid", autospec
=True,
1640 side_effect
=fake_os_waitpid
)
1641 func_patcher
.start()
1642 testcase
.addCleanup(func_patcher
.stop
)
1645 def patch_os_system(testcase
):
1646 """ Patch `os.system` behaviour for this test case.
1648 :param testcase: The `TestCase` instance to modify.
1651 When the patched function is called, the registry of
1652 `SubprocessDouble` instances for this test case will be used
1653 to get the instance for the program path specified.
1656 orig_os_system
= os
.system
1658 def fake_os_system(command
):
1659 registry
= SubprocessDouble
.get_registry_for_testcase(testcase
)
1660 command_argv
= shlex
.split(command
)
1661 if command_argv
in registry
:
1662 subprocess_double
= registry
[command_argv
]
1663 result
= subprocess_double
.os_system_scenario
.call_hook(
1666 result
= orig_os_system(command
)
1669 func_patcher
= mock
.patch
.object(
1670 os
, "system", autospec
=True,
1671 side_effect
=fake_os_system
)
1672 func_patcher
.start()
1673 testcase
.addCleanup(func_patcher
.stop
)
1676 def patch_os_spawnv(testcase
):
1677 """ Patch `os.spawnv` behaviour for this test case.
1679 :param testcase: The `TestCase` instance to modify.
1682 When the patched function is called, the registry of
1683 `SubprocessDouble` instances for this test case will be used
1684 to get the instance for the program path specified.
1687 orig_os_spawnv
= os
.spawnv
1689 def fake_os_spawnv(mode
, file, args
):
1690 registry
= SubprocessDouble
.get_registry_for_testcase(testcase
)
1691 registry_key
= tuple(args
)
1692 if registry_key
in registry
:
1693 subprocess_double
= registry
[registry_key
]
1694 result
= subprocess_double
.os_spawnv_scenario
.call_hook(
1697 result
= orig_os_spawnv(mode
, file, args
)
1700 func_patcher
= mock
.patch
.object(
1701 os
, "spawnv", autospec
=True,
1702 side_effect
=fake_os_spawnv
)
1703 func_patcher
.start()
1704 testcase
.addCleanup(func_patcher
.stop
)
1707 # Copyright © 2015–2016 Ben Finney <bignose@debian.org>
1709 # This is free software: you may copy, modify, and/or distribute this work
1710 # under the terms of the GNU General Public License as published by the
1711 # Free Software Foundation; version 3 of that license or any later version.
1712 # No warranty expressed or implied. See the file ‘LICENSE.GPL-3’ for details.
1719 # vim: fileencoding=utf-8 filetype=python :