Correct markup for a normal bullet list.
[python-gajja.git] / gajja / __init__.py
bloba3d0cbb4a11e574aa6a7731d9f00b89ba9eddf8a
1 # -*- coding: utf-8; -*-
3 # gajja/__init__.py
4 # Part of Gajja, a Python test double library.
6 # Copyright © 2015–2016 Ben Finney <ben+python@benfinney.id.au>
8 # This is free software: you may copy, modify, and/or distribute this work
9 # under the terms of the GNU General Public License as published by the
10 # Free Software Foundation; version 3 of that license or any later version.
11 # No warranty expressed or implied. See the file ‘LICENSE.GPL-3’ for details.
13 """ Gajja: Fake objects for real tests
15 The `gajja` library provides a system of Python test double classes
16 for specific system objects::
18 * Filesystem entries
20 * Subprocesses
22 The Korean word 가짜 (*gajja*; IPA ˈkaːt͡ɕ̤a) means “fake thing”.
24 """
26 from __future__ import (absolute_import, unicode_literals)
28 import sys
30 if sys.version_info >= (3, 3):
31 import builtins
32 import unittest
33 import unittest.mock as mock
34 from io import StringIO as StringIO
35 import configparser
36 import collections.abc as collections_abc
37 elif sys.version_info >= (3, 0):
38 raise RuntimeError("Python 3 earlier than 3.3 is not supported.")
39 elif sys.version_info >= (2, 7):
40 # Python 2 standard library.
41 import __builtin__ as builtins
42 # Third-party backport of Python 3 unittest improvements.
43 import unittest2 as unittest
44 # Third-party mock library.
45 import mock
46 # Python 2 standard library.
47 from StringIO import StringIO as BaseStringIO
48 import ConfigParser as configparser
49 import collections as collections_abc
50 else:
51 raise RuntimeError("Python earlier than 2.7 is not supported.")
53 import os
54 import os.path
55 import io
56 import shutil
57 import tempfile
58 import pwd
59 import errno
60 import time
61 import signal
62 import subprocess
63 import functools
64 import itertools
65 import base64
66 import collections
67 import weakref
68 import shlex
70 __package__ = str("gajja")
71 __import__(__package__)
73 __metaclass__ = type
75 try:
76 # Python 2 types.
77 basestring
78 unicode
79 except NameError:
80 # Alias for Python 3 types.
81 basestring = str
82 unicode = str
85 def make_unique_slug(testcase):
86 """ Make a unique slug for the test case. """
87 text = base64.b64encode(
88 testcase.getUniqueString().encode('utf-8')
89 ).decode('utf-8')
90 result = text[-30:]
91 return result
94 try:
95 StringIO
96 except NameError:
97 # We don't yet have the StringIO we want. Create it.
99 class StringIO(BaseStringIO, object):
100 """ StringIO with a context manager. """
102 def __enter__(self):
103 return self
105 def __exit__(self, *args):
106 self.close()
107 return False
109 def readable(self):
110 return True
112 def writable(self):
113 return True
115 def seekable(self):
116 return True
119 def patch_stdout(testcase):
120 """ Patch `sys.stdout` for the specified test case. """
121 patcher = mock.patch.object(
122 sys, "stdout", wraps=StringIO())
123 patcher.start()
124 testcase.addCleanup(patcher.stop)
127 def patch_stderr(testcase):
128 """ Patch `sys.stderr` for the specified test case. """
129 patcher = mock.patch.object(
130 sys, "stderr", wraps=StringIO())
131 patcher.start()
132 testcase.addCleanup(patcher.stop)
135 def patch_signal_signal(testcase):
136 """ Patch `signal.signal` for the specified test case. """
137 func_patcher = mock.patch.object(signal, "signal", autospec=True)
138 func_patcher.start()
139 testcase.addCleanup(func_patcher.stop)
142 class FakeSystemExit(Exception):
143 """ Fake double for `SystemExit` exception. """
146 EXIT_STATUS_SUCCESS = 0
147 EXIT_STATUS_FAILURE = 1
148 EXIT_STATUS_COMMAND_NOT_FOUND = 127
151 def patch_sys_exit(testcase):
152 """ Patch `sys.exit` for the specified test case. """
153 func_patcher = mock.patch.object(
154 sys, "exit", autospec=True,
155 side_effect=FakeSystemExit())
156 func_patcher.start()
157 testcase.addCleanup(func_patcher.stop)
160 def patch_sys_argv(testcase):
161 """ Patch the `sys.argv` sequence for the test case. """
162 if not hasattr(testcase, 'progname'):
163 testcase.progname = make_unique_slug(testcase)
164 if not hasattr(testcase, 'sys_argv'):
165 testcase.sys_argv = [testcase.progname]
166 patcher = mock.patch.object(
167 sys, "argv",
168 new=list(testcase.sys_argv))
169 patcher.start()
170 testcase.addCleanup(patcher.stop)
173 def patch_system_interfaces(testcase):
174 """ Patch system interfaces that are disruptive to the test runner. """
175 patch_stdout(testcase)
176 patch_stderr(testcase)
177 patch_sys_exit(testcase)
178 patch_sys_argv(testcase)
181 def patch_time_time(testcase, values=None):
182 """ Patch the `time.time` function for the specified test case.
184 :param testcase: The `TestCase` instance for binding to the patch.
185 :param values: An iterable to provide return values.
186 :return: None.
189 if values is None:
190 values = itertools.count()
192 def generator_fake_time():
193 while True:
194 yield next(values)
196 func_patcher = mock.patch.object(time, "time", autospec=True)
197 func_patcher.start()
198 testcase.addCleanup(func_patcher.stop)
200 time.time.side_effect = generator_fake_time()
203 def patch_os_environ(testcase):
204 """ Patch the `os.environ` mapping. """
205 if not hasattr(testcase, 'os_environ'):
206 testcase.os_environ = {}
207 patcher = mock.patch.object(os, "environ", new=testcase.os_environ)
208 patcher.start()
209 testcase.addCleanup(patcher.stop)
212 def patch_os_getpid(testcase):
213 """ Patch `os.getpid` for the specified test case. """
214 func_patcher = mock.patch.object(os, "getpid", autospec=True)
215 func_patcher.start()
216 testcase.addCleanup(func_patcher.stop)
219 def patch_os_getuid(testcase):
220 """ Patch the `os.getuid` function. """
221 if not hasattr(testcase, 'os_getuid_return_value'):
222 testcase.os_getuid_return_value = testcase.getUniqueInteger()
223 func_patcher = mock.patch.object(
224 os, "getuid", autospec=True,
225 return_value=testcase.os_getuid_return_value)
226 func_patcher.start()
227 testcase.addCleanup(func_patcher.stop)
230 PasswdEntry = collections.namedtuple(
231 "PasswdEntry",
232 "pw_name pw_passwd pw_uid pw_gid pw_gecos pw_dir pw_shell")
235 def patch_pwd_getpwuid(testcase):
236 """ Patch the `pwd.getpwuid` function. """
237 if not hasattr(testcase, 'pwd_getpwuid_return_value'):
238 testcase.pwd_getpwuid_return_value = PasswdEntry(
239 pw_name=make_unique_slug(testcase),
240 pw_passwd=make_unique_slug(testcase),
241 pw_uid=testcase.getUniqueInteger(),
242 pw_gid=testcase.getUniqueInteger(),
243 pw_gecos=testcase.getUniqueString(),
244 pw_dir=tempfile.mktemp(),
245 pw_shell=tempfile.mktemp())
246 if not isinstance(testcase.pwd_getpwuid_return_value, pwd.struct_passwd):
247 pwent = pwd.struct_passwd(testcase.pwd_getpwuid_return_value)
248 else:
249 pwent = testcase.pwd_getpwuid_return_value
250 func_patcher = mock.patch.object(
251 pwd, "getpwuid", autospec=True,
252 return_value=pwent)
253 func_patcher.start()
254 testcase.addCleanup(func_patcher.stop)
257 def patch_os_path_exists(testcase):
258 """ Patch `os.path.exists` behaviour for this test case.
260 When the patched function is called, the registry of
261 `FileDouble` instances for this test case will be used to get
262 the instance for the path specified.
265 orig_os_path_exists = os.path.exists
267 def fake_os_path_exists(path):
268 registry = FileDouble.get_registry_for_testcase(testcase)
269 if path in registry:
270 file_double = registry[path]
271 result = file_double.os_path_exists_scenario.call_hook()
272 else:
273 result = orig_os_path_exists(path)
274 return result
276 func_patcher = mock.patch.object(
277 os.path, "exists", autospec=True,
278 side_effect=fake_os_path_exists)
279 func_patcher.start()
280 testcase.addCleanup(func_patcher.stop)
283 def patch_os_access(testcase):
284 """ Patch `os.access` behaviour for this test case.
286 When the patched function is called, the registry of
287 `FileDouble` instances for this test case will be used to get
288 the instance for the path specified.
291 orig_os_access = os.access
293 def fake_os_access(path, mode):
294 registry = FileDouble.get_registry_for_testcase(testcase)
295 if path in registry:
296 file_double = registry[path]
297 result = file_double.os_access_scenario.call_hook(mode)
298 else:
299 result = orig_os_access(path, mode)
300 return result
302 func_patcher = mock.patch.object(
303 os, "access", autospec=True,
304 side_effect=fake_os_access)
305 func_patcher.start()
306 testcase.addCleanup(func_patcher.stop)
309 StatResult = collections.namedtuple(
310 'StatResult', [
311 'st_mode',
312 'st_ino', 'st_dev', 'st_nlink',
313 'st_uid', 'st_gid',
314 'st_size',
315 'st_atime', 'st_mtime', 'st_ctime',
319 def patch_os_stat(testcase):
320 """ Patch `os.stat` behaviour for this test case.
322 When the patched function is called, the registry of
323 `FileDouble` instances for this test case will be used to get
324 the instance for the path specified.
327 orig_os_stat = os.stat
329 def fake_os_stat(path):
330 registry = FileDouble.get_registry_for_testcase(testcase)
331 if path in registry:
332 file_double = registry[path]
333 result = file_double.os_stat_scenario.call_hook()
334 else:
335 result = orig_os_stat(path)
336 return result
338 func_patcher = mock.patch.object(
339 os, "stat", autospec=True,
340 side_effect=fake_os_stat)
341 func_patcher.start()
342 testcase.addCleanup(func_patcher.stop)
345 def patch_os_lstat(testcase):
346 """ Patch `os.lstat` behaviour for this test case.
348 When the patched function is called, the registry of
349 `FileDouble` instances for this test case will be used to get
350 the instance for the path specified.
353 orig_os_lstat = os.lstat
355 def fake_os_lstat(path):
356 registry = FileDouble.get_registry_for_testcase(testcase)
357 if path in registry:
358 file_double = registry[path]
359 result = file_double.os_lstat_scenario.call_hook()
360 else:
361 result = orig_os_lstat(path)
362 return result
364 func_patcher = mock.patch.object(
365 os, "lstat", autospec=True,
366 side_effect=fake_os_lstat)
367 func_patcher.start()
368 testcase.addCleanup(func_patcher.stop)
371 def patch_os_unlink(testcase):
372 """ Patch `os.unlink` behaviour for this test case.
374 When the patched function is called, the registry of
375 `FileDouble` instances for this test case will be used to get
376 the instance for the path specified.
379 orig_os_unlink = os.unlink
381 def fake_os_unlink(path):
382 registry = FileDouble.get_registry_for_testcase(testcase)
383 if path in registry:
384 file_double = registry[path]
385 result = file_double.os_unlink_scenario.call_hook()
386 else:
387 result = orig_os_unlink(path)
388 return result
390 func_patcher = mock.patch.object(
391 os, "unlink", autospec=True,
392 side_effect=fake_os_unlink)
393 func_patcher.start()
394 testcase.addCleanup(func_patcher.stop)
397 def patch_os_rmdir(testcase):
398 """ Patch `os.rmdir` behaviour for this test case.
400 When the patched function is called, the registry of
401 `FileDouble` instances for this test case will be used to get
402 the instance for the path specified.
405 orig_os_rmdir = os.rmdir
407 def fake_os_rmdir(path):
408 registry = FileDouble.get_registry_for_testcase(testcase)
409 if path in registry:
410 file_double = registry[path]
411 result = file_double.os_rmdir_scenario.call_hook()
412 else:
413 result = orig_os_rmdir(path)
414 return result
416 func_patcher = mock.patch.object(
417 os, "rmdir", autospec=True,
418 side_effect=fake_os_rmdir)
419 func_patcher.start()
420 testcase.addCleanup(func_patcher.stop)
423 def patch_shutil_rmtree(testcase):
424 """ Patch `shutil.rmtree` behaviour for this test case.
426 When the patched function is called, the registry of
427 `FileDouble` instances for this test case will be used to get
428 the instance for the path specified.
431 orig_shutil_rmtree = os.rmdir
433 def fake_shutil_rmtree(path, ignore_errors=False, onerror=None):
434 registry = FileDouble.get_registry_for_testcase(testcase)
435 if path in registry:
436 file_double = registry[path]
437 result = file_double.shutil_rmtree_scenario.call_hook()
438 else:
439 result = orig_shutil_rmtree(path)
440 return result
442 func_patcher = mock.patch.object(
443 shutil, "rmtree", autospec=True,
444 side_effect=fake_shutil_rmtree)
445 func_patcher.start()
446 testcase.addCleanup(func_patcher.stop)
449 def patch_tempfile_mkdtemp(testcase):
450 """ Patch the `tempfile.mkdtemp` function for this test case. """
451 if not hasattr(testcase, 'tempfile_mkdtemp_file_double'):
452 testcase.tempfile_mkdtemp_file_double = FileDouble(tempfile.mktemp())
454 double = testcase.tempfile_mkdtemp_file_double
455 double.set_os_unlink_scenario('okay')
456 double.set_os_rmdir_scenario('okay')
457 double.register_for_testcase(testcase)
459 func_patcher = mock.patch.object(tempfile, "mkdtemp", autospec=True)
460 func_patcher.start()
461 testcase.addCleanup(func_patcher.stop)
463 tempfile.mkdtemp.return_value = testcase.tempfile_mkdtemp_file_double.path
466 try:
467 FileNotFoundError
468 FileExistsError
469 PermissionError
470 except NameError:
471 # Python 2 uses IOError.
472 def _ensure_ioerror_args(init_args, init_kwargs, errno_value):
473 result_kwargs = init_kwargs
474 result_errno = errno_value
475 result_strerror = os.strerror(errno_value)
476 result_filename = None
477 if len(init_args) >= 3:
478 result_errno = init_args[0]
479 result_filename = init_args[2]
480 if 'errno' in init_kwargs:
481 result_errno = init_kwargs['errno']
482 del result_kwargs['errno']
483 if 'filename' in init_kwargs:
484 result_filename = init_kwargs['filename']
485 del result_kwargs['filename']
486 if len(init_args) >= 2:
487 result_strerror = init_args[1]
488 if 'strerror' in init_kwargs:
489 result_strerror = init_kwargs['strerror']
490 del result_kwargs['strerror']
491 result_args = (result_errno, result_strerror, result_filename)
492 return (result_args, result_kwargs)
494 class FileNotFoundError(IOError):
495 def __init__(self, *args, **kwargs):
496 (args, kwargs) = _ensure_ioerror_args(
497 args, kwargs, errno_value=errno.ENOENT)
498 super(FileNotFoundError, self).__init__(*args, **kwargs)
500 class FileExistsError(IOError):
501 def __init__(self, *args, **kwargs):
502 (args, kwargs) = _ensure_ioerror_args(
503 args, kwargs, errno_value=errno.EEXIST)
504 super(FileExistsError, self).__init__(*args, **kwargs)
506 class PermissionError(IOError):
507 def __init__(self, *args, **kwargs):
508 (args, kwargs) = _ensure_ioerror_args(
509 args, kwargs, errno_value=errno.EPERM)
510 super(PermissionError, self).__init__(*args, **kwargs)
513 def make_fake_file_scenarios(path=None):
514 """ Make a collection of scenarios for testing with fake files.
516 :path: The filesystem path of the fake file. If not specified,
517 a valid random path will be generated.
518 :return: A collection of scenarios for tests involving input files.
520 The collection is a mapping from scenario name to a dictionary of
521 scenario attributes.
525 if path is None:
526 file_path = tempfile.mktemp()
527 else:
528 file_path = path
530 fake_file_empty = StringIO()
531 fake_file_minimal = StringIO("Lorem ipsum.")
532 fake_file_large = StringIO("\n".join(
533 "ABCDEFGH" * 100
534 for __ in range(1000)))
536 default_scenario_params = {
537 'open_scenario_name': 'okay',
538 'file_double_params': dict(
539 path=file_path, fake_file=fake_file_minimal),
542 scenarios = {
543 'default': {},
544 'error-not-exist': {
545 'open_scenario_name': 'nonexist',
547 'error-exist': {
548 'open_scenario_name': 'exist_error',
550 'error-read-denied': {
551 'open_scenario_name': 'read_denied',
553 'not-found': {
554 'file_double_params': dict(
555 path=file_path, fake_file=fake_file_empty),
557 'exist-empty': {
558 'file_double_params': dict(
559 path=file_path, fake_file=fake_file_empty),
561 'exist-minimal': {
562 'file_double_params': dict(
563 path=file_path, fake_file=fake_file_minimal),
565 'exist-large': {
566 'file_double_params': dict(
567 path=file_path, fake_file=fake_file_large),
571 for (name, scenario) in scenarios.items():
572 params = default_scenario_params.copy()
573 params.update(scenario)
574 scenario.update(params)
575 scenario['file_double'] = FileDouble(**scenario['file_double_params'])
576 scenario['file_double'].set_open_scenario(params['open_scenario_name'])
577 scenario['fake_file_scenario_name'] = name
579 return scenarios
582 def get_file_doubles_from_fake_file_scenarios(scenarios):
583 """ Get the `FileDouble` instances from fake file scenarios.
585 :param scenarios: Collection of fake file scenarios.
586 :return: Collection of `FileDouble` instances.
589 doubles = set(
590 scenario['file_double']
591 for scenario in scenarios
592 if scenario['file_double'] is not None)
594 return doubles
597 def setup_file_double_behaviour(testcase, doubles=None):
598 """ Set up file double instances and behaviour.
600 :param testcase: The `TestCase` instance to modify.
601 :param doubles: Collection of `FileDouble` instances.
602 :return: None.
604 If `doubles` is ``None``, a default collection will be made
605 from the result of `make_fake_file_scenarios` result.
608 if doubles is None:
609 scenarios = make_fake_file_scenarios()
610 doubles = get_file_doubles_from_fake_file_scenarios(
611 scenarios.values())
613 for file_double in doubles:
614 file_double.register_for_testcase(testcase)
616 orig_open = builtins.open
618 def fake_open(path, mode='rt', buffering=-1):
619 registry = FileDouble.get_registry_for_testcase(testcase)
620 if path in registry:
621 file_double = registry[path]
622 result = file_double.builtins_open_scenario.call_hook(
623 mode, buffering)
624 else:
625 result = orig_open(path, mode, buffering)
626 return result
628 mock_open = mock.mock_open()
629 mock_open.side_effect = fake_open
631 func_patcher = mock.patch.object(
632 builtins, "open", new=mock_open)
633 func_patcher.start()
634 testcase.addCleanup(func_patcher.stop)
637 def setup_fake_file_fixtures(testcase):
638 """ Set up fixtures for fake file doubles.
640 :param testcase: The `TestCase` instance to modify.
641 :return: None.
644 scenarios = make_fake_file_scenarios()
645 testcase.fake_file_scenarios = scenarios
647 file_doubles = get_file_doubles_from_fake_file_scenarios(
648 scenarios.values())
649 setup_file_double_behaviour(testcase, file_doubles)
652 def set_fake_file_scenario(testcase, name):
653 """ Set the named fake file scenario for the test case. """
654 scenario = testcase.fake_file_scenarios[name]
655 testcase.fake_file_scenario = scenario
656 testcase.file_double = scenario['file_double']
657 testcase.file_double.register_for_testcase(testcase)
660 class TestDoubleFunctionScenario:
661 """ Scenario for fake behaviour of a specific function. """
663 def __init__(self, scenario_name, double):
664 self.scenario_name = scenario_name
665 self.double = double
667 self.call_hook = getattr(
668 self, "_hook_{name}".format(name=self.scenario_name))
670 def __repr__(self):
671 text = (
672 "<{class_name} instance: {id}"
673 " name: {name!r},"
674 " call_hook name: {hook_name!r}"
675 " double: {double!r}"
676 ">").format(
677 class_name=self.__class__.__name__, id=id(self),
678 name=self.scenario_name, double=self.double,
679 hook_name=self.call_hook.__name__)
680 return text
682 def __eq__(self, other):
683 result = True
684 if not self.scenario_name == other.scenario_name:
685 result = False
686 if not self.double == other.double:
687 result = False
688 if not self.call_hook.__name__ == other.call_hook.__name__:
689 result = False
690 return result
692 def __ne__(self, other):
693 result = not self.__eq__(other)
694 return result
697 class os_path_exists_scenario(TestDoubleFunctionScenario):
698 """ Scenario for `os.path.exists` behaviour. """
700 def _hook_exist(self):
701 return True
703 def _hook_not_exist(self):
704 return False
707 class os_access_scenario(TestDoubleFunctionScenario):
708 """ Scenario for `os.access` behaviour. """
710 def _hook_okay(self, mode):
711 return True
713 def _hook_not_exist(self, mode):
714 return False
716 def _hook_read_only(self, mode):
717 if mode & (os.W_OK | os.X_OK):
718 result = False
719 else:
720 result = True
721 return result
723 def _hook_denied(self, mode):
724 if mode & (os.R_OK | os.W_OK | os.X_OK):
725 result = False
726 else:
727 result = True
728 return result
731 class os_stat_scenario(TestDoubleFunctionScenario):
732 """ Scenario for `os.stat` behaviour. """
734 def _hook_okay(self):
735 return self.double.stat_result
737 def _hook_notfound_error(self):
738 raise FileNotFoundError(
739 self.double.path,
740 "No such file or directory: {path!r}".format(
741 path=self.double.path))
743 def _hook_denied_error(self):
744 raise PermissionError(
745 self.double.path,
746 "Permission denied")
749 class os_lstat_scenario(os_stat_scenario):
750 """ Scenario for `os.lstat` behaviour. """
753 class os_unlink_scenario(TestDoubleFunctionScenario):
754 """ Scenario for `os.unlink` behaviour. """
756 def _hook_okay(self):
757 return None
759 def _hook_nonexist(self):
760 error = FileNotFoundError(
761 self.double.path,
762 "No such file or directory: {path!r}".format(
763 path=self.double.path))
764 raise error
766 def _hook_denied(self):
767 error = PermissionError(
768 self.double.path,
769 "Permission denied")
770 raise error
773 class os_rmdir_scenario(TestDoubleFunctionScenario):
774 """ Scenario for `os.rmdir` behaviour. """
776 def _hook_okay(self):
777 return None
779 def _hook_nonexist(self):
780 error = FileNotFoundError(
781 self.double.path,
782 "No such file or directory: {path!r}".format(
783 path=self.double.path))
784 raise error
786 def _hook_denied(self):
787 error = PermissionError(
788 self.double.path,
789 "Permission denied")
790 raise error
793 class shutil_rmtree_scenario(TestDoubleFunctionScenario):
794 """ Scenario for `shutil.rmtree` behaviour. """
796 def _hook_okay(self):
797 return None
799 def _hook_nonexist(self):
800 error = FileNotFoundError(
801 self.double.path,
802 "No such file or directory: {path!r}".format(
803 path=self.double.path))
804 raise error
806 def _hook_denied(self):
807 error = PermissionError(
808 self.double.path,
809 "Permission denied")
810 raise error
813 class builtins_open_scenario(TestDoubleFunctionScenario):
814 """ Scenario for `builtins.open` behaviour. """
816 def _hook_okay(self, mode, buffering):
817 result = self.double.fake_file
818 return result
820 def _hook_nonexist(self, mode, buffering):
821 if mode.startswith('r'):
822 error = FileNotFoundError(
823 self.double.path,
824 "No such file or directory: {path!r}".format(
825 path=self.double.path))
826 raise error
827 result = self.double.fake_file
828 return result
830 def _hook_exist_error(self, mode, buffering):
831 if mode.startswith('w') or mode.startswith('a'):
832 error = FileExistsError(
833 self.double.path,
834 "File already exists: {path!r}".format(
835 path=self.double.path))
836 raise error
837 result = self.double.fake_file
838 return result
840 def _hook_read_denied(self, mode, buffering):
841 if mode.startswith('r'):
842 error = PermissionError(
843 self.double.path,
844 "Read denied on {path!r}".format(
845 path=self.double.path))
846 raise error
847 result = self.double.fake_file
848 return result
850 def _hook_write_denied(self, mode, buffering):
851 if mode.startswith('w') or mode.startswith('a'):
852 error = PermissionError(
853 self.double.path,
854 "Write denied on {path!r}".format(
855 path=self.double.path))
856 raise error
857 result = self.double.fake_file
858 return result
861 class TestDoubleWithRegistry:
862 """ Abstract base class for a test double with a test case registry. """
864 registry_class = NotImplemented
865 registries = NotImplemented
867 function_scenario_params_by_class = NotImplemented
869 def __new__(cls, *args, **kwargs):
870 superclass = super(TestDoubleWithRegistry, cls)
871 if superclass.__new__ is object.__new__:
872 # The ‘object’ implementation complains about extra arguments.
873 instance = superclass.__new__(cls)
874 else:
875 instance = superclass.__new__(cls, *args, **kwargs)
876 instance.make_set_scenario_methods()
878 return instance
880 def __init__(self, *args, **kwargs):
881 super(TestDoubleWithRegistry, self).__init__(*args, **kwargs)
882 self._set_method_per_scenario()
884 def _make_set_scenario_method(self, scenario_class, params):
885 def method(self, name):
886 scenario = scenario_class(name, double=self)
887 setattr(self, scenario_class.__name__, scenario)
888 method.__doc__ = (
889 """ Set the scenario for `{name}` behaviour. """
890 ).format(name=scenario_class.__name__)
891 method.__name__ = str(params['set_scenario_method_name'])
892 return method
894 def make_set_scenario_methods(self):
895 """ Make `set_<scenario_class_name>` methods on this class. """
896 for (function_scenario_class, function_scenario_params) in (
897 self.function_scenario_params_by_class.items()):
898 method = self._make_set_scenario_method(
899 function_scenario_class, function_scenario_params)
900 setattr(self.__class__, method.__name__, method)
901 function_scenario_params['set_scenario_method'] = method
903 def _set_method_per_scenario(self):
904 """ Set the method to be called for each scenario. """
905 for function_scenario_params in (
906 self.function_scenario_params_by_class.values()):
907 function_scenario_params['set_scenario_method'](
908 self, function_scenario_params['default_scenario_name'])
910 @classmethod
911 def get_registry_for_testcase(cls, testcase):
912 """ Get the FileDouble registry for the specified test case. """
913 # Key in a dict must be hashable.
914 key = (testcase.__class__, id(testcase))
915 registry = cls.registries.setdefault(key, cls.registry_class())
916 return registry
918 def get_registry_key(self):
919 """ Get the registry key for this double. """
920 raise NotImplementedError
922 def register_for_testcase(self, testcase):
923 """ Add this instance to registry for the specified testcase. """
924 registry = self.get_registry_for_testcase(testcase)
925 key = self.get_registry_key()
926 registry[key] = self
927 unregister_func = functools.partial(
928 self.unregister_for_testcase, testcase)
929 testcase.addCleanup(unregister_func)
931 def unregister_for_testcase(self, testcase):
932 """ Remove this instance from registry for the specified testcase. """
933 registry = self.get_registry_for_testcase(testcase)
934 key = self.get_registry_key()
935 if key in registry:
936 registry.pop(key)
939 def copy_fake_file(fake_file):
940 """ Make a copy of the StringIO instance. """
941 fake_file_type = StringIO
942 content = ""
943 if fake_file is not None:
944 fake_file_type = type(fake_file)
945 content = fake_file.getvalue()
946 assert issubclass(fake_file_type, object)
947 result = fake_file_type(content)
948 if hasattr(fake_file, 'encoding'):
949 if not hasattr(result, 'encoding'):
950 result.encoding = fake_file.encoding
951 return result
954 class FileDouble(TestDoubleWithRegistry):
955 """ A testing double for a file. """
957 registry_class = dict
958 registries = {}
960 function_scenario_params_by_class = {
961 os_path_exists_scenario: {
962 'default_scenario_name': 'not_exist',
963 'set_scenario_method_name': 'set_os_path_exists_scenario',
965 os_access_scenario: {
966 'default_scenario_name': 'okay',
967 'set_scenario_method_name': 'set_os_access_scenario',
969 os_stat_scenario: {
970 'default_scenario_name': 'okay',
971 'set_scenario_method_name': 'set_os_stat_scenario',
973 os_lstat_scenario: {
974 'default_scenario_name': 'okay',
975 'set_scenario_method_name': 'set_os_lstat_scenario',
977 builtins_open_scenario: {
978 'default_scenario_name': 'okay',
979 'set_scenario_method_name': 'set_open_scenario',
981 os_unlink_scenario: {
982 'default_scenario_name': 'okay',
983 'set_scenario_method_name': 'set_os_unlink_scenario',
985 os_rmdir_scenario: {
986 'default_scenario_name': 'okay',
987 'set_scenario_method_name': 'set_os_rmdir_scenario',
989 shutil_rmtree_scenario: {
990 'default_scenario_name': 'okay',
991 'set_scenario_method_name': 'set_shutil_rmtree_scenario',
995 def __init__(self, path=None, fake_file=None, *args, **kwargs):
996 self.path = path
997 self.fake_file = copy_fake_file(fake_file)
998 self.fake_file.name = path
1000 self._set_stat_result()
1002 super(FileDouble, self).__init__(*args, **kwargs)
1004 def _set_stat_result(self):
1005 """ Set the `os.stat` result for this file. """
1006 size = len(self.fake_file.getvalue())
1007 self.stat_result = StatResult(
1008 st_mode=0,
1009 st_ino=None, st_dev=None, st_nlink=None,
1010 st_uid=0, st_gid=0,
1011 st_size=size,
1012 st_atime=None, st_mtime=None, st_ctime=None,
1015 def __repr__(self):
1016 text = "FileDouble(path={path!r}, fake_file={fake_file!r})".format(
1017 path=self.path, fake_file=self.fake_file)
1018 return text
1020 def get_registry_key(self):
1021 """ Get the registry key for this double. """
1022 result = self.path
1023 return result
1026 class os_popen_scenario(TestDoubleFunctionScenario):
1027 """ Scenario for `os.popen` behaviour. """
1029 stream_name_by_mode = {
1030 'w': 'stdin',
1031 'r': 'stdout',
1034 def _hook_success(self, argv, mode, buffering):
1035 stream_name = self.stream_name_by_mode[mode]
1036 stream_double = getattr(
1037 self.double, stream_name + '_double')
1038 result = stream_double.fake_file
1039 return result
1041 def _hook_failure(self, argv, mode, buffering):
1042 result = StringIO()
1043 return result
1045 def _hook_not_found(self, argv, mode, buffering):
1046 result = StringIO()
1047 return result
1050 class os_waitpid_scenario(TestDoubleFunctionScenario):
1051 """ Scenario for `os.waitpid` behaviour. """
1053 def _hook_success(self, pid, options):
1054 result = (pid, EXIT_STATUS_SUCCESS)
1055 return result
1057 def _hook_failure(self, pid, options):
1058 result = (pid, EXIT_STATUS_FAILURE)
1059 return result
1061 def _hook_not_found(self, pid, options):
1062 error = OSError(errno.ECHILD)
1063 raise error
1066 class os_system_scenario(TestDoubleFunctionScenario):
1067 """ Scenario for `os.system` behaviour. """
1069 def _hook_success(self, command):
1070 result = EXIT_STATUS_SUCCESS
1071 return result
1073 def _hook_failure(self, command):
1074 result = EXIT_STATUS_FAILURE
1075 return result
1077 def _hook_not_found(self, command):
1078 result = EXIT_STATUS_COMMAND_NOT_FOUND
1079 return result
1082 class os_spawnv_scenario(TestDoubleFunctionScenario):
1083 """ Scenario for `os.spawnv` behaviour. """
1085 def _hook_success(self, mode, file, args):
1086 result = EXIT_STATUS_SUCCESS
1087 return result
1089 def _hook_failure(self, mode, file, args):
1090 result = EXIT_STATUS_FAILURE
1091 return result
1093 def _hook_not_found(self, mode, file, args):
1094 result = EXIT_STATUS_COMMAND_NOT_FOUND
1095 return result
1098 ARG_ANY = object()
1099 ARG_MORE = object()
1102 class PopenDouble:
1103 """ A testing double for `subprocess.Popen`. """
1105 def __init__(self, args, *posargs, **kwargs):
1106 self.stdin = None
1107 self.stdout = None
1108 self.stderr = None
1109 self.pid = None
1110 self.returncode = None
1112 if kwargs.get('shell', False):
1113 self.argv = shlex.split(args)
1114 else:
1115 # The paramter is already a sequence of command-line arguments.
1116 self.argv = args
1118 def set_streams(self, subprocess_double, popen_kwargs):
1119 """ Set the streams on the `PopenDouble`.
1121 :param subprocess_double: The `SubprocessDouble` from
1122 which to get existing stream doubles.
1123 :param popen_kwargs: The keyword arguments to the
1124 `subprocess.Popen` call.
1125 :return: ``None``.
1128 for stream_name in (
1129 name for name in ['stdin', 'stdout', 'stderr']
1130 if name in popen_kwargs):
1131 stream_spec = popen_kwargs[stream_name]
1132 if stream_spec is subprocess.PIPE:
1133 stream_double = getattr(
1134 subprocess_double,
1135 "{name}_double".format(name=stream_name))
1136 stream_file = stream_double.fake_file
1137 elif stream_spec is subprocess.STDOUT:
1138 stream_file = subprocess_double.stdout_double.fake_file
1139 else:
1140 stream_file = stream_spec
1141 setattr(self, stream_name, stream_file)
1143 def wait(self):
1144 """ Wait for subprocess to terminate. """
1145 return self.returncode
1148 class subprocess_popen_scenario(TestDoubleFunctionScenario):
1149 """ Scenario for `subprocess.Popen` behaviour. """
1151 def _hook_success(self, testcase, args, *posargs, **kwargs):
1152 double = self.double.popen_double
1153 double.set_streams(self.double, kwargs)
1154 return double
1157 def patch_subprocess_popen(testcase):
1158 """ Patch `subprocess.Popen` constructor for this test case.
1160 :param testcase: The `TestCase` instance to modify.
1161 :return: None.
1163 When the patched function is called, the registry of
1164 `SubprocessDouble` instances for this test case will be used
1165 to get the instance for the program path specified.
1168 orig_subprocess_popen = subprocess.Popen
1170 def fake_subprocess_popen(args, *posargs, **kwargs):
1171 if kwargs.get('shell', False):
1172 argv = shlex.split(args)
1173 else:
1174 argv = args
1175 registry = SubprocessDouble.get_registry_for_testcase(testcase)
1176 if argv in registry:
1177 subprocess_double = registry[argv]
1178 result = subprocess_double.subprocess_popen_scenario.call_hook(
1179 testcase, args, *posargs, **kwargs)
1180 else:
1181 result = orig_subprocess_popen(args, *posargs, **kwargs)
1182 return result
1184 func_patcher = mock.patch.object(
1185 subprocess, "Popen", autospec=True,
1186 side_effect=fake_subprocess_popen)
1187 func_patcher.start()
1188 testcase.addCleanup(func_patcher.stop)
1191 class subprocess_call_scenario(TestDoubleFunctionScenario):
1192 """ Scenario for `subprocess.call` behaviour. """
1194 def _hook_success(self, command):
1195 result = EXIT_STATUS_SUCCESS
1196 return result
1198 def _hook_failure(self, command):
1199 result = EXIT_STATUS_FAILURE
1200 return result
1202 def _hook_not_found(self, command):
1203 result = EXIT_STATUS_COMMAND_NOT_FOUND
1204 return result
1207 def patch_subprocess_call(testcase):
1208 """ Patch `subprocess.call` function for this test case.
1210 :param testcase: The `TestCase` instance to modify.
1211 :return: None.
1213 When the patched function is called, the registry of
1214 `SubprocessDouble` instances for this test case will be used
1215 to get the instance for the program path specified.
1218 orig_subprocess_call = subprocess.call
1220 def fake_subprocess_call(command, *posargs, **kwargs):
1221 if kwargs.get('shell', False):
1222 command_argv = shlex.split(command)
1223 else:
1224 command_argv = command
1225 registry = SubprocessDouble.get_registry_for_testcase(testcase)
1226 if command_argv in registry:
1227 subprocess_double = registry[command_argv]
1228 result = subprocess_double.subprocess_call_scenario.call_hook(
1229 command)
1230 else:
1231 result = orig_subprocess_call(command, *posargs, **kwargs)
1232 return result
1234 func_patcher = mock.patch.object(
1235 subprocess, "call", autospec=True,
1236 side_effect=fake_subprocess_call)
1237 func_patcher.start()
1238 testcase.addCleanup(func_patcher.stop)
1241 class subprocess_check_call_scenario(TestDoubleFunctionScenario):
1242 """ Scenario for `subprocess.check_call` behaviour. """
1244 def _hook_success(self, command):
1245 return None
1247 def _hook_failure(self, command):
1248 result = EXIT_STATUS_FAILURE
1249 error = subprocess.CalledProcessError(result, command)
1250 raise error
1252 def _hook_not_found(self, command):
1253 result = EXIT_STATUS_COMMAND_NOT_FOUND
1254 error = subprocess.CalledProcessError(result, command)
1255 raise error
1258 def patch_subprocess_check_call(testcase):
1259 """ Patch `subprocess.check_call` function for this test case.
1261 :param testcase: The `TestCase` instance to modify.
1262 :return: None.
1264 When the patched function is called, the registry of
1265 `SubprocessDouble` instances for this test case will be used
1266 to get the instance for the program path specified.
1269 orig_subprocess_check_call = subprocess.check_call
1271 def fake_subprocess_check_call(command, *posargs, **kwargs):
1272 if kwargs.get('shell', False):
1273 command_argv = shlex.split(command)
1274 else:
1275 command_argv = command
1276 registry = SubprocessDouble.get_registry_for_testcase(testcase)
1277 if command_argv in registry:
1278 subprocess_double = registry[command_argv]
1279 scenario = subprocess_double.subprocess_check_call_scenario
1280 result = scenario.call_hook(command)
1281 else:
1282 result = orig_subprocess_check_call(command, *posargs, **kwargs)
1283 return result
1285 func_patcher = mock.patch.object(
1286 subprocess, "check_call", autospec=True,
1287 side_effect=fake_subprocess_check_call)
1288 func_patcher.start()
1289 testcase.addCleanup(func_patcher.stop)
1292 class SubprocessDoubleRegistry(collections_abc.MutableMapping):
1293 """ Registry of `SubprocessDouble` instances by `argv`. """
1295 def __init__(self, *args, **kwargs):
1296 items = []
1297 if args:
1298 if isinstance(args[0], collections_abc.Mapping):
1299 items = args[0].items()
1300 if isinstance(args[0], collections_abc.Iterable):
1301 items = args[0]
1302 self._mapping = dict(items)
1304 def __repr__(self):
1305 text = "<{class_name} object: {mapping}>".format(
1306 class_name=self.__class__.__name__, mapping=self._mapping)
1307 return text
1309 def _match_argv(self, argv):
1310 """ Match the specified `argv` with our registered keys. """
1311 match = None
1312 if not isinstance(argv, collections_abc.Sequence):
1313 return match
1314 candidates = iter(self._mapping)
1315 while match is None:
1316 try:
1317 candidate = next(candidates)
1318 except StopIteration:
1319 break
1320 found = None
1321 if candidate == argv:
1322 # An exact match.
1323 found = True
1324 word_iter = enumerate(candidate)
1325 while found is None:
1326 try:
1327 (word_index, candidate_word) = next(word_iter)
1328 except StopIteration:
1329 break
1330 if candidate_word is ARG_MORE:
1331 # Candiate matches any remaining words. We have a match.
1332 found = True
1333 elif word_index > len(argv):
1334 # Candidate is too long for the specified argv.
1335 found = False
1336 elif candidate_word is ARG_ANY:
1337 # Candidate matches any word at this position.
1338 continue
1339 elif candidate_word == argv[word_index]:
1340 # Candidate matches the word at this position.
1341 continue
1342 else:
1343 # This candidate does not match.
1344 found = False
1345 if found is None:
1346 # Reached the end of the candidate without a mismatch.
1347 found = True
1348 if found:
1349 match = candidate
1350 return match
1352 def __getitem__(self, key):
1353 match = self._match_argv(key)
1354 if match is None:
1355 raise KeyError(key)
1356 result = self._mapping[match]
1357 return result
1359 def __setitem__(self, key, value):
1360 if key in self:
1361 del self[key]
1362 self._mapping[key] = value
1364 def __delitem__(self, key):
1365 match = self._match_argv(key)
1366 if match is not None:
1367 del self._mapping[match]
1369 def __iter__(self):
1370 return self._mapping.__iter__()
1372 def __len__(self):
1373 return self._mapping.__len__()
1376 class SubprocessDouble(TestDoubleWithRegistry):
1377 """ A testing double for a subprocess. """
1379 registry_class = SubprocessDoubleRegistry
1380 registries = {}
1382 double_by_pid = weakref.WeakValueDictionary()
1384 function_scenario_params_by_class = {
1385 subprocess_popen_scenario: {
1386 'default_scenario_name': 'success',
1387 'set_scenario_method_name': 'set_subprocess_popen_scenario',
1389 subprocess_call_scenario: {
1390 'default_scenario_name': 'success',
1391 'set_scenario_method_name': 'set_subprocess_call_scenario',
1393 subprocess_check_call_scenario: {
1394 'default_scenario_name': 'success',
1395 'set_scenario_method_name':
1396 'set_subprocess_check_call_scenario',
1398 os_popen_scenario: {
1399 'default_scenario_name': 'success',
1400 'set_scenario_method_name': 'set_os_popen_scenario',
1402 os_waitpid_scenario: {
1403 'default_scenario_name': 'success',
1404 'set_scenario_method_name': 'set_os_waitpid_scenario',
1406 os_system_scenario: {
1407 'default_scenario_name': 'success',
1408 'set_scenario_method_name': 'set_os_system_scenario',
1410 os_spawnv_scenario: {
1411 'default_scenario_name': 'success',
1412 'set_scenario_method_name': 'set_os_spawnv_scenario',
1416 def __init__(self, path=None, argv=None, *args, **kwargs):
1417 if path is None:
1418 path = tempfile.mktemp()
1419 self.path = path
1421 if argv is None:
1422 command_name = os.path.basename(path)
1423 argv = [command_name]
1424 self.argv = argv
1426 self.pid = self._make_pid()
1427 self._register_by_pid()
1429 self.set_popen_double()
1431 stream_class = SubprocessDouble.stream_class
1432 for stream_name in ['stdin', 'stdout', 'stderr']:
1433 fake_file = stream_class()
1434 file_double = FileDouble(fake_file=fake_file)
1435 stream_double_name = '{name}_double'.format(name=stream_name)
1436 setattr(self, stream_double_name, file_double)
1438 super(SubprocessDouble, self).__init__(*args, **kwargs)
1440 def set_popen_double(self):
1441 """ Set the `PopenDouble` for this instance. """
1442 double = PopenDouble(self.argv)
1443 double.pid = self.pid
1445 self.popen_double = double
1447 def __repr__(self):
1448 text = (
1449 "<SubprocessDouble instance: {id}"
1450 " path: {path!r},"
1451 " argv: {argv!r}"
1452 " stdin_double: {stdin_double!r}"
1453 " stdout_double: {stdout_double!r}"
1454 " stderr_double: {stderr_double!r}"
1455 ">").format(
1456 id=id(self),
1457 path=self.path, argv=self.argv,
1458 stdin_double=self.stdin_double,
1459 stdout_double=self.stdout_double,
1460 stderr_double=self.stderr_double)
1461 return text
1463 @classmethod
1464 def _make_pid(cls):
1465 """ Make a unique PID for a subprocess. """
1466 for pid in itertools.count(1):
1467 yield pid
1469 def _register_by_pid(self):
1470 """ Register this subprocess by its PID. """
1471 self.__class__.double_by_pid[self.pid] = self
1473 def get_registry_key(self):
1474 """ Get the registry key for this double. """
1475 result = tuple(self.argv)
1476 return result
1478 stream_class = io.BytesIO
1479 stream_encoding = "utf-8"
1481 def set_stdin_content(self, text, bytes_encoding=stream_encoding):
1482 """ Set the content of the `stdin` stream for this double. """
1483 content = text.encode(bytes_encoding)
1484 fake_file = self.stream_class(content)
1485 self.stdin_double.fake_file = fake_file
1487 def set_stdout_content(self, text, bytes_encoding=stream_encoding):
1488 """ Set the content of the `stdout` stream for this double. """
1489 content = text.encode(bytes_encoding)
1490 fake_file = self.stream_class(content)
1491 self.stdout_double.fake_file = fake_file
1493 def set_stderr_content(self, text, bytes_encoding=stream_encoding):
1494 """ Set the content of the `stderr` stream for this double. """
1495 content = text.encode(bytes_encoding)
1496 fake_file = self.stream_class(content)
1497 self.stderr_double.fake_file = fake_file
1500 def make_fake_subprocess_scenarios(path=None):
1501 """ Make a collection of scenarios for testing with fake files.
1503 :path: The filesystem path of the fake program. If not specified,
1504 a valid random path will be generated.
1505 :return: A collection of scenarios for tests involving subprocesses.
1507 The collection is a mapping from scenario name to a dictionary of
1508 scenario attributes.
1511 if path is None:
1512 file_path = tempfile.mktemp()
1513 else:
1514 file_path = path
1516 default_scenario_params = {
1517 'return_value': EXIT_STATUS_SUCCESS,
1518 'program_path': file_path,
1519 'argv_after_command_name': [],
1522 scenarios = {
1523 'default': {},
1524 'not-found': {
1525 'return_value': EXIT_STATUS_COMMAND_NOT_FOUND,
1529 for (name, scenario) in scenarios.items():
1530 params = default_scenario_params.copy()
1531 params.update(scenario)
1532 scenario.update(params)
1533 program_path = params['program_path']
1534 program_name = os.path.basename(params['program_path'])
1535 argv = [program_name]
1536 argv.extend(params['argv_after_command_name'])
1537 subprocess_double_params = dict(
1538 path=program_path,
1539 argv=argv,
1541 subprocess_double = SubprocessDouble(**subprocess_double_params)
1542 scenario['subprocess_double'] = subprocess_double
1543 scenario['fake_file_scenario_name'] = name
1545 return scenarios
1548 def get_subprocess_doubles_from_fake_subprocess_scenarios(scenarios):
1549 """ Get the `SubprocessDouble` instances from fake subprocess scenarios.
1551 :param scenarios: Collection of fake subprocess scenarios.
1552 :return: Collection of `SubprocessDouble` instances.
1555 doubles = set(
1556 scenario['subprocess_double']
1557 for scenario in scenarios
1558 if scenario['subprocess_double'] is not None)
1560 return doubles
1563 def setup_subprocess_double_behaviour(testcase, doubles=None):
1564 """ Set up subprocess double instances and behaviour.
1566 :param testcase: The `TestCase` instance to modify.
1567 :param doubles: Collection of `SubprocessDouble` instances.
1568 :return: None.
1570 If `doubles` is ``None``, a default collection will be made
1571 from the return value of `make_fake_subprocess_scenarios`.
1574 if doubles is None:
1575 scenarios = make_fake_subprocess_scenarios()
1576 doubles = get_subprocess_doubles_from_fake_subprocess_scenarios(
1577 scenarios.values())
1579 for double in doubles:
1580 double.register_for_testcase(testcase)
1583 def setup_fake_subprocess_fixtures(testcase):
1584 """ Set up fixtures for fake subprocess doubles.
1586 :param testcase: The `TestCase` instance to modify.
1587 :return: None.
1590 scenarios = make_fake_subprocess_scenarios()
1591 testcase.fake_subprocess_scenarios = scenarios
1593 doubles = get_subprocess_doubles_from_fake_subprocess_scenarios(
1594 scenarios.values())
1595 setup_subprocess_double_behaviour(testcase, doubles)
1598 def patch_os_popen(testcase):
1599 """ Patch `os.popen` behaviour for this test case.
1601 :param testcase: The `TestCase` instance to modify.
1602 :return: None.
1604 When the patched function is called, the registry of
1605 `SubprocessDouble` instances for this test case will be used
1606 to get the instance for the program path specified.
1609 orig_os_popen = os.popen
1611 def fake_os_popen(cmd, mode='r', buffering=-1):
1612 registry = SubprocessDouble.get_registry_for_testcase(testcase)
1613 if isinstance(cmd, basestring):
1614 command_argv = shlex.split(cmd)
1615 else:
1616 command_argv = cmd
1617 if command_argv in registry:
1618 subprocess_double = registry[command_argv]
1619 result = subprocess_double.os_popen_scenario.call_hook(
1620 command_argv, mode, buffering)
1621 else:
1622 result = orig_os_popen(cmd, mode, buffering)
1623 return result
1625 func_patcher = mock.patch.object(
1626 os, "popen", autospec=True,
1627 side_effect=fake_os_popen)
1628 func_patcher.start()
1629 testcase.addCleanup(func_patcher.stop)
1632 def patch_os_waitpid(testcase):
1633 """ Patch `os.waitpid` behaviour for this test case.
1635 :param testcase: The `TestCase` instance to modify.
1636 :return: None.
1638 When the patched function is called, the registry of
1639 `SubprocessDouble` instances for this test case will be used
1640 to get the instance for the program path specified.
1643 orig_os_waitpid = os.waitpid
1645 def fake_os_waitpid(pid, options):
1646 registry = SubprocessDouble.double_by_pid
1647 if pid in registry:
1648 subprocess_double = registry[pid]
1649 result = subprocess_double.os_waitpid_scenario.call_hook(
1650 pid, options)
1651 else:
1652 result = orig_os_waitpid(pid, options)
1653 return result
1655 func_patcher = mock.patch.object(
1656 os, "waitpid", autospec=True,
1657 side_effect=fake_os_waitpid)
1658 func_patcher.start()
1659 testcase.addCleanup(func_patcher.stop)
1662 def patch_os_system(testcase):
1663 """ Patch `os.system` behaviour for this test case.
1665 :param testcase: The `TestCase` instance to modify.
1666 :return: None.
1668 When the patched function is called, the registry of
1669 `SubprocessDouble` instances for this test case will be used
1670 to get the instance for the program path specified.
1673 orig_os_system = os.system
1675 def fake_os_system(command):
1676 registry = SubprocessDouble.get_registry_for_testcase(testcase)
1677 command_argv = shlex.split(command)
1678 if command_argv in registry:
1679 subprocess_double = registry[command_argv]
1680 result = subprocess_double.os_system_scenario.call_hook(
1681 command)
1682 else:
1683 result = orig_os_system(command)
1684 return result
1686 func_patcher = mock.patch.object(
1687 os, "system", autospec=True,
1688 side_effect=fake_os_system)
1689 func_patcher.start()
1690 testcase.addCleanup(func_patcher.stop)
1693 def patch_os_spawnv(testcase):
1694 """ Patch `os.spawnv` behaviour for this test case.
1696 :param testcase: The `TestCase` instance to modify.
1697 :return: None.
1699 When the patched function is called, the registry of
1700 `SubprocessDouble` instances for this test case will be used
1701 to get the instance for the program path specified.
1704 orig_os_spawnv = os.spawnv
1706 def fake_os_spawnv(mode, file, args):
1707 registry = SubprocessDouble.get_registry_for_testcase(testcase)
1708 registry_key = tuple(args)
1709 if registry_key in registry:
1710 subprocess_double = registry[registry_key]
1711 result = subprocess_double.os_spawnv_scenario.call_hook(
1712 mode, file, args)
1713 else:
1714 result = orig_os_spawnv(mode, file, args)
1715 return result
1717 func_patcher = mock.patch.object(
1718 os, "spawnv", autospec=True,
1719 side_effect=fake_os_spawnv)
1720 func_patcher.start()
1721 testcase.addCleanup(func_patcher.stop)
1724 # Local variables:
1725 # coding: utf-8
1726 # mode: python
1727 # End:
1728 # vim: fileencoding=utf-8 filetype=python :