1 # -*- coding: utf-8; -*-
4 # Part of Gajja, a Python test double library.
6 # Copyright © 2015–2016 Ben Finney <ben+python@benfinney.id.au>
8 # This is free software: you may copy, modify, and/or distribute this work
9 # under the terms of the GNU General Public License as published by the
10 # Free Software Foundation; version 3 of that license or any later version.
11 # No warranty expressed or implied. See the file ‘LICENSE.GPL-3’ for details.
13 """ Gajja: Fake objects for real tests
15 The `gajja` library provides a system of Python test double classes
16 for specific system objects::
22 The Korean word 가짜 (*gajja*; IPA ˈkaːt͡ɕ̤a) means “fake thing”.
26 from __future__
import (absolute_import
, unicode_literals
)
30 if sys
.version_info
>= (3, 3):
33 import unittest
.mock
as mock
34 from io
import StringIO
as StringIO
36 import collections
.abc
as collections_abc
37 elif sys
.version_info
>= (3, 0):
38 raise RuntimeError("Python 3 earlier than 3.3 is not supported.")
39 elif sys
.version_info
>= (2, 7):
40 # Python 2 standard library.
41 import __builtin__
as builtins
42 # Third-party backport of Python 3 unittest improvements.
43 import unittest2
as unittest
44 # Third-party mock library.
46 # Python 2 standard library.
47 from StringIO
import StringIO
as BaseStringIO
48 import ConfigParser
as configparser
49 import collections
as collections_abc
51 raise RuntimeError("Python earlier than 2.7 is not supported.")
70 __package__
= str("gajja")
71 __import__(__package__
)
80 # Alias for Python 3 types.
85 def make_unique_slug(testcase
):
86 """ Make a unique slug for the test case. """
87 text
= base64
.b64encode(
88 testcase
.getUniqueString().encode('utf-8')
97 # We don't yet have the StringIO we want. Create it.
99 class StringIO(BaseStringIO
, object):
100 """ StringIO with a context manager. """
105 def __exit__(self
, *args
):
119 def patch_stdout(testcase
):
120 """ Patch `sys.stdout` for the specified test case. """
121 patcher
= mock
.patch
.object(
122 sys
, "stdout", wraps
=StringIO())
124 testcase
.addCleanup(patcher
.stop
)
127 def patch_stderr(testcase
):
128 """ Patch `sys.stderr` for the specified test case. """
129 patcher
= mock
.patch
.object(
130 sys
, "stderr", wraps
=StringIO())
132 testcase
.addCleanup(patcher
.stop
)
135 def patch_signal_signal(testcase
):
136 """ Patch `signal.signal` for the specified test case. """
137 func_patcher
= mock
.patch
.object(signal
, "signal", autospec
=True)
139 testcase
.addCleanup(func_patcher
.stop
)
142 class FakeSystemExit(Exception):
143 """ Fake double for `SystemExit` exception. """
146 EXIT_STATUS_SUCCESS
= 0
147 EXIT_STATUS_FAILURE
= 1
148 EXIT_STATUS_COMMAND_NOT_FOUND
= 127
151 def patch_sys_exit(testcase
):
152 """ Patch `sys.exit` for the specified test case. """
153 func_patcher
= mock
.patch
.object(
154 sys
, "exit", autospec
=True,
155 side_effect
=FakeSystemExit())
157 testcase
.addCleanup(func_patcher
.stop
)
160 def patch_sys_argv(testcase
):
161 """ Patch the `sys.argv` sequence for the test case. """
162 if not hasattr(testcase
, 'progname'):
163 testcase
.progname
= make_unique_slug(testcase
)
164 if not hasattr(testcase
, 'sys_argv'):
165 testcase
.sys_argv
= [testcase
.progname
]
166 patcher
= mock
.patch
.object(
168 new
=list(testcase
.sys_argv
))
170 testcase
.addCleanup(patcher
.stop
)
173 def patch_system_interfaces(testcase
):
174 """ Patch system interfaces that are disruptive to the test runner. """
175 patch_stdout(testcase
)
176 patch_stderr(testcase
)
177 patch_sys_exit(testcase
)
178 patch_sys_argv(testcase
)
181 def patch_time_time(testcase
, values
=None):
182 """ Patch the `time.time` function for the specified test case.
184 :param testcase: The `TestCase` instance for binding to the patch.
185 :param values: An iterable to provide return values.
190 values
= itertools
.count()
192 def generator_fake_time():
196 func_patcher
= mock
.patch
.object(time
, "time", autospec
=True)
198 testcase
.addCleanup(func_patcher
.stop
)
200 time
.time
.side_effect
= generator_fake_time()
203 def patch_os_environ(testcase
):
204 """ Patch the `os.environ` mapping. """
205 if not hasattr(testcase
, 'os_environ'):
206 testcase
.os_environ
= {}
207 patcher
= mock
.patch
.object(os
, "environ", new
=testcase
.os_environ
)
209 testcase
.addCleanup(patcher
.stop
)
212 def patch_os_getpid(testcase
):
213 """ Patch `os.getpid` for the specified test case. """
214 func_patcher
= mock
.patch
.object(os
, "getpid", autospec
=True)
216 testcase
.addCleanup(func_patcher
.stop
)
219 def patch_os_getuid(testcase
):
220 """ Patch the `os.getuid` function. """
221 if not hasattr(testcase
, 'os_getuid_return_value'):
222 testcase
.os_getuid_return_value
= testcase
.getUniqueInteger()
223 func_patcher
= mock
.patch
.object(
224 os
, "getuid", autospec
=True,
225 return_value
=testcase
.os_getuid_return_value
)
227 testcase
.addCleanup(func_patcher
.stop
)
230 PasswdEntry
= collections
.namedtuple(
232 "pw_name pw_passwd pw_uid pw_gid pw_gecos pw_dir pw_shell")
235 def patch_pwd_getpwuid(testcase
):
236 """ Patch the `pwd.getpwuid` function. """
237 if not hasattr(testcase
, 'pwd_getpwuid_return_value'):
238 testcase
.pwd_getpwuid_return_value
= PasswdEntry(
239 pw_name
=make_unique_slug(testcase
),
240 pw_passwd
=make_unique_slug(testcase
),
241 pw_uid
=testcase
.getUniqueInteger(),
242 pw_gid
=testcase
.getUniqueInteger(),
243 pw_gecos
=testcase
.getUniqueString(),
244 pw_dir
=tempfile
.mktemp(),
245 pw_shell
=tempfile
.mktemp())
246 if not isinstance(testcase
.pwd_getpwuid_return_value
, pwd
.struct_passwd
):
247 pwent
= pwd
.struct_passwd(testcase
.pwd_getpwuid_return_value
)
249 pwent
= testcase
.pwd_getpwuid_return_value
250 func_patcher
= mock
.patch
.object(
251 pwd
, "getpwuid", autospec
=True,
254 testcase
.addCleanup(func_patcher
.stop
)
257 def patch_os_path_exists(testcase
):
258 """ Patch `os.path.exists` behaviour for this test case.
260 When the patched function is called, the registry of
261 `FileDouble` instances for this test case will be used to get
262 the instance for the path specified.
265 orig_os_path_exists
= os
.path
.exists
267 def fake_os_path_exists(path
):
268 registry
= FileDouble
.get_registry_for_testcase(testcase
)
270 file_double
= registry
[path
]
271 result
= file_double
.os_path_exists_scenario
.call_hook()
273 result
= orig_os_path_exists(path
)
276 func_patcher
= mock
.patch
.object(
277 os
.path
, "exists", autospec
=True,
278 side_effect
=fake_os_path_exists
)
280 testcase
.addCleanup(func_patcher
.stop
)
283 def patch_os_access(testcase
):
284 """ Patch `os.access` behaviour for this test case.
286 When the patched function is called, the registry of
287 `FileDouble` instances for this test case will be used to get
288 the instance for the path specified.
291 orig_os_access
= os
.access
293 def fake_os_access(path
, mode
):
294 registry
= FileDouble
.get_registry_for_testcase(testcase
)
296 file_double
= registry
[path
]
297 result
= file_double
.os_access_scenario
.call_hook(mode
)
299 result
= orig_os_access(path
, mode
)
302 func_patcher
= mock
.patch
.object(
303 os
, "access", autospec
=True,
304 side_effect
=fake_os_access
)
306 testcase
.addCleanup(func_patcher
.stop
)
309 StatResult
= collections
.namedtuple(
312 'st_ino', 'st_dev', 'st_nlink',
315 'st_atime', 'st_mtime', 'st_ctime',
319 def patch_os_stat(testcase
):
320 """ Patch `os.stat` behaviour for this test case.
322 When the patched function is called, the registry of
323 `FileDouble` instances for this test case will be used to get
324 the instance for the path specified.
327 orig_os_stat
= os
.stat
329 def fake_os_stat(path
):
330 registry
= FileDouble
.get_registry_for_testcase(testcase
)
332 file_double
= registry
[path
]
333 result
= file_double
.os_stat_scenario
.call_hook()
335 result
= orig_os_stat(path
)
338 func_patcher
= mock
.patch
.object(
339 os
, "stat", autospec
=True,
340 side_effect
=fake_os_stat
)
342 testcase
.addCleanup(func_patcher
.stop
)
345 def patch_os_lstat(testcase
):
346 """ Patch `os.lstat` behaviour for this test case.
348 When the patched function is called, the registry of
349 `FileDouble` instances for this test case will be used to get
350 the instance for the path specified.
353 orig_os_lstat
= os
.lstat
355 def fake_os_lstat(path
):
356 registry
= FileDouble
.get_registry_for_testcase(testcase
)
358 file_double
= registry
[path
]
359 result
= file_double
.os_lstat_scenario
.call_hook()
361 result
= orig_os_lstat(path
)
364 func_patcher
= mock
.patch
.object(
365 os
, "lstat", autospec
=True,
366 side_effect
=fake_os_lstat
)
368 testcase
.addCleanup(func_patcher
.stop
)
371 def patch_os_unlink(testcase
):
372 """ Patch `os.unlink` behaviour for this test case.
374 When the patched function is called, the registry of
375 `FileDouble` instances for this test case will be used to get
376 the instance for the path specified.
379 orig_os_unlink
= os
.unlink
381 def fake_os_unlink(path
):
382 registry
= FileDouble
.get_registry_for_testcase(testcase
)
384 file_double
= registry
[path
]
385 result
= file_double
.os_unlink_scenario
.call_hook()
387 result
= orig_os_unlink(path
)
390 func_patcher
= mock
.patch
.object(
391 os
, "unlink", autospec
=True,
392 side_effect
=fake_os_unlink
)
394 testcase
.addCleanup(func_patcher
.stop
)
397 def patch_os_rmdir(testcase
):
398 """ Patch `os.rmdir` behaviour for this test case.
400 When the patched function is called, the registry of
401 `FileDouble` instances for this test case will be used to get
402 the instance for the path specified.
405 orig_os_rmdir
= os
.rmdir
407 def fake_os_rmdir(path
):
408 registry
= FileDouble
.get_registry_for_testcase(testcase
)
410 file_double
= registry
[path
]
411 result
= file_double
.os_rmdir_scenario
.call_hook()
413 result
= orig_os_rmdir(path
)
416 func_patcher
= mock
.patch
.object(
417 os
, "rmdir", autospec
=True,
418 side_effect
=fake_os_rmdir
)
420 testcase
.addCleanup(func_patcher
.stop
)
423 def patch_shutil_rmtree(testcase
):
424 """ Patch `shutil.rmtree` behaviour for this test case.
426 When the patched function is called, the registry of
427 `FileDouble` instances for this test case will be used to get
428 the instance for the path specified.
431 orig_shutil_rmtree
= os
.rmdir
433 def fake_shutil_rmtree(path
, ignore_errors
=False, onerror
=None):
434 registry
= FileDouble
.get_registry_for_testcase(testcase
)
436 file_double
= registry
[path
]
437 result
= file_double
.shutil_rmtree_scenario
.call_hook()
439 result
= orig_shutil_rmtree(path
)
442 func_patcher
= mock
.patch
.object(
443 shutil
, "rmtree", autospec
=True,
444 side_effect
=fake_shutil_rmtree
)
446 testcase
.addCleanup(func_patcher
.stop
)
449 def patch_tempfile_mkdtemp(testcase
):
450 """ Patch the `tempfile.mkdtemp` function for this test case. """
451 if not hasattr(testcase
, 'tempfile_mkdtemp_file_double'):
452 testcase
.tempfile_mkdtemp_file_double
= FileDouble(tempfile
.mktemp())
454 double
= testcase
.tempfile_mkdtemp_file_double
455 double
.set_os_unlink_scenario('okay')
456 double
.set_os_rmdir_scenario('okay')
457 double
.register_for_testcase(testcase
)
459 func_patcher
= mock
.patch
.object(tempfile
, "mkdtemp", autospec
=True)
461 testcase
.addCleanup(func_patcher
.stop
)
463 tempfile
.mkdtemp
.return_value
= testcase
.tempfile_mkdtemp_file_double
.path
471 # Python 2 uses IOError.
472 def _ensure_ioerror_args(init_args
, init_kwargs
, errno_value
):
473 result_kwargs
= init_kwargs
474 result_errno
= errno_value
475 result_strerror
= os
.strerror(errno_value
)
476 result_filename
= None
477 if len(init_args
) >= 3:
478 result_errno
= init_args
[0]
479 result_filename
= init_args
[2]
480 if 'errno' in init_kwargs
:
481 result_errno
= init_kwargs
['errno']
482 del result_kwargs
['errno']
483 if 'filename' in init_kwargs
:
484 result_filename
= init_kwargs
['filename']
485 del result_kwargs
['filename']
486 if len(init_args
) >= 2:
487 result_strerror
= init_args
[1]
488 if 'strerror' in init_kwargs
:
489 result_strerror
= init_kwargs
['strerror']
490 del result_kwargs
['strerror']
491 result_args
= (result_errno
, result_strerror
, result_filename
)
492 return (result_args
, result_kwargs
)
494 class FileNotFoundError(IOError):
495 def __init__(self
, *args
, **kwargs
):
496 (args
, kwargs
) = _ensure_ioerror_args(
497 args
, kwargs
, errno_value
=errno
.ENOENT
)
498 super(FileNotFoundError
, self
).__init
__(*args
, **kwargs
)
500 class FileExistsError(IOError):
501 def __init__(self
, *args
, **kwargs
):
502 (args
, kwargs
) = _ensure_ioerror_args(
503 args
, kwargs
, errno_value
=errno
.EEXIST
)
504 super(FileExistsError
, self
).__init
__(*args
, **kwargs
)
506 class PermissionError(IOError):
507 def __init__(self
, *args
, **kwargs
):
508 (args
, kwargs
) = _ensure_ioerror_args(
509 args
, kwargs
, errno_value
=errno
.EPERM
)
510 super(PermissionError
, self
).__init
__(*args
, **kwargs
)
513 def make_fake_file_scenarios(path
=None):
514 """ Make a collection of scenarios for testing with fake files.
516 :path: The filesystem path of the fake file. If not specified,
517 a valid random path will be generated.
518 :return: A collection of scenarios for tests involving input files.
520 The collection is a mapping from scenario name to a dictionary of
526 file_path
= tempfile
.mktemp()
530 fake_file_empty
= StringIO()
531 fake_file_minimal
= StringIO("Lorem ipsum.")
532 fake_file_large
= StringIO("\n".join(
534 for __
in range(1000)))
536 default_scenario_params
= {
537 'open_scenario_name': 'okay',
538 'file_double_params': dict(
539 path
=file_path
, fake_file
=fake_file_minimal
),
545 'open_scenario_name': 'nonexist',
548 'open_scenario_name': 'exist_error',
550 'error-read-denied': {
551 'open_scenario_name': 'read_denied',
554 'file_double_params': dict(
555 path
=file_path
, fake_file
=fake_file_empty
),
558 'file_double_params': dict(
559 path
=file_path
, fake_file
=fake_file_empty
),
562 'file_double_params': dict(
563 path
=file_path
, fake_file
=fake_file_minimal
),
566 'file_double_params': dict(
567 path
=file_path
, fake_file
=fake_file_large
),
571 for (name
, scenario
) in scenarios
.items():
572 params
= default_scenario_params
.copy()
573 params
.update(scenario
)
574 scenario
.update(params
)
575 scenario
['file_double'] = FileDouble(**scenario
['file_double_params'])
576 scenario
['file_double'].set_open_scenario(params
['open_scenario_name'])
577 scenario
['fake_file_scenario_name'] = name
582 def get_file_doubles_from_fake_file_scenarios(scenarios
):
583 """ Get the `FileDouble` instances from fake file scenarios.
585 :param scenarios: Collection of fake file scenarios.
586 :return: Collection of `FileDouble` instances.
590 scenario
['file_double']
591 for scenario
in scenarios
592 if scenario
['file_double'] is not None)
597 def setup_file_double_behaviour(testcase
, doubles
=None):
598 """ Set up file double instances and behaviour.
600 :param testcase: The `TestCase` instance to modify.
601 :param doubles: Collection of `FileDouble` instances.
604 If `doubles` is ``None``, a default collection will be made
605 from the result of `make_fake_file_scenarios` result.
609 scenarios
= make_fake_file_scenarios()
610 doubles
= get_file_doubles_from_fake_file_scenarios(
613 for file_double
in doubles
:
614 file_double
.register_for_testcase(testcase
)
616 orig_open
= builtins
.open
618 def fake_open(path
, mode
='rt', buffering
=-1):
619 registry
= FileDouble
.get_registry_for_testcase(testcase
)
621 file_double
= registry
[path
]
622 result
= file_double
.builtins_open_scenario
.call_hook(
625 result
= orig_open(path
, mode
, buffering
)
628 mock_open
= mock
.mock_open()
629 mock_open
.side_effect
= fake_open
631 func_patcher
= mock
.patch
.object(
632 builtins
, "open", new
=mock_open
)
634 testcase
.addCleanup(func_patcher
.stop
)
637 def setup_fake_file_fixtures(testcase
):
638 """ Set up fixtures for fake file doubles.
640 :param testcase: The `TestCase` instance to modify.
644 scenarios
= make_fake_file_scenarios()
645 testcase
.fake_file_scenarios
= scenarios
647 file_doubles
= get_file_doubles_from_fake_file_scenarios(
649 setup_file_double_behaviour(testcase
, file_doubles
)
652 def set_fake_file_scenario(testcase
, name
):
653 """ Set the named fake file scenario for the test case. """
654 scenario
= testcase
.fake_file_scenarios
[name
]
655 testcase
.fake_file_scenario
= scenario
656 testcase
.file_double
= scenario
['file_double']
657 testcase
.file_double
.register_for_testcase(testcase
)
660 class TestDoubleFunctionScenario
:
661 """ Scenario for fake behaviour of a specific function. """
663 def __init__(self
, scenario_name
, double
):
664 self
.scenario_name
= scenario_name
667 self
.call_hook
= getattr(
668 self
, "_hook_{name}".format(name
=self
.scenario_name
))
672 "<{class_name} instance: {id}"
674 " call_hook name: {hook_name!r}"
675 " double: {double!r}"
677 class_name
=self
.__class
__.__name
__, id=id(self
),
678 name
=self
.scenario_name
, double
=self
.double
,
679 hook_name
=self
.call_hook
.__name
__)
682 def __eq__(self
, other
):
684 if not self
.scenario_name
== other
.scenario_name
:
686 if not self
.double
== other
.double
:
688 if not self
.call_hook
.__name
__ == other
.call_hook
.__name
__:
692 def __ne__(self
, other
):
693 result
= not self
.__eq
__(other
)
697 class os_path_exists_scenario(TestDoubleFunctionScenario
):
698 """ Scenario for `os.path.exists` behaviour. """
700 def _hook_exist(self
):
703 def _hook_not_exist(self
):
707 class os_access_scenario(TestDoubleFunctionScenario
):
708 """ Scenario for `os.access` behaviour. """
710 def _hook_okay(self
, mode
):
713 def _hook_not_exist(self
, mode
):
716 def _hook_read_only(self
, mode
):
717 if mode
& (os
.W_OK | os
.X_OK
):
723 def _hook_denied(self
, mode
):
724 if mode
& (os
.R_OK | os
.W_OK | os
.X_OK
):
731 class os_stat_scenario(TestDoubleFunctionScenario
):
732 """ Scenario for `os.stat` behaviour. """
734 def _hook_okay(self
):
735 return self
.double
.stat_result
737 def _hook_notfound_error(self
):
738 raise FileNotFoundError(
740 "No such file or directory: {path!r}".format(
741 path
=self
.double
.path
))
743 def _hook_denied_error(self
):
744 raise PermissionError(
749 class os_lstat_scenario(os_stat_scenario
):
750 """ Scenario for `os.lstat` behaviour. """
753 class os_unlink_scenario(TestDoubleFunctionScenario
):
754 """ Scenario for `os.unlink` behaviour. """
756 def _hook_okay(self
):
759 def _hook_nonexist(self
):
760 error
= FileNotFoundError(
762 "No such file or directory: {path!r}".format(
763 path
=self
.double
.path
))
766 def _hook_denied(self
):
767 error
= PermissionError(
773 class os_rmdir_scenario(TestDoubleFunctionScenario
):
774 """ Scenario for `os.rmdir` behaviour. """
776 def _hook_okay(self
):
779 def _hook_nonexist(self
):
780 error
= FileNotFoundError(
782 "No such file or directory: {path!r}".format(
783 path
=self
.double
.path
))
786 def _hook_denied(self
):
787 error
= PermissionError(
793 class shutil_rmtree_scenario(TestDoubleFunctionScenario
):
794 """ Scenario for `shutil.rmtree` behaviour. """
796 def _hook_okay(self
):
799 def _hook_nonexist(self
):
800 error
= FileNotFoundError(
802 "No such file or directory: {path!r}".format(
803 path
=self
.double
.path
))
806 def _hook_denied(self
):
807 error
= PermissionError(
813 class builtins_open_scenario(TestDoubleFunctionScenario
):
814 """ Scenario for `builtins.open` behaviour. """
816 def _hook_okay(self
, mode
, buffering
):
817 result
= self
.double
.fake_file
820 def _hook_nonexist(self
, mode
, buffering
):
821 if mode
.startswith('r'):
822 error
= FileNotFoundError(
824 "No such file or directory: {path!r}".format(
825 path
=self
.double
.path
))
827 result
= self
.double
.fake_file
830 def _hook_exist_error(self
, mode
, buffering
):
831 if mode
.startswith('w') or mode
.startswith('a'):
832 error
= FileExistsError(
834 "File already exists: {path!r}".format(
835 path
=self
.double
.path
))
837 result
= self
.double
.fake_file
840 def _hook_read_denied(self
, mode
, buffering
):
841 if mode
.startswith('r'):
842 error
= PermissionError(
844 "Read denied on {path!r}".format(
845 path
=self
.double
.path
))
847 result
= self
.double
.fake_file
850 def _hook_write_denied(self
, mode
, buffering
):
851 if mode
.startswith('w') or mode
.startswith('a'):
852 error
= PermissionError(
854 "Write denied on {path!r}".format(
855 path
=self
.double
.path
))
857 result
= self
.double
.fake_file
861 class TestDoubleWithRegistry
:
862 """ Abstract base class for a test double with a test case registry. """
864 registry_class
= NotImplemented
865 registries
= NotImplemented
867 function_scenario_params_by_class
= NotImplemented
869 def __new__(cls
, *args
, **kwargs
):
870 superclass
= super(TestDoubleWithRegistry
, cls
)
871 if superclass
.__new
__ is object.__new
__:
872 # The ‘object’ implementation complains about extra arguments.
873 instance
= superclass
.__new
__(cls
)
875 instance
= superclass
.__new
__(cls
, *args
, **kwargs
)
876 instance
.make_set_scenario_methods()
880 def __init__(self
, *args
, **kwargs
):
881 super(TestDoubleWithRegistry
, self
).__init
__(*args
, **kwargs
)
882 self
._set
_method
_per
_scenario
()
884 def _make_set_scenario_method(self
, scenario_class
, params
):
885 def method(self
, name
):
886 scenario
= scenario_class(name
, double
=self
)
887 setattr(self
, scenario_class
.__name
__, scenario
)
889 """ Set the scenario for `{name}` behaviour. """
890 ).format(name
=scenario_class
.__name
__)
891 method
.__name
__ = str(params
['set_scenario_method_name'])
894 def make_set_scenario_methods(self
):
895 """ Make `set_<scenario_class_name>` methods on this class. """
896 for (function_scenario_class
, function_scenario_params
) in (
897 self
.function_scenario_params_by_class
.items()):
898 method
= self
._make
_set
_scenario
_method
(
899 function_scenario_class
, function_scenario_params
)
900 setattr(self
.__class
__, method
.__name
__, method
)
901 function_scenario_params
['set_scenario_method'] = method
903 def _set_method_per_scenario(self
):
904 """ Set the method to be called for each scenario. """
905 for function_scenario_params
in (
906 self
.function_scenario_params_by_class
.values()):
907 function_scenario_params
['set_scenario_method'](
908 self
, function_scenario_params
['default_scenario_name'])
911 def get_registry_for_testcase(cls
, testcase
):
912 """ Get the FileDouble registry for the specified test case. """
913 # Key in a dict must be hashable.
914 key
= (testcase
.__class
__, id(testcase
))
915 registry
= cls
.registries
.setdefault(key
, cls
.registry_class())
918 def get_registry_key(self
):
919 """ Get the registry key for this double. """
920 raise NotImplementedError
922 def register_for_testcase(self
, testcase
):
923 """ Add this instance to registry for the specified testcase. """
924 registry
= self
.get_registry_for_testcase(testcase
)
925 key
= self
.get_registry_key()
927 unregister_func
= functools
.partial(
928 self
.unregister_for_testcase
, testcase
)
929 testcase
.addCleanup(unregister_func
)
931 def unregister_for_testcase(self
, testcase
):
932 """ Remove this instance from registry for the specified testcase. """
933 registry
= self
.get_registry_for_testcase(testcase
)
934 key
= self
.get_registry_key()
939 def copy_fake_file(fake_file
):
940 """ Make a copy of the StringIO instance. """
941 fake_file_type
= StringIO
943 if fake_file
is not None:
944 fake_file_type
= type(fake_file
)
945 content
= fake_file
.getvalue()
946 assert issubclass(fake_file_type
, object)
947 result
= fake_file_type(content
)
948 if hasattr(fake_file
, 'encoding'):
949 if not hasattr(result
, 'encoding'):
950 result
.encoding
= fake_file
.encoding
954 class FileDouble(TestDoubleWithRegistry
):
955 """ A testing double for a file. """
957 registry_class
= dict
960 function_scenario_params_by_class
= {
961 os_path_exists_scenario
: {
962 'default_scenario_name': 'not_exist',
963 'set_scenario_method_name': 'set_os_path_exists_scenario',
965 os_access_scenario
: {
966 'default_scenario_name': 'okay',
967 'set_scenario_method_name': 'set_os_access_scenario',
970 'default_scenario_name': 'okay',
971 'set_scenario_method_name': 'set_os_stat_scenario',
974 'default_scenario_name': 'okay',
975 'set_scenario_method_name': 'set_os_lstat_scenario',
977 builtins_open_scenario
: {
978 'default_scenario_name': 'okay',
979 'set_scenario_method_name': 'set_open_scenario',
981 os_unlink_scenario
: {
982 'default_scenario_name': 'okay',
983 'set_scenario_method_name': 'set_os_unlink_scenario',
986 'default_scenario_name': 'okay',
987 'set_scenario_method_name': 'set_os_rmdir_scenario',
989 shutil_rmtree_scenario
: {
990 'default_scenario_name': 'okay',
991 'set_scenario_method_name': 'set_shutil_rmtree_scenario',
995 def __init__(self
, path
=None, fake_file
=None, *args
, **kwargs
):
997 self
.fake_file
= copy_fake_file(fake_file
)
998 self
.fake_file
.name
= path
1000 self
._set
_stat
_result
()
1002 super(FileDouble
, self
).__init
__(*args
, **kwargs
)
1004 def _set_stat_result(self
):
1005 """ Set the `os.stat` result for this file. """
1006 size
= len(self
.fake_file
.getvalue())
1007 self
.stat_result
= StatResult(
1009 st_ino
=None, st_dev
=None, st_nlink
=None,
1012 st_atime
=None, st_mtime
=None, st_ctime
=None,
1016 text
= "FileDouble(path={path!r}, fake_file={fake_file!r})".format(
1017 path
=self
.path
, fake_file
=self
.fake_file
)
1020 def get_registry_key(self
):
1021 """ Get the registry key for this double. """
1026 class os_popen_scenario(TestDoubleFunctionScenario
):
1027 """ Scenario for `os.popen` behaviour. """
1029 stream_name_by_mode
= {
1034 def _hook_success(self
, argv
, mode
, buffering
):
1035 stream_name
= self
.stream_name_by_mode
[mode
]
1036 stream_double
= getattr(
1037 self
.double
, stream_name
+ '_double')
1038 result
= stream_double
.fake_file
1041 def _hook_failure(self
, argv
, mode
, buffering
):
1045 def _hook_not_found(self
, argv
, mode
, buffering
):
1050 class os_waitpid_scenario(TestDoubleFunctionScenario
):
1051 """ Scenario for `os.waitpid` behaviour. """
1053 def _hook_success(self
, pid
, options
):
1054 result
= (pid
, EXIT_STATUS_SUCCESS
)
1057 def _hook_failure(self
, pid
, options
):
1058 result
= (pid
, EXIT_STATUS_FAILURE
)
1061 def _hook_not_found(self
, pid
, options
):
1062 error
= OSError(errno
.ECHILD
)
1066 class os_system_scenario(TestDoubleFunctionScenario
):
1067 """ Scenario for `os.system` behaviour. """
1069 def _hook_success(self
, command
):
1070 result
= EXIT_STATUS_SUCCESS
1073 def _hook_failure(self
, command
):
1074 result
= EXIT_STATUS_FAILURE
1077 def _hook_not_found(self
, command
):
1078 result
= EXIT_STATUS_COMMAND_NOT_FOUND
1082 class os_spawnv_scenario(TestDoubleFunctionScenario
):
1083 """ Scenario for `os.spawnv` behaviour. """
1085 def _hook_success(self
, mode
, file, args
):
1086 result
= EXIT_STATUS_SUCCESS
1089 def _hook_failure(self
, mode
, file, args
):
1090 result
= EXIT_STATUS_FAILURE
1093 def _hook_not_found(self
, mode
, file, args
):
1094 result
= EXIT_STATUS_COMMAND_NOT_FOUND
1103 """ A testing double for `subprocess.Popen`. """
1105 def __init__(self
, args
, *posargs
, **kwargs
):
1110 self
.returncode
= None
1112 if kwargs
.get('shell', False):
1113 self
.argv
= shlex
.split(args
)
1115 # The paramter is already a sequence of command-line arguments.
1118 def set_streams(self
, subprocess_double
, popen_kwargs
):
1119 """ Set the streams on the `PopenDouble`.
1121 :param subprocess_double: The `SubprocessDouble` from
1122 which to get existing stream doubles.
1123 :param popen_kwargs: The keyword arguments to the
1124 `subprocess.Popen` call.
1128 for stream_name
in (
1129 name
for name
in ['stdin', 'stdout', 'stderr']
1130 if name
in popen_kwargs
):
1131 stream_spec
= popen_kwargs
[stream_name
]
1132 if stream_spec
is subprocess
.PIPE
:
1133 stream_double
= getattr(
1135 "{name}_double".format(name
=stream_name
))
1136 stream_file
= stream_double
.fake_file
1137 elif stream_spec
is subprocess
.STDOUT
:
1138 stream_file
= subprocess_double
.stdout_double
.fake_file
1140 stream_file
= stream_spec
1141 setattr(self
, stream_name
, stream_file
)
1144 """ Wait for subprocess to terminate. """
1145 return self
.returncode
1148 class subprocess_popen_scenario(TestDoubleFunctionScenario
):
1149 """ Scenario for `subprocess.Popen` behaviour. """
1151 def _hook_success(self
, testcase
, args
, *posargs
, **kwargs
):
1152 double
= self
.double
.popen_double
1153 double
.set_streams(self
.double
, kwargs
)
1157 def patch_subprocess_popen(testcase
):
1158 """ Patch `subprocess.Popen` constructor for this test case.
1160 :param testcase: The `TestCase` instance to modify.
1163 When the patched function is called, the registry of
1164 `SubprocessDouble` instances for this test case will be used
1165 to get the instance for the program path specified.
1168 orig_subprocess_popen
= subprocess
.Popen
1170 def fake_subprocess_popen(args
, *posargs
, **kwargs
):
1171 if kwargs
.get('shell', False):
1172 argv
= shlex
.split(args
)
1175 registry
= SubprocessDouble
.get_registry_for_testcase(testcase
)
1176 if argv
in registry
:
1177 subprocess_double
= registry
[argv
]
1178 result
= subprocess_double
.subprocess_popen_scenario
.call_hook(
1179 testcase
, args
, *posargs
, **kwargs
)
1181 result
= orig_subprocess_popen(args
, *posargs
, **kwargs
)
1184 func_patcher
= mock
.patch
.object(
1185 subprocess
, "Popen", autospec
=True,
1186 side_effect
=fake_subprocess_popen
)
1187 func_patcher
.start()
1188 testcase
.addCleanup(func_patcher
.stop
)
1191 class subprocess_call_scenario(TestDoubleFunctionScenario
):
1192 """ Scenario for `subprocess.call` behaviour. """
1194 def _hook_success(self
, command
):
1195 result
= EXIT_STATUS_SUCCESS
1198 def _hook_failure(self
, command
):
1199 result
= EXIT_STATUS_FAILURE
1202 def _hook_not_found(self
, command
):
1203 result
= EXIT_STATUS_COMMAND_NOT_FOUND
1207 def patch_subprocess_call(testcase
):
1208 """ Patch `subprocess.call` function for this test case.
1210 :param testcase: The `TestCase` instance to modify.
1213 When the patched function is called, the registry of
1214 `SubprocessDouble` instances for this test case will be used
1215 to get the instance for the program path specified.
1218 orig_subprocess_call
= subprocess
.call
1220 def fake_subprocess_call(command
, *posargs
, **kwargs
):
1221 if kwargs
.get('shell', False):
1222 command_argv
= shlex
.split(command
)
1224 command_argv
= command
1225 registry
= SubprocessDouble
.get_registry_for_testcase(testcase
)
1226 if command_argv
in registry
:
1227 subprocess_double
= registry
[command_argv
]
1228 result
= subprocess_double
.subprocess_call_scenario
.call_hook(
1231 result
= orig_subprocess_call(command
, *posargs
, **kwargs
)
1234 func_patcher
= mock
.patch
.object(
1235 subprocess
, "call", autospec
=True,
1236 side_effect
=fake_subprocess_call
)
1237 func_patcher
.start()
1238 testcase
.addCleanup(func_patcher
.stop
)
1241 class subprocess_check_call_scenario(TestDoubleFunctionScenario
):
1242 """ Scenario for `subprocess.check_call` behaviour. """
1244 def _hook_success(self
, command
):
1247 def _hook_failure(self
, command
):
1248 result
= EXIT_STATUS_FAILURE
1249 error
= subprocess
.CalledProcessError(result
, command
)
1252 def _hook_not_found(self
, command
):
1253 result
= EXIT_STATUS_COMMAND_NOT_FOUND
1254 error
= subprocess
.CalledProcessError(result
, command
)
1258 def patch_subprocess_check_call(testcase
):
1259 """ Patch `subprocess.check_call` function for this test case.
1261 :param testcase: The `TestCase` instance to modify.
1264 When the patched function is called, the registry of
1265 `SubprocessDouble` instances for this test case will be used
1266 to get the instance for the program path specified.
1269 orig_subprocess_check_call
= subprocess
.check_call
1271 def fake_subprocess_check_call(command
, *posargs
, **kwargs
):
1272 if kwargs
.get('shell', False):
1273 command_argv
= shlex
.split(command
)
1275 command_argv
= command
1276 registry
= SubprocessDouble
.get_registry_for_testcase(testcase
)
1277 if command_argv
in registry
:
1278 subprocess_double
= registry
[command_argv
]
1279 scenario
= subprocess_double
.subprocess_check_call_scenario
1280 result
= scenario
.call_hook(command
)
1282 result
= orig_subprocess_check_call(command
, *posargs
, **kwargs
)
1285 func_patcher
= mock
.patch
.object(
1286 subprocess
, "check_call", autospec
=True,
1287 side_effect
=fake_subprocess_check_call
)
1288 func_patcher
.start()
1289 testcase
.addCleanup(func_patcher
.stop
)
1292 class SubprocessDoubleRegistry(collections_abc
.MutableMapping
):
1293 """ Registry of `SubprocessDouble` instances by `argv`. """
1295 def __init__(self
, *args
, **kwargs
):
1298 if isinstance(args
[0], collections_abc
.Mapping
):
1299 items
= args
[0].items()
1300 if isinstance(args
[0], collections_abc
.Iterable
):
1302 self
._mapping
= dict(items
)
1305 text
= "<{class_name} object: {mapping}>".format(
1306 class_name
=self
.__class
__.__name
__, mapping
=self
._mapping
)
1309 def _match_argv(self
, argv
):
1310 """ Match the specified `argv` with our registered keys. """
1312 if not isinstance(argv
, collections_abc
.Sequence
):
1314 candidates
= iter(self
._mapping
)
1315 while match
is None:
1317 candidate
= next(candidates
)
1318 except StopIteration:
1321 if candidate
== argv
:
1324 word_iter
= enumerate(candidate
)
1325 while found
is None:
1327 (word_index
, candidate_word
) = next(word_iter
)
1328 except StopIteration:
1330 if candidate_word
is ARG_MORE
:
1331 # Candiate matches any remaining words. We have a match.
1333 elif word_index
> len(argv
):
1334 # Candidate is too long for the specified argv.
1336 elif candidate_word
is ARG_ANY
:
1337 # Candidate matches any word at this position.
1339 elif candidate_word
== argv
[word_index
]:
1340 # Candidate matches the word at this position.
1343 # This candidate does not match.
1346 # Reached the end of the candidate without a mismatch.
1352 def __getitem__(self
, key
):
1353 match
= self
._match
_argv
(key
)
1356 result
= self
._mapping
[match
]
1359 def __setitem__(self
, key
, value
):
1362 self
._mapping
[key
] = value
1364 def __delitem__(self
, key
):
1365 match
= self
._match
_argv
(key
)
1366 if match
is not None:
1367 del self
._mapping
[match
]
1370 return self
._mapping
.__iter
__()
1373 return self
._mapping
.__len
__()
1376 class SubprocessDouble(TestDoubleWithRegistry
):
1377 """ A testing double for a subprocess. """
1379 registry_class
= SubprocessDoubleRegistry
1382 double_by_pid
= weakref
.WeakValueDictionary()
1384 function_scenario_params_by_class
= {
1385 subprocess_popen_scenario
: {
1386 'default_scenario_name': 'success',
1387 'set_scenario_method_name': 'set_subprocess_popen_scenario',
1389 subprocess_call_scenario
: {
1390 'default_scenario_name': 'success',
1391 'set_scenario_method_name': 'set_subprocess_call_scenario',
1393 subprocess_check_call_scenario
: {
1394 'default_scenario_name': 'success',
1395 'set_scenario_method_name':
1396 'set_subprocess_check_call_scenario',
1398 os_popen_scenario
: {
1399 'default_scenario_name': 'success',
1400 'set_scenario_method_name': 'set_os_popen_scenario',
1402 os_waitpid_scenario
: {
1403 'default_scenario_name': 'success',
1404 'set_scenario_method_name': 'set_os_waitpid_scenario',
1406 os_system_scenario
: {
1407 'default_scenario_name': 'success',
1408 'set_scenario_method_name': 'set_os_system_scenario',
1410 os_spawnv_scenario
: {
1411 'default_scenario_name': 'success',
1412 'set_scenario_method_name': 'set_os_spawnv_scenario',
1416 def __init__(self
, path
=None, argv
=None, *args
, **kwargs
):
1418 path
= tempfile
.mktemp()
1422 command_name
= os
.path
.basename(path
)
1423 argv
= [command_name
]
1426 self
.pid
= self
._make
_pid
()
1427 self
._register
_by
_pid
()
1429 self
.set_popen_double()
1431 stream_class
= SubprocessDouble
.stream_class
1432 for stream_name
in ['stdin', 'stdout', 'stderr']:
1433 fake_file
= stream_class()
1434 file_double
= FileDouble(fake_file
=fake_file
)
1435 stream_double_name
= '{name}_double'.format(name
=stream_name
)
1436 setattr(self
, stream_double_name
, file_double
)
1438 super(SubprocessDouble
, self
).__init
__(*args
, **kwargs
)
1440 def set_popen_double(self
):
1441 """ Set the `PopenDouble` for this instance. """
1442 double
= PopenDouble(self
.argv
)
1443 double
.pid
= self
.pid
1445 self
.popen_double
= double
1449 "<SubprocessDouble instance: {id}"
1452 " stdin_double: {stdin_double!r}"
1453 " stdout_double: {stdout_double!r}"
1454 " stderr_double: {stderr_double!r}"
1457 path
=self
.path
, argv
=self
.argv
,
1458 stdin_double
=self
.stdin_double
,
1459 stdout_double
=self
.stdout_double
,
1460 stderr_double
=self
.stderr_double
)
1465 """ Make a unique PID for a subprocess. """
1466 for pid
in itertools
.count(1):
1469 def _register_by_pid(self
):
1470 """ Register this subprocess by its PID. """
1471 self
.__class
__.double_by_pid
[self
.pid
] = self
1473 def get_registry_key(self
):
1474 """ Get the registry key for this double. """
1475 result
= tuple(self
.argv
)
1478 stream_class
= io
.BytesIO
1479 stream_encoding
= "utf-8"
1481 def set_stdin_content(self
, text
, bytes_encoding
=stream_encoding
):
1482 """ Set the content of the `stdin` stream for this double. """
1483 content
= text
.encode(bytes_encoding
)
1484 fake_file
= self
.stream_class(content
)
1485 self
.stdin_double
.fake_file
= fake_file
1487 def set_stdout_content(self
, text
, bytes_encoding
=stream_encoding
):
1488 """ Set the content of the `stdout` stream for this double. """
1489 content
= text
.encode(bytes_encoding
)
1490 fake_file
= self
.stream_class(content
)
1491 self
.stdout_double
.fake_file
= fake_file
1493 def set_stderr_content(self
, text
, bytes_encoding
=stream_encoding
):
1494 """ Set the content of the `stderr` stream for this double. """
1495 content
= text
.encode(bytes_encoding
)
1496 fake_file
= self
.stream_class(content
)
1497 self
.stderr_double
.fake_file
= fake_file
1500 def make_fake_subprocess_scenarios(path
=None):
1501 """ Make a collection of scenarios for testing with fake files.
1503 :path: The filesystem path of the fake program. If not specified,
1504 a valid random path will be generated.
1505 :return: A collection of scenarios for tests involving subprocesses.
1507 The collection is a mapping from scenario name to a dictionary of
1508 scenario attributes.
1512 file_path
= tempfile
.mktemp()
1516 default_scenario_params
= {
1517 'return_value': EXIT_STATUS_SUCCESS
,
1518 'program_path': file_path
,
1519 'argv_after_command_name': [],
1525 'return_value': EXIT_STATUS_COMMAND_NOT_FOUND
,
1529 for (name
, scenario
) in scenarios
.items():
1530 params
= default_scenario_params
.copy()
1531 params
.update(scenario
)
1532 scenario
.update(params
)
1533 program_path
= params
['program_path']
1534 program_name
= os
.path
.basename(params
['program_path'])
1535 argv
= [program_name
]
1536 argv
.extend(params
['argv_after_command_name'])
1537 subprocess_double_params
= dict(
1541 subprocess_double
= SubprocessDouble(**subprocess_double_params
)
1542 scenario
['subprocess_double'] = subprocess_double
1543 scenario
['fake_file_scenario_name'] = name
1548 def get_subprocess_doubles_from_fake_subprocess_scenarios(scenarios
):
1549 """ Get the `SubprocessDouble` instances from fake subprocess scenarios.
1551 :param scenarios: Collection of fake subprocess scenarios.
1552 :return: Collection of `SubprocessDouble` instances.
1556 scenario
['subprocess_double']
1557 for scenario
in scenarios
1558 if scenario
['subprocess_double'] is not None)
1563 def setup_subprocess_double_behaviour(testcase
, doubles
=None):
1564 """ Set up subprocess double instances and behaviour.
1566 :param testcase: The `TestCase` instance to modify.
1567 :param doubles: Collection of `SubprocessDouble` instances.
1570 If `doubles` is ``None``, a default collection will be made
1571 from the return value of `make_fake_subprocess_scenarios`.
1575 scenarios
= make_fake_subprocess_scenarios()
1576 doubles
= get_subprocess_doubles_from_fake_subprocess_scenarios(
1579 for double
in doubles
:
1580 double
.register_for_testcase(testcase
)
1583 def setup_fake_subprocess_fixtures(testcase
):
1584 """ Set up fixtures for fake subprocess doubles.
1586 :param testcase: The `TestCase` instance to modify.
1590 scenarios
= make_fake_subprocess_scenarios()
1591 testcase
.fake_subprocess_scenarios
= scenarios
1593 doubles
= get_subprocess_doubles_from_fake_subprocess_scenarios(
1595 setup_subprocess_double_behaviour(testcase
, doubles
)
1598 def patch_os_popen(testcase
):
1599 """ Patch `os.popen` behaviour for this test case.
1601 :param testcase: The `TestCase` instance to modify.
1604 When the patched function is called, the registry of
1605 `SubprocessDouble` instances for this test case will be used
1606 to get the instance for the program path specified.
1609 orig_os_popen
= os
.popen
1611 def fake_os_popen(cmd
, mode
='r', buffering
=-1):
1612 registry
= SubprocessDouble
.get_registry_for_testcase(testcase
)
1613 if isinstance(cmd
, basestring
):
1614 command_argv
= shlex
.split(cmd
)
1617 if command_argv
in registry
:
1618 subprocess_double
= registry
[command_argv
]
1619 result
= subprocess_double
.os_popen_scenario
.call_hook(
1620 command_argv
, mode
, buffering
)
1622 result
= orig_os_popen(cmd
, mode
, buffering
)
1625 func_patcher
= mock
.patch
.object(
1626 os
, "popen", autospec
=True,
1627 side_effect
=fake_os_popen
)
1628 func_patcher
.start()
1629 testcase
.addCleanup(func_patcher
.stop
)
1632 def patch_os_waitpid(testcase
):
1633 """ Patch `os.waitpid` behaviour for this test case.
1635 :param testcase: The `TestCase` instance to modify.
1638 When the patched function is called, the registry of
1639 `SubprocessDouble` instances for this test case will be used
1640 to get the instance for the program path specified.
1643 orig_os_waitpid
= os
.waitpid
1645 def fake_os_waitpid(pid
, options
):
1646 registry
= SubprocessDouble
.double_by_pid
1648 subprocess_double
= registry
[pid
]
1649 result
= subprocess_double
.os_waitpid_scenario
.call_hook(
1652 result
= orig_os_waitpid(pid
, options
)
1655 func_patcher
= mock
.patch
.object(
1656 os
, "waitpid", autospec
=True,
1657 side_effect
=fake_os_waitpid
)
1658 func_patcher
.start()
1659 testcase
.addCleanup(func_patcher
.stop
)
1662 def patch_os_system(testcase
):
1663 """ Patch `os.system` behaviour for this test case.
1665 :param testcase: The `TestCase` instance to modify.
1668 When the patched function is called, the registry of
1669 `SubprocessDouble` instances for this test case will be used
1670 to get the instance for the program path specified.
1673 orig_os_system
= os
.system
1675 def fake_os_system(command
):
1676 registry
= SubprocessDouble
.get_registry_for_testcase(testcase
)
1677 command_argv
= shlex
.split(command
)
1678 if command_argv
in registry
:
1679 subprocess_double
= registry
[command_argv
]
1680 result
= subprocess_double
.os_system_scenario
.call_hook(
1683 result
= orig_os_system(command
)
1686 func_patcher
= mock
.patch
.object(
1687 os
, "system", autospec
=True,
1688 side_effect
=fake_os_system
)
1689 func_patcher
.start()
1690 testcase
.addCleanup(func_patcher
.stop
)
1693 def patch_os_spawnv(testcase
):
1694 """ Patch `os.spawnv` behaviour for this test case.
1696 :param testcase: The `TestCase` instance to modify.
1699 When the patched function is called, the registry of
1700 `SubprocessDouble` instances for this test case will be used
1701 to get the instance for the program path specified.
1704 orig_os_spawnv
= os
.spawnv
1706 def fake_os_spawnv(mode
, file, args
):
1707 registry
= SubprocessDouble
.get_registry_for_testcase(testcase
)
1708 registry_key
= tuple(args
)
1709 if registry_key
in registry
:
1710 subprocess_double
= registry
[registry_key
]
1711 result
= subprocess_double
.os_spawnv_scenario
.call_hook(
1714 result
= orig_os_spawnv(mode
, file, args
)
1717 func_patcher
= mock
.patch
.object(
1718 os
, "spawnv", autospec
=True,
1719 side_effect
=fake_os_spawnv
)
1720 func_patcher
.start()
1721 testcase
.addCleanup(func_patcher
.stop
)
1728 # vim: fileencoding=utf-8 filetype=python :