1 # -*- coding: utf-8; -*-
3 # test/test_dputhelper.py
4 # Part of ‘dput’, a Debian package upload toolkit.
6 # Copyright © 2015 Ben Finney <ben+python@benfinney.id.au>
8 # This is free software: you may copy, modify, and/or distribute this work
9 # under the terms of the GNU General Public License as published by the
10 # Free Software Foundation; version 3 of that license or any later version.
11 # No warranty expressed or implied. See the file ‘LICENSE.GPL-3’ for details.
13 """ Unit tests for ‘dput.helper.dputhelper’ module. """
15 from __future__
import (absolute_import
, unicode_literals
)
26 import testtools
.matchers
30 __package__
= str("test")
31 __import__(__package__
)
32 sys
.path
.insert(1, os
.path
.dirname(os
.path
.dirname(__file__
)))
33 from dput
.helper
import dputhelper
39 patch_system_interfaces
,
46 class spawnv_TestCase(
47 testscenarios
.WithScenarios
,
49 """ Test cases for `spawnv` function. """
51 default_args
= collections
.OrderedDict([
53 ('file', tempfile
.mktemp()),
54 ('args', ["arg-{}".format(n
) for n
in range(5)]),
59 'test_args': default_args
.copy(),
60 'os_spawnv_scenario_name': 'success',
63 'test_args': default_args
.copy(),
64 'os_spawnv_scenario_name': 'failure',
65 'expected_output': textwrap
.dedent("""\
66 Warning: The execution of '...' as
68 returned a nonzero exit code.
72 'test_args': default_args
.copy(),
73 'os_spawnv_scenario_name': 'not_found',
74 'expected_output': textwrap
.dedent("""\
75 Error: Failed to execute '...'.
76 The file may not exist or not be executable.
82 """ Set up test fixtures. """
83 super(spawnv_TestCase
, self
).setUp()
84 patch_system_interfaces(self
)
88 self
.set_subprocess_double()
90 def set_subprocess_double(self
):
91 """ Set the test double for the subprocess. """
92 double
= SubprocessDouble(
93 self
.test_args
['file'],
94 self
.test_args
['args'])
95 double
.register_for_testcase(self
)
96 double
.set_os_spawnv_scenario(self
.os_spawnv_scenario_name
)
97 self
.subprocess_double
= double
99 def test_calls_os_spawnv_with_specified_args(self
):
100 """ Should call `os.spawnv` with specified arguments. """
101 dputhelper
.spawnv(*self
.test_args
.values())
102 os
.spawnv
.assert_called_with(*self
.test_args
.values())
104 def test_emits_expected_output(self
):
105 """ Should emit the expected output messages. """
106 if not hasattr(self
, 'expected_output'):
107 self
.expected_output
= ""
108 dputhelper
.spawnv(*self
.test_args
.values())
110 sys
.stdout
.getvalue(),
111 testtools
.matchers
.DocTestMatches(
112 self
.expected_output
, flags
=doctest
.ELLIPSIS
))
115 class TimestampFile_TestCase(testtools
.TestCase
):
116 """ Base for test cases for the `TimestampFile` class. """
118 scenarios
= NotImplemented
121 """ Set up test fixtures. """
122 super(TimestampFile_TestCase
, self
).setUp()
124 patch_time_time(self
, itertools
.count(1))
126 self
.test_file
= StringIO()
127 self
.instance
= dputhelper
.TimestampFile(self
.test_file
)
130 class TimestampFile_InstanceTestCase(
131 testscenarios
.WithScenarios
,
132 TimestampFile_TestCase
):
133 """ Test cases for `TimestampFile` instance creation. """
139 def test_has_specified_file(self
):
140 """ Should have specified file object as `f` attribute. """
141 self
.assertIs(self
.test_file
, self
.instance
.f
)
143 def test_has_attributes_from_component_file(self
):
144 """ Should have attributes directly from component file. """
147 'mode', 'name', 'encoding',
148 'readable', 'seekable', 'writable',
149 'read', 'seek', 'tell',
151 for attr_name
in attr_names
:
152 expected_attr_value
= getattr(self
.test_file
, attr_name
, None)
154 getattr(self
.instance
, attr_name
, None),
155 testtools
.matchers
.Equals(expected_attr_value
))
158 class TimestampFile_write_TestCase(
159 testscenarios
.WithScenarios
,
160 TimestampFile_TestCase
):
161 """ Test cases for `TimestampFile.write` method. """
166 'expected_lines': [],
169 'test_output': textwrap
.dedent("""\
170 Lorem ipsum, dolor sit amet.
173 "1: Lorem ipsum, dolor sit amet.",
178 'test_output': textwrap
.dedent("""\
179 Lorem ipsum, dolor sit amet,
180 consectetur adipiscing elit.
181 Integer non pulvinar risus, sed malesuada diam.
184 "1: Lorem ipsum, dolor sit amet,",
185 "2: consectetur adipiscing elit.",
186 "3: Integer non pulvinar risus, sed malesuada diam.",
190 ('lines-two-with-trail', {
191 'test_output': textwrap
.dedent("""\
192 Lorem ipsum, dolor sit amet,
193 consectetur adipiscing elit.
194 Integer non pulvinar risus"""),
196 "1: Lorem ipsum, dolor sit amet,",
197 "2: consectetur adipiscing elit.",
198 "3: Integer non pulvinar risus",
203 def test_has_expected_content_for_output(self
):
204 """ Should have expected content for specified `write` output. """
205 self
.instance
.write(self
.test_output
)
206 expected_lines
= self
.expected_lines
207 if self
.expected_lines
:
208 if self
.expected_lines
[-1]:
209 # Expecting an unterminated final line.
210 expected_lines
= self
.expected_lines
[:-1]
211 expected_lines
.append("")
213 # Expecting no output following newline.
214 expected_lines
= self
.expected_lines
215 expected_content
= "\n".join(expected_lines
)
216 self
.assertEqual(expected_content
, self
.instance
.f
.getvalue())
219 class TimestampFile_close_TestCase(
220 testscenarios
.WithScenarios
,
221 TimestampFile_TestCase
):
222 """ Test cases for `TimestampFile.write` method. """
224 scenarios
= TimestampFile_write_TestCase
.scenarios
226 @testtools.skip("TimestampFile.close method is broken")
227 def test_has_expected_final_line(self
):
228 """ Should have expected final line. """
229 self
.instance
.write(self
.test_output
)
230 self
.instance
.f
.seek(0)
231 self
.instance
.close()
232 expected_content
= self
.expected_lines
[-1]
233 self
.assertEqual(expected_content
, self
.instance
.f
.getvalue())
236 class FileWithProgress_TestCase(
237 testscenarios
.WithScenarios
,
239 """ Base for test cases for the `FileWithProgress` class. """
243 'progressf': sys
.__stdout
__,
249 """ Set up test fixtures. """
250 super(FileWithProgress_TestCase
, self
).setUp()
251 patch_system_interfaces(self
)
253 self
.test_file
= StringIO(
254 getattr(self
, 'content', ""))
259 def set_test_args(self
):
260 """ Set the arguments for the test instance constructor. """
261 self
.test_args
= dict(
264 if hasattr(self
, 'test_ptype'):
265 self
.test_args
['ptype'] = self
.test_ptype
266 if hasattr(self
, 'test_progressf'):
267 self
.test_args
['progressf'] = self
.test_progressf
268 if hasattr(self
, 'test_size'):
269 self
.test_args
['size'] = self
.test_size
270 if hasattr(self
, 'test_step'):
271 self
.test_args
['step'] = self
.test_step
273 def make_instance(self
):
274 """ Make the test instance of the class. """
275 self
.instance
= dputhelper
.FileWithProgress(**self
.test_args
)
278 class FileWithProgress_ArgsTestCase(FileWithProgress_TestCase
):
279 """ Test cases for constructor arguments for `FileWithProgress` class. """
285 'test_progressf': StringIO(),
291 def test_has_specified_file(self
):
292 """ Should have specified file object as `f` attribute. """
293 self
.assertIs(self
.test_file
, self
.instance
.f
)
295 def test_has_specified_ptype(self
):
296 """ Should have specified progress type value as `ptype` attribute. """
297 expected_ptype
= getattr(
298 self
, 'test_ptype', self
.default_args
['ptype'])
299 self
.assertEqual(expected_ptype
, self
.instance
.ptype
)
301 def test_has_specified_progressf(self
):
302 """ Should have specified progress file as `progressf` attribute. """
303 expected_progressf
= getattr(
304 self
, 'test_progressf', self
.default_args
['progressf'])
305 self
.assertEqual(expected_progressf
, self
.instance
.progressf
)
307 def test_has_specified_size(self
):
308 """ Should have specified size value as `size` attribute. """
309 expected_size
= getattr(
310 self
, 'test_size', self
.default_args
['size'])
311 self
.assertEqual(expected_size
, self
.instance
.size
)
313 def test_has_specified_step(self
):
314 """ Should have specified step value as `step` attribute. """
315 expected_step
= getattr(
316 self
, 'test_step', self
.default_args
['step'])
317 self
.assertEqual(expected_step
, self
.instance
.step
)
319 def test_has_attributes_from_component_file(self
):
320 """ Should have attributes directly from component file. """
323 'mode', 'name', 'encoding',
324 'readable', 'seekable', 'writable',
325 'seek', 'tell', 'write',
327 for attr_name
in attr_names
:
328 expected_attr_value
= getattr(self
.test_file
, attr_name
, None)
330 getattr(self
.instance
, attr_name
, None),
331 testtools
.matchers
.Equals(expected_attr_value
))
334 class FileWithProgress_OutputTestCase(FileWithProgress_TestCase
):
335 """ Test cases for progress output for `FileWithProgress` class. """
337 content_scenarios
= [
342 'content': "0123456789\n" * 1000,
344 ('10 000 000 chars', {
345 'content': "0123456789\n" * 1000000,
351 ('ptype 0', {'test_ptype': 0}),
352 ('ptype 1', {'test_ptype': 1}),
353 ('ptype 2', {'test_ptype': 2}),
358 ('step 5', {'test_step': 5}),
359 ('step 500', {'test_step': 500}),
360 ('step 50 000', {'test_step': 50000}),
363 scenarios
= testscenarios
.multiply_scenarios(
364 content_scenarios
, ptype_scenarios
, step_scenarios
)
367 """ Set up test fixtures. """
368 super(FileWithProgress_OutputTestCase
, self
).setUp()
370 self
.test_file
= StringIO(self
.content
)
371 self
.test_size
= len(self
.content
)
372 self
.test_progressf
= StringIO()
376 self
.set_expected_output()
378 def set_expected_output(self
):
379 """ Set the expected output for this test case. """
380 ptype
= getattr(self
, 'test_ptype', self
.default_args
['ptype'])
382 self
.expected_output
= "/"
384 step
= getattr(self
, 'test_step', 1024)
385 total_bytes
= len(self
.content
)
386 total_hunks
= int(total_bytes
/ step
)
387 total_hunks_text
= "{size}k".format(size
=total_hunks
)
389 (total_bytes
+ step
- 1) / step
)
390 total_steps_text
= "{size}k".format(size
=total_steps
)
391 progress_text
= "{hunks}/{steps}".format(
392 hunks
=total_hunks_text
, steps
=total_steps_text
)
393 self
.expected_output
= progress_text
395 # `ptype == 0` specifies no progress output.
396 self
.expected_output
= ""
399 # No progress output for an empty file.
400 self
.expected_output
= ""
402 def test_emits_expected_output_for_content(self
):
403 """ Should emit expected output for file content. """
405 output_stream_content
= self
.test_progressf
.getvalue()
407 self
.expected_output
, output_stream_content
)
409 def test_clears_output_on_close(self
):
410 """ Should clear progress output when closed. """
412 self
.instance
.close()
415 + len(self
.expected_output
) * "\b"
416 + len(self
.expected_output
) * " "
417 + len(self
.expected_output
) * "\b"
419 output_stream_content
= self
.test_progressf
.getvalue()
420 self
.assertEqual(expected_output
, output_stream_content
)
423 def patch_filewithprogress(testcase
):
424 """ Patch the `FileWithProgress` class for the test case. """
425 if not hasattr(testcase
, 'fake_filewithprogress'):
426 testcase
.fake_filewithprogress
= mock
.MagicMock(
427 spec
=dputhelper
.FileWithProgress
, name
="FileWithProgress")
429 def fake_filewithprogress_factory(
430 f
, ptype
=0, progressf
=sys
.stdout
, size
=-1, step
=1024):
431 result
= testcase
.fake_filewithprogress
434 result
.progressf
= progressf
439 func_patcher
= mock
.patch
.object(
440 dputhelper
, "FileWithProgress",
441 side_effect
=fake_filewithprogress_factory
)
443 testcase
.addCleanup(func_patcher
.stop
)
446 GetoptResult
= collections
.namedtuple('GetoptResult', ['optlist', 'args'])
449 class getopt_SuccessTestCase(
450 testscenarios
.WithScenarios
,
452 """ Success test cases for `getopt` function. """
456 'test_argv': [object()],
457 'expected_result': GetoptResult(
458 optlist
=[], args
=[]),
461 'test_argv': [object(), "foo", "bar", "baz"],
462 'expected_result': GetoptResult(
463 optlist
=[], args
=["foo", "bar", "baz"]),
465 ('only short opts', {
466 'test_argv': [object(), "-a", "-b", "-c"],
467 'test_shortopts': "axbycz",
468 'expected_result': GetoptResult(
477 'test_argv': [object(), "--alpha", "--beta", "--gamma"],
479 "wibble", "alpha", "wobble",
480 "beta", "wubble", "gamma",
482 'expected_result': GetoptResult(
490 ('long opt prefix', {
491 'test_argv': [object(), "--al", "--be", "--ga"],
493 "wibble", "alpha", "wobble",
494 "beta", "wubble", "gamma",
496 'expected_result': GetoptResult(
504 ('short opt cluster', {
505 'test_argv': [object(), "-abc"],
506 'test_shortopts': "abc",
507 'expected_result': GetoptResult(
515 ('short with args', {
516 'test_argv': [object(), "-a", "-b", "eggs", "-cbeans"],
517 'test_shortopts': "ab:c:",
518 'expected_result': GetoptResult(
533 "wibble", "alpha", "wobble",
534 "beta=", "wubble", "gamma=",
536 'expected_result': GetoptResult(
540 ('--gamma', "beans"),
544 ('long with optional args', {
551 "wibble", "alpha", "wobble",
552 "beta==", "wubble", "gamma==",
554 'expected_result': GetoptResult(
562 ('single hyphen arg', {
563 'test_argv': [object(), "-a", "-b", "-c", "-"],
564 'test_shortopts': "axbycz",
565 'expected_result': GetoptResult(
573 ('explicit end of opts', {
581 "wibble", "alpha", "wobble",
582 "beta", "wubble", "gamma",
584 'expected_result': GetoptResult(
593 def test_returns_expected_result_for_argv(self
):
594 """ Should return expected result for specified argv. """
595 shortopts
= getattr(self
, 'test_shortopts', "")
596 longopts
= getattr(self
, 'test_longopts', "")
597 result
= dputhelper
.getopt(
598 self
.test_argv
[1:], shortopts
, longopts
)
599 self
.assertEqual(self
.expected_result
, result
)
602 class getopt_ErrorTestCase(
603 testscenarios
.WithScenarios
,
605 """ Error test cases for `getopt` function. """
608 ('short opt unknown', {
609 'test_argv': [object(), "-a", "-b", "-z", "-c"],
610 'test_shortopts': "abc",
611 'expected_error': dputhelper
.DputException
,
613 ('short missing arg', {
614 'test_argv': [object(), "-a", "-b", "-c"],
615 'test_shortopts': "abc:",
616 'expected_error': dputhelper
.DputException
,
618 ('long opt unknown', {
620 object(), "--alpha", "--beta", "--zeta", "--gamma"],
622 "alpha", "beta", "gamma"],
623 'expected_error': dputhelper
.DputException
,
625 ('long ambiguous prefix', {
627 object(), "--alpha", "--be", "--gamma"],
629 "alpha", "beta", "bettong", "bertha", "gamma"],
630 'expected_error': dputhelper
.DputException
,
632 ('long missing arg', {
633 'test_argv': [object(), "--alpha", "--beta", "--gamma"],
635 "alpha", "beta", "gamma="],
636 'expected_error': dputhelper
.DputException
,
638 ('long unexpected arg', {
640 object(), "--alpha", "--beta=beans", "--gamma"],
642 "alpha", "beta", "gamma"],
643 'expected_error': dputhelper
.DputException
,
647 def test_raises_expected_error_for_argv(self
):
648 """ Should raise expected error for specified argv. """
649 shortopts
= getattr(self
, 'test_shortopts', "")
650 longopts
= getattr(self
, 'test_longopts', "")
651 with testtools
.ExpectedException(self
.expected_error
):
653 self
.test_argv
[1:], shortopts
, longopts
)
656 def patch_getopt(testcase
):
657 """ Patch the `getopt` function for the specified test case. """
658 def fake_getopt(args
, shortopts
, longopts
):
659 result
= (testcase
.getopt_opts
, testcase
.getopt_args
)
662 func_patcher
= mock
.patch
.object(
663 dputhelper
, "getopt", side_effect
=fake_getopt
)
665 testcase
.addCleanup(func_patcher
.stop
)
668 class get_progname_TestCase(
669 testscenarios
.WithScenarios
,
671 """ Test cases for `get_progname` function. """
673 command_name_scenarios
= [
676 'expected_progname': "amet",
678 ('command-relative', {
679 'argv_zero': "lorem/ipsum/dolor/sit/amet",
680 'expected_progname': "amet",
682 ('command-absolute', {
683 'argv_zero': "/lorem/ipsum/dolor/sit/amet",
684 'expected_progname': "amet",
688 subsequent_args_scenarios
= [
693 'argv_remain': ["spam"],
695 ('args-three-words', {
696 'argv_remain': ["spam", "beans", "eggs"],
698 ('args-one-option', {
699 'argv_remain': ["--spam"],
703 scenarios
= testscenarios
.multiply_scenarios(
704 command_name_scenarios
, subsequent_args_scenarios
)
707 """ Set up test fixtures. """
708 super(get_progname_TestCase
, self
).setUp()
710 self
.test_argv
= [self
.argv_zero
] + self
.argv_remain
712 def test_returns_expected_progname(self
):
713 """ Should return expected progname value for command line. """
714 result
= dputhelper
.get_progname(self
.test_argv
)
715 self
.assertEqual(self
.expected_progname
, result
)
717 def test_queries_sys_argv_if_argv_unspecified(self
):
718 """ Should query `sys.argv` if no `argv` specified. """
719 self
.sys_argv
= self
.test_argv
721 result
= dputhelper
.get_progname()
722 self
.assertEqual(self
.expected_progname
, result
)
725 def patch_pkg_resources_get_distribution(testcase
):
726 """ Patch `pkg_resources.get_distribution` for the test case. """
727 if not hasattr(testcase
, 'fake_distribution'):
728 testcase
.fake_distribution
= mock
.MagicMock(pkg_resources
.Distribution
)
729 func_patcher
= mock
.patch
.object(
730 pkg_resources
, 'get_distribution',
731 return_value
=testcase
.fake_distribution
)
733 testcase
.addCleanup(func_patcher
.stop
)
736 class get_distribution_version_TestCase(
737 testscenarios
.WithScenarios
,
739 """ Test cases for `get_distribution_version` function. """
743 'fake_distribution': mock
.MagicMock(
744 project_name
="lorem", version
="42.23"),
749 """ Set up test fixtures. """
750 super(get_distribution_version_TestCase
, self
).setUp()
752 patch_pkg_resources_get_distribution(self
)
754 def test_returns_expected_result(self
):
755 """ Should return expected version for the distribution. """
756 result
= dputhelper
.get_distribution_version()
757 expected_version
= self
.fake_distribution
.version
758 self
.assertEqual(expected_version
, result
)
765 # vim: fileencoding=utf-8 filetype=python :