Add a function to get the distribution version.
[dput.git] / test / test_dputhelper.py
blob96d9b1d2c3913a938a9e2d6dc3693b9e5e95d305
1 # -*- coding: utf-8; -*-
3 # test/test_dputhelper.py
4 # Part of ‘dput’, a Debian package upload toolkit.
6 # Copyright © 2015 Ben Finney <ben+python@benfinney.id.au>
8 # This is free software: you may copy, modify, and/or distribute this work
9 # under the terms of the GNU General Public License as published by the
10 # Free Software Foundation; version 3 of that license or any later version.
11 # No warranty expressed or implied. See the file ‘LICENSE.GPL-3’ for details.
13 """ Unit tests for ‘dput.helper.dputhelper’ module. """
15 from __future__ import (absolute_import, unicode_literals)
17 import sys
18 import os
19 import collections
20 import itertools
21 import tempfile
22 import doctest
23 import textwrap
25 import testtools
26 import testtools.matchers
27 import testscenarios
28 import pkg_resources
30 __package__ = str("test")
31 __import__(__package__)
32 sys.path.insert(1, os.path.dirname(os.path.dirname(__file__)))
33 from dput.helper import dputhelper
35 from .helper import (
36 mock,
37 StringIO,
38 patch_sys_argv,
39 patch_system_interfaces,
40 patch_time_time,
41 patch_os_spawnv,
42 SubprocessDouble,
46 class spawnv_TestCase(
47 testscenarios.WithScenarios,
48 testtools.TestCase):
49 """ Test cases for `spawnv` function. """
51 default_args = collections.OrderedDict([
52 ('mode', object()),
53 ('file', tempfile.mktemp()),
54 ('args', ["arg-{}".format(n) for n in range(5)]),
57 scenarios = [
58 ('success', {
59 'test_args': default_args.copy(),
60 'os_spawnv_scenario_name': 'success',
61 }),
62 ('failure', {
63 'test_args': default_args.copy(),
64 'os_spawnv_scenario_name': 'failure',
65 'expected_output': textwrap.dedent("""\
66 Warning: The execution of '...' as
67 '...'
68 returned a nonzero exit code.
69 """)
70 }),
71 ('not-found', {
72 'test_args': default_args.copy(),
73 'os_spawnv_scenario_name': 'not_found',
74 'expected_output': textwrap.dedent("""\
75 Error: Failed to execute '...'.
76 The file may not exist or not be executable.
77 """)
78 }),
81 def setUp(self):
82 """ Set up test fixtures. """
83 super(spawnv_TestCase, self).setUp()
84 patch_system_interfaces(self)
86 patch_os_spawnv(self)
88 self.set_subprocess_double()
90 def set_subprocess_double(self):
91 """ Set the test double for the subprocess. """
92 double = SubprocessDouble(
93 self.test_args['file'],
94 self.test_args['args'])
95 double.register_for_testcase(self)
96 double.set_os_spawnv_scenario(self.os_spawnv_scenario_name)
97 self.subprocess_double = double
99 def test_calls_os_spawnv_with_specified_args(self):
100 """ Should call `os.spawnv` with specified arguments. """
101 dputhelper.spawnv(*self.test_args.values())
102 os.spawnv.assert_called_with(*self.test_args.values())
104 def test_emits_expected_output(self):
105 """ Should emit the expected output messages. """
106 if not hasattr(self, 'expected_output'):
107 self.expected_output = ""
108 dputhelper.spawnv(*self.test_args.values())
109 self.assertThat(
110 sys.stdout.getvalue(),
111 testtools.matchers.DocTestMatches(
112 self.expected_output, flags=doctest.ELLIPSIS))
115 class TimestampFile_TestCase(testtools.TestCase):
116 """ Base for test cases for the `TimestampFile` class. """
118 scenarios = NotImplemented
120 def setUp(self):
121 """ Set up test fixtures. """
122 super(TimestampFile_TestCase, self).setUp()
124 patch_time_time(self, itertools.count(1))
126 self.test_file = StringIO()
127 self.instance = dputhelper.TimestampFile(self.test_file)
130 class TimestampFile_InstanceTestCase(
131 testscenarios.WithScenarios,
132 TimestampFile_TestCase):
133 """ Test cases for `TimestampFile` instance creation. """
135 scenarios = [
136 ('default', {}),
139 def test_has_specified_file(self):
140 """ Should have specified file object as `f` attribute. """
141 self.assertIs(self.test_file, self.instance.f)
143 def test_has_attributes_from_component_file(self):
144 """ Should have attributes directly from component file. """
145 attr_names = [
146 'b0gUs',
147 'mode', 'name', 'encoding',
148 'readable', 'seekable', 'writable',
149 'read', 'seek', 'tell',
151 for attr_name in attr_names:
152 expected_attr_value = getattr(self.test_file, attr_name, None)
153 self.expectThat(
154 getattr(self.instance, attr_name, None),
155 testtools.matchers.Equals(expected_attr_value))
158 class TimestampFile_write_TestCase(
159 testscenarios.WithScenarios,
160 TimestampFile_TestCase):
161 """ Test cases for `TimestampFile.write` method. """
163 scenarios = [
164 ('empty', {
165 'test_output': "",
166 'expected_lines': [],
168 ('lines-one', {
169 'test_output': textwrap.dedent("""\
170 Lorem ipsum, dolor sit amet.
171 """),
172 'expected_lines': [
173 "1: Lorem ipsum, dolor sit amet.",
177 ('lines-three', {
178 'test_output': textwrap.dedent("""\
179 Lorem ipsum, dolor sit amet,
180 consectetur adipiscing elit.
181 Integer non pulvinar risus, sed malesuada diam.
182 """),
183 'expected_lines': [
184 "1: Lorem ipsum, dolor sit amet,",
185 "2: consectetur adipiscing elit.",
186 "3: Integer non pulvinar risus, sed malesuada diam.",
190 ('lines-two-with-trail', {
191 'test_output': textwrap.dedent("""\
192 Lorem ipsum, dolor sit amet,
193 consectetur adipiscing elit.
194 Integer non pulvinar risus"""),
195 'expected_lines': [
196 "1: Lorem ipsum, dolor sit amet,",
197 "2: consectetur adipiscing elit.",
198 "3: Integer non pulvinar risus",
203 def test_has_expected_content_for_output(self):
204 """ Should have expected content for specified `write` output. """
205 self.instance.write(self.test_output)
206 expected_lines = self.expected_lines
207 if self.expected_lines:
208 if self.expected_lines[-1]:
209 # Expecting an unterminated final line.
210 expected_lines = self.expected_lines[:-1]
211 expected_lines.append("")
212 else:
213 # Expecting no output following newline.
214 expected_lines = self.expected_lines
215 expected_content = "\n".join(expected_lines)
216 self.assertEqual(expected_content, self.instance.f.getvalue())
219 class TimestampFile_close_TestCase(
220 testscenarios.WithScenarios,
221 TimestampFile_TestCase):
222 """ Test cases for `TimestampFile.write` method. """
224 scenarios = TimestampFile_write_TestCase.scenarios
226 @testtools.skip("TimestampFile.close method is broken")
227 def test_has_expected_final_line(self):
228 """ Should have expected final line. """
229 self.instance.write(self.test_output)
230 self.instance.f.seek(0)
231 self.instance.close()
232 expected_content = self.expected_lines[-1]
233 self.assertEqual(expected_content, self.instance.f.getvalue())
236 class FileWithProgress_TestCase(
237 testscenarios.WithScenarios,
238 testtools.TestCase):
239 """ Base for test cases for the `FileWithProgress` class. """
241 default_args = {
242 'ptype': 0,
243 'progressf': sys.__stdout__,
244 'size': -1,
245 'step': 1024,
248 def setUp(self):
249 """ Set up test fixtures. """
250 super(FileWithProgress_TestCase, self).setUp()
251 patch_system_interfaces(self)
253 self.test_file = StringIO(
254 getattr(self, 'content', ""))
256 self.set_test_args()
257 self.make_instance()
259 def set_test_args(self):
260 """ Set the arguments for the test instance constructor. """
261 self.test_args = dict(
262 f=self.test_file,
264 if hasattr(self, 'test_ptype'):
265 self.test_args['ptype'] = self.test_ptype
266 if hasattr(self, 'test_progressf'):
267 self.test_args['progressf'] = self.test_progressf
268 if hasattr(self, 'test_size'):
269 self.test_args['size'] = self.test_size
270 if hasattr(self, 'test_step'):
271 self.test_args['step'] = self.test_step
273 def make_instance(self):
274 """ Make the test instance of the class. """
275 self.instance = dputhelper.FileWithProgress(**self.test_args)
278 class FileWithProgress_ArgsTestCase(FileWithProgress_TestCase):
279 """ Test cases for constructor arguments for `FileWithProgress` class. """
281 scenarios = [
282 ('simple', {}),
283 ('all args', {
284 'test_ptype': 1,
285 'test_progressf': StringIO(),
286 'test_size': 10,
287 'test_step': 2,
291 def test_has_specified_file(self):
292 """ Should have specified file object as `f` attribute. """
293 self.assertIs(self.test_file, self.instance.f)
295 def test_has_specified_ptype(self):
296 """ Should have specified progress type value as `ptype` attribute. """
297 expected_ptype = getattr(
298 self, 'test_ptype', self.default_args['ptype'])
299 self.assertEqual(expected_ptype, self.instance.ptype)
301 def test_has_specified_progressf(self):
302 """ Should have specified progress file as `progressf` attribute. """
303 expected_progressf = getattr(
304 self, 'test_progressf', self.default_args['progressf'])
305 self.assertEqual(expected_progressf, self.instance.progressf)
307 def test_has_specified_size(self):
308 """ Should have specified size value as `size` attribute. """
309 expected_size = getattr(
310 self, 'test_size', self.default_args['size'])
311 self.assertEqual(expected_size, self.instance.size)
313 def test_has_specified_step(self):
314 """ Should have specified step value as `step` attribute. """
315 expected_step = getattr(
316 self, 'test_step', self.default_args['step'])
317 self.assertEqual(expected_step, self.instance.step)
319 def test_has_attributes_from_component_file(self):
320 """ Should have attributes directly from component file. """
321 attr_names = [
322 'b0gUs',
323 'mode', 'name', 'encoding',
324 'readable', 'seekable', 'writable',
325 'seek', 'tell', 'write',
327 for attr_name in attr_names:
328 expected_attr_value = getattr(self.test_file, attr_name, None)
329 self.expectThat(
330 getattr(self.instance, attr_name, None),
331 testtools.matchers.Equals(expected_attr_value))
334 class FileWithProgress_OutputTestCase(FileWithProgress_TestCase):
335 """ Test cases for progress output for `FileWithProgress` class. """
337 content_scenarios = [
338 ('empty', {
339 'content': "",
341 ('10 000 chars', {
342 'content': "0123456789\n" * 1000,
344 ('10 000 000 chars', {
345 'content': "0123456789\n" * 1000000,
349 ptype_scenarios = [
350 ('default', {}),
351 ('ptype 0', {'test_ptype': 0}),
352 ('ptype 1', {'test_ptype': 1}),
353 ('ptype 2', {'test_ptype': 2}),
356 step_scenarios = [
357 ('default', {}),
358 ('step 5', {'test_step': 5}),
359 ('step 500', {'test_step': 500}),
360 ('step 50 000', {'test_step': 50000}),
363 scenarios = testscenarios.multiply_scenarios(
364 content_scenarios, ptype_scenarios, step_scenarios)
366 def setUp(self):
367 """ Set up test fixtures. """
368 super(FileWithProgress_OutputTestCase, self).setUp()
370 self.test_file = StringIO(self.content)
371 self.test_size = len(self.content)
372 self.test_progressf = StringIO()
373 self.set_test_args()
374 self.make_instance()
376 self.set_expected_output()
378 def set_expected_output(self):
379 """ Set the expected output for this test case. """
380 ptype = getattr(self, 'test_ptype', self.default_args['ptype'])
381 if ptype == 1:
382 self.expected_output = "/"
383 elif ptype == 2:
384 step = getattr(self, 'test_step', 1024)
385 total_bytes = len(self.content)
386 total_hunks = int(total_bytes / step)
387 total_hunks_text = "{size}k".format(size=total_hunks)
388 total_steps = int(
389 (total_bytes + step - 1) / step)
390 total_steps_text = "{size}k".format(size=total_steps)
391 progress_text = "{hunks}/{steps}".format(
392 hunks=total_hunks_text, steps=total_steps_text)
393 self.expected_output = progress_text
394 else:
395 # `ptype == 0` specifies no progress output.
396 self.expected_output = ""
398 if not self.content:
399 # No progress output for an empty file.
400 self.expected_output = ""
402 def test_emits_expected_output_for_content(self):
403 """ Should emit expected output for file content. """
404 self.instance.read()
405 output_stream_content = self.test_progressf.getvalue()
406 self.assertEqual(
407 self.expected_output, output_stream_content)
409 def test_clears_output_on_close(self):
410 """ Should clear progress output when closed. """
411 self.instance.read()
412 self.instance.close()
413 expected_output = (
414 self.expected_output
415 + len(self.expected_output) * "\b"
416 + len(self.expected_output) * " "
417 + len(self.expected_output) * "\b"
419 output_stream_content = self.test_progressf.getvalue()
420 self.assertEqual(expected_output, output_stream_content)
423 def patch_filewithprogress(testcase):
424 """ Patch the `FileWithProgress` class for the test case. """
425 if not hasattr(testcase, 'fake_filewithprogress'):
426 testcase.fake_filewithprogress = mock.MagicMock(
427 spec=dputhelper.FileWithProgress, name="FileWithProgress")
429 def fake_filewithprogress_factory(
430 f, ptype=0, progressf=sys.stdout, size=-1, step=1024):
431 result = testcase.fake_filewithprogress
432 result.f = f
433 result.ptype = ptype
434 result.progressf = progressf
435 result.size = size
436 result.step = step
437 return result
439 func_patcher = mock.patch.object(
440 dputhelper, "FileWithProgress",
441 side_effect=fake_filewithprogress_factory)
442 func_patcher.start()
443 testcase.addCleanup(func_patcher.stop)
446 GetoptResult = collections.namedtuple('GetoptResult', ['optlist', 'args'])
449 class getopt_SuccessTestCase(
450 testscenarios.WithScenarios,
451 testtools.TestCase):
452 """ Success test cases for `getopt` function. """
454 scenarios = [
455 ('empty', {
456 'test_argv': [object()],
457 'expected_result': GetoptResult(
458 optlist=[], args=[]),
460 ('no opts', {
461 'test_argv': [object(), "foo", "bar", "baz"],
462 'expected_result': GetoptResult(
463 optlist=[], args=["foo", "bar", "baz"]),
465 ('only short opts', {
466 'test_argv': [object(), "-a", "-b", "-c"],
467 'test_shortopts': "axbycz",
468 'expected_result': GetoptResult(
469 optlist=[
470 ('-a', ""),
471 ('-b', ""),
472 ('-c', ""),
474 args=[]),
476 ('only long opts', {
477 'test_argv': [object(), "--alpha", "--beta", "--gamma"],
478 'test_longopts': [
479 "wibble", "alpha", "wobble",
480 "beta", "wubble", "gamma",
482 'expected_result': GetoptResult(
483 optlist=[
484 ('--alpha', ""),
485 ('--beta', ""),
486 ('--gamma', ""),
488 args=[]),
490 ('long opt prefix', {
491 'test_argv': [object(), "--al", "--be", "--ga"],
492 'test_longopts': [
493 "wibble", "alpha", "wobble",
494 "beta", "wubble", "gamma",
496 'expected_result': GetoptResult(
497 optlist=[
498 ('--alpha', ""),
499 ('--beta', ""),
500 ('--gamma', ""),
502 args=[]),
504 ('short opt cluster', {
505 'test_argv': [object(), "-abc"],
506 'test_shortopts': "abc",
507 'expected_result': GetoptResult(
508 optlist=[
509 ('-a', ""),
510 ('-b', ""),
511 ('-c', ""),
513 args=[]),
515 ('short with args', {
516 'test_argv': [object(), "-a", "-b", "eggs", "-cbeans"],
517 'test_shortopts': "ab:c:",
518 'expected_result': GetoptResult(
519 optlist=[
520 ('-a', ""),
521 ('-b', "eggs"),
522 ('-c', "beans"),
524 args=[]),
526 ('long with args', {
527 'test_argv': [
528 object(),
529 "--alpha",
530 "--beta=eggs",
531 "--gamma", "beans"],
532 'test_longopts': [
533 "wibble", "alpha", "wobble",
534 "beta=", "wubble", "gamma=",
536 'expected_result': GetoptResult(
537 optlist=[
538 ('--alpha', ""),
539 ('--beta', "eggs"),
540 ('--gamma', "beans"),
542 args=[]),
544 ('long with optional args', {
545 'test_argv': [
546 object(),
547 "--alpha",
548 "--beta=eggs",
549 "--gamma"],
550 'test_longopts': [
551 "wibble", "alpha", "wobble",
552 "beta==", "wubble", "gamma==",
554 'expected_result': GetoptResult(
555 optlist=[
556 ('--alpha', ""),
557 ('--beta', "eggs"),
558 ('--gamma', ""),
560 args=[]),
562 ('single hyphen arg', {
563 'test_argv': [object(), "-a", "-b", "-c", "-"],
564 'test_shortopts': "axbycz",
565 'expected_result': GetoptResult(
566 optlist=[
567 ('-a', ""),
568 ('-b', ""),
569 ('-c', ""),
571 args=["-"]),
573 ('explicit end of opts', {
574 'test_argv': [
575 object(),
576 "--alpha",
577 "--beta",
578 "--",
579 "--spam"],
580 'test_longopts': [
581 "wibble", "alpha", "wobble",
582 "beta", "wubble", "gamma",
584 'expected_result': GetoptResult(
585 optlist=[
586 ('--alpha', ""),
587 ('--beta', ""),
589 args=["--spam"]),
593 def test_returns_expected_result_for_argv(self):
594 """ Should return expected result for specified argv. """
595 shortopts = getattr(self, 'test_shortopts', "")
596 longopts = getattr(self, 'test_longopts', "")
597 result = dputhelper.getopt(
598 self.test_argv[1:], shortopts, longopts)
599 self.assertEqual(self.expected_result, result)
602 class getopt_ErrorTestCase(
603 testscenarios.WithScenarios,
604 testtools.TestCase):
605 """ Error test cases for `getopt` function. """
607 scenarios = [
608 ('short opt unknown', {
609 'test_argv': [object(), "-a", "-b", "-z", "-c"],
610 'test_shortopts': "abc",
611 'expected_error': dputhelper.DputException,
613 ('short missing arg', {
614 'test_argv': [object(), "-a", "-b", "-c"],
615 'test_shortopts': "abc:",
616 'expected_error': dputhelper.DputException,
618 ('long opt unknown', {
619 'test_argv': [
620 object(), "--alpha", "--beta", "--zeta", "--gamma"],
621 'test_longopts': [
622 "alpha", "beta", "gamma"],
623 'expected_error': dputhelper.DputException,
625 ('long ambiguous prefix', {
626 'test_argv': [
627 object(), "--alpha", "--be", "--gamma"],
628 'test_longopts': [
629 "alpha", "beta", "bettong", "bertha", "gamma"],
630 'expected_error': dputhelper.DputException,
632 ('long missing arg', {
633 'test_argv': [object(), "--alpha", "--beta", "--gamma"],
634 'test_longopts': [
635 "alpha", "beta", "gamma="],
636 'expected_error': dputhelper.DputException,
638 ('long unexpected arg', {
639 'test_argv': [
640 object(), "--alpha", "--beta=beans", "--gamma"],
641 'test_longopts': [
642 "alpha", "beta", "gamma"],
643 'expected_error': dputhelper.DputException,
647 def test_raises_expected_error_for_argv(self):
648 """ Should raise expected error for specified argv. """
649 shortopts = getattr(self, 'test_shortopts', "")
650 longopts = getattr(self, 'test_longopts', "")
651 with testtools.ExpectedException(self.expected_error):
652 dputhelper.getopt(
653 self.test_argv[1:], shortopts, longopts)
656 def patch_getopt(testcase):
657 """ Patch the `getopt` function for the specified test case. """
658 def fake_getopt(args, shortopts, longopts):
659 result = (testcase.getopt_opts, testcase.getopt_args)
660 return result
662 func_patcher = mock.patch.object(
663 dputhelper, "getopt", side_effect=fake_getopt)
664 func_patcher.start()
665 testcase.addCleanup(func_patcher.stop)
668 class get_progname_TestCase(
669 testscenarios.WithScenarios,
670 testtools.TestCase):
671 """ Test cases for `get_progname` function. """
673 command_name_scenarios = [
674 ('command-simple', {
675 'argv_zero': "amet",
676 'expected_progname': "amet",
678 ('command-relative', {
679 'argv_zero': "lorem/ipsum/dolor/sit/amet",
680 'expected_progname': "amet",
682 ('command-absolute', {
683 'argv_zero': "/lorem/ipsum/dolor/sit/amet",
684 'expected_progname': "amet",
688 subsequent_args_scenarios = [
689 ('args-empty', {
690 'argv_remain': [],
692 ('args-one-word', {
693 'argv_remain': ["spam"],
695 ('args-three-words', {
696 'argv_remain': ["spam", "beans", "eggs"],
698 ('args-one-option', {
699 'argv_remain': ["--spam"],
703 scenarios = testscenarios.multiply_scenarios(
704 command_name_scenarios, subsequent_args_scenarios)
706 def setUp(self):
707 """ Set up test fixtures. """
708 super(get_progname_TestCase, self).setUp()
710 self.test_argv = [self.argv_zero] + self.argv_remain
712 def test_returns_expected_progname(self):
713 """ Should return expected progname value for command line. """
714 result = dputhelper.get_progname(self.test_argv)
715 self.assertEqual(self.expected_progname, result)
717 def test_queries_sys_argv_if_argv_unspecified(self):
718 """ Should query `sys.argv` if no `argv` specified. """
719 self.sys_argv = self.test_argv
720 patch_sys_argv(self)
721 result = dputhelper.get_progname()
722 self.assertEqual(self.expected_progname, result)
725 def patch_pkg_resources_get_distribution(testcase):
726 """ Patch `pkg_resources.get_distribution` for the test case. """
727 if not hasattr(testcase, 'fake_distribution'):
728 testcase.fake_distribution = mock.MagicMock(pkg_resources.Distribution)
729 func_patcher = mock.patch.object(
730 pkg_resources, 'get_distribution',
731 return_value=testcase.fake_distribution)
732 func_patcher.start()
733 testcase.addCleanup(func_patcher.stop)
736 class get_distribution_version_TestCase(
737 testscenarios.WithScenarios,
738 testtools.TestCase):
739 """ Test cases for `get_distribution_version` function. """
741 scenarios = [
742 ('simple', {
743 'fake_distribution': mock.MagicMock(
744 project_name="lorem", version="42.23"),
748 def setUp(self):
749 """ Set up test fixtures. """
750 super(get_distribution_version_TestCase, self).setUp()
752 patch_pkg_resources_get_distribution(self)
754 def test_returns_expected_result(self):
755 """ Should return expected version for the distribution. """
756 result = dputhelper.get_distribution_version()
757 expected_version = self.fake_distribution.version
758 self.assertEqual(expected_version, result)
761 # Local variables:
762 # coding: utf-8
763 # mode: python
764 # End:
765 # vim: fileencoding=utf-8 filetype=python :