Improve wording of what the tutorial is demonstrating.
[python-gajja.git] / gajja / __init__.py
blobb7daef8987291857abfe4234f4afa08aec561f7c
1 # -*- coding: utf-8; -*-
3 # gajja/__init__.py
4 # Part of Gajja, a Python test double library.
6 # Copyright © 2015–2016 Ben Finney <ben+python@benfinney.id.au>
8 # This is free software: you may copy, modify, and/or distribute this work
9 # under the terms of the GNU General Public License as published by the
10 # Free Software Foundation; version 3 of that license or any later version.
11 # No warranty expressed or implied. See the file ‘LICENSE.GPL-3’ for details.
13 """ Gajja: Fake objects for real tests.
15 The `gajja` library provides a system of Python test double classes
16 for specific system objects:
18 * Filesystem entries
20 * Subprocesses
22 The Korean word 가짜 (*gajja*; IPA ˈkaːt͡ɕ̤a) means “fake thing”.
24 """
26 from __future__ import (absolute_import, unicode_literals)
28 import sys
30 if sys.version_info >= (3, 3):
31 import builtins
32 import unittest
33 import unittest.mock as mock
34 from io import StringIO as StringIO
35 import configparser
36 import collections.abc as collections_abc
37 elif sys.version_info >= (3, 0):
38 raise RuntimeError("Python 3 earlier than 3.3 is not supported.")
39 elif sys.version_info >= (2, 7):
40 # Python 2 standard library.
41 import __builtin__ as builtins
42 # Third-party backport of Python 3 unittest improvements.
43 import unittest2 as unittest
44 # Third-party mock library.
45 import mock
46 # Python 2 standard library.
47 from StringIO import StringIO as BaseStringIO
48 import ConfigParser as configparser
49 import collections as collections_abc
50 else:
51 raise RuntimeError("Python earlier than 2.7 is not supported.")
53 import os
54 import os.path
55 import io
56 import shutil
57 import tempfile
58 import errno
59 import time
60 import signal
61 import subprocess
62 import functools
63 import itertools
64 import base64
65 import collections
66 import weakref
67 import shlex
69 try:
70 import pwd
71 except ImportError:
72 # The ‘pwd’ module is not available on platforms other than Unix.
73 pwd = NotImplemented
75 __package__ = str("gajja")
76 __import__(__package__)
78 __metaclass__ = type
80 try:
81 # Python 2 types.
82 basestring
83 unicode
84 except NameError:
85 # Alias for Python 3 types.
86 basestring = str
87 unicode = str
90 def make_unique_slug(testcase):
91 """ Make a unique slug for the test case. """
92 text = base64.b64encode(
93 testcase.getUniqueString().encode('utf-8')
94 ).decode('utf-8')
95 result = text[-30:]
96 return result
99 try:
100 StringIO
101 except NameError:
102 # We don't yet have the StringIO we want. Create it.
104 class StringIO(BaseStringIO, object):
105 """ StringIO with a context manager. """
107 def __enter__(self):
108 return self
110 def __exit__(self, *args):
111 self.close()
112 return False
114 def readable(self):
115 return True
117 def writable(self):
118 return True
120 def seekable(self):
121 return True
124 def patch_stdout(testcase):
125 """ Patch `sys.stdout` for the specified test case. """
126 patcher = mock.patch.object(
127 sys, "stdout", wraps=StringIO())
128 patcher.start()
129 testcase.addCleanup(patcher.stop)
132 def patch_stderr(testcase):
133 """ Patch `sys.stderr` for the specified test case. """
134 patcher = mock.patch.object(
135 sys, "stderr", wraps=StringIO())
136 patcher.start()
137 testcase.addCleanup(patcher.stop)
140 def patch_signal_signal(testcase):
141 """ Patch `signal.signal` for the specified test case. """
142 func_patcher = mock.patch.object(signal, "signal", autospec=True)
143 func_patcher.start()
144 testcase.addCleanup(func_patcher.stop)
147 class FakeSystemExit(Exception):
148 """ Fake double for `SystemExit` exception. """
151 EXIT_STATUS_SUCCESS = 0
152 EXIT_STATUS_FAILURE = 1
153 EXIT_STATUS_COMMAND_NOT_FOUND = 127
156 def patch_sys_exit(testcase):
157 """ Patch `sys.exit` for the specified test case. """
158 func_patcher = mock.patch.object(
159 sys, "exit", autospec=True,
160 side_effect=FakeSystemExit())
161 func_patcher.start()
162 testcase.addCleanup(func_patcher.stop)
165 def patch_sys_argv(testcase):
166 """ Patch the `sys.argv` sequence for the test case. """
167 if not hasattr(testcase, 'progname'):
168 testcase.progname = make_unique_slug(testcase)
169 if not hasattr(testcase, 'sys_argv'):
170 testcase.sys_argv = [testcase.progname]
171 patcher = mock.patch.object(
172 sys, "argv",
173 new=list(testcase.sys_argv))
174 patcher.start()
175 testcase.addCleanup(patcher.stop)
178 def patch_system_interfaces(testcase):
179 """ Patch system interfaces that are disruptive to the test runner. """
180 patch_stdout(testcase)
181 patch_stderr(testcase)
182 patch_sys_exit(testcase)
183 patch_sys_argv(testcase)
186 def patch_time_time(testcase, values=None):
187 """ Patch the `time.time` function for the specified test case.
189 :param testcase: The `TestCase` instance for binding to the patch.
190 :param values: An iterable to provide return values.
191 :return: None.
194 if values is None:
195 values = itertools.count()
197 def generator_fake_time():
198 while True:
199 yield next(values)
201 func_patcher = mock.patch.object(time, "time", autospec=True)
202 func_patcher.start()
203 testcase.addCleanup(func_patcher.stop)
205 time.time.side_effect = generator_fake_time()
208 def patch_os_environ(testcase):
209 """ Patch the `os.environ` mapping. """
210 if not hasattr(testcase, 'os_environ'):
211 testcase.os_environ = {}
212 patcher = mock.patch.object(os, "environ", new=testcase.os_environ)
213 patcher.start()
214 testcase.addCleanup(patcher.stop)
217 def patch_os_getpid(testcase):
218 """ Patch `os.getpid` for the specified test case. """
219 func_patcher = mock.patch.object(os, "getpid", autospec=True)
220 func_patcher.start()
221 testcase.addCleanup(func_patcher.stop)
224 def patch_os_getuid(testcase):
225 """ Patch the `os.getuid` function. """
226 if not hasattr(testcase, 'os_getuid_return_value'):
227 testcase.os_getuid_return_value = testcase.getUniqueInteger()
228 func_patcher = mock.patch.object(
229 os, "getuid", autospec=True,
230 return_value=testcase.os_getuid_return_value)
231 func_patcher.start()
232 testcase.addCleanup(func_patcher.stop)
235 PasswdEntry = collections.namedtuple(
236 "PasswdEntry",
237 "pw_name pw_passwd pw_uid pw_gid pw_gecos pw_dir pw_shell")
240 def patch_pwd_getpwuid(testcase):
241 """ Patch the `pwd.getpwuid` function. """
242 if not hasattr(testcase, 'pwd_getpwuid_return_value'):
243 testcase.pwd_getpwuid_return_value = PasswdEntry(
244 pw_name=make_unique_slug(testcase),
245 pw_passwd=make_unique_slug(testcase),
246 pw_uid=testcase.getUniqueInteger(),
247 pw_gid=testcase.getUniqueInteger(),
248 pw_gecos=testcase.getUniqueString(),
249 pw_dir=tempfile.mktemp(),
250 pw_shell=tempfile.mktemp())
251 if not isinstance(testcase.pwd_getpwuid_return_value, pwd.struct_passwd):
252 pwent = pwd.struct_passwd(testcase.pwd_getpwuid_return_value)
253 else:
254 pwent = testcase.pwd_getpwuid_return_value
255 func_patcher = mock.patch.object(
256 pwd, "getpwuid", autospec=True,
257 return_value=pwent)
258 func_patcher.start()
259 testcase.addCleanup(func_patcher.stop)
261 if pwd is NotImplemented:
262 # The ‘pwd’ module is not available on some platforms.
263 del patch_pwd_getpwuid
266 def patch_os_path_exists(testcase):
267 """ Patch `os.path.exists` behaviour for this test case.
269 When the patched function is called, the registry of
270 `FileDouble` instances for this test case will be used to get
271 the instance for the path specified.
274 orig_os_path_exists = os.path.exists
276 def fake_os_path_exists(path):
277 registry = FileDouble.get_registry_for_testcase(testcase)
278 if path in registry:
279 file_double = registry[path]
280 result = file_double.os_path_exists_scenario.call_hook()
281 else:
282 result = orig_os_path_exists(path)
283 return result
285 func_patcher = mock.patch.object(
286 os.path, "exists", autospec=True,
287 side_effect=fake_os_path_exists)
288 func_patcher.start()
289 testcase.addCleanup(func_patcher.stop)
292 def patch_os_access(testcase):
293 """ Patch `os.access` behaviour for this test case.
295 When the patched function is called, the registry of
296 `FileDouble` instances for this test case will be used to get
297 the instance for the path specified.
300 orig_os_access = os.access
302 def fake_os_access(path, mode):
303 registry = FileDouble.get_registry_for_testcase(testcase)
304 if path in registry:
305 file_double = registry[path]
306 result = file_double.os_access_scenario.call_hook(mode)
307 else:
308 result = orig_os_access(path, mode)
309 return result
311 func_patcher = mock.patch.object(
312 os, "access", autospec=True,
313 side_effect=fake_os_access)
314 func_patcher.start()
315 testcase.addCleanup(func_patcher.stop)
318 StatResult = collections.namedtuple(
319 'StatResult', [
320 'st_mode',
321 'st_ino', 'st_dev', 'st_nlink',
322 'st_uid', 'st_gid',
323 'st_size',
324 'st_atime', 'st_mtime', 'st_ctime',
328 def patch_os_stat(testcase):
329 """ Patch `os.stat` behaviour for this test case.
331 When the patched function is called, the registry of
332 `FileDouble` instances for this test case will be used to get
333 the instance for the path specified.
336 orig_os_stat = os.stat
338 def fake_os_stat(path):
339 registry = FileDouble.get_registry_for_testcase(testcase)
340 if path in registry:
341 file_double = registry[path]
342 result = file_double.os_stat_scenario.call_hook()
343 else:
344 result = orig_os_stat(path)
345 return result
347 func_patcher = mock.patch.object(
348 os, "stat", autospec=True,
349 side_effect=fake_os_stat)
350 func_patcher.start()
351 testcase.addCleanup(func_patcher.stop)
354 def patch_os_lstat(testcase):
355 """ Patch `os.lstat` behaviour for this test case.
357 When the patched function is called, the registry of
358 `FileDouble` instances for this test case will be used to get
359 the instance for the path specified.
362 orig_os_lstat = os.lstat
364 def fake_os_lstat(path):
365 registry = FileDouble.get_registry_for_testcase(testcase)
366 if path in registry:
367 file_double = registry[path]
368 result = file_double.os_lstat_scenario.call_hook()
369 else:
370 result = orig_os_lstat(path)
371 return result
373 func_patcher = mock.patch.object(
374 os, "lstat", autospec=True,
375 side_effect=fake_os_lstat)
376 func_patcher.start()
377 testcase.addCleanup(func_patcher.stop)
380 def patch_os_unlink(testcase):
381 """ Patch `os.unlink` behaviour for this test case.
383 When the patched function is called, the registry of
384 `FileDouble` instances for this test case will be used to get
385 the instance for the path specified.
388 orig_os_unlink = os.unlink
390 def fake_os_unlink(path):
391 registry = FileDouble.get_registry_for_testcase(testcase)
392 if path in registry:
393 file_double = registry[path]
394 result = file_double.os_unlink_scenario.call_hook()
395 else:
396 result = orig_os_unlink(path)
397 return result
399 func_patcher = mock.patch.object(
400 os, "unlink", autospec=True,
401 side_effect=fake_os_unlink)
402 func_patcher.start()
403 testcase.addCleanup(func_patcher.stop)
406 def patch_os_rmdir(testcase):
407 """ Patch `os.rmdir` behaviour for this test case.
409 When the patched function is called, the registry of
410 `FileDouble` instances for this test case will be used to get
411 the instance for the path specified.
414 orig_os_rmdir = os.rmdir
416 def fake_os_rmdir(path):
417 registry = FileDouble.get_registry_for_testcase(testcase)
418 if path in registry:
419 file_double = registry[path]
420 result = file_double.os_rmdir_scenario.call_hook()
421 else:
422 result = orig_os_rmdir(path)
423 return result
425 func_patcher = mock.patch.object(
426 os, "rmdir", autospec=True,
427 side_effect=fake_os_rmdir)
428 func_patcher.start()
429 testcase.addCleanup(func_patcher.stop)
432 def patch_shutil_rmtree(testcase):
433 """ Patch `shutil.rmtree` behaviour for this test case.
435 When the patched function is called, the registry of
436 `FileDouble` instances for this test case will be used to get
437 the instance for the path specified.
440 orig_shutil_rmtree = os.rmdir
442 def fake_shutil_rmtree(path, ignore_errors=False, onerror=None):
443 registry = FileDouble.get_registry_for_testcase(testcase)
444 if path in registry:
445 file_double = registry[path]
446 result = file_double.shutil_rmtree_scenario.call_hook()
447 else:
448 result = orig_shutil_rmtree(path)
449 return result
451 func_patcher = mock.patch.object(
452 shutil, "rmtree", autospec=True,
453 side_effect=fake_shutil_rmtree)
454 func_patcher.start()
455 testcase.addCleanup(func_patcher.stop)
458 def patch_tempfile_mkdtemp(testcase):
459 """ Patch the `tempfile.mkdtemp` function for this test case. """
460 if not hasattr(testcase, 'tempfile_mkdtemp_file_double'):
461 testcase.tempfile_mkdtemp_file_double = FileDouble(tempfile.mktemp())
463 double = testcase.tempfile_mkdtemp_file_double
464 double.set_os_unlink_scenario('okay')
465 double.set_os_rmdir_scenario('okay')
466 double.register_for_testcase(testcase)
468 func_patcher = mock.patch.object(tempfile, "mkdtemp", autospec=True)
469 func_patcher.start()
470 testcase.addCleanup(func_patcher.stop)
472 tempfile.mkdtemp.return_value = testcase.tempfile_mkdtemp_file_double.path
475 try:
476 FileNotFoundError
477 FileExistsError
478 PermissionError
479 except NameError:
480 # Python 2 uses IOError.
481 def _ensure_ioerror_args(init_args, init_kwargs, errno_value):
482 result_kwargs = init_kwargs
483 result_errno = errno_value
484 result_strerror = os.strerror(errno_value)
485 result_filename = None
486 if len(init_args) >= 3:
487 result_errno = init_args[0]
488 result_filename = init_args[2]
489 if 'errno' in init_kwargs:
490 result_errno = init_kwargs['errno']
491 del result_kwargs['errno']
492 if 'filename' in init_kwargs:
493 result_filename = init_kwargs['filename']
494 del result_kwargs['filename']
495 if len(init_args) >= 2:
496 result_strerror = init_args[1]
497 if 'strerror' in init_kwargs:
498 result_strerror = init_kwargs['strerror']
499 del result_kwargs['strerror']
500 result_args = (result_errno, result_strerror, result_filename)
501 return (result_args, result_kwargs)
503 class FileNotFoundError(IOError):
504 def __init__(self, *args, **kwargs):
505 (args, kwargs) = _ensure_ioerror_args(
506 args, kwargs, errno_value=errno.ENOENT)
507 super(FileNotFoundError, self).__init__(*args, **kwargs)
509 class FileExistsError(IOError):
510 def __init__(self, *args, **kwargs):
511 (args, kwargs) = _ensure_ioerror_args(
512 args, kwargs, errno_value=errno.EEXIST)
513 super(FileExistsError, self).__init__(*args, **kwargs)
515 class PermissionError(IOError):
516 def __init__(self, *args, **kwargs):
517 (args, kwargs) = _ensure_ioerror_args(
518 args, kwargs, errno_value=errno.EPERM)
519 super(PermissionError, self).__init__(*args, **kwargs)
522 def make_fake_file_scenarios(path=None):
523 """ Make a collection of scenarios for testing with fake files.
525 :path: The filesystem path of the fake file. If not specified,
526 a valid random path will be generated.
527 :return: A collection of scenarios for tests involving input files.
529 The collection is a mapping from scenario name to a dictionary of
530 scenario attributes.
534 if path is None:
535 file_path = tempfile.mktemp()
536 else:
537 file_path = path
539 fake_file_empty = StringIO()
540 fake_file_minimal = StringIO("Lorem ipsum.")
541 fake_file_large = StringIO("\n".join(
542 "ABCDEFGH" * 100
543 for __ in range(1000)))
545 default_scenario_params = {
546 'open_scenario_name': 'okay',
547 'file_double_params': dict(
548 path=file_path, fake_file=fake_file_minimal),
551 scenarios = {
552 'default': {},
553 'error-not-exist': {
554 'open_scenario_name': 'nonexist',
556 'error-exist': {
557 'open_scenario_name': 'exist_error',
559 'error-read-denied': {
560 'open_scenario_name': 'read_denied',
562 'not-found': {
563 'file_double_params': dict(
564 path=file_path, fake_file=fake_file_empty),
566 'exist-empty': {
567 'file_double_params': dict(
568 path=file_path, fake_file=fake_file_empty),
570 'exist-minimal': {
571 'file_double_params': dict(
572 path=file_path, fake_file=fake_file_minimal),
574 'exist-large': {
575 'file_double_params': dict(
576 path=file_path, fake_file=fake_file_large),
580 for (name, scenario) in scenarios.items():
581 params = default_scenario_params.copy()
582 params.update(scenario)
583 scenario.update(params)
584 scenario['file_double'] = FileDouble(**scenario['file_double_params'])
585 scenario['file_double'].set_open_scenario(params['open_scenario_name'])
586 scenario['fake_file_scenario_name'] = name
588 return scenarios
591 def get_file_doubles_from_fake_file_scenarios(scenarios):
592 """ Get the `FileDouble` instances from fake file scenarios.
594 :param scenarios: Collection of fake file scenarios.
595 :return: Collection of `FileDouble` instances.
598 doubles = set(
599 scenario['file_double']
600 for scenario in scenarios
601 if scenario['file_double'] is not None)
603 return doubles
606 def setup_file_double_behaviour(testcase, doubles=None):
607 """ Set up file double instances and behaviour.
609 :param testcase: The `TestCase` instance to modify.
610 :param doubles: Collection of `FileDouble` instances.
611 :return: None.
613 If `doubles` is ``None``, a default collection will be made
614 from the result of `make_fake_file_scenarios` result.
617 if doubles is None:
618 scenarios = make_fake_file_scenarios()
619 doubles = get_file_doubles_from_fake_file_scenarios(
620 scenarios.values())
622 for file_double in doubles:
623 file_double.register_for_testcase(testcase)
625 orig_open = builtins.open
627 def fake_open(path, mode='rt', buffering=-1):
628 registry = FileDouble.get_registry_for_testcase(testcase)
629 if path in registry:
630 file_double = registry[path]
631 result = file_double.builtins_open_scenario.call_hook(
632 mode, buffering)
633 else:
634 result = orig_open(path, mode, buffering)
635 return result
637 mock_open = mock.mock_open()
638 mock_open.side_effect = fake_open
640 func_patcher = mock.patch.object(
641 builtins, "open", new=mock_open)
642 func_patcher.start()
643 testcase.addCleanup(func_patcher.stop)
646 def setup_fake_file_fixtures(testcase):
647 """ Set up fixtures for fake file doubles.
649 :param testcase: The `TestCase` instance to modify.
650 :return: None.
653 scenarios = make_fake_file_scenarios()
654 testcase.fake_file_scenarios = scenarios
656 file_doubles = get_file_doubles_from_fake_file_scenarios(
657 scenarios.values())
658 setup_file_double_behaviour(testcase, file_doubles)
661 def set_fake_file_scenario(testcase, name):
662 """ Set the named fake file scenario for the test case. """
663 scenario = testcase.fake_file_scenarios[name]
664 testcase.fake_file_scenario = scenario
665 testcase.file_double = scenario['file_double']
666 testcase.file_double.register_for_testcase(testcase)
669 class TestDoubleFunctionScenario:
670 """ Scenario for fake behaviour of a specific function. """
672 def __init__(self, scenario_name, double):
673 self.scenario_name = scenario_name
674 self.double = double
676 self.call_hook = getattr(
677 self, "_hook_{name}".format(name=self.scenario_name))
679 def __repr__(self):
680 text = (
681 "<{class_name} instance: {id}"
682 " name: {name!r},"
683 " call_hook name: {hook_name!r}"
684 " double: {double!r}"
685 ">").format(
686 class_name=self.__class__.__name__, id=id(self),
687 name=self.scenario_name, double=self.double,
688 hook_name=self.call_hook.__name__)
689 return text
691 def __eq__(self, other):
692 result = True
693 if not self.scenario_name == other.scenario_name:
694 result = False
695 if not self.double == other.double:
696 result = False
697 if not self.call_hook.__name__ == other.call_hook.__name__:
698 result = False
699 return result
701 def __ne__(self, other):
702 result = not self.__eq__(other)
703 return result
706 class os_path_exists_scenario(TestDoubleFunctionScenario):
707 """ Scenario for `os.path.exists` behaviour. """
709 def _hook_exist(self):
710 return True
712 def _hook_not_exist(self):
713 return False
716 class os_access_scenario(TestDoubleFunctionScenario):
717 """ Scenario for `os.access` behaviour. """
719 def _hook_okay(self, mode):
720 return True
722 def _hook_not_exist(self, mode):
723 return False
725 def _hook_read_only(self, mode):
726 if mode & (os.W_OK | os.X_OK):
727 result = False
728 else:
729 result = True
730 return result
732 def _hook_denied(self, mode):
733 if mode & (os.R_OK | os.W_OK | os.X_OK):
734 result = False
735 else:
736 result = True
737 return result
740 class os_stat_scenario(TestDoubleFunctionScenario):
741 """ Scenario for `os.stat` behaviour. """
743 def _hook_okay(self):
744 return self.double.stat_result
746 def _hook_notfound_error(self):
747 raise FileNotFoundError(
748 self.double.path,
749 "No such file or directory: {path!r}".format(
750 path=self.double.path))
752 def _hook_denied_error(self):
753 raise PermissionError(
754 self.double.path,
755 "Permission denied")
758 class os_lstat_scenario(os_stat_scenario):
759 """ Scenario for `os.lstat` behaviour. """
762 class os_unlink_scenario(TestDoubleFunctionScenario):
763 """ Scenario for `os.unlink` behaviour. """
765 def _hook_okay(self):
766 return None
768 def _hook_nonexist(self):
769 error = FileNotFoundError(
770 self.double.path,
771 "No such file or directory: {path!r}".format(
772 path=self.double.path))
773 raise error
775 def _hook_denied(self):
776 error = PermissionError(
777 self.double.path,
778 "Permission denied")
779 raise error
782 class os_rmdir_scenario(TestDoubleFunctionScenario):
783 """ Scenario for `os.rmdir` behaviour. """
785 def _hook_okay(self):
786 return None
788 def _hook_nonexist(self):
789 error = FileNotFoundError(
790 self.double.path,
791 "No such file or directory: {path!r}".format(
792 path=self.double.path))
793 raise error
795 def _hook_denied(self):
796 error = PermissionError(
797 self.double.path,
798 "Permission denied")
799 raise error
802 class shutil_rmtree_scenario(TestDoubleFunctionScenario):
803 """ Scenario for `shutil.rmtree` behaviour. """
805 def _hook_okay(self):
806 return None
808 def _hook_nonexist(self):
809 error = FileNotFoundError(
810 self.double.path,
811 "No such file or directory: {path!r}".format(
812 path=self.double.path))
813 raise error
815 def _hook_denied(self):
816 error = PermissionError(
817 self.double.path,
818 "Permission denied")
819 raise error
822 class builtins_open_scenario(TestDoubleFunctionScenario):
823 """ Scenario for `builtins.open` behaviour. """
825 def _hook_okay(self, mode, buffering):
826 result = self.double.fake_file
827 return result
829 def _hook_nonexist(self, mode, buffering):
830 if mode.startswith('r'):
831 error = FileNotFoundError(
832 self.double.path,
833 "No such file or directory: {path!r}".format(
834 path=self.double.path))
835 raise error
836 result = self.double.fake_file
837 return result
839 def _hook_exist_error(self, mode, buffering):
840 if mode.startswith('w') or mode.startswith('a'):
841 error = FileExistsError(
842 self.double.path,
843 "File already exists: {path!r}".format(
844 path=self.double.path))
845 raise error
846 result = self.double.fake_file
847 return result
849 def _hook_read_denied(self, mode, buffering):
850 if mode.startswith('r'):
851 error = PermissionError(
852 self.double.path,
853 "Read denied on {path!r}".format(
854 path=self.double.path))
855 raise error
856 result = self.double.fake_file
857 return result
859 def _hook_write_denied(self, mode, buffering):
860 if mode.startswith('w') or mode.startswith('a'):
861 error = PermissionError(
862 self.double.path,
863 "Write denied on {path!r}".format(
864 path=self.double.path))
865 raise error
866 result = self.double.fake_file
867 return result
870 class TestDoubleWithRegistry:
871 """ Abstract base class for a test double with a test case registry. """
873 registry_class = NotImplemented
874 registries = NotImplemented
876 function_scenario_params_by_class = NotImplemented
878 def __new__(cls, *args, **kwargs):
879 superclass = super(TestDoubleWithRegistry, cls)
880 if superclass.__new__ is object.__new__:
881 # The ‘object’ implementation complains about extra arguments.
882 instance = superclass.__new__(cls)
883 else:
884 instance = superclass.__new__(cls, *args, **kwargs)
885 instance.make_set_scenario_methods()
887 return instance
889 def __init__(self, *args, **kwargs):
890 super(TestDoubleWithRegistry, self).__init__(*args, **kwargs)
891 self._set_method_per_scenario()
893 def _make_set_scenario_method(self, scenario_class, params):
894 def method(self, name):
895 scenario = scenario_class(name, double=self)
896 setattr(self, scenario_class.__name__, scenario)
897 method.__doc__ = (
898 """ Set the scenario for `{name}` behaviour. """
899 ).format(name=scenario_class.__name__)
900 method.__name__ = str(params['set_scenario_method_name'])
901 return method
903 def make_set_scenario_methods(self):
904 """ Make `set_<scenario_class_name>` methods on this class. """
905 for (function_scenario_class, function_scenario_params) in (
906 self.function_scenario_params_by_class.items()):
907 method = self._make_set_scenario_method(
908 function_scenario_class, function_scenario_params)
909 setattr(self.__class__, method.__name__, method)
910 function_scenario_params['set_scenario_method'] = method
912 def _set_method_per_scenario(self):
913 """ Set the method to be called for each scenario. """
914 for function_scenario_params in (
915 self.function_scenario_params_by_class.values()):
916 function_scenario_params['set_scenario_method'](
917 self, function_scenario_params['default_scenario_name'])
919 @classmethod
920 def get_registry_for_testcase(cls, testcase):
921 """ Get the FileDouble registry for the specified test case. """
922 # Key in a dict must be hashable.
923 key = (testcase.__class__, id(testcase))
924 registry = cls.registries.setdefault(key, cls.registry_class())
925 return registry
927 def get_registry_key(self):
928 """ Get the registry key for this double. """
929 raise NotImplementedError
931 def register_for_testcase(self, testcase):
932 """ Add this instance to registry for the specified testcase. """
933 registry = self.get_registry_for_testcase(testcase)
934 key = self.get_registry_key()
935 registry[key] = self
936 unregister_func = functools.partial(
937 self.unregister_for_testcase, testcase)
938 testcase.addCleanup(unregister_func)
940 def unregister_for_testcase(self, testcase):
941 """ Remove this instance from registry for the specified testcase. """
942 registry = self.get_registry_for_testcase(testcase)
943 key = self.get_registry_key()
944 if key in registry:
945 registry.pop(key)
948 def copy_fake_file(fake_file):
949 """ Make a copy of the StringIO instance. """
950 fake_file_type = StringIO
951 content = ""
952 if fake_file is not None:
953 fake_file_type = type(fake_file)
954 content = fake_file.getvalue()
955 assert issubclass(fake_file_type, object)
956 result = fake_file_type(content)
957 if hasattr(fake_file, 'encoding'):
958 if not hasattr(result, 'encoding'):
959 result.encoding = fake_file.encoding
960 return result
963 class FileDouble(TestDoubleWithRegistry):
964 """ A testing double for a file. """
966 registry_class = dict
967 registries = {}
969 function_scenario_params_by_class = {
970 os_path_exists_scenario: {
971 'default_scenario_name': 'not_exist',
972 'set_scenario_method_name': 'set_os_path_exists_scenario',
974 os_access_scenario: {
975 'default_scenario_name': 'okay',
976 'set_scenario_method_name': 'set_os_access_scenario',
978 os_stat_scenario: {
979 'default_scenario_name': 'okay',
980 'set_scenario_method_name': 'set_os_stat_scenario',
982 os_lstat_scenario: {
983 'default_scenario_name': 'okay',
984 'set_scenario_method_name': 'set_os_lstat_scenario',
986 builtins_open_scenario: {
987 'default_scenario_name': 'okay',
988 'set_scenario_method_name': 'set_open_scenario',
990 os_unlink_scenario: {
991 'default_scenario_name': 'okay',
992 'set_scenario_method_name': 'set_os_unlink_scenario',
994 os_rmdir_scenario: {
995 'default_scenario_name': 'okay',
996 'set_scenario_method_name': 'set_os_rmdir_scenario',
998 shutil_rmtree_scenario: {
999 'default_scenario_name': 'okay',
1000 'set_scenario_method_name': 'set_shutil_rmtree_scenario',
1004 def __init__(self, path=None, fake_file=None, *args, **kwargs):
1005 self.path = path
1006 self.fake_file = copy_fake_file(fake_file)
1007 self.fake_file.name = path
1009 self._set_stat_result()
1011 super(FileDouble, self).__init__(*args, **kwargs)
1013 def _set_stat_result(self):
1014 """ Set the `os.stat` result for this file. """
1015 size = len(self.fake_file.getvalue())
1016 self.stat_result = StatResult(
1017 st_mode=0,
1018 st_ino=None, st_dev=None, st_nlink=None,
1019 st_uid=0, st_gid=0,
1020 st_size=size,
1021 st_atime=None, st_mtime=None, st_ctime=None,
1024 def __repr__(self):
1025 text = "FileDouble(path={path!r}, fake_file={fake_file!r})".format(
1026 path=self.path, fake_file=self.fake_file)
1027 return text
1029 def get_registry_key(self):
1030 """ Get the registry key for this double. """
1031 result = self.path
1032 return result
1035 class os_popen_scenario(TestDoubleFunctionScenario):
1036 """ Scenario for `os.popen` behaviour. """
1038 stream_name_by_mode = {
1039 'w': 'stdin',
1040 'r': 'stdout',
1043 def _hook_success(self, argv, mode, buffering):
1044 stream_name = self.stream_name_by_mode[mode]
1045 stream_double = getattr(
1046 self.double, stream_name + '_double')
1047 result = stream_double.fake_file
1048 return result
1050 def _hook_failure(self, argv, mode, buffering):
1051 result = StringIO()
1052 return result
1054 def _hook_not_found(self, argv, mode, buffering):
1055 result = StringIO()
1056 return result
1059 class os_waitpid_scenario(TestDoubleFunctionScenario):
1060 """ Scenario for `os.waitpid` behaviour. """
1062 def _hook_success(self, pid, options):
1063 result = (pid, EXIT_STATUS_SUCCESS)
1064 return result
1066 def _hook_failure(self, pid, options):
1067 result = (pid, EXIT_STATUS_FAILURE)
1068 return result
1070 def _hook_not_found(self, pid, options):
1071 error = OSError(errno.ECHILD)
1072 raise error
1075 class os_system_scenario(TestDoubleFunctionScenario):
1076 """ Scenario for `os.system` behaviour. """
1078 def _hook_success(self, command):
1079 result = EXIT_STATUS_SUCCESS
1080 return result
1082 def _hook_failure(self, command):
1083 result = EXIT_STATUS_FAILURE
1084 return result
1086 def _hook_not_found(self, command):
1087 result = EXIT_STATUS_COMMAND_NOT_FOUND
1088 return result
1091 class os_spawnv_scenario(TestDoubleFunctionScenario):
1092 """ Scenario for `os.spawnv` behaviour. """
1094 def _hook_success(self, mode, file, args):
1095 result = EXIT_STATUS_SUCCESS
1096 return result
1098 def _hook_failure(self, mode, file, args):
1099 result = EXIT_STATUS_FAILURE
1100 return result
1102 def _hook_not_found(self, mode, file, args):
1103 result = EXIT_STATUS_COMMAND_NOT_FOUND
1104 return result
1107 ARG_ANY = object()
1108 ARG_MORE = object()
1111 class PopenDouble:
1112 """ A testing double for `subprocess.Popen`. """
1114 def __init__(self, args, *posargs, **kwargs):
1115 self.stdin = None
1116 self.stdout = None
1117 self.stderr = None
1118 self.pid = None
1119 self.returncode = None
1121 if kwargs.get('shell', False):
1122 self.argv = shlex.split(args)
1123 else:
1124 # The paramter is already a sequence of command-line arguments.
1125 self.argv = args
1127 def set_streams(self, subprocess_double, popen_kwargs):
1128 """ Set the streams on the `PopenDouble`.
1130 :param subprocess_double: The `SubprocessDouble` from
1131 which to get existing stream doubles.
1132 :param popen_kwargs: The keyword arguments to the
1133 `subprocess.Popen` call.
1134 :return: ``None``.
1137 for stream_name in (
1138 name for name in ['stdin', 'stdout', 'stderr']
1139 if name in popen_kwargs):
1140 stream_spec = popen_kwargs[stream_name]
1141 if stream_spec is subprocess.PIPE:
1142 stream_double = getattr(
1143 subprocess_double,
1144 "{name}_double".format(name=stream_name))
1145 stream_file = stream_double.fake_file
1146 elif stream_spec is subprocess.STDOUT:
1147 stream_file = subprocess_double.stdout_double.fake_file
1148 else:
1149 stream_file = stream_spec
1150 setattr(self, stream_name, stream_file)
1152 def wait(self):
1153 """ Wait for subprocess to terminate. """
1154 return self.returncode
1157 class subprocess_popen_scenario(TestDoubleFunctionScenario):
1158 """ Scenario for `subprocess.Popen` behaviour. """
1160 def _hook_success(self, testcase, args, *posargs, **kwargs):
1161 double = self.double.popen_double
1162 double.set_streams(self.double, kwargs)
1163 return double
1166 def patch_subprocess_popen(testcase):
1167 """ Patch `subprocess.Popen` constructor for this test case.
1169 :param testcase: The `TestCase` instance to modify.
1170 :return: None.
1172 When the patched function is called, the registry of
1173 `SubprocessDouble` instances for this test case will be used
1174 to get the instance for the program path specified.
1177 orig_subprocess_popen = subprocess.Popen
1179 def fake_subprocess_popen(args, *posargs, **kwargs):
1180 if kwargs.get('shell', False):
1181 argv = shlex.split(args)
1182 else:
1183 argv = args
1184 registry = SubprocessDouble.get_registry_for_testcase(testcase)
1185 if argv in registry:
1186 subprocess_double = registry[argv]
1187 result = subprocess_double.subprocess_popen_scenario.call_hook(
1188 testcase, args, *posargs, **kwargs)
1189 else:
1190 result = orig_subprocess_popen(args, *posargs, **kwargs)
1191 return result
1193 func_patcher = mock.patch.object(
1194 subprocess, "Popen", autospec=True,
1195 side_effect=fake_subprocess_popen)
1196 func_patcher.start()
1197 testcase.addCleanup(func_patcher.stop)
1200 class subprocess_call_scenario(TestDoubleFunctionScenario):
1201 """ Scenario for `subprocess.call` behaviour. """
1203 def _hook_success(self, command):
1204 result = EXIT_STATUS_SUCCESS
1205 return result
1207 def _hook_failure(self, command):
1208 result = EXIT_STATUS_FAILURE
1209 return result
1211 def _hook_not_found(self, command):
1212 result = EXIT_STATUS_COMMAND_NOT_FOUND
1213 return result
1216 def patch_subprocess_call(testcase):
1217 """ Patch `subprocess.call` function for this test case.
1219 :param testcase: The `TestCase` instance to modify.
1220 :return: None.
1222 When the patched function is called, the registry of
1223 `SubprocessDouble` instances for this test case will be used
1224 to get the instance for the program path specified.
1227 orig_subprocess_call = subprocess.call
1229 def fake_subprocess_call(command, *posargs, **kwargs):
1230 if kwargs.get('shell', False):
1231 command_argv = shlex.split(command)
1232 else:
1233 command_argv = command
1234 registry = SubprocessDouble.get_registry_for_testcase(testcase)
1235 if command_argv in registry:
1236 subprocess_double = registry[command_argv]
1237 result = subprocess_double.subprocess_call_scenario.call_hook(
1238 command)
1239 else:
1240 result = orig_subprocess_call(command, *posargs, **kwargs)
1241 return result
1243 func_patcher = mock.patch.object(
1244 subprocess, "call", autospec=True,
1245 side_effect=fake_subprocess_call)
1246 func_patcher.start()
1247 testcase.addCleanup(func_patcher.stop)
1250 class subprocess_check_call_scenario(TestDoubleFunctionScenario):
1251 """ Scenario for `subprocess.check_call` behaviour. """
1253 def _hook_success(self, command):
1254 return None
1256 def _hook_failure(self, command):
1257 result = EXIT_STATUS_FAILURE
1258 error = subprocess.CalledProcessError(result, command)
1259 raise error
1261 def _hook_not_found(self, command):
1262 result = EXIT_STATUS_COMMAND_NOT_FOUND
1263 error = subprocess.CalledProcessError(result, command)
1264 raise error
1267 def patch_subprocess_check_call(testcase):
1268 """ Patch `subprocess.check_call` function for this test case.
1270 :param testcase: The `TestCase` instance to modify.
1271 :return: None.
1273 When the patched function is called, the registry of
1274 `SubprocessDouble` instances for this test case will be used
1275 to get the instance for the program path specified.
1278 orig_subprocess_check_call = subprocess.check_call
1280 def fake_subprocess_check_call(command, *posargs, **kwargs):
1281 if kwargs.get('shell', False):
1282 command_argv = shlex.split(command)
1283 else:
1284 command_argv = command
1285 registry = SubprocessDouble.get_registry_for_testcase(testcase)
1286 if command_argv in registry:
1287 subprocess_double = registry[command_argv]
1288 scenario = subprocess_double.subprocess_check_call_scenario
1289 result = scenario.call_hook(command)
1290 else:
1291 result = orig_subprocess_check_call(command, *posargs, **kwargs)
1292 return result
1294 func_patcher = mock.patch.object(
1295 subprocess, "check_call", autospec=True,
1296 side_effect=fake_subprocess_check_call)
1297 func_patcher.start()
1298 testcase.addCleanup(func_patcher.stop)
1301 class SubprocessDoubleRegistry(collections_abc.MutableMapping):
1302 """ Registry of `SubprocessDouble` instances by `argv`. """
1304 def __init__(self, *args, **kwargs):
1305 items = []
1306 if args:
1307 if isinstance(args[0], collections_abc.Mapping):
1308 items = args[0].items()
1309 if isinstance(args[0], collections_abc.Iterable):
1310 items = args[0]
1311 self._mapping = dict(items)
1313 def __repr__(self):
1314 text = "<{class_name} object: {mapping}>".format(
1315 class_name=self.__class__.__name__, mapping=self._mapping)
1316 return text
1318 def _match_argv(self, argv):
1319 """ Match the specified `argv` with our registered keys. """
1320 match = None
1321 if not isinstance(argv, collections_abc.Sequence):
1322 return match
1323 candidates = iter(self._mapping)
1324 while match is None:
1325 try:
1326 candidate = next(candidates)
1327 except StopIteration:
1328 break
1329 found = None
1330 if candidate == argv:
1331 # An exact match.
1332 found = True
1333 word_iter = enumerate(candidate)
1334 while found is None:
1335 try:
1336 (word_index, candidate_word) = next(word_iter)
1337 except StopIteration:
1338 break
1339 if candidate_word is ARG_MORE:
1340 # Candiate matches any remaining words. We have a match.
1341 found = True
1342 elif word_index > len(argv):
1343 # Candidate is too long for the specified argv.
1344 found = False
1345 elif candidate_word is ARG_ANY:
1346 # Candidate matches any word at this position.
1347 continue
1348 elif candidate_word == argv[word_index]:
1349 # Candidate matches the word at this position.
1350 continue
1351 else:
1352 # This candidate does not match.
1353 found = False
1354 if found is None:
1355 # Reached the end of the candidate without a mismatch.
1356 found = True
1357 if found:
1358 match = candidate
1359 return match
1361 def __getitem__(self, key):
1362 match = self._match_argv(key)
1363 if match is None:
1364 raise KeyError(key)
1365 result = self._mapping[match]
1366 return result
1368 def __setitem__(self, key, value):
1369 if key in self:
1370 del self[key]
1371 self._mapping[key] = value
1373 def __delitem__(self, key):
1374 match = self._match_argv(key)
1375 if match is not None:
1376 del self._mapping[match]
1378 def __iter__(self):
1379 return self._mapping.__iter__()
1381 def __len__(self):
1382 return self._mapping.__len__()
1385 class SubprocessDouble(TestDoubleWithRegistry):
1386 """ A testing double for a subprocess. """
1388 registry_class = SubprocessDoubleRegistry
1389 registries = {}
1391 double_by_pid = weakref.WeakValueDictionary()
1393 function_scenario_params_by_class = {
1394 subprocess_popen_scenario: {
1395 'default_scenario_name': 'success',
1396 'set_scenario_method_name': 'set_subprocess_popen_scenario',
1398 subprocess_call_scenario: {
1399 'default_scenario_name': 'success',
1400 'set_scenario_method_name': 'set_subprocess_call_scenario',
1402 subprocess_check_call_scenario: {
1403 'default_scenario_name': 'success',
1404 'set_scenario_method_name':
1405 'set_subprocess_check_call_scenario',
1407 os_popen_scenario: {
1408 'default_scenario_name': 'success',
1409 'set_scenario_method_name': 'set_os_popen_scenario',
1411 os_waitpid_scenario: {
1412 'default_scenario_name': 'success',
1413 'set_scenario_method_name': 'set_os_waitpid_scenario',
1415 os_system_scenario: {
1416 'default_scenario_name': 'success',
1417 'set_scenario_method_name': 'set_os_system_scenario',
1419 os_spawnv_scenario: {
1420 'default_scenario_name': 'success',
1421 'set_scenario_method_name': 'set_os_spawnv_scenario',
1425 def __init__(self, path=None, argv=None, *args, **kwargs):
1426 if path is None:
1427 path = tempfile.mktemp()
1428 self.path = path
1430 if argv is None:
1431 command_name = os.path.basename(path)
1432 argv = [command_name]
1433 self.argv = argv
1435 self.pid = self._make_pid()
1436 self._register_by_pid()
1438 self.set_popen_double()
1440 stream_class = SubprocessDouble.stream_class
1441 for stream_name in ['stdin', 'stdout', 'stderr']:
1442 fake_file = stream_class()
1443 file_double = FileDouble(fake_file=fake_file)
1444 stream_double_name = '{name}_double'.format(name=stream_name)
1445 setattr(self, stream_double_name, file_double)
1447 super(SubprocessDouble, self).__init__(*args, **kwargs)
1449 def set_popen_double(self):
1450 """ Set the `PopenDouble` for this instance. """
1451 double = PopenDouble(self.argv)
1452 double.pid = self.pid
1454 self.popen_double = double
1456 def __repr__(self):
1457 text = (
1458 "<SubprocessDouble instance: {id}"
1459 " path: {path!r},"
1460 " argv: {argv!r}"
1461 " stdin_double: {stdin_double!r}"
1462 " stdout_double: {stdout_double!r}"
1463 " stderr_double: {stderr_double!r}"
1464 ">").format(
1465 id=id(self),
1466 path=self.path, argv=self.argv,
1467 stdin_double=self.stdin_double,
1468 stdout_double=self.stdout_double,
1469 stderr_double=self.stderr_double)
1470 return text
1472 @classmethod
1473 def _make_pid(cls):
1474 """ Make a unique PID for a subprocess. """
1475 for pid in itertools.count(1):
1476 yield pid
1478 def _register_by_pid(self):
1479 """ Register this subprocess by its PID. """
1480 self.__class__.double_by_pid[self.pid] = self
1482 def get_registry_key(self):
1483 """ Get the registry key for this double. """
1484 result = tuple(self.argv)
1485 return result
1487 stream_class = io.BytesIO
1488 stream_encoding = "utf-8"
1490 def set_stdin_content(self, text, bytes_encoding=stream_encoding):
1491 """ Set the content of the `stdin` stream for this double. """
1492 content = text.encode(bytes_encoding)
1493 fake_file = self.stream_class(content)
1494 self.stdin_double.fake_file = fake_file
1496 def set_stdout_content(self, text, bytes_encoding=stream_encoding):
1497 """ Set the content of the `stdout` stream for this double. """
1498 content = text.encode(bytes_encoding)
1499 fake_file = self.stream_class(content)
1500 self.stdout_double.fake_file = fake_file
1502 def set_stderr_content(self, text, bytes_encoding=stream_encoding):
1503 """ Set the content of the `stderr` stream for this double. """
1504 content = text.encode(bytes_encoding)
1505 fake_file = self.stream_class(content)
1506 self.stderr_double.fake_file = fake_file
1509 def make_fake_subprocess_scenarios(path=None):
1510 """ Make a collection of scenarios for testing with fake files.
1512 :path: The filesystem path of the fake program. If not specified,
1513 a valid random path will be generated.
1514 :return: A collection of scenarios for tests involving subprocesses.
1516 The collection is a mapping from scenario name to a dictionary of
1517 scenario attributes.
1520 if path is None:
1521 file_path = tempfile.mktemp()
1522 else:
1523 file_path = path
1525 default_scenario_params = {
1526 'return_value': EXIT_STATUS_SUCCESS,
1527 'program_path': file_path,
1528 'argv_after_command_name': [],
1531 scenarios = {
1532 'default': {},
1533 'not-found': {
1534 'return_value': EXIT_STATUS_COMMAND_NOT_FOUND,
1538 for (name, scenario) in scenarios.items():
1539 params = default_scenario_params.copy()
1540 params.update(scenario)
1541 scenario.update(params)
1542 program_path = params['program_path']
1543 program_name = os.path.basename(params['program_path'])
1544 argv = [program_name]
1545 argv.extend(params['argv_after_command_name'])
1546 subprocess_double_params = dict(
1547 path=program_path,
1548 argv=argv,
1550 subprocess_double = SubprocessDouble(**subprocess_double_params)
1551 scenario['subprocess_double'] = subprocess_double
1552 scenario['fake_file_scenario_name'] = name
1554 return scenarios
1557 def get_subprocess_doubles_from_fake_subprocess_scenarios(scenarios):
1558 """ Get the `SubprocessDouble` instances from fake subprocess scenarios.
1560 :param scenarios: Collection of fake subprocess scenarios.
1561 :return: Collection of `SubprocessDouble` instances.
1564 doubles = set(
1565 scenario['subprocess_double']
1566 for scenario in scenarios
1567 if scenario['subprocess_double'] is not None)
1569 return doubles
1572 def setup_subprocess_double_behaviour(testcase, doubles=None):
1573 """ Set up subprocess double instances and behaviour.
1575 :param testcase: The `TestCase` instance to modify.
1576 :param doubles: Collection of `SubprocessDouble` instances.
1577 :return: None.
1579 If `doubles` is ``None``, a default collection will be made
1580 from the return value of `make_fake_subprocess_scenarios`.
1583 if doubles is None:
1584 scenarios = make_fake_subprocess_scenarios()
1585 doubles = get_subprocess_doubles_from_fake_subprocess_scenarios(
1586 scenarios.values())
1588 for double in doubles:
1589 double.register_for_testcase(testcase)
1592 def setup_fake_subprocess_fixtures(testcase):
1593 """ Set up fixtures for fake subprocess doubles.
1595 :param testcase: The `TestCase` instance to modify.
1596 :return: None.
1599 scenarios = make_fake_subprocess_scenarios()
1600 testcase.fake_subprocess_scenarios = scenarios
1602 doubles = get_subprocess_doubles_from_fake_subprocess_scenarios(
1603 scenarios.values())
1604 setup_subprocess_double_behaviour(testcase, doubles)
1607 def patch_os_popen(testcase):
1608 """ Patch `os.popen` behaviour for this test case.
1610 :param testcase: The `TestCase` instance to modify.
1611 :return: None.
1613 When the patched function is called, the registry of
1614 `SubprocessDouble` instances for this test case will be used
1615 to get the instance for the program path specified.
1618 orig_os_popen = os.popen
1620 def fake_os_popen(cmd, mode='r', buffering=-1):
1621 registry = SubprocessDouble.get_registry_for_testcase(testcase)
1622 if isinstance(cmd, basestring):
1623 command_argv = shlex.split(cmd)
1624 else:
1625 command_argv = cmd
1626 if command_argv in registry:
1627 subprocess_double = registry[command_argv]
1628 result = subprocess_double.os_popen_scenario.call_hook(
1629 command_argv, mode, buffering)
1630 else:
1631 result = orig_os_popen(cmd, mode, buffering)
1632 return result
1634 func_patcher = mock.patch.object(
1635 os, "popen", autospec=True,
1636 side_effect=fake_os_popen)
1637 func_patcher.start()
1638 testcase.addCleanup(func_patcher.stop)
1641 def patch_os_waitpid(testcase):
1642 """ Patch `os.waitpid` behaviour for this test case.
1644 :param testcase: The `TestCase` instance to modify.
1645 :return: None.
1647 When the patched function is called, the registry of
1648 `SubprocessDouble` instances for this test case will be used
1649 to get the instance for the program path specified.
1652 orig_os_waitpid = os.waitpid
1654 def fake_os_waitpid(pid, options):
1655 registry = SubprocessDouble.double_by_pid
1656 if pid in registry:
1657 subprocess_double = registry[pid]
1658 result = subprocess_double.os_waitpid_scenario.call_hook(
1659 pid, options)
1660 else:
1661 result = orig_os_waitpid(pid, options)
1662 return result
1664 func_patcher = mock.patch.object(
1665 os, "waitpid", autospec=True,
1666 side_effect=fake_os_waitpid)
1667 func_patcher.start()
1668 testcase.addCleanup(func_patcher.stop)
1671 def patch_os_system(testcase):
1672 """ Patch `os.system` behaviour for this test case.
1674 :param testcase: The `TestCase` instance to modify.
1675 :return: None.
1677 When the patched function is called, the registry of
1678 `SubprocessDouble` instances for this test case will be used
1679 to get the instance for the program path specified.
1682 orig_os_system = os.system
1684 def fake_os_system(command):
1685 registry = SubprocessDouble.get_registry_for_testcase(testcase)
1686 command_argv = shlex.split(command)
1687 if command_argv in registry:
1688 subprocess_double = registry[command_argv]
1689 result = subprocess_double.os_system_scenario.call_hook(
1690 command)
1691 else:
1692 result = orig_os_system(command)
1693 return result
1695 func_patcher = mock.patch.object(
1696 os, "system", autospec=True,
1697 side_effect=fake_os_system)
1698 func_patcher.start()
1699 testcase.addCleanup(func_patcher.stop)
1702 def patch_os_spawnv(testcase):
1703 """ Patch `os.spawnv` behaviour for this test case.
1705 :param testcase: The `TestCase` instance to modify.
1706 :return: None.
1708 When the patched function is called, the registry of
1709 `SubprocessDouble` instances for this test case will be used
1710 to get the instance for the program path specified.
1713 orig_os_spawnv = os.spawnv
1715 def fake_os_spawnv(mode, file, args):
1716 registry = SubprocessDouble.get_registry_for_testcase(testcase)
1717 registry_key = tuple(args)
1718 if registry_key in registry:
1719 subprocess_double = registry[registry_key]
1720 result = subprocess_double.os_spawnv_scenario.call_hook(
1721 mode, file, args)
1722 else:
1723 result = orig_os_spawnv(mode, file, args)
1724 return result
1726 func_patcher = mock.patch.object(
1727 os, "spawnv", autospec=True,
1728 side_effect=fake_os_spawnv)
1729 func_patcher.start()
1730 testcase.addCleanup(func_patcher.stop)
1733 # Local variables:
1734 # coding: utf-8
1735 # mode: python
1736 # End:
1737 # vim: fileencoding=utf-8 filetype=python :