From ab93d9f667e96a357be97a63d1a0018b1667de65 Mon Sep 17 00:00:00 2001 From: Ben Finney Date: Fri, 7 Aug 2015 11:07:04 +1000 Subject: [PATCH] =?utf8?q?Add=20unit=20tests=20for=20=E2=80=98dput.methods?= =?utf8?q?.rsync.upload=E2=80=99.?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Signed-off-by: Ben Finney --- test/test_methods.py | 127 +++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 127 insertions(+) diff --git a/test/test_methods.py b/test/test_methods.py index 7e9663e..8e4c296 100644 --- a/test/test_methods.py +++ b/test/test_methods.py @@ -50,6 +50,7 @@ import dput.methods.ftp import dput.methods.http import dput.methods.https import dput.methods.scp +import dput.methods.rsync from .helper import ( mock, @@ -1465,6 +1466,132 @@ class scp_upload_ChmodTestCase(scp_upload_TestCase): sys.exit.assert_called_with(EXIT_STATUS_FAILURE) +class rsync_upload_TestCase(ssh_channel_upload_TestCase): + """ Test cases for `methods.rsync.upload` function. """ + + function_to_test = staticmethod(dput.methods.rsync.upload) + + scenarios = testscenarios.multiply_scenarios( + upload_TestCase.files_scenarios, + upload_TestCase.incoming_scenarios, + ssh_channel_upload_TestCase.login_scenarios, + ssh_channel_upload_TestCase.stat_mode_scenarios) + + def setUp(self): + """ Set up test fixtures. """ + super(rsync_upload_TestCase, self).setUp() + + self.set_rsync_subprocess_double() + + self.expected_ssh_options = [] + self.set_ssh_chmod_subprocess_double() + + def set_test_args(self): + """ Set the arguments for the test call to the function. """ + self.test_args = dict( + fqdn=self.getUniqueString(), + login=self.login, + incoming=self.incoming_path, + files_to_upload=self.paths_to_upload, + debug=False, + dummy=object(), + progress=object(), + ) + + def set_rsync_subprocess_double(self): + """ Set the ‘rsync’ test double for the subprocess. """ + command_file_path = "/usr/bin/rsync" + argv = [os.path.basename(command_file_path)] + argv.extend(self.paths_to_upload) + argv.extend([ + "--copy-links", "--progress", "--partial", + "-zave", "ssh -x"]) + argv.append(make_remote_spec( + username=self.login, host=self.test_args['fqdn'], + dir_path=self.incoming_path)) + double = SubprocessDouble(command_file_path, argv=argv) + double.register_for_testcase(self) + os_spawnv_scenario_name = getattr( + self, 'rsync_os_spawnv_scenario_name', "success") + double.set_os_spawnv_scenario(os_spawnv_scenario_name) + self.rsync_subprocess_double = double + + def test_emits_debug_message_for_upload(self): + """ Should emit debug message for files upload. """ + self.test_args['debug'] = True + self.function_to_test(**self.test_args) + expected_output = textwrap.dedent("""\ + D: Uploading with rsync to {host}:{incoming} + """).format( + host=make_host_spec( + username=self.login, host=self.test_args['fqdn']), + incoming=self.incoming_path) + self.assertIn(expected_output, sys.stdout.getvalue()) + + def test_calls_os_spawnv_with_expected_rsync_command(self): + """ Should call `os.spawnv` with expected ‘rsync’ command. """ + self.function_to_test(**self.test_args) + expected_calls = [mock.call( + os.P_WAIT, + self.rsync_subprocess_double.path, + self.rsync_subprocess_double.argv)] + os.spawnv.assert_has_calls(expected_calls) + + def test_emits_error_message_when_rsync_failure(self): + """ Should emit error message when ‘rsync’ command fails. """ + self.rsync_subprocess_double.set_os_spawnv_scenario("failure") + try: + self.function_to_test(**self.test_args) + except FakeSystemExit: + pass + expected_output = "Error while uploading." + self.assertIn(expected_output, sys.stdout.getvalue()) + + def test_calls_sys_exit_when_rsync_failure(self): + """ Should call `sys.exit` when ‘rsync’ command fails. """ + self.rsync_subprocess_double.set_os_spawnv_scenario("failure") + with testtools.ExpectedException(FakeSystemExit): + self.function_to_test(**self.test_args) + sys.exit.assert_called_with(EXIT_STATUS_FAILURE) + + def test_emits_debug_message_for_fixing_permissions(self): + """ Should emit debug message for fixing file permissions . """ + self.test_args['debug'] = True + self.function_to_test(**self.test_args) + expected_output = textwrap.dedent("""\ + D: Fixing file permissions with {host} + """).format( + host=make_host_spec( + username=self.login, host=self.test_args['fqdn'])) + self.assertIn(expected_output, sys.stdout.getvalue()) + + def test_calls_os_spawnv_with_expected_ssh_chmod_command(self): + """ Should call `os.spawnv` with expected ‘ssh … chmod’ command. """ + self.function_to_test(**self.test_args) + expected_calls = [mock.call( + os.P_WAIT, + self.ssh_chmod_subprocess_double.path, + list(self.ssh_chmod_subprocess_double.argv))] + os.spawnv.assert_has_calls(expected_calls) + + def test_emits_error_message_when_ssh_chmod_failure(self): + """ Should emit error message when ‘ssh … chmod’ command fails. """ + self.ssh_chmod_subprocess_double.set_os_spawnv_scenario("failure") + try: + self.function_to_test(**self.test_args) + except FakeSystemExit: + pass + expected_output = "Error while fixing permission." + self.assertIn(expected_output, sys.stdout.getvalue()) + + def test_calls_sys_exit_when_ssh_chmod_failure(self): + """ Should call `sys.exit` when ‘ssh … chmod’ command fails. """ + self.ssh_chmod_subprocess_double.set_os_spawnv_scenario("failure") + with testtools.ExpectedException(FakeSystemExit): + self.function_to_test(**self.test_args) + sys.exit.assert_called_with(EXIT_STATUS_FAILURE) + + # Local variables: # coding: utf-8 # mode: python -- 2.11.4.GIT