1 # -*- coding: utf-8; -*-
4 # Part of ‘dput’, a Debian package upload toolkit.
6 # Copyright © 2015 Ben Finney <ben+python@benfinney.id.au>
8 # This is free software: you may copy, modify, and/or distribute this work
9 # under the terms of the GNU General Public License as published by the
10 # Free Software Foundation; version 3 of that license or any later version.
11 # No warranty expressed or implied. See the file ‘LICENSE.GPL-3’ for details.
13 """ Unit tests for ‘dcut’ module. """
15 from __future__
import (absolute_import
, unicode_literals
)
27 __package__
= str("test")
28 __import__(__package__
)
29 sys
.path
.insert(1, os
.path
.dirname(os
.path
.dirname(__file__
)))
31 from dput
.helper
import dputhelper
38 EXIT_STATUS_SUCCESS
, EXIT_STATUS_FAILURE
,
39 patch_system_interfaces
,
48 patch_tempfile_mkdtemp
,
50 setup_file_double_behaviour
,
53 patch_subprocess_popen
,
57 setup_subprocess_double_behaviour
,
59 from .test_configfile
import (
62 from .test_dputhelper
import (
65 from .test_changesfile
import (
66 make_changes_file_scenarios
,
67 set_changes_file_scenario
,
69 from . import test_dput_main
72 dummy_pwent
= PasswdEntry(
77 pw_gecos
="Lorem Ipsum,spam,eggs,beans",
78 pw_dir
=tempfile
.mktemp(),
79 pw_shell
=tempfile
.mktemp())
82 def patch_progname(testcase
):
83 """ Patch the module attribute `progname` for the test case. """
84 if hasattr(testcase
, 'dcut_progname'):
85 text
= testcase
.dcut_progname
87 text
= make_unique_slug(testcase
)
88 patcher
= mock
.patch
.object(dput
.dcut
, "progname", text
)
90 testcase
.addCleanup(patcher
.stop
)
93 def patch_version(testcase
):
94 """ Patch the module attribute `version` for the test case. """
95 if hasattr(testcase
, 'dcut_version'):
96 text
= testcase
.dcut_version
98 text
= make_unique_slug(testcase
)
99 patcher
= mock
.patch
.object(dput
.dcut
, "version", text
)
101 testcase
.addCleanup(patcher
.stop
)
104 def patch_getoptions(testcase
):
105 """ Patch the `getoptions` function for this test case. """
113 'filetoupload': None,
114 'filetocreate': None,
117 if not hasattr(testcase
, 'getoptions_opts'):
118 testcase
.getoptions_opts
= {}
119 if not hasattr(testcase
, 'getoptions_args'):
120 testcase
.getoptions_args
= []
122 def fake_getoptions():
123 options
= dict(default_options
)
124 options
.update(testcase
.getoptions_opts
)
125 arguments
= list(testcase
.getoptions_args
)
126 result
= (options
, arguments
)
129 func_patcher
= mock
.patch
.object(
130 dput
.dcut
, "getoptions", side_effect
=fake_getoptions
)
132 testcase
.addCleanup(func_patcher
.stop
)
135 def get_upload_method_func(testcase
):
136 """ Get the specified upload method. """
137 host
= testcase
.test_host
138 method_name
= testcase
.runtime_config_parser
.get(host
, 'method')
139 method_func
= dput
.dput
.upload_methods
[method_name
]
143 class getoptions_TestCase(testtools
.TestCase
):
144 """ Base for test cases for `getoptions` function. """
146 default_options
= NotImplemented
148 scenarios
= NotImplemented
151 """ Set up test fixtures. """
152 super(getoptions_TestCase
, self
).setUp()
153 patch_system_interfaces(self
)
155 patch_os_environ(self
)
156 patch_os_getuid(self
)
157 patch_pwd_getpwuid(self
)
158 self
.patch_sys_argv()
160 self
.patch_etc_mailname()
161 setup_file_double_behaviour(
162 self
, [self
.mailname_file_double
])
164 self
.set_hostname_subprocess_double()
168 if hasattr(self
, 'expected_options'):
169 self
.set_expected_result()
173 self
.patch_usage_message()
175 def patch_sys_argv(self
):
176 """ Patch the `sys.argv` object. """
177 if not hasattr(self
, 'dcut_progname'):
178 self
.dcut_progname
= make_unique_slug(self
)
179 if not hasattr(self
, 'sys_argv'):
180 self
.sys_argv
= [self
.dcut_progname
]
181 patcher
= mock
.patch
.object(sys
, "argv", new
=list(self
.sys_argv
))
183 self
.addCleanup(patcher
.stop
)
185 def patch_etc_mailname(self
):
186 """ Patch the ‘/etc/mailname’ file. """
187 path
= "/etc/mailname"
188 if hasattr(self
, 'mailname_fake_file'):
189 double
= FileDouble(path
, self
.mailname_fake_file
)
191 double
= FileDouble(path
, StringIO())
192 if hasattr(self
, 'mailname_file_open_scenario_name'):
193 double
.set_open_scenario(self
.mailname_file_open_scenario_name
)
194 self
.mailname_file_double
= double
196 def set_hostname_subprocess_double(self
):
197 """ Set the test double for the ‘hostname’ subprocess. """
198 path
= "/bin/hostname"
199 argv
= (path
, "--fqdn")
200 double
= SubprocessDouble(path
, argv
=argv
)
201 double
.register_for_testcase(self
)
203 double
.set_os_popen_scenario('success')
204 double
.set_stdout_content(getattr(self
, 'hostname_stdout_content', ""))
206 self
.hostname_subprocess_double
= double
208 def patch_getopt(self
):
209 """ Patch the `dputhelper.getopt` function. """
210 if not hasattr(self
, 'getopt_opts'):
211 self
.getopt_opts
= []
213 self
.getopt_opts
= list(self
.getopt_opts
)
214 if not hasattr(self
, 'getopt_args'):
215 self
.getopt_args
= []
217 self
.getopt_args
= list(self
.getopt_args
)
221 def set_expected_result(self
):
222 """ Set the expected result value. """
223 if not hasattr(self
, 'expected_arguments'):
224 self
.expected_arguments
= []
225 expected_options
= self
.default_options
.copy()
226 expected_options
.update(self
.expected_options
)
227 self
.expected_result
= (expected_options
, self
.expected_arguments
)
229 def patch_usage_message(self
):
230 """ Patch the module attribute `USAGE`. """
231 if hasattr(self
, 'dcut_usage_message'):
232 text
= self
.dcut_usage_message
234 text
= self
.getUniqueString()
235 patcher
= mock
.patch
.object(dput
.dcut
, "USAGE", text
)
237 self
.addCleanup(patcher
.stop
)
240 class getoptions_UploaderTestCase(
241 testscenarios
.WithScenarios
,
242 getoptions_TestCase
):
243 """ Test cases for `getoptions` function, determining uploader. """
245 environ_scenarios
= [
249 ('environ-email-not-delimited', {
250 'os_environ': {'EMAIL': "quux@example.org"},
251 'expected_environ_uploader': "<quux@example.org>",
253 ('environ-email-delimited', {
254 'os_environ': {'EMAIL': "<quux@example.org>"},
255 'expected_environ_uploader': "<quux@example.org>",
257 ('environ-debemail-not-delimited', {
258 'os_environ': {'DEBEMAIL': "flup@example.org"},
259 'expected_environ_uploader': "<flup@example.org>",
261 ('environ-debemail-delimited', {
262 'os_environ': {'DEBEMAIL': "<flup@example.org>"},
263 'expected_environ_uploader': "<flup@example.org>",
265 ('environ-both-email-and-debfullname', {
267 'EMAIL': "quux@example.org",
268 'DEBFULLNAME': "Lorem Ipsum",
270 'expected_environ_uploader': "Lorem Ipsum <quux@example.org>",
272 ('environ-both-debemail-and-debfullname', {
274 'DEBEMAIL': "flup@example.org",
275 'DEBFULLNAME': "Lorem Ipsum",
277 'expected_environ_uploader': "Lorem Ipsum <flup@example.org>",
279 ('environ-both-email-and-debemail', {
281 'EMAIL': "quux@example.org",
282 'DEBEMAIL': "flup@example.org",
284 'expected_environ_uploader': "<flup@example.org>",
286 ('environ-both-email-and-debemail-and-debfullname', {
288 'EMAIL': "quux@example.org",
289 'DEBEMAIL': "flup@example.org",
290 'DEBFULLNAME': "Lorem Ipsum",
292 'expected_environ_uploader': "Lorem Ipsum <flup@example.org>",
297 ('domain-from-mailname-file', {
298 'mailname_fake_file': StringIO("consecteur.example.org"),
299 'pwd_getpwuid_return_value': dummy_pwent
._replace
(
301 pw_gecos
="Dolor Sit Amet,spam,beans,eggs"),
302 'expected_debug_chatter': textwrap
.dedent("""\
305 'expected_system_uploader':
306 "Dolor Sit Amet <grue@consecteur.example.org>",
308 ('domain-from-hostname-command', {
309 'mailname_file_open_scenario_name': "read_denied",
310 'hostname_stdout_content': "consecteur.example.org\n",
311 'pwd_getpwuid_return_value': dummy_pwent
._replace
(
313 pw_gecos
="Dolor Sit Amet,spam,beans,eggs"),
314 'expected_debug_chatter': textwrap
.dedent("""\
316 D: Guessing uploader: /etc/mailname was a failure
318 'expected_system_uploader':
319 "Dolor Sit Amet <grue@consecteur.example.org>",
322 'mailname_file_open_scenario_name': "read_denied",
323 'hostname_stdout_content': "",
324 'pwd_getpwuid_return_value': dummy_pwent
._replace
(
326 pw_gecos
="Dolor Sit Amet,spam,beans,eggs"),
327 'expected_debug_chatter': textwrap
.dedent("""\
329 D: Guessing uploader: /etc/mailname was a failure
330 D: Couldn't guess uploader
335 scenarios
= testscenarios
.multiply_scenarios(
336 environ_scenarios
, system_scenarios
)
338 def setUp(self
, *args
, **kwargs
):
339 """ Set up test fixtures. """
340 super(getoptions_UploaderTestCase
, self
).setUp(*args
, **kwargs
)
342 self
.set_expected_uploader()
344 def set_expected_uploader(self
):
345 """ Set the expected uploader value for this test case. """
347 'expected_command_line_uploader',
348 'expected_environ_uploader',
349 'expected_system_uploader']:
350 if hasattr(self
, attrib_name
):
351 self
.expected_uploader
= getattr(self
, attrib_name
)
354 def test_calls_getopt_with_expected_args(self
):
355 """ Should call `getopt` with expected arguments. """
356 dput
.dcut
.getoptions()
357 dputhelper
.getopt
.assert_called_with(
358 self
.sys_argv
[1:], mock
.ANY
, mock
.ANY
)
360 def test_emits_debug_message_for_program_version(self
):
361 """ Should emit debug message for program version. """
362 sys
.argv
.insert(1, "--debug")
363 dput
.dcut
.getoptions()
364 expected_output
= textwrap
.dedent("""\
365 D: {progname} {version}
367 progname
=dput
.dcut
.progname
,
368 version
=dput
.dcut
.version
)
369 self
.assertIn(expected_output
, sys
.stdout
.getvalue())
371 def test_emits_debug_message_for_uploader_discovery(self
):
372 """ Should emit debug message for uploader discovery. """
373 sys
.argv
.insert(1, "--debug")
374 dput
.dcut
.getoptions()
375 expected_output_lines
= [
376 "D: trying to get maintainer email from environment"]
377 if hasattr(self
, 'expected_environ_uploader'):
378 guess_line_template
= "D: Uploader from env: {uploader}"
380 expected_output_lines
.extend(
381 self
.expected_debug_chatter
.split("\n")[:-1])
382 if hasattr(self
, 'expected_system_uploader'):
383 guess_line_template
= "D: Guessed uploader: {uploader}"
384 if hasattr(self
, 'expected_uploader'):
385 expected_output_lines
.append(guess_line_template
.format(
386 uploader
=self
.expected_uploader
))
387 expected_output
= "\n".join(expected_output_lines
)
388 self
.assertIn(expected_output
, sys
.stdout
.getvalue())
391 class getoptions_ParseCommandLineTestCase(
392 testscenarios
.WithScenarios
,
393 getoptions_TestCase
):
394 """ Test cases for `getoptions` function, parsing command line. """
396 dcut_usage_message
= "Lorem ipsum, dolor sit amet."
397 dcut_progname
= "lorem"
398 dcut_version
= "ipsum"
400 config_file_path
= tempfile
.mktemp()
401 changes_file_path
= tempfile
.mktemp()
402 output_file_path
= tempfile
.mktemp()
403 upload_file_path
= tempfile
.mktemp()
405 default_options
= dict()
406 default_options
.update(
407 (key
, None) for key
in [
408 'config', 'host', 'uploader', 'keyid',
409 'filetocreate', 'filetoupload', 'changes'])
410 default_options
.update(
411 (key
, False) for key
in ['debug', 'simulate', 'passive'])
418 'getopt_opts': [("--b0gUs", "BOGUS")],
419 'expected_stderr_output': (
420 "{progname} internal error:"
421 " Option --b0gUs, argument BOGUS unknown").format(
422 progname
=dcut_progname
),
423 'expected_exit_status': EXIT_STATUS_FAILURE
,
426 'getopt_opts': [("--help", None)],
427 'expected_stdout_output': dcut_usage_message
,
428 'expected_exit_status': EXIT_STATUS_SUCCESS
,
431 'getopt_opts': [("--version", None)],
432 'expected_stdout_output': " ".join(
433 [dcut_progname
, dcut_version
]),
434 'expected_exit_status': EXIT_STATUS_SUCCESS
,
436 ('option-filetoupload-and-environ-uploader', {
438 'DEBEMAIL': "flup@example.org",
439 'DEBFULLNAME': "Lorem Ipsum",
442 ("--upload", upload_file_path
),
444 'expected_options': {
445 'uploader': "Lorem Ipsum <flup@example.org>",
446 'filetoupload': upload_file_path
,
448 'expected_arguments': [],
450 ('option-changes-and-environ-uploader', {
452 'DEBEMAIL': "flup@example.org",
453 'DEBFULLNAME': "Lorem Ipsum",
456 ("--input", changes_file_path
),
458 'expected_options': {
459 'uploader': "Lorem Ipsum <flup@example.org>",
460 'changes': changes_file_path
,
462 'expected_arguments': [],
464 ('option-filetoupload-and-option-maintaineraddress', {
466 ("--upload", upload_file_path
),
467 ("--maintaineraddress", "Lorem Ipsum <flup@example.org>"),
469 'expected_options': {
470 'uploader': "Lorem Ipsum <flup@example.org>",
471 'filetoupload': upload_file_path
,
473 'expected_arguments': [],
475 ('option-changes-and-option-maintaineraddress', {
477 ("--input", changes_file_path
),
478 ("--maintaineraddress", "Lorem Ipsum <flup@example.org>"),
480 'expected_options': {
481 'uploader': "Lorem Ipsum <flup@example.org>",
482 'changes': changes_file_path
,
484 'expected_arguments': [],
486 ('option-filetoupload-with-no-uploader', {
487 'getopt_opts': [("--upload", upload_file_path
)],
488 'expected_stderr_output': "command file cannot be created",
489 'expected_exit_status': EXIT_STATUS_FAILURE
,
491 ('option-changes-with-no-uploader', {
492 'getopt_opts': [("--input", changes_file_path
)],
493 'expected_stderr_output': "command file cannot be created",
494 'expected_exit_status': EXIT_STATUS_FAILURE
,
499 ("--simulate", None),
500 ("--config", config_file_path
),
501 ("--maintaineraddress", "Lorem Ipsum <flup@example.org>"),
502 ("--keyid", "DEADBEEF"),
504 ("--output", output_file_path
),
505 ("--host", "quux.example.com"),
507 'expected_options': {
510 'config': config_file_path
,
511 'uploader': "Lorem Ipsum <flup@example.org>",
514 'filetocreate': output_file_path
,
515 'host': "quux.example.com",
517 'expected_arguments': [],
521 scenarios
= option_scenarios
523 def test_emits_debug_message_for_each_option(self
):
524 """ Should emit a debug message for each option processed. """
525 sys
.argv
.insert(1, "--debug")
526 expected_output_lines
= [
527 "D: processing arg \"{opt}\", option \"{arg}\"".format(
528 opt
=option
, arg
=option_argument
)
529 for (option
, option_argument
) in self
.getopt_args
]
530 expected_output
= "\n".join(expected_output_lines
)
531 self
.assertIn(expected_output
, sys
.stdout
.getvalue())
533 def test_emits_expected_message(self
):
534 """ Should emit message with expected content. """
536 dput
.dcut
.getoptions()
537 except FakeSystemExit
:
539 if hasattr(self
, 'expected_stdout_output'):
540 self
.assertIn(self
.expected_stdout_output
, sys
.stdout
.getvalue())
541 if hasattr(self
, 'expected_stderr_output'):
542 self
.assertIn(self
.expected_stderr_output
, sys
.stderr
.getvalue())
544 def test_calls_sys_exit_with_expected_exit_status(self
):
545 """ Should call `sys.exit` with expected exit status. """
546 if not hasattr(self
, 'expected_exit_status'):
547 dput
.dcut
.getoptions()
549 with testtools
.ExpectedException(FakeSystemExit
):
550 dput
.dcut
.getoptions()
551 sys
.exit
.assert_called_with(self
.expected_exit_status
)
553 def test_returns_expected_values(self
):
554 """ Should return expected values. """
555 if not hasattr(self
, 'expected_result'):
556 self
.skipTest("No return result expected")
557 result
= dput
.dcut
.getoptions()
558 self
.assertEqual(self
.expected_result
, result
)
561 class getoptions_DetermineHostTestCase(
562 testscenarios
.WithScenarios
,
563 getoptions_TestCase
):
564 """ Test cases for `getoptions` function, determine host name. """
567 ('domain-from-mailname-file', {
568 'mailname_fake_file': StringIO("consecteur.example.org"),
572 default_options
= getattr(
573 getoptions_ParseCommandLineTestCase
, 'default_options')
575 command_scenarios
= [
576 ('no-opts no-args', {
579 'expected_options': {
583 ('no-opts command-first-arg', {
585 'getopt_args': ["cancel"],
586 'expected_options': {
589 'expected_arguments': ["cancel"],
591 ('no-opts host-first-arg', {
593 'getopt_args': ["quux.example.com", "cancel"],
594 'expected_options': {
595 'host': "quux.example.com",
597 'expected_arguments': ["cancel"],
598 'expected_debug_output': textwrap
.dedent("""\
599 D: first argument "quux.example.com" treated as host
602 ('option-host host-first-arg', {
603 'getopt_opts': [("--host", "quux.example.com")],
604 'getopt_args': ["decoy.example.net", "cancel"],
605 'expected_options': {
606 'host': "quux.example.com",
608 'expected_arguments': ["decoy.example.net", "cancel"],
612 scenarios
= testscenarios
.multiply_scenarios(
613 system_scenarios
, command_scenarios
)
615 def test_emits_expected_debug_message(self
):
616 """ Should emit the expected debug message. """
617 if not hasattr(self
, 'expected_debug_output'):
618 self
.expected_debug_output
= ""
619 self
.getopt_opts
= list(
620 self
.getopt_opts
+ [("--debug", None)])
621 dput
.dcut
.getoptions()
622 self
.assertIn(self
.expected_debug_output
, sys
.stdout
.getvalue())
624 def test_returns_expected_values(self
):
625 """ Should return expected values. """
626 if not hasattr(self
, 'expected_result'):
627 self
.skipTest("No return result expected")
628 (options
, arguments
) = dput
.dcut
.getoptions()
629 self
.assertEqual(self
.expected_options
['host'], options
['host'])
630 self
.assertEqual(self
.expected_arguments
, arguments
)
633 class parse_queuecommands_TestCase(testtools
.TestCase
):
634 """ Base for test cases for `parse_queuecommands` function. """
636 scenarios
= NotImplemented
639 """ Set up test fixtures. """
640 super(parse_queuecommands_TestCase
, self
).setUp()
641 patch_system_interfaces(self
)
645 def set_test_args(self
):
646 """ Set the arguments for the test call to the function. """
650 self
.test_args
= dict(
651 arguments
=getattr(self
, 'arguments', []),
652 options
=getattr(self
, 'options', default_options
),
657 class parse_queuecommands_SuccessTestCase(
658 testscenarios
.WithScenarios
,
659 parse_queuecommands_TestCase
):
660 """ Success test cases for `parse_queuecommands` function. """
664 'arguments': ["rm", "lorem.deb"],
665 'expected_commands': [
666 "rm --searchdirs lorem.deb",
669 ('one-command-rm nosearchdirs', {
670 'arguments': ["rm", "--nosearchdirs", "lorem.deb"],
671 'expected_commands': [
675 ('one-command-cancel', {
676 'arguments': ["cancel", "lorem.deb"],
677 'expected_commands': [
681 ('one-command-cancel nosearchdirs', {
682 'arguments': ["cancel", "--nosearchdirs", "lorem.deb"],
683 'expected_commands': [
684 "cancel --nosearchdirs lorem.deb",
687 ('one-command-reschedule', {
688 'arguments': ["reschedule", "lorem.deb"],
689 'expected_commands': [
690 "reschedule lorem.deb",
693 ('one-command-reschedule nosearchdirs', {
694 'arguments': ["reschedule", "--nosearchdirs", "lorem.deb"],
695 'expected_commands': [
696 "reschedule --nosearchdirs lorem.deb",
699 ('three-commands comma-separated', {
702 "cancel", "bar", ",",
703 "reschedule", "baz"],
704 'expected_commands': [
705 "rm --searchdirs foo ",
710 ('three-commands semicolon-separated', {
713 "cancel", "bar", ";",
714 "reschedule", "baz"],
715 'expected_commands': [
716 "rm --searchdirs foo ",
723 def test_emits_debug_message_for_each_command(self
):
724 """ Should emit a debug message for each command. """
725 self
.test_args
['options'] = dict(self
.test_args
['options'])
726 self
.test_args
['options']['debug'] = True
727 dput
.dcut
.parse_queuecommands(**self
.test_args
)
728 expected_output
= "\n".join(
729 "D: Successfully parsed command \"{command}\"".format(
731 for command
in self
.expected_commands
)
732 self
.assertIn(expected_output
, sys
.stdout
.getvalue())
734 def test_returns_expected_commands(self
):
735 """ Should return expected commands value. """
736 result
= dput
.dcut
.parse_queuecommands(**self
.test_args
)
737 self
.assertEqual(self
.expected_commands
, result
)
740 class parse_queuecommands_ErrorTestCase(
741 testscenarios
.WithScenarios
,
742 parse_queuecommands_TestCase
):
743 """ Error test cases for `parse_queuecommands` function. """
748 'expected_debug_output': textwrap
.dedent("""\
749 Error: no arguments given, see dcut -h
751 'expected_exit_status': EXIT_STATUS_FAILURE
,
753 ('first-command-bogus', {
754 'arguments': ["b0gUs", "spam", "eggs"],
755 'expected_debug_output': textwrap
.dedent("""\
756 Error: Could not parse commands at "b0gUs"
758 'expected_exit_status': EXIT_STATUS_FAILURE
,
760 ('third-command-bogus', {
761 'arguments': ["rm", "foo", ",", "cancel", "bar", ",", "b0gUs"],
762 'expected_debug_output': textwrap
.dedent("""\
763 Error: Could not parse commands at "b0gUs"
765 'expected_exit_status': EXIT_STATUS_FAILURE
,
769 def test_emits_expected_error_message(self
):
770 """ Should emit expected error message. """
772 dput
.dcut
.parse_queuecommands(**self
.test_args
)
773 except FakeSystemExit
:
775 self
.assertIn(self
.expected_debug_output
, sys
.stderr
.getvalue())
777 def test_calls_sys_exit_with_exit_status(self
):
778 """ Should call `sys.exit` with expected exit status. """
779 with testtools
.ExpectedException(FakeSystemExit
):
780 dput
.dcut
.parse_queuecommands(**self
.test_args
)
781 sys
.exit
.assert_called_with(self
.expected_exit_status
)
784 class create_commands_TestCase(
786 """ Test cases for `create_commands` function. """
789 """ Set up test fixtures. """
790 super(create_commands_TestCase
, self
).setUp()
791 patch_system_interfaces(self
)
793 self
.changes_file_scenarios
= make_changes_file_scenarios()
794 set_changes_file_scenario(self
, 'no-format')
795 setup_file_double_behaviour(self
)
797 self
.set_expected_commands()
801 test_dput_main
.patch_parse_changes(self
)
802 dput
.dput
.parse_changes
.return_value
= self
.changes_file_scenario
[
807 def set_options(self
):
808 """ Set the options mapping to pass to the function. """
811 'changes': self
.changes_file_double
.path
,
814 def set_test_args(self
):
815 """ Set the arguments for the test call to the function. """
816 self
.test_args
= dict(
817 options
=dict(self
.options
),
819 parse_changes
=dput
.dput
.parse_changes
,
822 def set_expected_commands(self
):
823 """ Set the expected commands for this test case. """
824 files_to_remove
= [os
.path
.basename(self
.changes_file_double
.path
)]
825 files_from_changes
= self
.changes_file_scenario
[
826 'expected_result']['files']
827 for line
in files_from_changes
.split("\n"):
828 files_to_remove
.append(line
.split(" ")[4])
829 self
.expected_commands
= [
830 "rm --searchdirs {path}".format(path
=path
)
831 for path
in files_to_remove
]
833 def test_emits_debug_message_for_changes_file(self
):
834 """ Should emit debug message for changes file. """
835 self
.options
['debug'] = True
837 dput
.dcut
.create_commands(**self
.test_args
)
838 expected_output
= textwrap
.dedent("""\
839 D: Parsing changes file ({path}) for files to remove
840 """).format(path
=self
.changes_file_double
.path
)
841 self
.assertIn(expected_output
, sys
.stdout
.getvalue())
843 def test_emits_error_message_when_changes_file_open_error(self
):
844 """ Should emit error message when changes file raises error. """
845 self
.changes_file_double
.set_open_scenario('read_denied')
847 dput
.dcut
.create_commands(**self
.test_args
)
848 except FakeSystemExit
:
850 expected_output
= textwrap
.dedent("""\
851 Can't open changes file: {path}
852 """).format(path
=self
.changes_file_double
.path
)
853 self
.assertIn(expected_output
, sys
.stdout
.getvalue())
855 def test_calls_sys_exit_when_changes_file_open_error(self
):
856 """ Should call `sys.exit` when changes file raises error. """
857 self
.changes_file_double
.set_open_scenario('read_denied')
858 with testtools
.ExpectedException(FakeSystemExit
):
859 dput
.dcut
.create_commands(**self
.test_args
)
860 sys
.exit
.assert_called_with(EXIT_STATUS_FAILURE
)
862 def test_returns_expected_result(self
):
863 """ Should return expected result. """
864 result
= dput
.dcut
.create_commands(**self
.test_args
)
865 self
.assertEqual(self
.expected_commands
, result
)
868 class write_commands_TestCase(
869 testscenarios
.WithScenarios
,
871 """ Test cases for `write_commands` function. """
874 'filetocreate': None,
878 ('default-path', {}),
880 'option_filetocreate': str("ipsum.commands"),
881 'expected_result': "ipsum.commands",
888 commands_scenarios
= [
896 'commands': ["foo", "bar", "baz"],
903 'option_keyid': "DEADBEEF",
907 scenarios
= testscenarios
.multiply_scenarios(
908 path_scenarios
, commands_scenarios
, keyid_scenarios
)
910 for (scenario_name
, scenario
) in scenarios
:
911 default_options
= getattr(
912 getoptions_ParseCommandLineTestCase
, 'default_options')
913 options
= dict(default_options
)
915 'uploader': "Lorem Ipsum <flup@example.org>",
917 scenario
['uploader_filename_part'] = "Lorem_Ipsum__flup_example_org_"
918 if 'option_filetocreate' in scenario
:
919 options
['filetocreate'] = scenario
['option_filetocreate']
920 if 'option_keyid' in scenario
:
921 options
['keyid'] = scenario
['option_keyid']
922 scenario
['options'] = options
923 if 'tempdir' not in scenario
:
924 scenario
['tempdir'] = tempfile
.mktemp()
925 del scenario_name
, scenario
926 del default_options
, options
929 """ Set up test fixtures. """
930 super(write_commands_TestCase
, self
).setUp()
931 patch_system_interfaces(self
)
933 patch_os_getpid(self
)
934 os
.getpid
.return_value
= self
.getUniqueInteger()
936 self
.time_return_value
= self
.getUniqueInteger()
937 patch_time_time(self
, itertools
.repeat(self
.time_return_value
))
940 self
.set_commands_file_double()
941 setup_file_double_behaviour(self
)
942 self
.set_expected_result()
948 patch_subprocess_popen(self
)
949 patch_os_waitpid(self
)
950 self
.set_debsign_subprocess_double()
951 setup_subprocess_double_behaviour(self
)
953 def set_options(self
):
954 """ Set the options mapping to pass to the function. """
956 def set_test_args(self
):
957 """ Set the arguments for the test call to the function. """
958 self
.test_args
= dict(
959 commands
=list(self
.commands
),
960 options
=dict(self
.options
),
962 tempdir
=self
.tempdir
,
965 def make_commands_filename(self
):
966 """ Make the filename for the commands output file. """
967 filename
= "{progname}.{uploadpart}.{time:d}.{pid:d}.commands".format(
968 progname
=dput
.dcut
.progname
,
969 uploadpart
=self
.uploader_filename_part
,
970 time
=self
.time_return_value
,
971 pid
=os
.getpid
.return_value
)
974 def set_commands_file_double(self
):
975 """ Set the commands file double for this test case. """
976 if self
.options
['filetocreate']:
977 path
= self
.options
['filetocreate']
979 output_filename
= self
.make_commands_filename()
981 path
= os
.path
.join(self
.tempdir
, output_filename
)
983 path
= output_filename
984 double
= FileDouble(path
)
985 double
.register_for_testcase(self
)
986 self
.commands_file_double
= double
988 def set_expected_result(self
):
989 """ Set the `expected_result` for this test case. """
990 self
.expected_result
= self
.commands_file_double
.path
992 def set_commands(self
):
993 """ Set the commands to use for this test case. """
994 if not hasattr(self
, 'commands'):
997 def make_expected_content(self
):
998 """ Make the expected content for the output file. """
999 uploader_value
= self
.options
['uploader']
1001 commands_value
= "\n".join(
1002 " {command}".format(command
=command
)
1003 for command
in self
.commands
)
1005 commands_value
= " "
1006 commands_value
+= "\n"
1007 text
= textwrap
.dedent("""\
1008 Uploader: {uploader}
1011 """).format(uploader
=uploader_value
, commands
=commands_value
)
1014 def set_debsign_subprocess_double(self
):
1015 """ Set the ‘debsign’ subprocess double for this test case. """
1016 path
= "/usr/bin/debsign"
1017 argv
= [os
.path
.basename(path
), ARG_MORE
]
1018 double
= SubprocessDouble(path
, argv
)
1019 double
.register_for_testcase(self
)
1020 self
.debsign_subprocess_double
= double
1022 def make_expected_debsign_argv(self
):
1023 """ Make the expected command-line arguments for ‘debsign’. """
1026 "-m{uploader}".format(uploader
=self
.options
['uploader']),
1028 if self
.options
['keyid']:
1030 "-k{keyid}".format(keyid
=self
.options
['keyid']))
1031 argv
.append(self
.commands_file_double
.path
)
1035 def test_returns_expected_file_path(self
):
1036 """ Should return expected file path. """
1037 result
= dput
.dcut
.write_commands(**self
.test_args
)
1038 self
.assertEqual(self
.expected_result
, result
)
1040 def test_output_file_has_expected_content(self
):
1041 """ Should have expected content in output file. """
1042 with mock
.patch
.object(self
.commands_file_double
.fake_file
, 'close'):
1043 dput
.dcut
.write_commands(**self
.test_args
)
1044 expected_value
= self
.make_expected_content()
1046 expected_value
, self
.commands_file_double
.fake_file
.getvalue())
1048 def test_emits_debug_message_for_debsign(self
):
1049 """ Should emit debug message for ‘debsign’ command. """
1050 self
.options
['debug'] = True
1051 self
.test_args
['options'] = self
.options
1052 dput
.dcut
.write_commands(**self
.test_args
)
1053 debsign_argv
= self
.make_expected_debsign_argv()
1054 expected_output
= textwrap
.dedent("""\
1055 D: calling debsign: {argv}
1056 """).format(argv
=debsign_argv
)
1057 self
.assertIn(expected_output
, sys
.stdout
.getvalue())
1059 def test_invokes_debsign_with_expected_args(self
):
1060 """ Should invoke ‘debsign’ command with expected args. """
1061 debsign_argv
= self
.make_expected_debsign_argv()
1062 expected_args
= (debsign_argv
,)
1063 dput
.dcut
.write_commands(**self
.test_args
)
1064 subprocess
.Popen
.assert_called_with(*expected_args
)
1066 def test_calls_os_waitpid_with_expected_args(self
):
1067 """ Should call `os.waitpid` with expected args. """
1068 expected_args
= (self
.debsign_subprocess_double
.pid
, 0)
1069 dput
.dcut
.write_commands(**self
.test_args
)
1070 os
.waitpid
.assert_called_with(*expected_args
)
1072 def test_emits_error_message_when_debsign_failure(self
):
1073 """ Should emit error message when ‘debsign’ command failure. """
1074 self
.debsign_subprocess_double
.set_os_waitpid_scenario('failure')
1076 dput
.dcut
.write_commands(**self
.test_args
)
1077 except FakeSystemExit
:
1079 expected_output
= textwrap
.dedent("""\
1080 Error: debsign failed.
1082 self
.assertIn(expected_output
, sys
.stderr
.getvalue())
1084 def test_calls_sys_exit_when_debsign_failure(self
):
1085 """ Should call `sys.exit` when ‘debsign’ command failure. """
1086 self
.debsign_subprocess_double
.set_os_waitpid_scenario('failure')
1087 with testtools
.ExpectedException(FakeSystemExit
):
1088 dput
.dcut
.write_commands(**self
.test_args
)
1089 sys
.exit
.assert_called_with(EXIT_STATUS_FAILURE
)
1092 class upload_TestCase(test_dput_main
.main_TestCase
):
1093 """ Base for test cases for `upload_stolen_from_dput_main` function. """
1095 function_to_test
= staticmethod(dput
.dcut
.upload_stolen_from_dput_main
)
1098 """ Set up test fixtures. """
1099 super(upload_TestCase
, self
).setUp()
1101 self
.set_cat_subprocess_double()
1102 patch_os_system(self
)
1103 patch_tempfile_mkdtemp(self
)
1104 patch_os_unlink(self
)
1105 patch_os_rmdir(self
)
1107 patch_getoptions(self
)
1109 def patch_globals(self
):
1110 # The `upload_stolen_from_dput_main` function doesn't need these. """
1113 def set_cat_subprocess_double(self
):
1114 """ Set the ‘cat’ subprocess double for this test case. """
1116 argv
= [os
.path
.basename(path
), ARG_ANY
]
1117 double
= SubprocessDouble(path
, argv
)
1118 double
.register_for_testcase(self
)
1119 double
.set_os_system_scenario('success')
1120 self
.cat_subprocess_double
= double
1122 def set_test_args(self
):
1123 """ Set the arguments for the test call to the function. """
1124 self
.test_args
= dict(
1125 host
=self
.test_host
,
1126 upload_methods
=self
.upload_methods
,
1127 config
=self
.runtime_config_parser
,
1130 files_to_upload
=self
.files_to_upload
,
1131 ftp_passive_mode
=False,
1134 if hasattr(self
, 'test_args_extra'):
1135 self
.test_args
.update(self
.test_args_extra
)
1137 def get_upload_method_func(self
):
1138 """ Get the specified upload method. """
1139 method_name
= self
.runtime_config_parser
.get(self
.test_host
, 'method')
1140 method_func
= dput
.dput
.upload_methods
[method_name
]
1144 class upload_DebugMessageTestCase(upload_TestCase
):
1145 """ Test cases for `upload_stolen_from_dput_main` debug messages. """
1147 def test_emits_debug_message_for_discovered_methods(self
):
1148 """ Should emit debug message for discovered upload methods. """
1149 self
.test_args
['debug'] = True
1150 self
.function_to_test(**self
.test_args
)
1151 expected_output
= textwrap
.dedent("""\
1152 D: Default Method: {default_method}
1153 D: Host Method: {host_method}
1155 default_method
=self
.runtime_config_parser
.get(
1156 'DEFAULT', 'method'),
1157 host_method
=self
.runtime_config_parser
.get(
1158 self
.test_host
, 'method'))
1159 self
.assertIn(expected_output
, sys
.stdout
.getvalue())
1162 class upload_UnknownUploadMethodTestCase(
1163 testscenarios
.WithScenarios
,
1165 """ Test cases for `upload_stolen_from_dput_main`, unknown method. """
1168 ('bogus-default-method', {
1174 'expected_output': "Unknown upload method: b0gUs",
1175 'expected_exit_status': EXIT_STATUS_FAILURE
,
1177 ('bogus-host-method', {
1183 'expected_output': "Unknown upload method: b0gUs",
1184 'expected_exit_status': EXIT_STATUS_FAILURE
,
1188 def test_emits_error_message_when_unknown_method(self
):
1189 """ Should emit error message when unknown upload method. """
1191 self
.function_to_test(**self
.test_args
)
1192 except FakeSystemExit
:
1194 self
.assertIn(self
.expected_output
, sys
.stderr
.getvalue())
1196 def test_calls_sys_exit_when_unknown_method(self
):
1197 """ Should call `sys.exit` when unknown upload method. """
1198 with testtools
.ExpectedException(FakeSystemExit
):
1199 self
.function_to_test(**self
.test_args
)
1200 sys
.exit
.assert_called_with(self
.expected_exit_status
)
1203 class upload_DiscoverLoginTestCase(
1204 testscenarios
.WithScenarios
,
1206 """ Test cases for `upload_stolen_from_dput_main` discovery of login. """
1208 fallback_login_scenarios
= [
1209 ('login-from-environ', {
1211 'USER': "login-from-environ",
1213 'expected_fallback_login': "login-from-environ",
1214 'expected_system_uid_debug_message': "",
1216 ('login-from-pwd', {
1217 'os_getuid_return_value': 42,
1218 'pwd_getpwuid_return_value': PasswdEntry(
1219 *(["login-from-pwd"] + [object()] * 6)),
1220 'expected_fallback_login': "login-from-pwd",
1221 'expected_system_uid_debug_message': "D: User-ID: 42",
1225 config_login_scenarios
= [
1226 ('config-default-login', {
1229 'login': "login-from-config-default",
1232 'expected_login': "login-from-config-default",
1233 'expected_output_template':
1234 "D: Login to use: {login}",
1236 ('config-host-login', {
1239 'login': "login-from-config-host",
1242 'expected_login': "login-from-config-host",
1243 'expected_output_template':
1244 "D: Login to use: {login}",
1246 ('config-default-login sentinel', {
1249 'login': "username",
1252 'expected_output_template': (
1253 "D: Neither host {host} nor default login used."
1256 ('config-host-login sentinel', {
1259 'login': "username",
1262 'expected_output_template': (
1263 "D: Neither host {host} nor default login used."
1268 scenarios
= testscenarios
.multiply_scenarios(
1269 fallback_login_scenarios
, config_login_scenarios
)
1270 for (scenario_name
, scenario
) in scenarios
:
1271 if 'expected_login' not in scenario
:
1272 scenario
['expected_login'] = scenario
['expected_fallback_login']
1273 del scenario_name
, scenario
1275 def test_emits_debug_message_for_system_uid(self
):
1276 """ Should emit a debug message for the system UID. """
1277 if self
.expected_login
!= self
.expected_fallback_login
:
1278 self
.skipTest("No fallback in this scenario")
1279 self
.test_args
['debug'] = True
1280 self
.function_to_test(**self
.test_args
)
1281 expected_output
= self
.expected_system_uid_debug_message
.format(
1282 uid
=self
.os_getuid_return_value
)
1283 self
.assertIn(expected_output
, sys
.stdout
.getvalue())
1285 def test_emits_debug_message_for_discovered_login(self
):
1286 """ Should emit a debug message for the discovered login. """
1287 self
.test_args
['debug'] = True
1288 self
.function_to_test(**self
.test_args
)
1289 expected_output
= self
.expected_output_template
.format(
1290 login
=self
.expected_login
, host
=self
.test_host
)
1291 self
.assertIn(expected_output
, sys
.stdout
.getvalue())
1293 def test_calls_upload_method_with_expected_login(self
):
1294 """ Should call upload method function with expected login arg. """
1295 upload_method_func
= get_upload_method_func(self
)
1296 self
.function_to_test(**self
.test_args
)
1297 upload_method_func
.assert_called_with(
1298 mock
.ANY
, self
.expected_login
,
1299 mock
.ANY
, mock
.ANY
, mock
.ANY
, mock
.ANY
)
1302 class upload_SimulateTestCase(
1303 testscenarios
.WithScenarios
,
1305 """ Test cases for `upload_stolen_from_dput_main`, ‘simulate’ option. """
1309 'config_default_login': "login-from-config-default",
1310 'test_args_extra': {
1314 ('simulate three-files', {
1315 'config_default_login': "login-from-config-default",
1316 'test_args_extra': {
1319 'files_to_upload': [tempfile
.mktemp() for __
in range(3)],
1323 def test_omits_upload_method(self
):
1324 """ Should omit call to upload method function. """
1325 upload_method_func
= get_upload_method_func(self
)
1326 self
.function_to_test(**self
.test_args
)
1327 self
.assertFalse(upload_method_func
.called
)
1329 def test_emits_message_for_each_file_to_upload(self
):
1330 """ Should emit a message for each file to upload. """
1331 self
.function_to_test(**self
.test_args
)
1332 method
= self
.runtime_config_parser
.get(self
.test_host
, 'method')
1333 fqdn
= self
.runtime_config_parser
.get(self
.test_host
, 'fqdn')
1334 incoming
= self
.runtime_config_parser
.get(self
.test_host
, 'incoming')
1335 expected_output
= "\n".join(
1336 "Uploading with {method}: {path} to {fqdn}:{incoming}".format(
1337 method
=method
, path
=path
,
1338 fqdn
=fqdn
, incoming
=incoming
)
1339 for path
in self
.files_to_upload
)
1340 self
.assertIn(expected_output
, sys
.stderr
.getvalue())
1342 def test_calls_cat_for_each_file_to_upload(self
):
1343 """ Should call ‘cat’ for each file to upload. """
1344 self
.function_to_test(**self
.test_args
)
1346 mock
.call("cat {path}".format(path
=path
))
1347 for path
in self
.files_to_upload
]
1348 os
.system
.assert_has_calls(expected_calls
, any_order
=True)
1351 class upload_UploadMethodTestCase(
1352 testscenarios
.WithScenarios
,
1354 """ Test cases for `upload_stolen_from_dput_main`, invoking method. """
1356 method_scenarios
= [
1358 'config_method': "local",
1359 'config_progress_indicator': 23,
1361 "localhost", mock
.ANY
, mock
.ANY
, mock
.ANY
, mock
.ANY
,
1365 'config_method': "ftp",
1366 'config_fqdn': "foo.example.com",
1367 'config_passive_ftp': False,
1368 'config_progress_indicator': 23,
1370 "foo.example.com", mock
.ANY
, mock
.ANY
, mock
.ANY
, mock
.ANY
,
1372 'expected_stdout_output': "",
1374 ('method-ftp port-custom', {
1375 'config_method': "ftp",
1376 'config_fqdn': "foo.example.com:42",
1377 'config_passive_ftp': False,
1378 'config_progress_indicator': 23,
1380 "foo.example.com:42",
1381 mock
.ANY
, mock
.ANY
, mock
.ANY
, mock
.ANY
,
1383 'expected_stdout_output': "",
1385 ('method-ftp config-passive-mode', {
1386 'config_method': "ftp",
1387 'config_fqdn': "foo.example.com",
1388 'config_passive_ftp': True,
1389 'config_progress_indicator': 23,
1391 "foo.example.com", mock
.ANY
, mock
.ANY
, mock
.ANY
, mock
.ANY
,
1393 'expected_stdout_output': "",
1395 ('method-ftp config-passive-mode arg-ftp-active-mode', {
1396 'config_method': "ftp",
1397 'config_fqdn': "foo.example.com",
1398 'config_passive_ftp': True,
1399 'config_progress_indicator': 23,
1400 'test_args_extra': {
1401 'ftp_passive_mode': False,
1404 "foo.example.com", mock
.ANY
, mock
.ANY
, mock
.ANY
, mock
.ANY
,
1406 'expected_stdout_output': "D: Using active ftp",
1408 ('method-ftp arg-ftp-passive-mode', {
1409 'config_method': "ftp",
1410 'config_fqdn': "foo.example.com",
1411 'config_progress_indicator': 23,
1412 'test_args_extra': {
1413 'ftp_passive_mode': True,
1416 "foo.example.com", mock
.ANY
, mock
.ANY
, mock
.ANY
, mock
.ANY
,
1418 'expected_stdout_output': "D: Using passive ftp",
1420 ('method-ftp config-passive-mode arg-ftp-passive-mode', {
1421 'config_method': "ftp",
1422 'config_fqdn': "foo.example.com",
1423 'config_passive_ftp': True,
1424 'config_progress_indicator': 23,
1425 'test_args_extra': {
1426 'ftp_passive_mode': True,
1429 "foo.example.com", mock
.ANY
, mock
.ANY
, mock
.ANY
, mock
.ANY
,
1431 'expected_stdout_output': "D: Using passive ftp",
1434 'config_method': "scp",
1435 'config_fqdn': "foo.example.com",
1437 "foo.example.com", mock
.ANY
, mock
.ANY
, mock
.ANY
, mock
.ANY
,
1439 'expected_stdout_output': "",
1441 ('method-scp scp-compress', {
1442 'config_method': "scp",
1443 'config_fqdn': "foo.example.com",
1446 'scp_compress': "True",
1447 'ssh_config_options': "spam eggs beans",
1451 "foo.example.com", mock
.ANY
, mock
.ANY
, mock
.ANY
, mock
.ANY
,
1452 True, ["spam eggs beans"]),
1453 'expected_stdout_output': "D: Setting compression for scp",
1459 'config_default_login': "login-from-config-default",
1463 commands_scenarios
= [
1464 ('commands-from-changes', {
1465 'getoptions_args': ["foo", "bar", "baz"],
1466 'getoptions_opts': {
1467 'filetocreate': None,
1468 'filetoupload': tempfile
.mktemp() + "commands",
1471 ('commands-from-changes', {
1472 'getoptions_args': ["foo", "bar", "baz"],
1473 'getoptions_opts': {
1474 'filetocreate': None,
1475 'filetoupload': None,
1476 'changes': tempfile
.mktemp(),
1479 ('commands-from-arguments', {
1480 'getoptions_args': ["foo", "bar", "baz"],
1481 'getoptions_opts': {
1482 'filetocreate': None,
1483 'filetoupload': None,
1491 'files_to_remove': [],
1494 'files_to_remove': [
1495 tempfile
.mktemp() for __
in range(3)],
1499 scenarios
= testscenarios
.multiply_scenarios(
1500 method_scenarios
, login_scenarios
,
1501 commands_scenarios
, files_scenarios
)
1503 def test_emits_expected_debug_message(self
):
1504 """ Should emit expected debug message. """
1505 self
.test_args
['debug'] = True
1506 self
.function_to_test(**self
.test_args
)
1507 if hasattr(self
, 'expected_stdout_output'):
1508 self
.assertIn(self
.expected_stdout_output
, sys
.stdout
.getvalue())
1510 def test_calls_upload_method_with_expected_args(self
):
1511 """ Should call upload method function with expected args. """
1512 upload_method_func
= get_upload_method_func(self
)
1513 self
.function_to_test(**self
.test_args
)
1514 upload_method_func
.assert_called_with(*self
.expected_args
)
1517 class dcut_TestCase(testtools
.TestCase
):
1518 """ Base for test cases for `dput` function. """
1521 """ Set up test fixtures. """
1522 super(dcut_TestCase
, self
).setUp()
1523 patch_system_interfaces(self
)
1525 patch_tempfile_mkdtemp(self
)
1526 patch_os_unlink(self
)
1527 patch_os_rmdir(self
)
1529 self
.patch_globals()
1533 getattr(self
, 'config_scenario_name', 'exist-simple'))
1534 test_dput_main
.patch_runtime_config_options(self
)
1536 self
.set_test_args()
1538 patch_getoptions(self
)
1539 test_dput_main
.patch_parse_changes(self
)
1540 test_dput_main
.patch_read_configs(self
)
1541 test_dput_main
.patch_upload_methods(self
)
1542 test_dput_main
.patch_import_upload_functions(self
)
1544 self
.patch_parse_queuecommands()
1545 self
.patch_create_commands()
1546 self
.patch_write_commands()
1547 self
.patch_upload_stolen_from_dput_main()
1549 def set_test_args(self
):
1550 """ Set the arguments for the test call to the function. """
1551 self
.test_args
= dict()
1553 def patch_globals(self
):
1554 """ Patch global names used by `dcut`. """
1555 for (name
, value
) in {
1556 'files_to_remove': [],
1558 patcher
= mock
.patch
.object(dput
.dcut
, name
, new
=value
)
1560 self
.addCleanup(patcher
.stop
)
1562 def patch_parse_queuecommands(self
):
1563 """ Patch the `parse_queuecommands` function for this test case. """
1564 func_patcher
= mock
.patch
.object(dput
.dcut
, "parse_queuecommands")
1565 func_patcher
.start()
1566 self
.addCleanup(func_patcher
.stop
)
1568 def patch_create_commands(self
):
1569 """ Patch the `create_commands` function for this test case. """
1570 func_patcher
= mock
.patch
.object(dput
.dcut
, "create_commands")
1571 func_patcher
.start()
1572 self
.addCleanup(func_patcher
.stop
)
1574 def patch_write_commands(self
):
1575 """ Patch the `write_commands` function for this test case. """
1576 func_patcher
= mock
.patch
.object(dput
.dcut
, "write_commands")
1577 func_patcher
.start()
1578 self
.addCleanup(func_patcher
.stop
)
1580 def patch_upload_stolen_from_dput_main(self
):
1581 """ Patch `upload_stolen_from_dput_main` for this test case. """
1582 func_patcher
= mock
.patch
.object(
1583 dput
.dcut
, "upload_stolen_from_dput_main")
1584 func_patcher
.start()
1585 self
.addCleanup(func_patcher
.stop
)
1588 class dcut_DebugMessageTestCase(dcut_TestCase
):
1589 """ Test cases for `dcut` debug messages. """
1591 def test_emits_debug_message_for_read_configs(self
):
1592 """ Should emit debug message for `read_configs` call. """
1593 self
.getoptions_opts
['debug'] = True
1594 dput
.dcut
.dcut(**self
.test_args
)
1595 expected_output
= textwrap
.dedent("""\
1596 D: calling dput.read_configs
1598 self
.assertIn(expected_output
, sys
.stdout
.getvalue())
1601 class dcut_ConfigFileTestCase(
1602 testscenarios
.WithScenarios
,
1604 """ Test cases for `main` specification of configuration file. """
1608 'expected_args': (None, mock
.ANY
),
1610 ('config-from-command-line', {
1611 'getoptions_opts': {
1612 'config': "lorem.conf",
1614 'expected_args': ("lorem.conf", mock
.ANY
),
1618 def test_calls_read_configs_with_expected_args(self
):
1619 """ Should call `read_configs` with expected arguments. """
1620 dput
.dcut
.dcut(**self
.test_args
)
1621 dput
.dput
.read_configs
.assert_called_with(*self
.expected_args
)
1624 class dcut_OptionsErrorTestCase(
1625 testscenarios
.WithScenarios
,
1627 """ Test cases for `dcut` function, startup options cause error. """
1630 ('no-host-discovered', {
1631 'config_default_default_host_main': None,
1632 'getoptions_opts': {
1635 'expected_output': (
1636 "Error: No host specified"
1637 " and no default found in config"),
1638 'expected_exit_status': EXIT_STATUS_FAILURE
,
1640 ('host-not-in-config', {
1641 'config_scenario_name': "exist-minimal",
1642 'expected_output': "No host foo found in config",
1643 'expected_exit_status': EXIT_STATUS_FAILURE
,
1646 'config_allow_dcut': False,
1647 'expected_output': (
1648 "Error: dcut is not supported for this upload queue."),
1649 'expected_exit_status': EXIT_STATUS_FAILURE
,
1651 ('filetoupload arguments', {
1652 'getoptions_opts': {
1653 'filetoupload': tempfile
.mktemp() + ".commands",
1655 'getoptions_args': ["lorem", "ipsum", "dolor", "sit", "amet"],
1656 'expected_output': (
1657 "Error: cannot take commands"
1658 " when uploading existing file"),
1659 'expected_exit_status': EXIT_STATUS_FAILURE
,
1663 def test_emits_expected_error_message(self
):
1664 """ Should emit expected error message. """
1666 dput
.dcut
.dcut(**self
.test_args
)
1667 except FakeSystemExit
:
1669 self
.assertIn(self
.expected_output
, sys
.stdout
.getvalue())
1671 def test_calls_sys_exit_with_failure_exit_status(self
):
1672 """ Should call `sys.exit` with failure exit status. """
1673 with testtools
.ExpectedException(FakeSystemExit
):
1674 dput
.dcut
.dcut(**self
.test_args
)
1675 sys
.exit
.assert_called_with(self
.expected_exit_status
)
1678 class dcut_NamedHostTestCase(
1679 testscenarios
.WithScenarios
,
1681 """ Test cases for `dcut` function, named host processing. """
1684 ('host-from-command-line', {
1685 'config_scenario_name': "exist-simple-host-three",
1686 'config_default_default_host_main': "quux",
1687 'getoptions_opts': {
1690 'expected_host': "bar",
1691 'expected_debug_output': "",
1693 ('host-from-config-default', {
1694 'config_scenario_name': "exist-simple-host-three",
1695 'config_default_default_host_main': "bar",
1696 'getoptions_opts': {
1699 'expected_host': "bar",
1700 'expected_debug_output': textwrap
.dedent("""\
1701 D: Using host "bar" (default_host_main)
1704 ('host-from-hardcoded-default', {
1705 'config_scenario_name': "exist-default-distribution-only",
1706 'config_default_default_host_main': "",
1707 'getoptions_opts': {
1710 'expected_host': "ftp-master",
1711 'expected_debug_output': textwrap
.dedent("""\
1712 D: Using host "" (default_host_main)
1713 D: Using host "ftp-master" (hardcoded)
1718 def test_emits_debug_message_for_discovered_host(self
):
1719 """ Should emit debug message for discovered host values. """
1720 self
.getoptions_opts
['debug'] = True
1721 dput
.dcut
.dcut(**self
.test_args
)
1722 self
.assertIn(self
.expected_debug_output
, sys
.stdout
.getvalue())
1724 def test_calls_write_commands_with_expected_host_option(self
):
1725 """ Should call `write_commands` with expected `host` option. """
1726 dput
.dcut
.dcut(**self
.test_args
)
1727 self
.assertEqual(1, len(dput
.dcut
.write_commands
.mock_calls
))
1728 (__
, call_args
, call_kwargs
) = dput
.dcut
.write_commands
.mock_calls
[0]
1729 (__
, options
, __
, __
) = call_args
1730 self
.assertEqual(self
.expected_host
, options
['host'])
1733 class dcut_FileToUploadBadNameTestCase(
1734 testscenarios
.WithScenarios
,
1736 """ Test cases for `dcut` function, file to upload with bad name. """
1740 'getoptions_args': [],
1741 'getoptions_opts': {
1742 'filetoupload': tempfile
.mktemp(),
1744 'expected_output': (
1745 "Error: I'm insisting on the .commands extension"),
1749 def test_emits_error_message_for_bad_filename(self
):
1750 """ Should emit error message for bad filename. """
1751 dput
.dcut
.dcut(**self
.test_args
)
1752 self
.assertIn(self
.expected_output
, sys
.stdout
.getvalue())
1755 class dcut_ParseChangesTestCase(
1756 testscenarios
.WithScenarios
,
1758 """ Test cases for `dcut` function, parse upload control file. """
1761 ('changes-file no-filetoupload', {
1762 'getoptions_opts': {
1763 'filetoupload': None,
1764 'changes': tempfile
.mktemp(),
1767 ('changes-file no-filetoupload no-filetocreate', {
1768 'getoptions_opts': {
1769 'filetoupload': None,
1770 'filetocreate': None,
1771 'changes': tempfile
.mktemp(),
1776 def test_calls_create_commands_with_expected_args(self
):
1777 """ Should call `create_commands` with expected args. """
1778 dput
.dcut
.dcut(**self
.test_args
)
1779 (expected_options
, __
) = dput
.dcut
.getoptions()
1780 expected_config
= self
.runtime_config_parser
1781 expected_parse_changes
= dput
.dput
.parse_changes
1782 dput
.dcut
.create_commands
.assert_called_with(
1783 expected_options
, expected_config
, expected_parse_changes
)
1785 def test_calls_write_commands_with_expected_args(self
):
1786 """ Should call `write_commands` with expected args. """
1787 expected_commands
= object()
1788 dput
.dcut
.create_commands
.return_value
= expected_commands
1789 dput
.dcut
.dcut(**self
.test_args
)
1790 (expected_options
, __
) = dput
.dcut
.getoptions()
1791 expected_config
= self
.runtime_config_parser
1792 expected_tempdir
= self
.tempfile_mkdtemp_file_double
.path
1793 dput
.dcut
.write_commands
.assert_called_with(
1794 expected_commands
, expected_options
, expected_config
,
1798 class dcut_ParseQueueCommandsTestCase(
1799 testscenarios
.WithScenarios
,
1801 """ Test cases for `dcut` function, parse commands from arguments. """
1804 ('no-changes-file no-filetoupload', {
1805 'getoptions_opts': {
1806 'filetoupload': None,
1812 def test_calls_parse_queuecommands_with_expected_args(self
):
1813 """ Should call `parse_queuecommands` with expected args. """
1814 dput
.dcut
.dcut(**self
.test_args
)
1815 (expected_options
, expected_arguments
) = dput
.dcut
.getoptions()
1816 expected_config
= self
.runtime_config_parser
1817 dput
.dcut
.parse_queuecommands
.assert_called_with(
1818 expected_arguments
, expected_options
, expected_config
)
1820 def test_calls_write_commands_with_expected_args(self
):
1821 """ Should call `write_commands` with expected args. """
1822 expected_commands
= object()
1823 dput
.dcut
.parse_queuecommands
.return_value
= expected_commands
1824 dput
.dcut
.dcut(**self
.test_args
)
1825 (expected_options
, __
) = dput
.dcut
.getoptions()
1826 expected_config
= self
.runtime_config_parser
1827 expected_tempdir
= self
.tempfile_mkdtemp_file_double
.path
1828 dput
.dcut
.write_commands
.assert_called_with(
1829 expected_commands
, expected_options
, expected_config
,
1833 class dcut_CleanupTestCase(
1834 testscenarios
.WithScenarios
,
1836 """ Test cases for `dcut` function, cleanup from exception. """
1838 commands_scenarios
= [
1839 ('commands-from-arguments', {
1840 'getoptions_args': ["foo", "bar", "baz"],
1841 'getoptions_opts': {
1842 'filetocreate': None,
1843 'filetoupload': None,
1849 files_scenarios
= upload_UploadMethodTestCase
.files_scenarios
1851 scenarios
= testscenarios
.multiply_scenarios(
1852 commands_scenarios
, files_scenarios
)
1855 """ Set up test fixtures. """
1856 super(dcut_CleanupTestCase
, self
).setUp()
1858 dput
.dcut
.files_to_remove
= self
.files_to_remove
1859 for path
in self
.files_to_remove
:
1860 double
= FileDouble(path
)
1861 double
.set_os_unlink_scenario('okay')
1862 double
.register_for_testcase(self
)
1864 upload_method_func
= get_upload_method_func(self
)
1865 self
.upload_error
= RuntimeError("Bad stuff happened")
1866 upload_method_func
.side_effect
= self
.upload_error
1868 def test_calls_os_rmdir_for_tempdir_when_upload_raises_exception(self
):
1869 """ Should call `os.rmdir` for `tempdir` when exception raised. """
1871 dput
.dcut
.dcut(**self
.test_args
)
1872 except self
.upload_error
.__class
__:
1874 os
.rmdir
.assert_called_with(self
.tempfile_mkdtemp_file_double
.path
)
1876 def test_calls_os_unlink_for_each_file_when_upload_raises_exception(self
):
1877 """ Should call `os.unlink` for each file when exception raised. """
1879 dput
.dcut
.dcut(**self
.test_args
)
1880 except self
.upload_error
.__class
__:
1883 mock
.call(path
) for path
in self
.files_to_remove
]
1884 os
.unlink
.assert_has_calls(expected_calls
, any_order
=True)
1891 # vim: fileencoding=utf-8 filetype=python :