1 # -*- coding: utf-8; -*-
4 # Part of Gajja, a Python test double library.
6 # Copyright © 2015–2016 Ben Finney <ben+python@benfinney.id.au>
8 # This is free software: you may copy, modify, and/or distribute this work
9 # under the terms of the GNU General Public License as published by the
10 # Free Software Foundation; version 3 of that license or any later version.
11 # No warranty expressed or implied. See the file ‘LICENSE.GPL-3’ for details.
13 """ Gajja: Fake objects for real tests.
15 The `gajja` library provides a system of Python test double classes
16 for specific system objects:
22 The Korean word 가짜 (*gajja*; IPA ˈkaːt͡ɕ̤a) means “fake thing”.
26 from __future__
import (absolute_import
, unicode_literals
)
30 if sys
.version_info
>= (3, 3):
33 import unittest
.mock
as mock
34 from io
import StringIO
as StringIO
36 import collections
.abc
as collections_abc
37 elif sys
.version_info
>= (3, 0):
38 raise RuntimeError("Python 3 earlier than 3.3 is not supported.")
39 elif sys
.version_info
>= (2, 7):
40 # Python 2 standard library.
41 import __builtin__
as builtins
42 # Third-party backport of Python 3 unittest improvements.
43 import unittest2
as unittest
44 # Third-party mock library.
46 # Python 2 standard library.
47 from StringIO
import StringIO
as BaseStringIO
48 import ConfigParser
as configparser
49 import collections
as collections_abc
51 raise RuntimeError("Python earlier than 2.7 is not supported.")
72 # The ‘pwd’ module is not available on platforms other than Unix.
75 __package__
= str("gajja")
76 __import__(__package__
)
85 # Alias for Python 3 types.
90 def make_unique_slug(testcase
):
91 """ Make a unique slug for the test case. """
92 text
= base64
.b64encode(
93 testcase
.getUniqueString().encode('utf-8')
102 # We don't yet have the StringIO we want. Create it.
104 class StringIO(BaseStringIO
, object):
105 """ StringIO with a context manager. """
110 def __exit__(self
, *args
):
124 def patch_stdout(testcase
):
125 """ Patch `sys.stdout` for the specified test case. """
126 patcher
= mock
.patch
.object(
127 sys
, "stdout", wraps
=StringIO())
129 testcase
.addCleanup(patcher
.stop
)
132 def patch_stderr(testcase
):
133 """ Patch `sys.stderr` for the specified test case. """
134 patcher
= mock
.patch
.object(
135 sys
, "stderr", wraps
=StringIO())
137 testcase
.addCleanup(patcher
.stop
)
140 def patch_signal_signal(testcase
):
141 """ Patch `signal.signal` for the specified test case. """
142 func_patcher
= mock
.patch
.object(signal
, "signal", autospec
=True)
144 testcase
.addCleanup(func_patcher
.stop
)
147 class FakeSystemExit(Exception):
148 """ Fake double for `SystemExit` exception. """
151 EXIT_STATUS_SUCCESS
= 0
152 EXIT_STATUS_FAILURE
= 1
153 EXIT_STATUS_COMMAND_NOT_FOUND
= 127
156 def patch_sys_exit(testcase
):
157 """ Patch `sys.exit` for the specified test case. """
158 func_patcher
= mock
.patch
.object(
159 sys
, "exit", autospec
=True,
160 side_effect
=FakeSystemExit())
162 testcase
.addCleanup(func_patcher
.stop
)
165 def patch_sys_argv(testcase
):
166 """ Patch the `sys.argv` sequence for the test case. """
167 if not hasattr(testcase
, 'progname'):
168 testcase
.progname
= make_unique_slug(testcase
)
169 if not hasattr(testcase
, 'sys_argv'):
170 testcase
.sys_argv
= [testcase
.progname
]
171 patcher
= mock
.patch
.object(
173 new
=list(testcase
.sys_argv
))
175 testcase
.addCleanup(patcher
.stop
)
178 def patch_system_interfaces(testcase
):
179 """ Patch system interfaces that are disruptive to the test runner. """
180 patch_stdout(testcase
)
181 patch_stderr(testcase
)
182 patch_sys_exit(testcase
)
183 patch_sys_argv(testcase
)
186 def patch_time_time(testcase
, values
=None):
187 """ Patch the `time.time` function for the specified test case.
189 :param testcase: The `TestCase` instance for binding to the patch.
190 :param values: An iterable to provide return values.
195 values
= itertools
.count()
197 def generator_fake_time():
201 func_patcher
= mock
.patch
.object(time
, "time", autospec
=True)
203 testcase
.addCleanup(func_patcher
.stop
)
205 time
.time
.side_effect
= generator_fake_time()
208 def patch_os_environ(testcase
):
209 """ Patch the `os.environ` mapping. """
210 if not hasattr(testcase
, 'os_environ'):
211 testcase
.os_environ
= {}
212 patcher
= mock
.patch
.object(os
, "environ", new
=testcase
.os_environ
)
214 testcase
.addCleanup(patcher
.stop
)
217 def patch_os_getpid(testcase
):
218 """ Patch `os.getpid` for the specified test case. """
219 func_patcher
= mock
.patch
.object(os
, "getpid", autospec
=True)
221 testcase
.addCleanup(func_patcher
.stop
)
224 def patch_os_getuid(testcase
):
225 """ Patch the `os.getuid` function. """
226 if not hasattr(testcase
, 'os_getuid_return_value'):
227 testcase
.os_getuid_return_value
= testcase
.getUniqueInteger()
228 func_patcher
= mock
.patch
.object(
229 os
, "getuid", autospec
=True,
230 return_value
=testcase
.os_getuid_return_value
)
232 testcase
.addCleanup(func_patcher
.stop
)
235 PasswdEntry
= collections
.namedtuple(
237 "pw_name pw_passwd pw_uid pw_gid pw_gecos pw_dir pw_shell")
240 def patch_pwd_getpwuid(testcase
):
241 """ Patch the `pwd.getpwuid` function. """
242 if not hasattr(testcase
, 'pwd_getpwuid_return_value'):
243 testcase
.pwd_getpwuid_return_value
= PasswdEntry(
244 pw_name
=make_unique_slug(testcase
),
245 pw_passwd
=make_unique_slug(testcase
),
246 pw_uid
=testcase
.getUniqueInteger(),
247 pw_gid
=testcase
.getUniqueInteger(),
248 pw_gecos
=testcase
.getUniqueString(),
249 pw_dir
=tempfile
.mktemp(),
250 pw_shell
=tempfile
.mktemp())
251 if not isinstance(testcase
.pwd_getpwuid_return_value
, pwd
.struct_passwd
):
252 pwent
= pwd
.struct_passwd(testcase
.pwd_getpwuid_return_value
)
254 pwent
= testcase
.pwd_getpwuid_return_value
255 func_patcher
= mock
.patch
.object(
256 pwd
, "getpwuid", autospec
=True,
259 testcase
.addCleanup(func_patcher
.stop
)
261 if pwd
is NotImplemented:
262 # The ‘pwd’ module is not available on some platforms.
263 del patch_pwd_getpwuid
266 def patch_os_path_exists(testcase
):
267 """ Patch `os.path.exists` behaviour for this test case.
269 When the patched function is called, the registry of
270 `FileDouble` instances for this test case will be used to get
271 the instance for the path specified.
274 orig_os_path_exists
= os
.path
.exists
276 def fake_os_path_exists(path
):
277 registry
= FileDouble
.get_registry_for_testcase(testcase
)
279 file_double
= registry
[path
]
280 result
= file_double
.os_path_exists_scenario
.call_hook()
282 result
= orig_os_path_exists(path
)
285 func_patcher
= mock
.patch
.object(
286 os
.path
, "exists", autospec
=True,
287 side_effect
=fake_os_path_exists
)
289 testcase
.addCleanup(func_patcher
.stop
)
292 def patch_os_access(testcase
):
293 """ Patch `os.access` behaviour for this test case.
295 When the patched function is called, the registry of
296 `FileDouble` instances for this test case will be used to get
297 the instance for the path specified.
300 orig_os_access
= os
.access
302 def fake_os_access(path
, mode
):
303 registry
= FileDouble
.get_registry_for_testcase(testcase
)
305 file_double
= registry
[path
]
306 result
= file_double
.os_access_scenario
.call_hook(mode
)
308 result
= orig_os_access(path
, mode
)
311 func_patcher
= mock
.patch
.object(
312 os
, "access", autospec
=True,
313 side_effect
=fake_os_access
)
315 testcase
.addCleanup(func_patcher
.stop
)
318 StatResult
= collections
.namedtuple(
321 'st_ino', 'st_dev', 'st_nlink',
324 'st_atime', 'st_mtime', 'st_ctime',
328 def patch_os_stat(testcase
):
329 """ Patch `os.stat` behaviour for this test case.
331 When the patched function is called, the registry of
332 `FileDouble` instances for this test case will be used to get
333 the instance for the path specified.
336 orig_os_stat
= os
.stat
338 def fake_os_stat(path
):
339 registry
= FileDouble
.get_registry_for_testcase(testcase
)
341 file_double
= registry
[path
]
342 result
= file_double
.os_stat_scenario
.call_hook()
344 result
= orig_os_stat(path
)
347 func_patcher
= mock
.patch
.object(
348 os
, "stat", autospec
=True,
349 side_effect
=fake_os_stat
)
351 testcase
.addCleanup(func_patcher
.stop
)
354 def patch_os_lstat(testcase
):
355 """ Patch `os.lstat` behaviour for this test case.
357 When the patched function is called, the registry of
358 `FileDouble` instances for this test case will be used to get
359 the instance for the path specified.
362 orig_os_lstat
= os
.lstat
364 def fake_os_lstat(path
):
365 registry
= FileDouble
.get_registry_for_testcase(testcase
)
367 file_double
= registry
[path
]
368 result
= file_double
.os_lstat_scenario
.call_hook()
370 result
= orig_os_lstat(path
)
373 func_patcher
= mock
.patch
.object(
374 os
, "lstat", autospec
=True,
375 side_effect
=fake_os_lstat
)
377 testcase
.addCleanup(func_patcher
.stop
)
380 def patch_os_unlink(testcase
):
381 """ Patch `os.unlink` behaviour for this test case.
383 When the patched function is called, the registry of
384 `FileDouble` instances for this test case will be used to get
385 the instance for the path specified.
388 orig_os_unlink
= os
.unlink
390 def fake_os_unlink(path
):
391 registry
= FileDouble
.get_registry_for_testcase(testcase
)
393 file_double
= registry
[path
]
394 result
= file_double
.os_unlink_scenario
.call_hook()
396 result
= orig_os_unlink(path
)
399 func_patcher
= mock
.patch
.object(
400 os
, "unlink", autospec
=True,
401 side_effect
=fake_os_unlink
)
403 testcase
.addCleanup(func_patcher
.stop
)
406 def patch_os_rmdir(testcase
):
407 """ Patch `os.rmdir` behaviour for this test case.
409 When the patched function is called, the registry of
410 `FileDouble` instances for this test case will be used to get
411 the instance for the path specified.
414 orig_os_rmdir
= os
.rmdir
416 def fake_os_rmdir(path
):
417 registry
= FileDouble
.get_registry_for_testcase(testcase
)
419 file_double
= registry
[path
]
420 result
= file_double
.os_rmdir_scenario
.call_hook()
422 result
= orig_os_rmdir(path
)
425 func_patcher
= mock
.patch
.object(
426 os
, "rmdir", autospec
=True,
427 side_effect
=fake_os_rmdir
)
429 testcase
.addCleanup(func_patcher
.stop
)
432 def patch_shutil_rmtree(testcase
):
433 """ Patch `shutil.rmtree` behaviour for this test case.
435 When the patched function is called, the registry of
436 `FileDouble` instances for this test case will be used to get
437 the instance for the path specified.
440 orig_shutil_rmtree
= os
.rmdir
442 def fake_shutil_rmtree(path
, ignore_errors
=False, onerror
=None):
443 registry
= FileDouble
.get_registry_for_testcase(testcase
)
445 file_double
= registry
[path
]
446 result
= file_double
.shutil_rmtree_scenario
.call_hook()
448 result
= orig_shutil_rmtree(path
)
451 func_patcher
= mock
.patch
.object(
452 shutil
, "rmtree", autospec
=True,
453 side_effect
=fake_shutil_rmtree
)
455 testcase
.addCleanup(func_patcher
.stop
)
458 def patch_tempfile_mkdtemp(testcase
):
459 """ Patch the `tempfile.mkdtemp` function for this test case. """
460 if not hasattr(testcase
, 'tempfile_mkdtemp_file_double'):
461 testcase
.tempfile_mkdtemp_file_double
= FileDouble(tempfile
.mktemp())
463 double
= testcase
.tempfile_mkdtemp_file_double
464 double
.set_os_unlink_scenario('okay')
465 double
.set_os_rmdir_scenario('okay')
466 double
.register_for_testcase(testcase
)
468 func_patcher
= mock
.patch
.object(tempfile
, "mkdtemp", autospec
=True)
470 testcase
.addCleanup(func_patcher
.stop
)
472 tempfile
.mkdtemp
.return_value
= testcase
.tempfile_mkdtemp_file_double
.path
480 # Python 2 uses IOError.
481 def _ensure_ioerror_args(init_args
, init_kwargs
, errno_value
):
482 result_kwargs
= init_kwargs
483 result_errno
= errno_value
484 result_strerror
= os
.strerror(errno_value
)
485 result_filename
= None
486 if len(init_args
) >= 3:
487 result_errno
= init_args
[0]
488 result_filename
= init_args
[2]
489 if 'errno' in init_kwargs
:
490 result_errno
= init_kwargs
['errno']
491 del result_kwargs
['errno']
492 if 'filename' in init_kwargs
:
493 result_filename
= init_kwargs
['filename']
494 del result_kwargs
['filename']
495 if len(init_args
) >= 2:
496 result_strerror
= init_args
[1]
497 if 'strerror' in init_kwargs
:
498 result_strerror
= init_kwargs
['strerror']
499 del result_kwargs
['strerror']
500 result_args
= (result_errno
, result_strerror
, result_filename
)
501 return (result_args
, result_kwargs
)
503 class FileNotFoundError(IOError):
504 def __init__(self
, *args
, **kwargs
):
505 (args
, kwargs
) = _ensure_ioerror_args(
506 args
, kwargs
, errno_value
=errno
.ENOENT
)
507 super(FileNotFoundError
, self
).__init
__(*args
, **kwargs
)
509 class FileExistsError(IOError):
510 def __init__(self
, *args
, **kwargs
):
511 (args
, kwargs
) = _ensure_ioerror_args(
512 args
, kwargs
, errno_value
=errno
.EEXIST
)
513 super(FileExistsError
, self
).__init
__(*args
, **kwargs
)
515 class PermissionError(IOError):
516 def __init__(self
, *args
, **kwargs
):
517 (args
, kwargs
) = _ensure_ioerror_args(
518 args
, kwargs
, errno_value
=errno
.EPERM
)
519 super(PermissionError
, self
).__init
__(*args
, **kwargs
)
522 def make_fake_file_scenarios(path
=None):
523 """ Make a collection of scenarios for testing with fake files.
525 :path: The filesystem path of the fake file. If not specified,
526 a valid random path will be generated.
527 :return: A collection of scenarios for tests involving input files.
529 The collection is a mapping from scenario name to a dictionary of
535 file_path
= tempfile
.mktemp()
539 fake_file_empty
= StringIO()
540 fake_file_minimal
= StringIO("Lorem ipsum.")
541 fake_file_large
= StringIO("\n".join(
543 for __
in range(1000)))
545 default_scenario_params
= {
546 'open_scenario_name': 'okay',
547 'file_double_params': dict(
548 path
=file_path
, fake_file
=fake_file_minimal
),
554 'open_scenario_name': 'nonexist',
557 'open_scenario_name': 'exist_error',
559 'error-read-denied': {
560 'open_scenario_name': 'read_denied',
563 'file_double_params': dict(
564 path
=file_path
, fake_file
=fake_file_empty
),
567 'file_double_params': dict(
568 path
=file_path
, fake_file
=fake_file_empty
),
571 'file_double_params': dict(
572 path
=file_path
, fake_file
=fake_file_minimal
),
575 'file_double_params': dict(
576 path
=file_path
, fake_file
=fake_file_large
),
580 for (name
, scenario
) in scenarios
.items():
581 params
= default_scenario_params
.copy()
582 params
.update(scenario
)
583 scenario
.update(params
)
584 scenario
['file_double'] = FileDouble(**scenario
['file_double_params'])
585 scenario
['file_double'].set_open_scenario(params
['open_scenario_name'])
586 scenario
['fake_file_scenario_name'] = name
591 def get_file_doubles_from_fake_file_scenarios(scenarios
):
592 """ Get the `FileDouble` instances from fake file scenarios.
594 :param scenarios: Collection of fake file scenarios.
595 :return: Collection of `FileDouble` instances.
599 scenario
['file_double']
600 for scenario
in scenarios
601 if scenario
['file_double'] is not None)
606 def setup_file_double_behaviour(testcase
, doubles
=None):
607 """ Set up file double instances and behaviour.
609 :param testcase: The `TestCase` instance to modify.
610 :param doubles: Collection of `FileDouble` instances.
613 If `doubles` is ``None``, a default collection will be made
614 from the result of `make_fake_file_scenarios` result.
618 scenarios
= make_fake_file_scenarios()
619 doubles
= get_file_doubles_from_fake_file_scenarios(
622 for file_double
in doubles
:
623 file_double
.register_for_testcase(testcase
)
625 orig_open
= builtins
.open
627 def fake_open(path
, mode
='rt', buffering
=-1):
628 registry
= FileDouble
.get_registry_for_testcase(testcase
)
630 file_double
= registry
[path
]
631 result
= file_double
.builtins_open_scenario
.call_hook(
634 result
= orig_open(path
, mode
, buffering
)
637 mock_open
= mock
.mock_open()
638 mock_open
.side_effect
= fake_open
640 func_patcher
= mock
.patch
.object(
641 builtins
, "open", new
=mock_open
)
643 testcase
.addCleanup(func_patcher
.stop
)
646 def setup_fake_file_fixtures(testcase
):
647 """ Set up fixtures for fake file doubles.
649 :param testcase: The `TestCase` instance to modify.
653 scenarios
= make_fake_file_scenarios()
654 testcase
.fake_file_scenarios
= scenarios
656 file_doubles
= get_file_doubles_from_fake_file_scenarios(
658 setup_file_double_behaviour(testcase
, file_doubles
)
661 def set_fake_file_scenario(testcase
, name
):
662 """ Set the named fake file scenario for the test case. """
663 scenario
= testcase
.fake_file_scenarios
[name
]
664 testcase
.fake_file_scenario
= scenario
665 testcase
.file_double
= scenario
['file_double']
666 testcase
.file_double
.register_for_testcase(testcase
)
669 class TestDoubleFunctionScenario
:
670 """ Scenario for fake behaviour of a specific function. """
672 def __init__(self
, scenario_name
, double
):
673 self
.scenario_name
= scenario_name
676 self
.call_hook
= getattr(
677 self
, "_hook_{name}".format(name
=self
.scenario_name
))
681 "<{class_name} instance: {id}"
683 " call_hook name: {hook_name!r}"
684 " double: {double!r}"
686 class_name
=self
.__class
__.__name
__, id=id(self
),
687 name
=self
.scenario_name
, double
=self
.double
,
688 hook_name
=self
.call_hook
.__name
__)
691 def __eq__(self
, other
):
693 if not self
.scenario_name
== other
.scenario_name
:
695 if not self
.double
== other
.double
:
697 if not self
.call_hook
.__name
__ == other
.call_hook
.__name
__:
701 def __ne__(self
, other
):
702 result
= not self
.__eq
__(other
)
706 class os_path_exists_scenario(TestDoubleFunctionScenario
):
707 """ Scenario for `os.path.exists` behaviour. """
709 def _hook_exist(self
):
712 def _hook_not_exist(self
):
716 class os_access_scenario(TestDoubleFunctionScenario
):
717 """ Scenario for `os.access` behaviour. """
719 def _hook_okay(self
, mode
):
722 def _hook_not_exist(self
, mode
):
725 def _hook_read_only(self
, mode
):
726 if mode
& (os
.W_OK | os
.X_OK
):
732 def _hook_denied(self
, mode
):
733 if mode
& (os
.R_OK | os
.W_OK | os
.X_OK
):
740 class os_stat_scenario(TestDoubleFunctionScenario
):
741 """ Scenario for `os.stat` behaviour. """
743 def _hook_okay(self
):
744 return self
.double
.stat_result
746 def _hook_notfound_error(self
):
747 raise FileNotFoundError(
749 "No such file or directory: {path!r}".format(
750 path
=self
.double
.path
))
752 def _hook_denied_error(self
):
753 raise PermissionError(
758 class os_lstat_scenario(os_stat_scenario
):
759 """ Scenario for `os.lstat` behaviour. """
762 class os_unlink_scenario(TestDoubleFunctionScenario
):
763 """ Scenario for `os.unlink` behaviour. """
765 def _hook_okay(self
):
768 def _hook_nonexist(self
):
769 error
= FileNotFoundError(
771 "No such file or directory: {path!r}".format(
772 path
=self
.double
.path
))
775 def _hook_denied(self
):
776 error
= PermissionError(
782 class os_rmdir_scenario(TestDoubleFunctionScenario
):
783 """ Scenario for `os.rmdir` behaviour. """
785 def _hook_okay(self
):
788 def _hook_nonexist(self
):
789 error
= FileNotFoundError(
791 "No such file or directory: {path!r}".format(
792 path
=self
.double
.path
))
795 def _hook_denied(self
):
796 error
= PermissionError(
802 class shutil_rmtree_scenario(TestDoubleFunctionScenario
):
803 """ Scenario for `shutil.rmtree` behaviour. """
805 def _hook_okay(self
):
808 def _hook_nonexist(self
):
809 error
= FileNotFoundError(
811 "No such file or directory: {path!r}".format(
812 path
=self
.double
.path
))
815 def _hook_denied(self
):
816 error
= PermissionError(
822 class builtins_open_scenario(TestDoubleFunctionScenario
):
823 """ Scenario for `builtins.open` behaviour. """
825 def _hook_okay(self
, mode
, buffering
):
826 result
= self
.double
.fake_file
829 def _hook_nonexist(self
, mode
, buffering
):
830 if mode
.startswith('r'):
831 error
= FileNotFoundError(
833 "No such file or directory: {path!r}".format(
834 path
=self
.double
.path
))
836 result
= self
.double
.fake_file
839 def _hook_exist_error(self
, mode
, buffering
):
840 if mode
.startswith('w') or mode
.startswith('a'):
841 error
= FileExistsError(
843 "File already exists: {path!r}".format(
844 path
=self
.double
.path
))
846 result
= self
.double
.fake_file
849 def _hook_read_denied(self
, mode
, buffering
):
850 if mode
.startswith('r'):
851 error
= PermissionError(
853 "Read denied on {path!r}".format(
854 path
=self
.double
.path
))
856 result
= self
.double
.fake_file
859 def _hook_write_denied(self
, mode
, buffering
):
860 if mode
.startswith('w') or mode
.startswith('a'):
861 error
= PermissionError(
863 "Write denied on {path!r}".format(
864 path
=self
.double
.path
))
866 result
= self
.double
.fake_file
870 class TestDoubleWithRegistry
:
871 """ Abstract base class for a test double with a test case registry. """
873 registry_class
= NotImplemented
874 registries
= NotImplemented
876 function_scenario_params_by_class
= NotImplemented
878 def __new__(cls
, *args
, **kwargs
):
879 superclass
= super(TestDoubleWithRegistry
, cls
)
880 if superclass
.__new
__ is object.__new
__:
881 # The ‘object’ implementation complains about extra arguments.
882 instance
= superclass
.__new
__(cls
)
884 instance
= superclass
.__new
__(cls
, *args
, **kwargs
)
885 instance
.make_set_scenario_methods()
889 def __init__(self
, *args
, **kwargs
):
890 super(TestDoubleWithRegistry
, self
).__init
__(*args
, **kwargs
)
891 self
._set
_method
_per
_scenario
()
893 def _make_set_scenario_method(self
, scenario_class
, params
):
894 def method(self
, name
):
895 scenario
= scenario_class(name
, double
=self
)
896 setattr(self
, scenario_class
.__name
__, scenario
)
898 """ Set the scenario for `{name}` behaviour. """
899 ).format(name
=scenario_class
.__name
__)
900 method
.__name
__ = str(params
['set_scenario_method_name'])
903 def make_set_scenario_methods(self
):
904 """ Make `set_<scenario_class_name>` methods on this class. """
905 for (function_scenario_class
, function_scenario_params
) in (
906 self
.function_scenario_params_by_class
.items()):
907 method
= self
._make
_set
_scenario
_method
(
908 function_scenario_class
, function_scenario_params
)
909 setattr(self
.__class
__, method
.__name
__, method
)
910 function_scenario_params
['set_scenario_method'] = method
912 def _set_method_per_scenario(self
):
913 """ Set the method to be called for each scenario. """
914 for function_scenario_params
in (
915 self
.function_scenario_params_by_class
.values()):
916 function_scenario_params
['set_scenario_method'](
917 self
, function_scenario_params
['default_scenario_name'])
920 def get_registry_for_testcase(cls
, testcase
):
921 """ Get the FileDouble registry for the specified test case. """
922 # Key in a dict must be hashable.
923 key
= (testcase
.__class
__, id(testcase
))
924 registry
= cls
.registries
.setdefault(key
, cls
.registry_class())
927 def get_registry_key(self
):
928 """ Get the registry key for this double. """
929 raise NotImplementedError
931 def register_for_testcase(self
, testcase
):
932 """ Add this instance to registry for the specified testcase. """
933 registry
= self
.get_registry_for_testcase(testcase
)
934 key
= self
.get_registry_key()
936 unregister_func
= functools
.partial(
937 self
.unregister_for_testcase
, testcase
)
938 testcase
.addCleanup(unregister_func
)
940 def unregister_for_testcase(self
, testcase
):
941 """ Remove this instance from registry for the specified testcase. """
942 registry
= self
.get_registry_for_testcase(testcase
)
943 key
= self
.get_registry_key()
948 def copy_fake_file(fake_file
):
949 """ Make a copy of the StringIO instance. """
950 fake_file_type
= StringIO
952 if fake_file
is not None:
953 fake_file_type
= type(fake_file
)
954 content
= fake_file
.getvalue()
955 assert issubclass(fake_file_type
, object)
956 result
= fake_file_type(content
)
957 if hasattr(fake_file
, 'encoding'):
958 if not hasattr(result
, 'encoding'):
959 result
.encoding
= fake_file
.encoding
963 class FileDouble(TestDoubleWithRegistry
):
964 """ A testing double for a file. """
966 registry_class
= dict
969 function_scenario_params_by_class
= {
970 os_path_exists_scenario
: {
971 'default_scenario_name': 'not_exist',
972 'set_scenario_method_name': 'set_os_path_exists_scenario',
974 os_access_scenario
: {
975 'default_scenario_name': 'okay',
976 'set_scenario_method_name': 'set_os_access_scenario',
979 'default_scenario_name': 'okay',
980 'set_scenario_method_name': 'set_os_stat_scenario',
983 'default_scenario_name': 'okay',
984 'set_scenario_method_name': 'set_os_lstat_scenario',
986 builtins_open_scenario
: {
987 'default_scenario_name': 'okay',
988 'set_scenario_method_name': 'set_open_scenario',
990 os_unlink_scenario
: {
991 'default_scenario_name': 'okay',
992 'set_scenario_method_name': 'set_os_unlink_scenario',
995 'default_scenario_name': 'okay',
996 'set_scenario_method_name': 'set_os_rmdir_scenario',
998 shutil_rmtree_scenario
: {
999 'default_scenario_name': 'okay',
1000 'set_scenario_method_name': 'set_shutil_rmtree_scenario',
1004 def __init__(self
, path
=None, fake_file
=None, *args
, **kwargs
):
1006 self
.fake_file
= copy_fake_file(fake_file
)
1007 self
.fake_file
.name
= path
1009 self
._set
_stat
_result
()
1011 super(FileDouble
, self
).__init
__(*args
, **kwargs
)
1013 def _set_stat_result(self
):
1014 """ Set the `os.stat` result for this file. """
1015 size
= len(self
.fake_file
.getvalue())
1016 self
.stat_result
= StatResult(
1018 st_ino
=None, st_dev
=None, st_nlink
=None,
1021 st_atime
=None, st_mtime
=None, st_ctime
=None,
1025 text
= "FileDouble(path={path!r}, fake_file={fake_file!r})".format(
1026 path
=self
.path
, fake_file
=self
.fake_file
)
1029 def get_registry_key(self
):
1030 """ Get the registry key for this double. """
1035 class os_popen_scenario(TestDoubleFunctionScenario
):
1036 """ Scenario for `os.popen` behaviour. """
1038 stream_name_by_mode
= {
1043 def _hook_success(self
, argv
, mode
, buffering
):
1044 stream_name
= self
.stream_name_by_mode
[mode
]
1045 stream_double
= getattr(
1046 self
.double
, stream_name
+ '_double')
1047 result
= stream_double
.fake_file
1050 def _hook_failure(self
, argv
, mode
, buffering
):
1054 def _hook_not_found(self
, argv
, mode
, buffering
):
1059 class os_waitpid_scenario(TestDoubleFunctionScenario
):
1060 """ Scenario for `os.waitpid` behaviour. """
1062 def _hook_success(self
, pid
, options
):
1063 result
= (pid
, EXIT_STATUS_SUCCESS
)
1066 def _hook_failure(self
, pid
, options
):
1067 result
= (pid
, EXIT_STATUS_FAILURE
)
1070 def _hook_not_found(self
, pid
, options
):
1071 error
= OSError(errno
.ECHILD
)
1075 class os_system_scenario(TestDoubleFunctionScenario
):
1076 """ Scenario for `os.system` behaviour. """
1078 def _hook_success(self
, command
):
1079 result
= EXIT_STATUS_SUCCESS
1082 def _hook_failure(self
, command
):
1083 result
= EXIT_STATUS_FAILURE
1086 def _hook_not_found(self
, command
):
1087 result
= EXIT_STATUS_COMMAND_NOT_FOUND
1091 class os_spawnv_scenario(TestDoubleFunctionScenario
):
1092 """ Scenario for `os.spawnv` behaviour. """
1094 def _hook_success(self
, mode
, file, args
):
1095 result
= EXIT_STATUS_SUCCESS
1098 def _hook_failure(self
, mode
, file, args
):
1099 result
= EXIT_STATUS_FAILURE
1102 def _hook_not_found(self
, mode
, file, args
):
1103 result
= EXIT_STATUS_COMMAND_NOT_FOUND
1112 """ A testing double for `subprocess.Popen`. """
1114 def __init__(self
, args
, *posargs
, **kwargs
):
1119 self
.returncode
= None
1121 if kwargs
.get('shell', False):
1122 self
.argv
= shlex
.split(args
)
1124 # The paramter is already a sequence of command-line arguments.
1127 def set_streams(self
, subprocess_double
, popen_kwargs
):
1128 """ Set the streams on the `PopenDouble`.
1130 :param subprocess_double: The `SubprocessDouble` from
1131 which to get existing stream doubles.
1132 :param popen_kwargs: The keyword arguments to the
1133 `subprocess.Popen` call.
1137 for stream_name
in (
1138 name
for name
in ['stdin', 'stdout', 'stderr']
1139 if name
in popen_kwargs
):
1140 stream_spec
= popen_kwargs
[stream_name
]
1141 if stream_spec
is subprocess
.PIPE
:
1142 stream_double
= getattr(
1144 "{name}_double".format(name
=stream_name
))
1145 stream_file
= stream_double
.fake_file
1146 elif stream_spec
is subprocess
.STDOUT
:
1147 stream_file
= subprocess_double
.stdout_double
.fake_file
1149 stream_file
= stream_spec
1150 setattr(self
, stream_name
, stream_file
)
1153 """ Wait for subprocess to terminate. """
1154 return self
.returncode
1157 class subprocess_popen_scenario(TestDoubleFunctionScenario
):
1158 """ Scenario for `subprocess.Popen` behaviour. """
1160 def _hook_success(self
, testcase
, args
, *posargs
, **kwargs
):
1161 double
= self
.double
.popen_double
1162 double
.set_streams(self
.double
, kwargs
)
1166 def patch_subprocess_popen(testcase
):
1167 """ Patch `subprocess.Popen` constructor for this test case.
1169 :param testcase: The `TestCase` instance to modify.
1172 When the patched function is called, the registry of
1173 `SubprocessDouble` instances for this test case will be used
1174 to get the instance for the program path specified.
1177 orig_subprocess_popen
= subprocess
.Popen
1179 def fake_subprocess_popen(args
, *posargs
, **kwargs
):
1180 if kwargs
.get('shell', False):
1181 argv
= shlex
.split(args
)
1184 registry
= SubprocessDouble
.get_registry_for_testcase(testcase
)
1185 if argv
in registry
:
1186 subprocess_double
= registry
[argv
]
1187 result
= subprocess_double
.subprocess_popen_scenario
.call_hook(
1188 testcase
, args
, *posargs
, **kwargs
)
1190 result
= orig_subprocess_popen(args
, *posargs
, **kwargs
)
1193 func_patcher
= mock
.patch
.object(
1194 subprocess
, "Popen", autospec
=True,
1195 side_effect
=fake_subprocess_popen
)
1196 func_patcher
.start()
1197 testcase
.addCleanup(func_patcher
.stop
)
1200 class subprocess_call_scenario(TestDoubleFunctionScenario
):
1201 """ Scenario for `subprocess.call` behaviour. """
1203 def _hook_success(self
, command
):
1204 result
= EXIT_STATUS_SUCCESS
1207 def _hook_failure(self
, command
):
1208 result
= EXIT_STATUS_FAILURE
1211 def _hook_not_found(self
, command
):
1212 result
= EXIT_STATUS_COMMAND_NOT_FOUND
1216 def patch_subprocess_call(testcase
):
1217 """ Patch `subprocess.call` function for this test case.
1219 :param testcase: The `TestCase` instance to modify.
1222 When the patched function is called, the registry of
1223 `SubprocessDouble` instances for this test case will be used
1224 to get the instance for the program path specified.
1227 orig_subprocess_call
= subprocess
.call
1229 def fake_subprocess_call(command
, *posargs
, **kwargs
):
1230 if kwargs
.get('shell', False):
1231 command_argv
= shlex
.split(command
)
1233 command_argv
= command
1234 registry
= SubprocessDouble
.get_registry_for_testcase(testcase
)
1235 if command_argv
in registry
:
1236 subprocess_double
= registry
[command_argv
]
1237 result
= subprocess_double
.subprocess_call_scenario
.call_hook(
1240 result
= orig_subprocess_call(command
, *posargs
, **kwargs
)
1243 func_patcher
= mock
.patch
.object(
1244 subprocess
, "call", autospec
=True,
1245 side_effect
=fake_subprocess_call
)
1246 func_patcher
.start()
1247 testcase
.addCleanup(func_patcher
.stop
)
1250 class subprocess_check_call_scenario(TestDoubleFunctionScenario
):
1251 """ Scenario for `subprocess.check_call` behaviour. """
1253 def _hook_success(self
, command
):
1256 def _hook_failure(self
, command
):
1257 result
= EXIT_STATUS_FAILURE
1258 error
= subprocess
.CalledProcessError(result
, command
)
1261 def _hook_not_found(self
, command
):
1262 result
= EXIT_STATUS_COMMAND_NOT_FOUND
1263 error
= subprocess
.CalledProcessError(result
, command
)
1267 def patch_subprocess_check_call(testcase
):
1268 """ Patch `subprocess.check_call` function for this test case.
1270 :param testcase: The `TestCase` instance to modify.
1273 When the patched function is called, the registry of
1274 `SubprocessDouble` instances for this test case will be used
1275 to get the instance for the program path specified.
1278 orig_subprocess_check_call
= subprocess
.check_call
1280 def fake_subprocess_check_call(command
, *posargs
, **kwargs
):
1281 if kwargs
.get('shell', False):
1282 command_argv
= shlex
.split(command
)
1284 command_argv
= command
1285 registry
= SubprocessDouble
.get_registry_for_testcase(testcase
)
1286 if command_argv
in registry
:
1287 subprocess_double
= registry
[command_argv
]
1288 scenario
= subprocess_double
.subprocess_check_call_scenario
1289 result
= scenario
.call_hook(command
)
1291 result
= orig_subprocess_check_call(command
, *posargs
, **kwargs
)
1294 func_patcher
= mock
.patch
.object(
1295 subprocess
, "check_call", autospec
=True,
1296 side_effect
=fake_subprocess_check_call
)
1297 func_patcher
.start()
1298 testcase
.addCleanup(func_patcher
.stop
)
1301 class SubprocessDoubleRegistry(collections_abc
.MutableMapping
):
1302 """ Registry of `SubprocessDouble` instances by `argv`. """
1304 def __init__(self
, *args
, **kwargs
):
1307 if isinstance(args
[0], collections_abc
.Mapping
):
1308 items
= args
[0].items()
1309 if isinstance(args
[0], collections_abc
.Iterable
):
1311 self
._mapping
= dict(items
)
1314 text
= "<{class_name} object: {mapping}>".format(
1315 class_name
=self
.__class
__.__name
__, mapping
=self
._mapping
)
1318 def _match_argv(self
, argv
):
1319 """ Match the specified `argv` with our registered keys. """
1321 if not isinstance(argv
, collections_abc
.Sequence
):
1323 candidates
= iter(self
._mapping
)
1324 while match
is None:
1326 candidate
= next(candidates
)
1327 except StopIteration:
1330 if candidate
== argv
:
1333 word_iter
= enumerate(candidate
)
1334 while found
is None:
1336 (word_index
, candidate_word
) = next(word_iter
)
1337 except StopIteration:
1339 if candidate_word
is ARG_MORE
:
1340 # Candiate matches any remaining words. We have a match.
1342 elif word_index
> len(argv
):
1343 # Candidate is too long for the specified argv.
1345 elif candidate_word
is ARG_ANY
:
1346 # Candidate matches any word at this position.
1348 elif candidate_word
== argv
[word_index
]:
1349 # Candidate matches the word at this position.
1352 # This candidate does not match.
1355 # Reached the end of the candidate without a mismatch.
1361 def __getitem__(self
, key
):
1362 match
= self
._match
_argv
(key
)
1365 result
= self
._mapping
[match
]
1368 def __setitem__(self
, key
, value
):
1371 self
._mapping
[key
] = value
1373 def __delitem__(self
, key
):
1374 match
= self
._match
_argv
(key
)
1375 if match
is not None:
1376 del self
._mapping
[match
]
1379 return self
._mapping
.__iter
__()
1382 return self
._mapping
.__len
__()
1385 class SubprocessDouble(TestDoubleWithRegistry
):
1386 """ A testing double for a subprocess. """
1388 registry_class
= SubprocessDoubleRegistry
1391 double_by_pid
= weakref
.WeakValueDictionary()
1393 function_scenario_params_by_class
= {
1394 subprocess_popen_scenario
: {
1395 'default_scenario_name': 'success',
1396 'set_scenario_method_name': 'set_subprocess_popen_scenario',
1398 subprocess_call_scenario
: {
1399 'default_scenario_name': 'success',
1400 'set_scenario_method_name': 'set_subprocess_call_scenario',
1402 subprocess_check_call_scenario
: {
1403 'default_scenario_name': 'success',
1404 'set_scenario_method_name':
1405 'set_subprocess_check_call_scenario',
1407 os_popen_scenario
: {
1408 'default_scenario_name': 'success',
1409 'set_scenario_method_name': 'set_os_popen_scenario',
1411 os_waitpid_scenario
: {
1412 'default_scenario_name': 'success',
1413 'set_scenario_method_name': 'set_os_waitpid_scenario',
1415 os_system_scenario
: {
1416 'default_scenario_name': 'success',
1417 'set_scenario_method_name': 'set_os_system_scenario',
1419 os_spawnv_scenario
: {
1420 'default_scenario_name': 'success',
1421 'set_scenario_method_name': 'set_os_spawnv_scenario',
1425 def __init__(self
, path
=None, argv
=None, *args
, **kwargs
):
1427 path
= tempfile
.mktemp()
1431 command_name
= os
.path
.basename(path
)
1432 argv
= [command_name
]
1435 self
.pid
= self
._make
_pid
()
1436 self
._register
_by
_pid
()
1438 self
.set_popen_double()
1440 stream_class
= SubprocessDouble
.stream_class
1441 for stream_name
in ['stdin', 'stdout', 'stderr']:
1442 fake_file
= stream_class()
1443 file_double
= FileDouble(fake_file
=fake_file
)
1444 stream_double_name
= '{name}_double'.format(name
=stream_name
)
1445 setattr(self
, stream_double_name
, file_double
)
1447 super(SubprocessDouble
, self
).__init
__(*args
, **kwargs
)
1449 def set_popen_double(self
):
1450 """ Set the `PopenDouble` for this instance. """
1451 double
= PopenDouble(self
.argv
)
1452 double
.pid
= self
.pid
1454 self
.popen_double
= double
1458 "<SubprocessDouble instance: {id}"
1461 " stdin_double: {stdin_double!r}"
1462 " stdout_double: {stdout_double!r}"
1463 " stderr_double: {stderr_double!r}"
1466 path
=self
.path
, argv
=self
.argv
,
1467 stdin_double
=self
.stdin_double
,
1468 stdout_double
=self
.stdout_double
,
1469 stderr_double
=self
.stderr_double
)
1474 """ Make a unique PID for a subprocess. """
1475 for pid
in itertools
.count(1):
1478 def _register_by_pid(self
):
1479 """ Register this subprocess by its PID. """
1480 self
.__class
__.double_by_pid
[self
.pid
] = self
1482 def get_registry_key(self
):
1483 """ Get the registry key for this double. """
1484 result
= tuple(self
.argv
)
1487 stream_class
= io
.BytesIO
1488 stream_encoding
= "utf-8"
1490 def set_stdin_content(self
, text
, bytes_encoding
=stream_encoding
):
1491 """ Set the content of the `stdin` stream for this double. """
1492 content
= text
.encode(bytes_encoding
)
1493 fake_file
= self
.stream_class(content
)
1494 self
.stdin_double
.fake_file
= fake_file
1496 def set_stdout_content(self
, text
, bytes_encoding
=stream_encoding
):
1497 """ Set the content of the `stdout` stream for this double. """
1498 content
= text
.encode(bytes_encoding
)
1499 fake_file
= self
.stream_class(content
)
1500 self
.stdout_double
.fake_file
= fake_file
1502 def set_stderr_content(self
, text
, bytes_encoding
=stream_encoding
):
1503 """ Set the content of the `stderr` stream for this double. """
1504 content
= text
.encode(bytes_encoding
)
1505 fake_file
= self
.stream_class(content
)
1506 self
.stderr_double
.fake_file
= fake_file
1509 def make_fake_subprocess_scenarios(path
=None):
1510 """ Make a collection of scenarios for testing with fake files.
1512 :path: The filesystem path of the fake program. If not specified,
1513 a valid random path will be generated.
1514 :return: A collection of scenarios for tests involving subprocesses.
1516 The collection is a mapping from scenario name to a dictionary of
1517 scenario attributes.
1521 file_path
= tempfile
.mktemp()
1525 default_scenario_params
= {
1526 'return_value': EXIT_STATUS_SUCCESS
,
1527 'program_path': file_path
,
1528 'argv_after_command_name': [],
1534 'return_value': EXIT_STATUS_COMMAND_NOT_FOUND
,
1538 for (name
, scenario
) in scenarios
.items():
1539 params
= default_scenario_params
.copy()
1540 params
.update(scenario
)
1541 scenario
.update(params
)
1542 program_path
= params
['program_path']
1543 program_name
= os
.path
.basename(params
['program_path'])
1544 argv
= [program_name
]
1545 argv
.extend(params
['argv_after_command_name'])
1546 subprocess_double_params
= dict(
1550 subprocess_double
= SubprocessDouble(**subprocess_double_params
)
1551 scenario
['subprocess_double'] = subprocess_double
1552 scenario
['fake_file_scenario_name'] = name
1557 def get_subprocess_doubles_from_fake_subprocess_scenarios(scenarios
):
1558 """ Get the `SubprocessDouble` instances from fake subprocess scenarios.
1560 :param scenarios: Collection of fake subprocess scenarios.
1561 :return: Collection of `SubprocessDouble` instances.
1565 scenario
['subprocess_double']
1566 for scenario
in scenarios
1567 if scenario
['subprocess_double'] is not None)
1572 def setup_subprocess_double_behaviour(testcase
, doubles
=None):
1573 """ Set up subprocess double instances and behaviour.
1575 :param testcase: The `TestCase` instance to modify.
1576 :param doubles: Collection of `SubprocessDouble` instances.
1579 If `doubles` is ``None``, a default collection will be made
1580 from the return value of `make_fake_subprocess_scenarios`.
1584 scenarios
= make_fake_subprocess_scenarios()
1585 doubles
= get_subprocess_doubles_from_fake_subprocess_scenarios(
1588 for double
in doubles
:
1589 double
.register_for_testcase(testcase
)
1592 def setup_fake_subprocess_fixtures(testcase
):
1593 """ Set up fixtures for fake subprocess doubles.
1595 :param testcase: The `TestCase` instance to modify.
1599 scenarios
= make_fake_subprocess_scenarios()
1600 testcase
.fake_subprocess_scenarios
= scenarios
1602 doubles
= get_subprocess_doubles_from_fake_subprocess_scenarios(
1604 setup_subprocess_double_behaviour(testcase
, doubles
)
1607 def patch_os_popen(testcase
):
1608 """ Patch `os.popen` behaviour for this test case.
1610 :param testcase: The `TestCase` instance to modify.
1613 When the patched function is called, the registry of
1614 `SubprocessDouble` instances for this test case will be used
1615 to get the instance for the program path specified.
1618 orig_os_popen
= os
.popen
1620 def fake_os_popen(cmd
, mode
='r', buffering
=-1):
1621 registry
= SubprocessDouble
.get_registry_for_testcase(testcase
)
1622 if isinstance(cmd
, basestring
):
1623 command_argv
= shlex
.split(cmd
)
1626 if command_argv
in registry
:
1627 subprocess_double
= registry
[command_argv
]
1628 result
= subprocess_double
.os_popen_scenario
.call_hook(
1629 command_argv
, mode
, buffering
)
1631 result
= orig_os_popen(cmd
, mode
, buffering
)
1634 func_patcher
= mock
.patch
.object(
1635 os
, "popen", autospec
=True,
1636 side_effect
=fake_os_popen
)
1637 func_patcher
.start()
1638 testcase
.addCleanup(func_patcher
.stop
)
1641 def patch_os_waitpid(testcase
):
1642 """ Patch `os.waitpid` behaviour for this test case.
1644 :param testcase: The `TestCase` instance to modify.
1647 When the patched function is called, the registry of
1648 `SubprocessDouble` instances for this test case will be used
1649 to get the instance for the program path specified.
1652 orig_os_waitpid
= os
.waitpid
1654 def fake_os_waitpid(pid
, options
):
1655 registry
= SubprocessDouble
.double_by_pid
1657 subprocess_double
= registry
[pid
]
1658 result
= subprocess_double
.os_waitpid_scenario
.call_hook(
1661 result
= orig_os_waitpid(pid
, options
)
1664 func_patcher
= mock
.patch
.object(
1665 os
, "waitpid", autospec
=True,
1666 side_effect
=fake_os_waitpid
)
1667 func_patcher
.start()
1668 testcase
.addCleanup(func_patcher
.stop
)
1671 def patch_os_system(testcase
):
1672 """ Patch `os.system` behaviour for this test case.
1674 :param testcase: The `TestCase` instance to modify.
1677 When the patched function is called, the registry of
1678 `SubprocessDouble` instances for this test case will be used
1679 to get the instance for the program path specified.
1682 orig_os_system
= os
.system
1684 def fake_os_system(command
):
1685 registry
= SubprocessDouble
.get_registry_for_testcase(testcase
)
1686 command_argv
= shlex
.split(command
)
1687 if command_argv
in registry
:
1688 subprocess_double
= registry
[command_argv
]
1689 result
= subprocess_double
.os_system_scenario
.call_hook(
1692 result
= orig_os_system(command
)
1695 func_patcher
= mock
.patch
.object(
1696 os
, "system", autospec
=True,
1697 side_effect
=fake_os_system
)
1698 func_patcher
.start()
1699 testcase
.addCleanup(func_patcher
.stop
)
1702 def patch_os_spawnv(testcase
):
1703 """ Patch `os.spawnv` behaviour for this test case.
1705 :param testcase: The `TestCase` instance to modify.
1708 When the patched function is called, the registry of
1709 `SubprocessDouble` instances for this test case will be used
1710 to get the instance for the program path specified.
1713 orig_os_spawnv
= os
.spawnv
1715 def fake_os_spawnv(mode
, file, args
):
1716 registry
= SubprocessDouble
.get_registry_for_testcase(testcase
)
1717 registry_key
= tuple(args
)
1718 if registry_key
in registry
:
1719 subprocess_double
= registry
[registry_key
]
1720 result
= subprocess_double
.os_spawnv_scenario
.call_hook(
1723 result
= orig_os_spawnv(mode
, file, args
)
1726 func_patcher
= mock
.patch
.object(
1727 os
, "spawnv", autospec
=True,
1728 side_effect
=fake_os_spawnv
)
1729 func_patcher
.start()
1730 testcase
.addCleanup(func_patcher
.stop
)
1737 # vim: fileencoding=utf-8 filetype=python :