1 # -*- coding: utf-8 -*-
2 # Copyright 2011 Google Inc. All Rights Reserved.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 """Unit and integration tests for gsutil command_runner module."""
17 from __future__
import absolute_import
24 from gslib
import command_runner
25 from gslib
.command
import Command
26 from gslib
.command_argument
import CommandArgument
27 from gslib
.command_runner
import CommandRunner
28 from gslib
.command_runner
import HandleArgCoding
29 from gslib
.exception
import CommandException
30 from gslib
.tab_complete
import CloudObjectCompleter
31 from gslib
.tab_complete
import CloudOrLocalObjectCompleter
32 from gslib
.tab_complete
import LocalObjectCompleter
33 from gslib
.tab_complete
import LocalObjectOrCannedACLCompleter
34 from gslib
.tab_complete
import NoOpCompleter
35 import gslib
.tests
.testcase
as testcase
36 import gslib
.tests
.util
as util
37 from gslib
.tests
.util
import ARGCOMPLETE_AVAILABLE
38 from gslib
.tests
.util
import SetBotoConfigFileForTest
39 from gslib
.tests
.util
import SetBotoConfigForTest
40 from gslib
.tests
.util
import unittest
41 from gslib
.util
import GSUTIL_PUB_TARBALL
42 from gslib
.util
import SECONDS_PER_DAY
45 class FakeArgparseArgument(object):
46 """Fake for argparse parser argument."""
50 class FakeArgparseParser(object):
51 """Fake for argparse parser."""
56 def add_argument(self
, *unused_args
, **unused_kwargs
):
57 argument
= FakeArgparseArgument()
58 self
.arguments
.append(argument
)
62 class FakeArgparseSubparsers(object):
63 """Container for nested parsers."""
68 def add_parser(self
, unused_name
, **unused_kwargs
):
69 parser
= FakeArgparseParser()
70 self
.parsers
.append(parser
)
74 class FakeCommandWithInvalidCompleter(Command
):
75 """Command with an invalid completer on an argument."""
77 command_spec
= Command
.CreateCommandSpec(
80 CommandArgument('arg', completer
='BAD')
84 help_spec
= Command
.HelpSpec(
87 help_type
='command_help',
88 help_one_line_summary
='fake command for tests',
89 help_text
='fake command for tests',
90 subcommand_help_text
={}
97 class FakeCommandWithCompleters(Command
):
98 """Command with various completer types."""
100 command_spec
= Command
.CreateCommandSpec(
103 CommandArgument
.MakeZeroOrMoreCloudURLsArgument(),
104 CommandArgument
.MakeZeroOrMoreFileURLsArgument(),
105 CommandArgument
.MakeZeroOrMoreCloudOrFileURLsArgument(),
106 CommandArgument
.MakeFreeTextArgument(),
107 CommandArgument
.MakeZeroOrMoreCloudBucketURLsArgument(),
108 CommandArgument
.MakeFileURLOrCannedACLArgument(),
112 help_spec
= Command
.HelpSpec(
114 help_name_aliases
=[],
115 help_type
='command_help',
116 help_one_line_summary
='fake command for tests',
117 help_text
='fake command for tests',
118 subcommand_help_text
={}
125 class TestCommandRunnerUnitTests(
126 testcase
.unit_testcase
.GsUtilUnitTestCase
):
127 """Unit tests for gsutil update check in command_runner module."""
130 """Sets up the command runner mock objects."""
131 super(TestCommandRunnerUnitTests
, self
).setUp()
133 # Mock out the timestamp file so we can manipulate it.
134 self
.previous_update_file
= (
135 command_runner
.LAST_CHECKED_FOR_GSUTIL_UPDATE_TIMESTAMP_FILE
)
136 self
.timestamp_file
= self
.CreateTempFile()
137 command_runner
.LAST_CHECKED_FOR_GSUTIL_UPDATE_TIMESTAMP_FILE
= (
140 # Mock out the gsutil version checker.
141 base_version
= unicode(gslib
.VERSION
)
142 while not base_version
.isnumeric():
144 raise CommandException(
145 'Version number (%s) is not numeric.' % gslib
.VERSION
)
146 base_version
= base_version
[:-1]
147 command_runner
.LookUpGsutilVersion
= lambda u
, v
: float(base_version
) + 1
149 # Mock out raw_input to trigger yes prompt.
150 command_runner
.raw_input = lambda p
: 'y'
152 # Mock out TTY check to pretend we're on a TTY even if we're not.
153 self
.running_interactively
= True
154 command_runner
.IsRunningInteractively
= lambda: self
.running_interactively
156 # Mock out the modified time of the VERSION file.
157 self
.version_mod_time
= 0
158 self
.previous_version_mod_time
= command_runner
.GetGsutilVersionModifiedTime
159 command_runner
.GetGsutilVersionModifiedTime
= lambda: self
.version_mod_time
161 # Create a fake pub tarball that will be used to check for gsutil version.
162 self
.pub_bucket_uri
= self
.CreateBucket('pub')
163 self
.gsutil_tarball_uri
= self
.CreateObject(
164 bucket_uri
=self
.pub_bucket_uri
, object_name
='gsutil.tar.gz',
168 """Tears down the command runner mock objects."""
169 super(TestCommandRunnerUnitTests
, self
).tearDown()
171 command_runner
.LAST_CHECKED_FOR_GSUTIL_UPDATE_TIMESTAMP_FILE
= (
172 self
.previous_update_file
)
173 command_runner
.LookUpGsutilVersion
= gslib
.util
.LookUpGsutilVersion
174 command_runner
.raw_input = raw_input
176 command_runner
.GetGsutilVersionModifiedTime
= self
.previous_version_mod_time
178 command_runner
.IsRunningInteractively
= gslib
.util
.IsRunningInteractively
180 self
.gsutil_tarball_uri
.delete_key()
181 self
.pub_bucket_uri
.delete_bucket()
183 def _IsPackageOrCloudSDKInstall(self
):
184 # Update should not trigger for package installs or Cloud SDK installs.
185 return (gslib
.IS_PACKAGE_INSTALL
or
186 os
.environ
.get('CLOUDSDK_WRAPPER') == '1')
188 @unittest.skipUnless(not util
.HAS_GS_HOST
, 'gs_host is defined in config')
189 def test_not_interactive(self
):
190 """Tests that update is not triggered if not running interactively."""
191 with
SetBotoConfigForTest([
192 ('GSUtil', 'software_update_check_period', '1')]):
193 with
open(self
.timestamp_file
, 'w') as f
:
194 f
.write(str(int(time
.time() - 2 * SECONDS_PER_DAY
)))
195 self
.running_interactively
= False
198 self
.command_runner
.MaybeCheckForAndOfferSoftwareUpdate('ls', 0))
200 @unittest.skipUnless(not util
.HAS_GS_HOST
, 'gs_host is defined in config')
201 def test_no_tracker_file_version_recent(self
):
202 """Tests when no timestamp file exists and VERSION file is recent."""
203 if os
.path
.exists(self
.timestamp_file
):
204 os
.remove(self
.timestamp_file
)
205 self
.assertFalse(os
.path
.exists(self
.timestamp_file
))
206 self
.version_mod_time
= time
.time()
209 self
.command_runner
.MaybeCheckForAndOfferSoftwareUpdate('ls', 0))
211 @unittest.skipUnless(not util
.HAS_GS_HOST
, 'gs_host is defined in config')
212 def test_no_tracker_file_version_old(self
):
213 """Tests when no timestamp file exists and VERSION file is old."""
214 if os
.path
.exists(self
.timestamp_file
):
215 os
.remove(self
.timestamp_file
)
216 self
.assertFalse(os
.path
.exists(self
.timestamp_file
))
217 self
.version_mod_time
= 0
218 expected
= not self
._IsPackageOrCloudSDKInstall
()
221 self
.command_runner
.MaybeCheckForAndOfferSoftwareUpdate('ls', 0))
223 @unittest.skipUnless(not util
.HAS_GS_HOST
, 'gs_host is defined in config')
224 def test_invalid_commands(self
):
225 """Tests that update is not triggered for certain commands."""
228 self
.command_runner
.MaybeCheckForAndOfferSoftwareUpdate('update', 0))
230 @unittest.skipUnless(not util
.HAS_GS_HOST
, 'gs_host is defined in config')
231 def test_invalid_file_contents(self
):
232 """Tests no update if timestamp file has invalid value."""
233 with
open(self
.timestamp_file
, 'w') as f
:
237 self
.command_runner
.MaybeCheckForAndOfferSoftwareUpdate('ls', 0))
239 @unittest.skipUnless(not util
.HAS_GS_HOST
, 'gs_host is defined in config')
240 def test_update_should_trigger(self
):
241 """Tests update should be triggered if time is up."""
242 with
SetBotoConfigForTest([
243 ('GSUtil', 'software_update_check_period', '1')]):
244 with
open(self
.timestamp_file
, 'w') as f
:
245 f
.write(str(int(time
.time() - 2 * SECONDS_PER_DAY
)))
246 expected
= not self
._IsPackageOrCloudSDKInstall
()
249 self
.command_runner
.MaybeCheckForAndOfferSoftwareUpdate('ls', 0))
251 @unittest.skipUnless(not util
.HAS_GS_HOST
, 'gs_host is defined in config')
252 def test_not_time_for_update_yet(self
):
253 """Tests update not triggered if not time yet."""
254 with
SetBotoConfigForTest([
255 ('GSUtil', 'software_update_check_period', '3')]):
256 with
open(self
.timestamp_file
, 'w') as f
:
257 f
.write(str(int(time
.time() - 2 * SECONDS_PER_DAY
)))
260 self
.command_runner
.MaybeCheckForAndOfferSoftwareUpdate('ls', 0))
262 def test_user_says_no_to_update(self
):
263 """Tests no update triggered if user says no at the prompt."""
264 with
SetBotoConfigForTest([
265 ('GSUtil', 'software_update_check_period', '1')]):
266 with
open(self
.timestamp_file
, 'w') as f
:
267 f
.write(str(int(time
.time() - 2 * SECONDS_PER_DAY
)))
268 command_runner
.raw_input = lambda p
: 'n'
271 self
.command_runner
.MaybeCheckForAndOfferSoftwareUpdate('ls', 0))
273 @unittest.skipUnless(not util
.HAS_GS_HOST
, 'gs_host is defined in config')
274 def test_update_check_skipped_with_quiet_mode(self
):
275 """Tests that update isn't triggered when loglevel is in quiet mode."""
276 with
SetBotoConfigForTest([
277 ('GSUtil', 'software_update_check_period', '1')]):
278 with
open(self
.timestamp_file
, 'w') as f
:
279 f
.write(str(int(time
.time() - 2 * SECONDS_PER_DAY
)))
281 expected
= not self
._IsPackageOrCloudSDKInstall
()
284 self
.command_runner
.MaybeCheckForAndOfferSoftwareUpdate('ls', 0))
286 prev_loglevel
= logging
.getLogger().getEffectiveLevel()
288 logging
.getLogger().setLevel(logging
.ERROR
)
289 # With reduced loglevel, should return False.
292 self
.command_runner
.MaybeCheckForAndOfferSoftwareUpdate('ls', 0))
294 logging
.getLogger().setLevel(prev_loglevel
)
296 def test_command_argument_parser_setup_invalid_completer(self
):
299 FakeCommandWithInvalidCompleter
.command_spec
.command_name
:
300 FakeCommandWithInvalidCompleter()
303 runner
= CommandRunner(
304 bucket_storage_uri_class
=self
.mock_bucket_storage_uri
,
305 gsutil_api_class_map_factory
=self
.mock_gsutil_api_class_map_factory
,
306 command_map
=command_map
)
308 subparsers
= FakeArgparseSubparsers()
310 runner
.ConfigureCommandArgumentParsers(subparsers
)
311 except RuntimeError as e
:
312 self
.assertIn('Unknown completer', e
.message
)
314 @unittest.skipUnless(ARGCOMPLETE_AVAILABLE
,
315 'Tab completion requires argcomplete')
316 def test_command_argument_parser_setup_completers(self
):
319 FakeCommandWithCompleters
.command_spec
.command_name
:
320 FakeCommandWithCompleters()
323 runner
= CommandRunner(
324 bucket_storage_uri_class
=self
.mock_bucket_storage_uri
,
325 gsutil_api_class_map_factory
=self
.mock_gsutil_api_class_map_factory
,
326 command_map
=command_map
)
328 subparsers
= FakeArgparseSubparsers()
329 runner
.ConfigureCommandArgumentParsers(subparsers
)
331 self
.assertEqual(1, len(subparsers
.parsers
))
332 parser
= subparsers
.parsers
[0]
333 self
.assertEqual(6, len(parser
.arguments
))
334 self
.assertEqual(CloudObjectCompleter
, type(parser
.arguments
[0].completer
))
335 self
.assertEqual(LocalObjectCompleter
, type(parser
.arguments
[1].completer
))
337 CloudOrLocalObjectCompleter
, type(parser
.arguments
[2].completer
))
339 NoOpCompleter
, type(parser
.arguments
[3].completer
))
340 self
.assertEqual(CloudObjectCompleter
, type(parser
.arguments
[4].completer
))
342 LocalObjectOrCannedACLCompleter
, type(parser
.arguments
[5].completer
))
344 # pylint: disable=invalid-encoded-data
345 def test_valid_arg_coding(self
):
346 """Tests that gsutil encodes valid args correctly."""
347 # Args other than -h and -p should be utf-8 decoded.
348 args
= HandleArgCoding(['ls', '-l'])
349 self
.assertIs(type(args
[0]), unicode)
350 self
.assertIs(type(args
[1]), unicode)
352 # -p and -h args other than x-goog-meta should not be decoded.
353 args
= HandleArgCoding(['ls', '-p', 'abc:def', 'gs://bucket'])
354 self
.assertIs(type(args
[0]), unicode)
355 self
.assertIs(type(args
[1]), unicode)
356 self
.assertIsNot(type(args
[2]), unicode)
357 self
.assertIs(type(args
[3]), unicode)
359 args
= HandleArgCoding(['gsutil', '-h', 'content-type:text/plain', 'cp',
361 self
.assertIs(type(args
[0]), unicode)
362 self
.assertIs(type(args
[1]), unicode)
363 self
.assertIsNot(type(args
[2]), unicode)
364 self
.assertIs(type(args
[3]), unicode)
365 self
.assertIs(type(args
[4]), unicode)
366 self
.assertIs(type(args
[5]), unicode)
368 # -h x-goog-meta args should be decoded.
369 args
= HandleArgCoding(['gsutil', '-h', 'x-goog-meta-abc', '1234'])
370 self
.assertIs(type(args
[0]), unicode)
371 self
.assertIs(type(args
[1]), unicode)
372 self
.assertIs(type(args
[2]), unicode)
373 self
.assertIs(type(args
[3]), unicode)
375 # -p and -h args with non-ASCII content should raise CommandException.
377 HandleArgCoding(['ls', '-p', '碼'])
378 # Ensure exception is raised.
379 self
.assertTrue(False)
380 except CommandException
as e
:
381 self
.assertIn('Invalid non-ASCII header', e
.reason
)
383 HandleArgCoding(['-h', '碼', 'ls'])
384 # Ensure exception is raised.
385 self
.assertTrue(False)
386 except CommandException
as e
:
387 self
.assertIn('Invalid non-ASCII header', e
.reason
)
390 class TestCommandRunnerIntegrationTests(
391 testcase
.GsUtilIntegrationTestCase
):
392 """Integration tests for gsutil update check in command_runner module."""
395 """Sets up the command runner mock objects."""
396 super(TestCommandRunnerIntegrationTests
, self
).setUp()
398 # Mock out the timestamp file so we can manipulate it.
399 self
.previous_update_file
= (
400 command_runner
.LAST_CHECKED_FOR_GSUTIL_UPDATE_TIMESTAMP_FILE
)
401 self
.timestamp_file
= self
.CreateTempFile(contents
='0')
402 command_runner
.LAST_CHECKED_FOR_GSUTIL_UPDATE_TIMESTAMP_FILE
= (
405 # Mock out raw_input to trigger yes prompt.
406 command_runner
.raw_input = lambda p
: 'y'
409 """Tears down the command runner mock objects."""
410 super(TestCommandRunnerIntegrationTests
, self
).tearDown()
411 command_runner
.LAST_CHECKED_FOR_GSUTIL_UPDATE_TIMESTAMP_FILE
= (
412 self
.previous_update_file
)
413 command_runner
.raw_input = raw_input
415 @unittest.skipUnless(not util
.HAS_GS_HOST
, 'gs_host is defined in config')
416 def test_lookup_version_without_credentials(self
):
417 """Tests that gsutil tarball version lookup works without credentials."""
418 with
SetBotoConfigFileForTest(self
.CreateTempFile(
419 contents
='[GSUtil]\nsoftware_update_check_period=1')):
420 self
.command_runner
= command_runner
.CommandRunner()
421 # Looking up software version shouldn't get auth failure exception.
422 self
.command_runner
.RunNamedCommand('ls', [GSUTIL_PUB_TARBALL
])