Be explicit about why we use ‘io.BytesIO’ for the fake file.
[python-gajja.git] / gajja / __init__.py
blob713b097834603d62e224f2da3bc9546ebd5466a6
1 # -*- coding: utf-8; -*-
3 # gajja/__init__.py
4 # Part of Gajja, a Python test double library.
6 # Copyright © 2015–2016 Ben Finney <ben+python@benfinney.id.au>
8 # This is free software: you may copy, modify, and/or distribute this work
9 # under the terms of the GNU General Public License as published by the
10 # Free Software Foundation; version 3 of that license or any later version.
11 # No warranty expressed or implied. See the file ‘LICENSE.GPL-3’ for details.
13 """ Gajja: Fake objects for real tests.
15 The `gajja` library provides a system of Python test double classes
16 for specific system objects:
18 * Filesystem entries
20 * Subprocesses
22 The Korean word 가짜 (*gajja*; IPA ˈkaːt͡ɕ̤a) means “fake thing”.
24 """
26 from __future__ import (absolute_import, unicode_literals)
28 import sys
30 if sys.version_info >= (3, 3):
31 import builtins
32 import unittest
33 import unittest.mock as mock
34 from io import StringIO as StringIO
35 import configparser
36 import collections.abc as collections_abc
37 elif sys.version_info >= (3, 0):
38 raise RuntimeError("Python 3 earlier than 3.3 is not supported.")
39 elif sys.version_info >= (2, 7):
40 # Python 2 standard library.
41 import __builtin__ as builtins
42 # Third-party backport of Python 3 unittest improvements.
43 import unittest2 as unittest
44 # Third-party mock library.
45 import mock
46 # Python 2 standard library.
47 from StringIO import StringIO as BaseStringIO
48 import ConfigParser as configparser
49 import collections as collections_abc
50 else:
51 raise RuntimeError("Python earlier than 2.7 is not supported.")
53 import os
54 import os.path
55 import io
56 import shutil
57 import tempfile
58 import errno
59 import time
60 import signal
61 import subprocess
62 import functools
63 import itertools
64 import base64
65 import collections
66 import weakref
67 import shlex
69 try:
70 import pwd
71 except ImportError:
72 # The ‘pwd’ module is not available on platforms other than Unix.
73 pwd = NotImplemented
75 __package__ = str("gajja")
76 __import__(__package__)
78 __metaclass__ = type
80 try:
81 # Python 2 types.
82 basestring
83 unicode
84 except NameError:
85 # Alias for Python 3 types.
86 basestring = str
87 unicode = str
90 def make_unique_slug(testcase):
91 """ Make a unique slug for the test case. """
92 text = base64.b64encode(
93 testcase.getUniqueString().encode('utf-8')
94 ).decode('utf-8')
95 result = text[-30:]
96 return result
99 try:
100 StringIO
101 except NameError:
102 # We don't yet have the StringIO we want. Create it.
104 class StringIO(BaseStringIO, object):
105 """ StringIO with a context manager. """
107 def __enter__(self):
108 return self
110 def __exit__(self, *args):
111 self.close()
112 return False
114 def readable(self):
115 return True
117 def writable(self):
118 return True
120 def seekable(self):
121 return True
124 def patch_stdout(testcase):
125 """ Patch `sys.stdout` for the specified test case. """
126 patcher = mock.patch.object(
127 sys, "stdout", wraps=StringIO())
128 patcher.start()
129 testcase.addCleanup(patcher.stop)
132 def patch_stderr(testcase):
133 """ Patch `sys.stderr` for the specified test case. """
134 patcher = mock.patch.object(
135 sys, "stderr", wraps=StringIO())
136 patcher.start()
137 testcase.addCleanup(patcher.stop)
140 def patch_signal_signal(testcase):
141 """ Patch `signal.signal` for the specified test case. """
142 func_patcher = mock.patch.object(signal, "signal", autospec=True)
143 func_patcher.start()
144 testcase.addCleanup(func_patcher.stop)
147 class FakeSystemExit(Exception):
148 """ Fake double for `SystemExit` exception. """
151 EXIT_STATUS_SUCCESS = 0
152 EXIT_STATUS_FAILURE = 1
153 EXIT_STATUS_COMMAND_NOT_FOUND = 127
156 def patch_sys_exit(testcase):
157 """ Patch `sys.exit` for the specified test case. """
158 func_patcher = mock.patch.object(
159 sys, "exit", autospec=True,
160 side_effect=FakeSystemExit())
161 func_patcher.start()
162 testcase.addCleanup(func_patcher.stop)
165 def patch_sys_argv(testcase):
166 """ Patch the `sys.argv` sequence for the test case. """
167 if not hasattr(testcase, 'progname'):
168 testcase.progname = make_unique_slug(testcase)
169 if not hasattr(testcase, 'sys_argv'):
170 testcase.sys_argv = [testcase.progname]
171 patcher = mock.patch.object(
172 sys, "argv",
173 new=list(testcase.sys_argv))
174 patcher.start()
175 testcase.addCleanup(patcher.stop)
178 def patch_system_interfaces(testcase):
179 """ Patch system interfaces that are disruptive to the test runner. """
180 patch_stdout(testcase)
181 patch_stderr(testcase)
182 patch_sys_exit(testcase)
183 patch_sys_argv(testcase)
186 def patch_time_time(testcase, values=None):
187 """ Patch the `time.time` function for the specified test case.
189 :param testcase: The `TestCase` instance for binding to the patch.
190 :param values: An iterable to provide return values.
191 :return: None.
194 if values is None:
195 values = itertools.count()
197 def generator_fake_time():
198 while True:
199 yield next(values)
201 func_patcher = mock.patch.object(time, "time", autospec=True)
202 func_patcher.start()
203 testcase.addCleanup(func_patcher.stop)
205 time.time.side_effect = generator_fake_time()
208 def patch_os_environ(testcase):
209 """ Patch the `os.environ` mapping. """
210 if not hasattr(testcase, 'os_environ'):
211 testcase.os_environ = {}
212 patcher = mock.patch.object(os, "environ", new=testcase.os_environ)
213 patcher.start()
214 testcase.addCleanup(patcher.stop)
217 def patch_os_getpid(testcase):
218 """ Patch `os.getpid` for the specified test case. """
219 func_patcher = mock.patch.object(os, "getpid", autospec=True)
220 func_patcher.start()
221 testcase.addCleanup(func_patcher.stop)
224 def patch_os_getuid(testcase):
225 """ Patch the `os.getuid` function. """
226 if not hasattr(testcase, 'os_getuid_return_value'):
227 testcase.os_getuid_return_value = testcase.getUniqueInteger()
228 func_patcher = mock.patch.object(
229 os, "getuid", autospec=True,
230 return_value=testcase.os_getuid_return_value)
231 func_patcher.start()
232 testcase.addCleanup(func_patcher.stop)
235 PasswdEntry = collections.namedtuple(
236 "PasswdEntry",
237 "pw_name pw_passwd pw_uid pw_gid pw_gecos pw_dir pw_shell")
240 def patch_pwd_getpwuid(testcase):
241 """ Patch the `pwd.getpwuid` function. """
242 if not hasattr(testcase, 'pwd_getpwuid_return_value'):
243 testcase.pwd_getpwuid_return_value = PasswdEntry(
244 pw_name=make_unique_slug(testcase),
245 pw_passwd=make_unique_slug(testcase),
246 pw_uid=testcase.getUniqueInteger(),
247 pw_gid=testcase.getUniqueInteger(),
248 pw_gecos=testcase.getUniqueString(),
249 pw_dir=tempfile.mktemp(),
250 pw_shell=tempfile.mktemp())
251 if not isinstance(testcase.pwd_getpwuid_return_value, pwd.struct_passwd):
252 pwent = pwd.struct_passwd(testcase.pwd_getpwuid_return_value)
253 else:
254 pwent = testcase.pwd_getpwuid_return_value
255 func_patcher = mock.patch.object(
256 pwd, "getpwuid", autospec=True,
257 return_value=pwent)
258 func_patcher.start()
259 testcase.addCleanup(func_patcher.stop)
261 if pwd is NotImplemented:
262 # The ‘pwd’ module is not available on some platforms.
263 del patch_pwd_getpwuid
266 def patch_os_path_exists(testcase):
267 """ Patch `os.path.exists` behaviour for this test case.
269 When the patched function is called, the registry of
270 `FileDouble` instances for this test case will be used to get
271 the instance for the path specified.
274 orig_os_path_exists = os.path.exists
276 def fake_os_path_exists(path):
277 registry = FileDouble.get_registry_for_testcase(testcase)
278 if path in registry:
279 file_double = registry[path]
280 result = file_double.os_path_exists_scenario.call_hook()
281 else:
282 result = orig_os_path_exists(path)
283 return result
285 func_patcher = mock.patch.object(
286 os.path, "exists", autospec=True,
287 side_effect=fake_os_path_exists)
288 func_patcher.start()
289 testcase.addCleanup(func_patcher.stop)
292 def patch_os_access(testcase):
293 """ Patch `os.access` behaviour for this test case.
295 When the patched function is called, the registry of
296 `FileDouble` instances for this test case will be used to get
297 the instance for the path specified.
300 orig_os_access = os.access
302 def fake_os_access(path, mode):
303 registry = FileDouble.get_registry_for_testcase(testcase)
304 if path in registry:
305 file_double = registry[path]
306 result = file_double.os_access_scenario.call_hook(mode)
307 else:
308 result = orig_os_access(path, mode)
309 return result
311 func_patcher = mock.patch.object(
312 os, "access", autospec=True,
313 side_effect=fake_os_access)
314 func_patcher.start()
315 testcase.addCleanup(func_patcher.stop)
318 StatResult = collections.namedtuple(
319 'StatResult', [
320 'st_mode',
321 'st_ino', 'st_dev', 'st_nlink',
322 'st_uid', 'st_gid',
323 'st_size',
324 'st_atime', 'st_mtime', 'st_ctime',
328 def patch_os_stat(testcase):
329 """ Patch `os.stat` behaviour for this test case.
331 When the patched function is called, the registry of
332 `FileDouble` instances for this test case will be used to get
333 the instance for the path specified.
336 orig_os_stat = os.stat
338 def fake_os_stat(path):
339 registry = FileDouble.get_registry_for_testcase(testcase)
340 if path in registry:
341 file_double = registry[path]
342 result = file_double.os_stat_scenario.call_hook()
343 else:
344 result = orig_os_stat(path)
345 return result
347 func_patcher = mock.patch.object(
348 os, "stat", autospec=True,
349 side_effect=fake_os_stat)
350 func_patcher.start()
351 testcase.addCleanup(func_patcher.stop)
354 def patch_os_lstat(testcase):
355 """ Patch `os.lstat` behaviour for this test case.
357 When the patched function is called, the registry of
358 `FileDouble` instances for this test case will be used to get
359 the instance for the path specified.
362 orig_os_lstat = os.lstat
364 def fake_os_lstat(path):
365 registry = FileDouble.get_registry_for_testcase(testcase)
366 if path in registry:
367 file_double = registry[path]
368 result = file_double.os_lstat_scenario.call_hook()
369 else:
370 result = orig_os_lstat(path)
371 return result
373 func_patcher = mock.patch.object(
374 os, "lstat", autospec=True,
375 side_effect=fake_os_lstat)
376 func_patcher.start()
377 testcase.addCleanup(func_patcher.stop)
380 def patch_builtins_open(testcase):
381 """ Patch `builtins.open` behaviour for this test case.
383 :param testcase: The `TestCase` instance for binding to the patch.
384 :return: None.
386 When the patched function is called, the registry of
387 `FileDouble` instances for this test case will be used to get
388 the instance for the path specified.
391 orig_open = builtins.open
393 def fake_open(path, mode='rt', buffering=-1):
394 registry = FileDouble.get_registry_for_testcase(testcase)
395 if path in registry:
396 file_double = registry[path]
397 result = file_double.builtins_open_scenario.call_hook(
398 mode, buffering)
399 else:
400 result = orig_open(path, mode, buffering)
401 return result
403 mock_open = mock.mock_open()
404 mock_open.side_effect = fake_open
406 func_patcher = mock.patch.object(builtins, "open", new=mock_open)
407 func_patcher.start()
408 testcase.addCleanup(func_patcher.stop)
411 def patch_os_unlink(testcase):
412 """ Patch `os.unlink` behaviour for this test case.
414 When the patched function is called, the registry of
415 `FileDouble` instances for this test case will be used to get
416 the instance for the path specified.
419 orig_os_unlink = os.unlink
421 def fake_os_unlink(path):
422 registry = FileDouble.get_registry_for_testcase(testcase)
423 if path in registry:
424 file_double = registry[path]
425 result = file_double.os_unlink_scenario.call_hook()
426 else:
427 result = orig_os_unlink(path)
428 return result
430 func_patcher = mock.patch.object(
431 os, "unlink", autospec=True,
432 side_effect=fake_os_unlink)
433 func_patcher.start()
434 testcase.addCleanup(func_patcher.stop)
437 def patch_os_rmdir(testcase):
438 """ Patch `os.rmdir` behaviour for this test case.
440 When the patched function is called, the registry of
441 `FileDouble` instances for this test case will be used to get
442 the instance for the path specified.
445 orig_os_rmdir = os.rmdir
447 def fake_os_rmdir(path):
448 registry = FileDouble.get_registry_for_testcase(testcase)
449 if path in registry:
450 file_double = registry[path]
451 result = file_double.os_rmdir_scenario.call_hook()
452 else:
453 result = orig_os_rmdir(path)
454 return result
456 func_patcher = mock.patch.object(
457 os, "rmdir", autospec=True,
458 side_effect=fake_os_rmdir)
459 func_patcher.start()
460 testcase.addCleanup(func_patcher.stop)
463 def patch_shutil_rmtree(testcase):
464 """ Patch `shutil.rmtree` behaviour for this test case.
466 When the patched function is called, the registry of
467 `FileDouble` instances for this test case will be used to get
468 the instance for the path specified.
471 orig_shutil_rmtree = os.rmdir
473 def fake_shutil_rmtree(path, ignore_errors=False, onerror=None):
474 registry = FileDouble.get_registry_for_testcase(testcase)
475 if path in registry:
476 file_double = registry[path]
477 result = file_double.shutil_rmtree_scenario.call_hook()
478 else:
479 result = orig_shutil_rmtree(path)
480 return result
482 func_patcher = mock.patch.object(
483 shutil, "rmtree", autospec=True,
484 side_effect=fake_shutil_rmtree)
485 func_patcher.start()
486 testcase.addCleanup(func_patcher.stop)
489 def patch_tempfile_mkdtemp(testcase):
490 """ Patch the `tempfile.mkdtemp` function for this test case. """
491 if not hasattr(testcase, 'tempfile_mkdtemp_file_double'):
492 testcase.tempfile_mkdtemp_file_double = FileDouble(tempfile.mktemp())
494 double = testcase.tempfile_mkdtemp_file_double
495 double.set_os_unlink_scenario('okay')
496 double.set_os_rmdir_scenario('okay')
497 double.register_for_testcase(testcase)
499 func_patcher = mock.patch.object(tempfile, "mkdtemp", autospec=True)
500 func_patcher.start()
501 testcase.addCleanup(func_patcher.stop)
503 tempfile.mkdtemp.return_value = testcase.tempfile_mkdtemp_file_double.path
506 try:
507 FileNotFoundError
508 FileExistsError
509 PermissionError
510 except NameError:
511 # Python 2 uses IOError.
512 def _ensure_ioerror_args(init_args, init_kwargs, errno_value):
513 result_kwargs = init_kwargs
514 result_errno = errno_value
515 result_strerror = os.strerror(errno_value)
516 result_filename = None
517 if len(init_args) >= 3:
518 result_errno = init_args[0]
519 result_filename = init_args[2]
520 if 'errno' in init_kwargs:
521 result_errno = init_kwargs['errno']
522 del result_kwargs['errno']
523 if 'filename' in init_kwargs:
524 result_filename = init_kwargs['filename']
525 del result_kwargs['filename']
526 if len(init_args) >= 2:
527 result_strerror = init_args[1]
528 if 'strerror' in init_kwargs:
529 result_strerror = init_kwargs['strerror']
530 del result_kwargs['strerror']
531 result_args = (result_errno, result_strerror, result_filename)
532 return (result_args, result_kwargs)
534 class FileNotFoundError(IOError):
535 def __init__(self, *args, **kwargs):
536 (args, kwargs) = _ensure_ioerror_args(
537 args, kwargs, errno_value=errno.ENOENT)
538 super(FileNotFoundError, self).__init__(*args, **kwargs)
540 class FileExistsError(IOError):
541 def __init__(self, *args, **kwargs):
542 (args, kwargs) = _ensure_ioerror_args(
543 args, kwargs, errno_value=errno.EEXIST)
544 super(FileExistsError, self).__init__(*args, **kwargs)
546 class PermissionError(IOError):
547 def __init__(self, *args, **kwargs):
548 (args, kwargs) = _ensure_ioerror_args(
549 args, kwargs, errno_value=errno.EPERM)
550 super(PermissionError, self).__init__(*args, **kwargs)
553 def make_fake_file_scenarios(path=None):
554 """ Make a collection of scenarios for testing with fake files.
556 :path: The filesystem path of the fake file. If not specified,
557 a valid random path will be generated.
558 :return: A collection of scenarios for tests involving input files.
560 The collection is a mapping from scenario name to a dictionary of
561 scenario attributes.
565 if path is None:
566 file_path = tempfile.mktemp()
567 else:
568 file_path = path
570 fake_file_empty = StringIO()
571 fake_file_minimal = StringIO("Lorem ipsum.")
572 fake_file_large = StringIO("\n".join(
573 "ABCDEFGH" * 100
574 for __ in range(1000)))
576 default_scenario_params = {
577 'open_scenario_name': 'okay',
578 'file_double_params': dict(
579 path=file_path, fake_file=fake_file_minimal),
582 scenarios = {
583 'default': {},
584 'error-not-exist': {
585 'open_scenario_name': 'nonexist',
587 'error-exist': {
588 'open_scenario_name': 'exist_error',
590 'error-read-denied': {
591 'open_scenario_name': 'read_denied',
593 'not-found': {
594 'file_double_params': dict(
595 path=file_path, fake_file=fake_file_empty),
597 'exist-empty': {
598 'file_double_params': dict(
599 path=file_path, fake_file=fake_file_empty),
601 'exist-minimal': {
602 'file_double_params': dict(
603 path=file_path, fake_file=fake_file_minimal),
605 'exist-large': {
606 'file_double_params': dict(
607 path=file_path, fake_file=fake_file_large),
611 for (name, scenario) in scenarios.items():
612 params = default_scenario_params.copy()
613 params.update(scenario)
614 scenario.update(params)
615 scenario['file_double'] = FileDouble(**scenario['file_double_params'])
616 scenario['file_double'].set_open_scenario(params['open_scenario_name'])
617 scenario['fake_file_scenario_name'] = name
619 return scenarios
622 def get_file_doubles_from_fake_file_scenarios(scenarios):
623 """ Get the `FileDouble` instances from fake file scenarios.
625 :param scenarios: Collection of fake file scenarios.
626 :return: Collection of `FileDouble` instances.
629 doubles = set(
630 scenario['file_double']
631 for scenario in scenarios
632 if scenario['file_double'] is not None)
634 return doubles
637 def setup_file_double_behaviour(testcase, doubles=None):
638 """ Set up file double instances and behaviour.
640 :param testcase: The `TestCase` instance to modify.
641 :param doubles: Collection of `FileDouble` instances.
642 :return: None.
644 If `doubles` is ``None``, a default collection will be made
645 from the result of `make_fake_file_scenarios` result.
648 if doubles is None:
649 scenarios = make_fake_file_scenarios()
650 doubles = get_file_doubles_from_fake_file_scenarios(
651 scenarios.values())
653 for file_double in doubles:
654 file_double.register_for_testcase(testcase)
656 patch_builtins_open(testcase)
659 def setup_fake_file_fixtures(testcase):
660 """ Set up fixtures for fake file doubles.
662 :param testcase: The `TestCase` instance to modify.
663 :return: None.
666 scenarios = make_fake_file_scenarios()
667 testcase.fake_file_scenarios = scenarios
669 file_doubles = get_file_doubles_from_fake_file_scenarios(
670 scenarios.values())
671 setup_file_double_behaviour(testcase, file_doubles)
674 def set_fake_file_scenario(testcase, name):
675 """ Set the named fake file scenario for the test case. """
676 scenario = testcase.fake_file_scenarios[name]
677 testcase.fake_file_scenario = scenario
678 testcase.file_double = scenario['file_double']
679 testcase.file_double.register_for_testcase(testcase)
682 class TestDoubleFunctionScenario:
683 """ Scenario for fake behaviour of a specific function. """
685 def __init__(self, scenario_name, double):
686 self.scenario_name = scenario_name
687 self.double = double
689 self.call_hook = getattr(
690 self, "_hook_{name}".format(name=self.scenario_name))
692 def __repr__(self):
693 text = (
694 "<{class_name} instance: {id}"
695 " name: {name!r},"
696 " call_hook name: {hook_name!r}"
697 " double: {double!r}"
698 ">").format(
699 class_name=self.__class__.__name__, id=id(self),
700 name=self.scenario_name, double=self.double,
701 hook_name=self.call_hook.__name__)
702 return text
704 def __eq__(self, other):
705 result = True
706 if not self.scenario_name == other.scenario_name:
707 result = False
708 if not self.double == other.double:
709 result = False
710 if not self.call_hook.__name__ == other.call_hook.__name__:
711 result = False
712 return result
714 def __ne__(self, other):
715 result = not self.__eq__(other)
716 return result
719 class os_path_exists_scenario(TestDoubleFunctionScenario):
720 """ Scenario for `os.path.exists` behaviour. """
722 def _hook_exist(self):
723 return True
725 def _hook_not_exist(self):
726 return False
729 class os_access_scenario(TestDoubleFunctionScenario):
730 """ Scenario for `os.access` behaviour. """
732 def _hook_okay(self, mode):
733 return True
735 def _hook_not_exist(self, mode):
736 return False
738 def _hook_read_only(self, mode):
739 if mode & (os.W_OK | os.X_OK):
740 result = False
741 else:
742 result = True
743 return result
745 def _hook_denied(self, mode):
746 if mode & (os.R_OK | os.W_OK | os.X_OK):
747 result = False
748 else:
749 result = True
750 return result
753 class os_stat_scenario(TestDoubleFunctionScenario):
754 """ Scenario for `os.stat` behaviour. """
756 def _hook_okay(self):
757 return self.double.stat_result
759 def _hook_notfound_error(self):
760 raise FileNotFoundError(
761 self.double.path,
762 "No such file or directory: {path!r}".format(
763 path=self.double.path))
765 def _hook_denied_error(self):
766 raise PermissionError(
767 self.double.path,
768 "Permission denied")
771 class os_lstat_scenario(os_stat_scenario):
772 """ Scenario for `os.lstat` behaviour. """
775 class os_unlink_scenario(TestDoubleFunctionScenario):
776 """ Scenario for `os.unlink` behaviour. """
778 def _hook_okay(self):
779 return None
781 def _hook_nonexist(self):
782 error = FileNotFoundError(
783 self.double.path,
784 "No such file or directory: {path!r}".format(
785 path=self.double.path))
786 raise error
788 def _hook_denied(self):
789 error = PermissionError(
790 self.double.path,
791 "Permission denied")
792 raise error
795 class os_rmdir_scenario(TestDoubleFunctionScenario):
796 """ Scenario for `os.rmdir` behaviour. """
798 def _hook_okay(self):
799 return None
801 def _hook_nonexist(self):
802 error = FileNotFoundError(
803 self.double.path,
804 "No such file or directory: {path!r}".format(
805 path=self.double.path))
806 raise error
808 def _hook_denied(self):
809 error = PermissionError(
810 self.double.path,
811 "Permission denied")
812 raise error
815 class shutil_rmtree_scenario(TestDoubleFunctionScenario):
816 """ Scenario for `shutil.rmtree` behaviour. """
818 def _hook_okay(self):
819 return None
821 def _hook_nonexist(self):
822 error = FileNotFoundError(
823 self.double.path,
824 "No such file or directory: {path!r}".format(
825 path=self.double.path))
826 raise error
828 def _hook_denied(self):
829 error = PermissionError(
830 self.double.path,
831 "Permission denied")
832 raise error
835 class builtins_open_scenario(TestDoubleFunctionScenario):
836 """ Scenario for `builtins.open` behaviour. """
838 def _hook_okay(self, mode, buffering):
839 result = self.double.fake_file
840 return result
842 def _hook_nonexist(self, mode, buffering):
843 if mode.startswith('r'):
844 error = FileNotFoundError(
845 self.double.path,
846 "No such file or directory: {path!r}".format(
847 path=self.double.path))
848 raise error
849 result = self.double.fake_file
850 return result
852 def _hook_exist_error(self, mode, buffering):
853 if mode.startswith('w') or mode.startswith('a'):
854 error = FileExistsError(
855 self.double.path,
856 "File already exists: {path!r}".format(
857 path=self.double.path))
858 raise error
859 result = self.double.fake_file
860 return result
862 def _hook_read_denied(self, mode, buffering):
863 if mode.startswith('r'):
864 error = PermissionError(
865 self.double.path,
866 "Read denied on {path!r}".format(
867 path=self.double.path))
868 raise error
869 result = self.double.fake_file
870 return result
872 def _hook_write_denied(self, mode, buffering):
873 if mode.startswith('w') or mode.startswith('a'):
874 error = PermissionError(
875 self.double.path,
876 "Write denied on {path!r}".format(
877 path=self.double.path))
878 raise error
879 result = self.double.fake_file
880 return result
883 class TestDoubleWithRegistry:
884 """ Abstract base class for a test double with a test case registry. """
886 registry_class = NotImplemented
887 registries = NotImplemented
889 function_scenario_params_by_class = NotImplemented
891 def __new__(cls, *args, **kwargs):
892 superclass = super(TestDoubleWithRegistry, cls)
893 if superclass.__new__ is object.__new__:
894 # The ‘object’ implementation complains about extra arguments.
895 instance = superclass.__new__(cls)
896 else:
897 instance = superclass.__new__(cls, *args, **kwargs)
898 instance.make_set_scenario_methods()
900 return instance
902 def __init__(self, *args, **kwargs):
903 super(TestDoubleWithRegistry, self).__init__(*args, **kwargs)
904 self._set_method_per_scenario()
906 def _make_set_scenario_method(self, scenario_class, params):
907 def method(self, name):
908 scenario = scenario_class(name, double=self)
909 setattr(self, scenario_class.__name__, scenario)
910 method.__doc__ = (
911 """ Set the scenario for `{name}` behaviour. """
912 ).format(name=scenario_class.__name__)
913 method.__name__ = str(params['set_scenario_method_name'])
914 return method
916 def make_set_scenario_methods(self):
917 """ Make `set_<scenario_class_name>` methods on this class. """
918 for (function_scenario_class, function_scenario_params) in (
919 self.function_scenario_params_by_class.items()):
920 method = self._make_set_scenario_method(
921 function_scenario_class, function_scenario_params)
922 setattr(self.__class__, method.__name__, method)
923 function_scenario_params['set_scenario_method'] = method
925 def _set_method_per_scenario(self):
926 """ Set the method to be called for each scenario. """
927 for function_scenario_params in (
928 self.function_scenario_params_by_class.values()):
929 function_scenario_params['set_scenario_method'](
930 self, function_scenario_params['default_scenario_name'])
932 @classmethod
933 def get_registry_for_testcase(cls, testcase):
934 """ Get the FileDouble registry for the specified test case. """
935 # Key in a dict must be hashable.
936 key = (testcase.__class__, id(testcase))
937 registry = cls.registries.setdefault(key, cls.registry_class())
938 return registry
940 def get_registry_key(self):
941 """ Get the registry key for this double. """
942 raise NotImplementedError
944 def register_for_testcase(self, testcase):
945 """ Add this instance to registry for the specified testcase. """
946 registry = self.get_registry_for_testcase(testcase)
947 key = self.get_registry_key()
948 registry[key] = self
949 unregister_func = functools.partial(
950 self.unregister_for_testcase, testcase)
951 testcase.addCleanup(unregister_func)
953 def unregister_for_testcase(self, testcase):
954 """ Remove this instance from registry for the specified testcase. """
955 registry = self.get_registry_for_testcase(testcase)
956 key = self.get_registry_key()
957 if key in registry:
958 registry.pop(key)
961 def copy_fake_file(fake_file):
962 """ Make a copy of the StringIO instance. """
963 fake_file_type = StringIO
964 content = ""
965 if fake_file is not None:
966 fake_file_type = type(fake_file)
967 content = fake_file.getvalue()
968 assert issubclass(fake_file_type, object)
969 result = fake_file_type(content)
970 if hasattr(fake_file, 'encoding'):
971 if not hasattr(result, 'encoding'):
972 result.encoding = fake_file.encoding
973 return result
976 class FileDouble(TestDoubleWithRegistry):
977 """ A testing double for a file. """
979 registry_class = dict
980 registries = {}
982 function_scenario_params_by_class = {
983 os_path_exists_scenario: {
984 'default_scenario_name': 'not_exist',
985 'set_scenario_method_name': 'set_os_path_exists_scenario',
987 os_access_scenario: {
988 'default_scenario_name': 'okay',
989 'set_scenario_method_name': 'set_os_access_scenario',
991 os_stat_scenario: {
992 'default_scenario_name': 'okay',
993 'set_scenario_method_name': 'set_os_stat_scenario',
995 os_lstat_scenario: {
996 'default_scenario_name': 'okay',
997 'set_scenario_method_name': 'set_os_lstat_scenario',
999 builtins_open_scenario: {
1000 'default_scenario_name': 'okay',
1001 'set_scenario_method_name': 'set_open_scenario',
1003 os_unlink_scenario: {
1004 'default_scenario_name': 'okay',
1005 'set_scenario_method_name': 'set_os_unlink_scenario',
1007 os_rmdir_scenario: {
1008 'default_scenario_name': 'okay',
1009 'set_scenario_method_name': 'set_os_rmdir_scenario',
1011 shutil_rmtree_scenario: {
1012 'default_scenario_name': 'okay',
1013 'set_scenario_method_name': 'set_shutil_rmtree_scenario',
1017 def __init__(self, path=None, fake_file=None, *args, **kwargs):
1018 self.path = path
1019 self.fake_file = copy_fake_file(fake_file)
1020 self.fake_file.name = path
1022 self._set_stat_result()
1024 super(FileDouble, self).__init__(*args, **kwargs)
1026 def _set_stat_result(self):
1027 """ Set the `os.stat` result for this file. """
1028 size = len(self.fake_file.getvalue())
1029 self.stat_result = StatResult(
1030 st_mode=0,
1031 st_ino=None, st_dev=None, st_nlink=None,
1032 st_uid=0, st_gid=0,
1033 st_size=size,
1034 st_atime=None, st_mtime=None, st_ctime=None,
1037 def __repr__(self):
1038 text = "FileDouble(path={path!r}, fake_file={fake_file!r})".format(
1039 path=self.path, fake_file=self.fake_file)
1040 return text
1042 def get_registry_key(self):
1043 """ Get the registry key for this double. """
1044 result = self.path
1045 return result
1048 class os_popen_scenario(TestDoubleFunctionScenario):
1049 """ Scenario for `os.popen` behaviour. """
1051 stream_name_by_mode = {
1052 'w': 'stdin',
1053 'r': 'stdout',
1056 def _hook_success(self, argv, mode, buffering):
1057 stream_name = self.stream_name_by_mode[mode]
1058 stream_double = getattr(
1059 self.double, stream_name + '_double')
1060 result = stream_double.fake_file
1061 return result
1063 def _hook_failure(self, argv, mode, buffering):
1064 result = StringIO()
1065 return result
1067 def _hook_not_found(self, argv, mode, buffering):
1068 result = StringIO()
1069 return result
1072 class os_waitpid_scenario(TestDoubleFunctionScenario):
1073 """ Scenario for `os.waitpid` behaviour. """
1075 def _hook_success(self, pid, options):
1076 result = (pid, EXIT_STATUS_SUCCESS)
1077 return result
1079 def _hook_failure(self, pid, options):
1080 result = (pid, EXIT_STATUS_FAILURE)
1081 return result
1083 def _hook_not_found(self, pid, options):
1084 error = OSError(errno.ECHILD)
1085 raise error
1088 class os_system_scenario(TestDoubleFunctionScenario):
1089 """ Scenario for `os.system` behaviour. """
1091 def _hook_success(self, command):
1092 result = EXIT_STATUS_SUCCESS
1093 return result
1095 def _hook_failure(self, command):
1096 result = EXIT_STATUS_FAILURE
1097 return result
1099 def _hook_not_found(self, command):
1100 result = EXIT_STATUS_COMMAND_NOT_FOUND
1101 return result
1104 class os_spawnv_scenario(TestDoubleFunctionScenario):
1105 """ Scenario for `os.spawnv` behaviour. """
1107 def _hook_success(self, mode, file, args):
1108 result = EXIT_STATUS_SUCCESS
1109 return result
1111 def _hook_failure(self, mode, file, args):
1112 result = EXIT_STATUS_FAILURE
1113 return result
1115 def _hook_not_found(self, mode, file, args):
1116 result = EXIT_STATUS_COMMAND_NOT_FOUND
1117 return result
1120 ARG_ANY = object()
1121 ARG_MORE = object()
1124 class PopenDouble:
1125 """ A testing double for `subprocess.Popen`. """
1127 def __init__(self, args, *posargs, **kwargs):
1128 self.stdin = None
1129 self.stdout = None
1130 self.stderr = None
1131 self.pid = None
1132 self.returncode = None
1134 if kwargs.get('shell', False):
1135 self.argv = shlex.split(args)
1136 else:
1137 # The paramter is already a sequence of command-line arguments.
1138 self.argv = args
1140 def set_streams(self, subprocess_double, popen_kwargs):
1141 """ Set the streams on the `PopenDouble`.
1143 :param subprocess_double: The `SubprocessDouble` from
1144 which to get existing stream doubles.
1145 :param popen_kwargs: The keyword arguments to the
1146 `subprocess.Popen` call.
1147 :return: ``None``.
1150 for stream_name in (
1151 name for name in ['stdin', 'stdout', 'stderr']
1152 if name in popen_kwargs):
1153 stream_spec = popen_kwargs[stream_name]
1154 if stream_spec is subprocess.PIPE:
1155 stream_double = getattr(
1156 subprocess_double,
1157 "{name}_double".format(name=stream_name))
1158 stream_file = stream_double.fake_file
1159 elif stream_spec is subprocess.STDOUT:
1160 stream_file = subprocess_double.stdout_double.fake_file
1161 else:
1162 stream_file = stream_spec
1163 setattr(self, stream_name, stream_file)
1165 def wait(self):
1166 """ Wait for subprocess to terminate. """
1167 return self.returncode
1170 class subprocess_popen_scenario(TestDoubleFunctionScenario):
1171 """ Scenario for `subprocess.Popen` behaviour. """
1173 def _hook_success(self, testcase, args, *posargs, **kwargs):
1174 double = self.double.popen_double
1175 double.set_streams(self.double, kwargs)
1176 return double
1179 def patch_subprocess_popen(testcase):
1180 """ Patch `subprocess.Popen` constructor for this test case.
1182 :param testcase: The `TestCase` instance to modify.
1183 :return: None.
1185 When the patched function is called, the registry of
1186 `SubprocessDouble` instances for this test case will be used
1187 to get the instance for the program path specified.
1190 orig_subprocess_popen = subprocess.Popen
1192 def fake_subprocess_popen(args, *posargs, **kwargs):
1193 if kwargs.get('shell', False):
1194 argv = shlex.split(args)
1195 else:
1196 argv = args
1197 registry = SubprocessDouble.get_registry_for_testcase(testcase)
1198 if argv in registry:
1199 subprocess_double = registry[argv]
1200 result = subprocess_double.subprocess_popen_scenario.call_hook(
1201 testcase, args, *posargs, **kwargs)
1202 else:
1203 result = orig_subprocess_popen(args, *posargs, **kwargs)
1204 return result
1206 func_patcher = mock.patch.object(
1207 subprocess, "Popen", autospec=True,
1208 side_effect=fake_subprocess_popen)
1209 func_patcher.start()
1210 testcase.addCleanup(func_patcher.stop)
1213 class subprocess_call_scenario(TestDoubleFunctionScenario):
1214 """ Scenario for `subprocess.call` behaviour. """
1216 def _hook_success(self, command):
1217 result = EXIT_STATUS_SUCCESS
1218 return result
1220 def _hook_failure(self, command):
1221 result = EXIT_STATUS_FAILURE
1222 return result
1224 def _hook_not_found(self, command):
1225 result = EXIT_STATUS_COMMAND_NOT_FOUND
1226 return result
1229 def patch_subprocess_call(testcase):
1230 """ Patch `subprocess.call` function for this test case.
1232 :param testcase: The `TestCase` instance to modify.
1233 :return: None.
1235 When the patched function is called, the registry of
1236 `SubprocessDouble` instances for this test case will be used
1237 to get the instance for the program path specified.
1240 orig_subprocess_call = subprocess.call
1242 def fake_subprocess_call(command, *posargs, **kwargs):
1243 if kwargs.get('shell', False):
1244 command_argv = shlex.split(command)
1245 else:
1246 command_argv = command
1247 registry = SubprocessDouble.get_registry_for_testcase(testcase)
1248 if command_argv in registry:
1249 subprocess_double = registry[command_argv]
1250 result = subprocess_double.subprocess_call_scenario.call_hook(
1251 command)
1252 else:
1253 result = orig_subprocess_call(command, *posargs, **kwargs)
1254 return result
1256 func_patcher = mock.patch.object(
1257 subprocess, "call", autospec=True,
1258 side_effect=fake_subprocess_call)
1259 func_patcher.start()
1260 testcase.addCleanup(func_patcher.stop)
1263 class subprocess_check_call_scenario(TestDoubleFunctionScenario):
1264 """ Scenario for `subprocess.check_call` behaviour. """
1266 def _hook_success(self, command):
1267 return None
1269 def _hook_failure(self, command):
1270 result = EXIT_STATUS_FAILURE
1271 error = subprocess.CalledProcessError(result, command)
1272 raise error
1274 def _hook_not_found(self, command):
1275 result = EXIT_STATUS_COMMAND_NOT_FOUND
1276 error = subprocess.CalledProcessError(result, command)
1277 raise error
1280 def patch_subprocess_check_call(testcase):
1281 """ Patch `subprocess.check_call` function for this test case.
1283 :param testcase: The `TestCase` instance to modify.
1284 :return: None.
1286 When the patched function is called, the registry of
1287 `SubprocessDouble` instances for this test case will be used
1288 to get the instance for the program path specified.
1291 orig_subprocess_check_call = subprocess.check_call
1293 def fake_subprocess_check_call(command, *posargs, **kwargs):
1294 if kwargs.get('shell', False):
1295 command_argv = shlex.split(command)
1296 else:
1297 command_argv = command
1298 registry = SubprocessDouble.get_registry_for_testcase(testcase)
1299 if command_argv in registry:
1300 subprocess_double = registry[command_argv]
1301 scenario = subprocess_double.subprocess_check_call_scenario
1302 result = scenario.call_hook(command)
1303 else:
1304 result = orig_subprocess_check_call(command, *posargs, **kwargs)
1305 return result
1307 func_patcher = mock.patch.object(
1308 subprocess, "check_call", autospec=True,
1309 side_effect=fake_subprocess_check_call)
1310 func_patcher.start()
1311 testcase.addCleanup(func_patcher.stop)
1314 class SubprocessDoubleRegistry(collections_abc.MutableMapping):
1315 """ Registry of `SubprocessDouble` instances by `argv`. """
1317 def __init__(self, *args, **kwargs):
1318 items = []
1319 if args:
1320 if isinstance(args[0], collections_abc.Mapping):
1321 items = args[0].items()
1322 if isinstance(args[0], collections_abc.Iterable):
1323 items = args[0]
1324 self._mapping = dict(items)
1326 def __repr__(self):
1327 text = "<{class_name} object: {mapping}>".format(
1328 class_name=self.__class__.__name__, mapping=self._mapping)
1329 return text
1331 def _match_argv(self, argv):
1332 """ Match the specified `argv` with our registered keys. """
1333 match = None
1334 if not isinstance(argv, collections_abc.Sequence):
1335 return match
1336 candidates = iter(self._mapping)
1337 while match is None:
1338 try:
1339 candidate = next(candidates)
1340 except StopIteration:
1341 break
1342 found = None
1343 if candidate == argv:
1344 # An exact match.
1345 found = True
1346 word_iter = enumerate(candidate)
1347 while found is None:
1348 try:
1349 (word_index, candidate_word) = next(word_iter)
1350 except StopIteration:
1351 break
1352 if candidate_word is ARG_MORE:
1353 # Candiate matches any remaining words. We have a match.
1354 found = True
1355 elif word_index > len(argv):
1356 # Candidate is too long for the specified argv.
1357 found = False
1358 elif candidate_word is ARG_ANY:
1359 # Candidate matches any word at this position.
1360 continue
1361 elif candidate_word == argv[word_index]:
1362 # Candidate matches the word at this position.
1363 continue
1364 else:
1365 # This candidate does not match.
1366 found = False
1367 if found is None:
1368 # Reached the end of the candidate without a mismatch.
1369 found = True
1370 if found:
1371 match = candidate
1372 return match
1374 def __getitem__(self, key):
1375 match = self._match_argv(key)
1376 if match is None:
1377 raise KeyError(key)
1378 result = self._mapping[match]
1379 return result
1381 def __setitem__(self, key, value):
1382 if key in self:
1383 del self[key]
1384 self._mapping[key] = value
1386 def __delitem__(self, key):
1387 match = self._match_argv(key)
1388 if match is not None:
1389 del self._mapping[match]
1391 def __iter__(self):
1392 return self._mapping.__iter__()
1394 def __len__(self):
1395 return self._mapping.__len__()
1398 class SubprocessDouble(TestDoubleWithRegistry):
1399 """ A testing double for a subprocess. """
1401 registry_class = SubprocessDoubleRegistry
1402 registries = {}
1404 double_by_pid = weakref.WeakValueDictionary()
1406 function_scenario_params_by_class = {
1407 subprocess_popen_scenario: {
1408 'default_scenario_name': 'success',
1409 'set_scenario_method_name': 'set_subprocess_popen_scenario',
1411 subprocess_call_scenario: {
1412 'default_scenario_name': 'success',
1413 'set_scenario_method_name': 'set_subprocess_call_scenario',
1415 subprocess_check_call_scenario: {
1416 'default_scenario_name': 'success',
1417 'set_scenario_method_name':
1418 'set_subprocess_check_call_scenario',
1420 os_popen_scenario: {
1421 'default_scenario_name': 'success',
1422 'set_scenario_method_name': 'set_os_popen_scenario',
1424 os_waitpid_scenario: {
1425 'default_scenario_name': 'success',
1426 'set_scenario_method_name': 'set_os_waitpid_scenario',
1428 os_system_scenario: {
1429 'default_scenario_name': 'success',
1430 'set_scenario_method_name': 'set_os_system_scenario',
1432 os_spawnv_scenario: {
1433 'default_scenario_name': 'success',
1434 'set_scenario_method_name': 'set_os_spawnv_scenario',
1438 def __init__(self, path=None, argv=None, *args, **kwargs):
1439 if path is None:
1440 path = tempfile.mktemp()
1441 self.path = path
1443 if argv is None:
1444 command_name = os.path.basename(path)
1445 argv = [command_name]
1446 self.argv = argv
1448 self.pid = self._make_pid()
1449 self._register_by_pid()
1451 self.set_popen_double()
1453 stream_class = SubprocessDouble.stream_class
1454 for stream_name in ['stdin', 'stdout', 'stderr']:
1455 fake_file = stream_class()
1456 file_double = FileDouble(fake_file=fake_file)
1457 stream_double_name = '{name}_double'.format(name=stream_name)
1458 setattr(self, stream_double_name, file_double)
1460 super(SubprocessDouble, self).__init__(*args, **kwargs)
1462 def set_popen_double(self):
1463 """ Set the `PopenDouble` for this instance. """
1464 double = PopenDouble(self.argv)
1465 double.pid = self.pid
1467 self.popen_double = double
1469 def __repr__(self):
1470 text = (
1471 "<SubprocessDouble instance: {id}"
1472 " path: {path!r},"
1473 " argv: {argv!r}"
1474 " stdin_double: {stdin_double!r}"
1475 " stdout_double: {stdout_double!r}"
1476 " stderr_double: {stderr_double!r}"
1477 ">").format(
1478 id=id(self),
1479 path=self.path, argv=self.argv,
1480 stdin_double=self.stdin_double,
1481 stdout_double=self.stdout_double,
1482 stderr_double=self.stderr_double)
1483 return text
1485 @classmethod
1486 def _make_pid(cls):
1487 """ Make a unique PID for a subprocess. """
1488 for pid in itertools.count(1):
1489 yield pid
1491 def _register_by_pid(self):
1492 """ Register this subprocess by its PID. """
1493 self.__class__.double_by_pid[self.pid] = self
1495 def get_registry_key(self):
1496 """ Get the registry key for this double. """
1497 result = tuple(self.argv)
1498 return result
1500 stream_class = io.BytesIO
1501 stream_encoding = "utf-8"
1503 def set_stdin_content(self, text, bytes_encoding=stream_encoding):
1504 """ Set the content of the `stdin` stream for this double. """
1505 content = text.encode(bytes_encoding)
1506 fake_file = self.stream_class(content)
1507 self.stdin_double.fake_file = fake_file
1509 def set_stdout_content(self, text, bytes_encoding=stream_encoding):
1510 """ Set the content of the `stdout` stream for this double. """
1511 content = text.encode(bytes_encoding)
1512 fake_file = self.stream_class(content)
1513 self.stdout_double.fake_file = fake_file
1515 def set_stderr_content(self, text, bytes_encoding=stream_encoding):
1516 """ Set the content of the `stderr` stream for this double. """
1517 content = text.encode(bytes_encoding)
1518 fake_file = self.stream_class(content)
1519 self.stderr_double.fake_file = fake_file
1522 def make_fake_subprocess_scenarios(path=None):
1523 """ Make a collection of scenarios for testing with fake files.
1525 :path: The filesystem path of the fake program. If not specified,
1526 a valid random path will be generated.
1527 :return: A collection of scenarios for tests involving subprocesses.
1529 The collection is a mapping from scenario name to a dictionary of
1530 scenario attributes.
1533 if path is None:
1534 file_path = tempfile.mktemp()
1535 else:
1536 file_path = path
1538 default_scenario_params = {
1539 'return_value': EXIT_STATUS_SUCCESS,
1540 'program_path': file_path,
1541 'argv_after_command_name': [],
1544 scenarios = {
1545 'default': {},
1546 'not-found': {
1547 'return_value': EXIT_STATUS_COMMAND_NOT_FOUND,
1551 for (name, scenario) in scenarios.items():
1552 params = default_scenario_params.copy()
1553 params.update(scenario)
1554 scenario.update(params)
1555 program_path = params['program_path']
1556 program_name = os.path.basename(params['program_path'])
1557 argv = [program_name]
1558 argv.extend(params['argv_after_command_name'])
1559 subprocess_double_params = dict(
1560 path=program_path,
1561 argv=argv,
1563 subprocess_double = SubprocessDouble(**subprocess_double_params)
1564 scenario['subprocess_double'] = subprocess_double
1565 scenario['fake_file_scenario_name'] = name
1567 return scenarios
1570 def get_subprocess_doubles_from_fake_subprocess_scenarios(scenarios):
1571 """ Get the `SubprocessDouble` instances from fake subprocess scenarios.
1573 :param scenarios: Collection of fake subprocess scenarios.
1574 :return: Collection of `SubprocessDouble` instances.
1577 doubles = set(
1578 scenario['subprocess_double']
1579 for scenario in scenarios
1580 if scenario['subprocess_double'] is not None)
1582 return doubles
1585 def setup_subprocess_double_behaviour(testcase, doubles=None):
1586 """ Set up subprocess double instances and behaviour.
1588 :param testcase: The `TestCase` instance to modify.
1589 :param doubles: Collection of `SubprocessDouble` instances.
1590 :return: None.
1592 If `doubles` is ``None``, a default collection will be made
1593 from the return value of `make_fake_subprocess_scenarios`.
1596 if doubles is None:
1597 scenarios = make_fake_subprocess_scenarios()
1598 doubles = get_subprocess_doubles_from_fake_subprocess_scenarios(
1599 scenarios.values())
1601 for double in doubles:
1602 double.register_for_testcase(testcase)
1605 def setup_fake_subprocess_fixtures(testcase):
1606 """ Set up fixtures for fake subprocess doubles.
1608 :param testcase: The `TestCase` instance to modify.
1609 :return: None.
1612 scenarios = make_fake_subprocess_scenarios()
1613 testcase.fake_subprocess_scenarios = scenarios
1615 doubles = get_subprocess_doubles_from_fake_subprocess_scenarios(
1616 scenarios.values())
1617 setup_subprocess_double_behaviour(testcase, doubles)
1620 def patch_os_popen(testcase):
1621 """ Patch `os.popen` behaviour for this test case.
1623 :param testcase: The `TestCase` instance to modify.
1624 :return: None.
1626 When the patched function is called, the registry of
1627 `SubprocessDouble` instances for this test case will be used
1628 to get the instance for the program path specified.
1631 orig_os_popen = os.popen
1633 def fake_os_popen(cmd, mode='r', buffering=-1):
1634 registry = SubprocessDouble.get_registry_for_testcase(testcase)
1635 if isinstance(cmd, basestring):
1636 command_argv = shlex.split(cmd)
1637 else:
1638 command_argv = cmd
1639 if command_argv in registry:
1640 subprocess_double = registry[command_argv]
1641 result = subprocess_double.os_popen_scenario.call_hook(
1642 command_argv, mode, buffering)
1643 else:
1644 result = orig_os_popen(cmd, mode, buffering)
1645 return result
1647 func_patcher = mock.patch.object(
1648 os, "popen", autospec=True,
1649 side_effect=fake_os_popen)
1650 func_patcher.start()
1651 testcase.addCleanup(func_patcher.stop)
1654 def patch_os_waitpid(testcase):
1655 """ Patch `os.waitpid` behaviour for this test case.
1657 :param testcase: The `TestCase` instance to modify.
1658 :return: None.
1660 When the patched function is called, the registry of
1661 `SubprocessDouble` instances for this test case will be used
1662 to get the instance for the program path specified.
1665 orig_os_waitpid = os.waitpid
1667 def fake_os_waitpid(pid, options):
1668 registry = SubprocessDouble.double_by_pid
1669 if pid in registry:
1670 subprocess_double = registry[pid]
1671 result = subprocess_double.os_waitpid_scenario.call_hook(
1672 pid, options)
1673 else:
1674 result = orig_os_waitpid(pid, options)
1675 return result
1677 func_patcher = mock.patch.object(
1678 os, "waitpid", autospec=True,
1679 side_effect=fake_os_waitpid)
1680 func_patcher.start()
1681 testcase.addCleanup(func_patcher.stop)
1684 def patch_os_system(testcase):
1685 """ Patch `os.system` behaviour for this test case.
1687 :param testcase: The `TestCase` instance to modify.
1688 :return: None.
1690 When the patched function is called, the registry of
1691 `SubprocessDouble` instances for this test case will be used
1692 to get the instance for the program path specified.
1695 orig_os_system = os.system
1697 def fake_os_system(command):
1698 registry = SubprocessDouble.get_registry_for_testcase(testcase)
1699 command_argv = shlex.split(command)
1700 if command_argv in registry:
1701 subprocess_double = registry[command_argv]
1702 result = subprocess_double.os_system_scenario.call_hook(
1703 command)
1704 else:
1705 result = orig_os_system(command)
1706 return result
1708 func_patcher = mock.patch.object(
1709 os, "system", autospec=True,
1710 side_effect=fake_os_system)
1711 func_patcher.start()
1712 testcase.addCleanup(func_patcher.stop)
1715 def patch_os_spawnv(testcase):
1716 """ Patch `os.spawnv` behaviour for this test case.
1718 :param testcase: The `TestCase` instance to modify.
1719 :return: None.
1721 When the patched function is called, the registry of
1722 `SubprocessDouble` instances for this test case will be used
1723 to get the instance for the program path specified.
1726 orig_os_spawnv = os.spawnv
1728 def fake_os_spawnv(mode, file, args):
1729 registry = SubprocessDouble.get_registry_for_testcase(testcase)
1730 registry_key = tuple(args)
1731 if registry_key in registry:
1732 subprocess_double = registry[registry_key]
1733 result = subprocess_double.os_spawnv_scenario.call_hook(
1734 mode, file, args)
1735 else:
1736 result = orig_os_spawnv(mode, file, args)
1737 return result
1739 func_patcher = mock.patch.object(
1740 os, "spawnv", autospec=True,
1741 side_effect=fake_os_spawnv)
1742 func_patcher.start()
1743 testcase.addCleanup(func_patcher.stop)
1746 # Local variables:
1747 # coding: utf-8
1748 # mode: python
1749 # End:
1750 # vim: fileencoding=utf-8 filetype=python :