Merge Chromium + Blink git repositories
[chromium-blink-merge.git] / tools / telemetry / third_party / gsutilz / gslib / tests / test_command_runner.py
blobbf4d037d97f118cc0345db9f45b5719921e94f2e
1 # -*- coding: utf-8 -*-
2 # Copyright 2011 Google Inc. All Rights Reserved.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 """Unit and integration tests for gsutil command_runner module."""
17 from __future__ import absolute_import
19 import logging
20 import os
21 import time
23 import gslib
24 from gslib import command_runner
25 from gslib.command import Command
26 from gslib.command_argument import CommandArgument
27 from gslib.command_runner import CommandRunner
28 from gslib.command_runner import HandleArgCoding
29 from gslib.exception import CommandException
30 from gslib.tab_complete import CloudObjectCompleter
31 from gslib.tab_complete import CloudOrLocalObjectCompleter
32 from gslib.tab_complete import LocalObjectCompleter
33 from gslib.tab_complete import LocalObjectOrCannedACLCompleter
34 from gslib.tab_complete import NoOpCompleter
35 import gslib.tests.testcase as testcase
36 import gslib.tests.util as util
37 from gslib.tests.util import ARGCOMPLETE_AVAILABLE
38 from gslib.tests.util import SetBotoConfigFileForTest
39 from gslib.tests.util import SetBotoConfigForTest
40 from gslib.tests.util import unittest
41 from gslib.util import GSUTIL_PUB_TARBALL
42 from gslib.util import SECONDS_PER_DAY
45 class FakeArgparseArgument(object):
46 """Fake for argparse parser argument."""
47 pass
50 class FakeArgparseParser(object):
51 """Fake for argparse parser."""
53 def __init__(self):
54 self.arguments = []
56 def add_argument(self, *unused_args, **unused_kwargs):
57 argument = FakeArgparseArgument()
58 self.arguments.append(argument)
59 return argument
62 class FakeArgparseSubparsers(object):
63 """Container for nested parsers."""
65 def __init__(self):
66 self.parsers = []
68 def add_parser(self, unused_name, **unused_kwargs):
69 parser = FakeArgparseParser()
70 self.parsers.append(parser)
71 return parser
74 class FakeCommandWithInvalidCompleter(Command):
75 """Command with an invalid completer on an argument."""
77 command_spec = Command.CreateCommandSpec(
78 'fake1',
79 argparse_arguments=[
80 CommandArgument('arg', completer='BAD')
84 help_spec = Command.HelpSpec(
85 help_name='fake1',
86 help_name_aliases=[],
87 help_type='command_help',
88 help_one_line_summary='fake command for tests',
89 help_text='fake command for tests',
90 subcommand_help_text={}
93 def __init__(self):
94 pass
97 class FakeCommandWithCompleters(Command):
98 """Command with various completer types."""
100 command_spec = Command.CreateCommandSpec(
101 'fake2',
102 argparse_arguments=[
103 CommandArgument.MakeZeroOrMoreCloudURLsArgument(),
104 CommandArgument.MakeZeroOrMoreFileURLsArgument(),
105 CommandArgument.MakeZeroOrMoreCloudOrFileURLsArgument(),
106 CommandArgument.MakeFreeTextArgument(),
107 CommandArgument.MakeZeroOrMoreCloudBucketURLsArgument(),
108 CommandArgument.MakeFileURLOrCannedACLArgument(),
112 help_spec = Command.HelpSpec(
113 help_name='fake2',
114 help_name_aliases=[],
115 help_type='command_help',
116 help_one_line_summary='fake command for tests',
117 help_text='fake command for tests',
118 subcommand_help_text={}
121 def __init__(self):
122 pass
125 class TestCommandRunnerUnitTests(
126 testcase.unit_testcase.GsUtilUnitTestCase):
127 """Unit tests for gsutil update check in command_runner module."""
129 def setUp(self):
130 """Sets up the command runner mock objects."""
131 super(TestCommandRunnerUnitTests, self).setUp()
133 # Mock out the timestamp file so we can manipulate it.
134 self.previous_update_file = (
135 command_runner.LAST_CHECKED_FOR_GSUTIL_UPDATE_TIMESTAMP_FILE)
136 self.timestamp_file = self.CreateTempFile()
137 command_runner.LAST_CHECKED_FOR_GSUTIL_UPDATE_TIMESTAMP_FILE = (
138 self.timestamp_file)
140 # Mock out the gsutil version checker.
141 base_version = unicode(gslib.VERSION)
142 while not base_version.isnumeric():
143 if not base_version:
144 raise CommandException(
145 'Version number (%s) is not numeric.' % gslib.VERSION)
146 base_version = base_version[:-1]
147 command_runner.LookUpGsutilVersion = lambda u, v: float(base_version) + 1
149 # Mock out raw_input to trigger yes prompt.
150 command_runner.raw_input = lambda p: 'y'
152 # Mock out TTY check to pretend we're on a TTY even if we're not.
153 self.running_interactively = True
154 command_runner.IsRunningInteractively = lambda: self.running_interactively
156 # Mock out the modified time of the VERSION file.
157 self.version_mod_time = 0
158 self.previous_version_mod_time = command_runner.GetGsutilVersionModifiedTime
159 command_runner.GetGsutilVersionModifiedTime = lambda: self.version_mod_time
161 # Create a fake pub tarball that will be used to check for gsutil version.
162 self.pub_bucket_uri = self.CreateBucket('pub')
163 self.gsutil_tarball_uri = self.CreateObject(
164 bucket_uri=self.pub_bucket_uri, object_name='gsutil.tar.gz',
165 contents='foo')
167 def tearDown(self):
168 """Tears down the command runner mock objects."""
169 super(TestCommandRunnerUnitTests, self).tearDown()
171 command_runner.LAST_CHECKED_FOR_GSUTIL_UPDATE_TIMESTAMP_FILE = (
172 self.previous_update_file)
173 command_runner.LookUpGsutilVersion = gslib.util.LookUpGsutilVersion
174 command_runner.raw_input = raw_input
176 command_runner.GetGsutilVersionModifiedTime = self.previous_version_mod_time
178 command_runner.IsRunningInteractively = gslib.util.IsRunningInteractively
180 self.gsutil_tarball_uri.delete_key()
181 self.pub_bucket_uri.delete_bucket()
183 def _IsPackageOrCloudSDKInstall(self):
184 # Update should not trigger for package installs or Cloud SDK installs.
185 return (gslib.IS_PACKAGE_INSTALL or
186 os.environ.get('CLOUDSDK_WRAPPER') == '1')
188 @unittest.skipUnless(not util.HAS_GS_HOST, 'gs_host is defined in config')
189 def test_not_interactive(self):
190 """Tests that update is not triggered if not running interactively."""
191 with SetBotoConfigForTest([
192 ('GSUtil', 'software_update_check_period', '1')]):
193 with open(self.timestamp_file, 'w') as f:
194 f.write(str(int(time.time() - 2 * SECONDS_PER_DAY)))
195 self.running_interactively = False
196 self.assertEqual(
197 False,
198 self.command_runner.MaybeCheckForAndOfferSoftwareUpdate('ls', 0))
200 @unittest.skipUnless(not util.HAS_GS_HOST, 'gs_host is defined in config')
201 def test_no_tracker_file_version_recent(self):
202 """Tests when no timestamp file exists and VERSION file is recent."""
203 if os.path.exists(self.timestamp_file):
204 os.remove(self.timestamp_file)
205 self.assertFalse(os.path.exists(self.timestamp_file))
206 self.version_mod_time = time.time()
207 self.assertEqual(
208 False,
209 self.command_runner.MaybeCheckForAndOfferSoftwareUpdate('ls', 0))
211 @unittest.skipUnless(not util.HAS_GS_HOST, 'gs_host is defined in config')
212 def test_no_tracker_file_version_old(self):
213 """Tests when no timestamp file exists and VERSION file is old."""
214 if os.path.exists(self.timestamp_file):
215 os.remove(self.timestamp_file)
216 self.assertFalse(os.path.exists(self.timestamp_file))
217 self.version_mod_time = 0
218 expected = not self._IsPackageOrCloudSDKInstall()
219 self.assertEqual(
220 expected,
221 self.command_runner.MaybeCheckForAndOfferSoftwareUpdate('ls', 0))
223 @unittest.skipUnless(not util.HAS_GS_HOST, 'gs_host is defined in config')
224 def test_invalid_commands(self):
225 """Tests that update is not triggered for certain commands."""
226 self.assertEqual(
227 False,
228 self.command_runner.MaybeCheckForAndOfferSoftwareUpdate('update', 0))
230 @unittest.skipUnless(not util.HAS_GS_HOST, 'gs_host is defined in config')
231 def test_invalid_file_contents(self):
232 """Tests no update if timestamp file has invalid value."""
233 with open(self.timestamp_file, 'w') as f:
234 f.write('NaN')
235 self.assertEqual(
236 False,
237 self.command_runner.MaybeCheckForAndOfferSoftwareUpdate('ls', 0))
239 @unittest.skipUnless(not util.HAS_GS_HOST, 'gs_host is defined in config')
240 def test_update_should_trigger(self):
241 """Tests update should be triggered if time is up."""
242 with SetBotoConfigForTest([
243 ('GSUtil', 'software_update_check_period', '1')]):
244 with open(self.timestamp_file, 'w') as f:
245 f.write(str(int(time.time() - 2 * SECONDS_PER_DAY)))
246 expected = not self._IsPackageOrCloudSDKInstall()
247 self.assertEqual(
248 expected,
249 self.command_runner.MaybeCheckForAndOfferSoftwareUpdate('ls', 0))
251 @unittest.skipUnless(not util.HAS_GS_HOST, 'gs_host is defined in config')
252 def test_not_time_for_update_yet(self):
253 """Tests update not triggered if not time yet."""
254 with SetBotoConfigForTest([
255 ('GSUtil', 'software_update_check_period', '3')]):
256 with open(self.timestamp_file, 'w') as f:
257 f.write(str(int(time.time() - 2 * SECONDS_PER_DAY)))
258 self.assertEqual(
259 False,
260 self.command_runner.MaybeCheckForAndOfferSoftwareUpdate('ls', 0))
262 def test_user_says_no_to_update(self):
263 """Tests no update triggered if user says no at the prompt."""
264 with SetBotoConfigForTest([
265 ('GSUtil', 'software_update_check_period', '1')]):
266 with open(self.timestamp_file, 'w') as f:
267 f.write(str(int(time.time() - 2 * SECONDS_PER_DAY)))
268 command_runner.raw_input = lambda p: 'n'
269 self.assertEqual(
270 False,
271 self.command_runner.MaybeCheckForAndOfferSoftwareUpdate('ls', 0))
273 @unittest.skipUnless(not util.HAS_GS_HOST, 'gs_host is defined in config')
274 def test_update_check_skipped_with_quiet_mode(self):
275 """Tests that update isn't triggered when loglevel is in quiet mode."""
276 with SetBotoConfigForTest([
277 ('GSUtil', 'software_update_check_period', '1')]):
278 with open(self.timestamp_file, 'w') as f:
279 f.write(str(int(time.time() - 2 * SECONDS_PER_DAY)))
281 expected = not self._IsPackageOrCloudSDKInstall()
282 self.assertEqual(
283 expected,
284 self.command_runner.MaybeCheckForAndOfferSoftwareUpdate('ls', 0))
286 prev_loglevel = logging.getLogger().getEffectiveLevel()
287 try:
288 logging.getLogger().setLevel(logging.ERROR)
289 # With reduced loglevel, should return False.
290 self.assertEqual(
291 False,
292 self.command_runner.MaybeCheckForAndOfferSoftwareUpdate('ls', 0))
293 finally:
294 logging.getLogger().setLevel(prev_loglevel)
296 def test_command_argument_parser_setup_invalid_completer(self):
298 command_map = {
299 FakeCommandWithInvalidCompleter.command_spec.command_name:
300 FakeCommandWithInvalidCompleter()
303 runner = CommandRunner(
304 bucket_storage_uri_class=self.mock_bucket_storage_uri,
305 gsutil_api_class_map_factory=self.mock_gsutil_api_class_map_factory,
306 command_map=command_map)
308 subparsers = FakeArgparseSubparsers()
309 try:
310 runner.ConfigureCommandArgumentParsers(subparsers)
311 except RuntimeError as e:
312 self.assertIn('Unknown completer', e.message)
314 @unittest.skipUnless(ARGCOMPLETE_AVAILABLE,
315 'Tab completion requires argcomplete')
316 def test_command_argument_parser_setup_completers(self):
318 command_map = {
319 FakeCommandWithCompleters.command_spec.command_name:
320 FakeCommandWithCompleters()
323 runner = CommandRunner(
324 bucket_storage_uri_class=self.mock_bucket_storage_uri,
325 gsutil_api_class_map_factory=self.mock_gsutil_api_class_map_factory,
326 command_map=command_map)
328 subparsers = FakeArgparseSubparsers()
329 runner.ConfigureCommandArgumentParsers(subparsers)
331 self.assertEqual(1, len(subparsers.parsers))
332 parser = subparsers.parsers[0]
333 self.assertEqual(6, len(parser.arguments))
334 self.assertEqual(CloudObjectCompleter, type(parser.arguments[0].completer))
335 self.assertEqual(LocalObjectCompleter, type(parser.arguments[1].completer))
336 self.assertEqual(
337 CloudOrLocalObjectCompleter, type(parser.arguments[2].completer))
338 self.assertEqual(
339 NoOpCompleter, type(parser.arguments[3].completer))
340 self.assertEqual(CloudObjectCompleter, type(parser.arguments[4].completer))
341 self.assertEqual(
342 LocalObjectOrCannedACLCompleter, type(parser.arguments[5].completer))
344 # pylint: disable=invalid-encoded-data
345 def test_valid_arg_coding(self):
346 """Tests that gsutil encodes valid args correctly."""
347 # Args other than -h and -p should be utf-8 decoded.
348 args = HandleArgCoding(['ls', '-l'])
349 self.assertIs(type(args[0]), unicode)
350 self.assertIs(type(args[1]), unicode)
352 # -p and -h args other than x-goog-meta should not be decoded.
353 args = HandleArgCoding(['ls', '-p', 'abc:def', 'gs://bucket'])
354 self.assertIs(type(args[0]), unicode)
355 self.assertIs(type(args[1]), unicode)
356 self.assertIsNot(type(args[2]), unicode)
357 self.assertIs(type(args[3]), unicode)
359 args = HandleArgCoding(['gsutil', '-h', 'content-type:text/plain', 'cp',
360 'a', 'gs://bucket'])
361 self.assertIs(type(args[0]), unicode)
362 self.assertIs(type(args[1]), unicode)
363 self.assertIsNot(type(args[2]), unicode)
364 self.assertIs(type(args[3]), unicode)
365 self.assertIs(type(args[4]), unicode)
366 self.assertIs(type(args[5]), unicode)
368 # -h x-goog-meta args should be decoded.
369 args = HandleArgCoding(['gsutil', '-h', 'x-goog-meta-abc', '1234'])
370 self.assertIs(type(args[0]), unicode)
371 self.assertIs(type(args[1]), unicode)
372 self.assertIs(type(args[2]), unicode)
373 self.assertIs(type(args[3]), unicode)
375 # -p and -h args with non-ASCII content should raise CommandException.
376 try:
377 HandleArgCoding(['ls', '-p', '碼'])
378 # Ensure exception is raised.
379 self.assertTrue(False)
380 except CommandException as e:
381 self.assertIn('Invalid non-ASCII header', e.reason)
382 try:
383 HandleArgCoding(['-h', '碼', 'ls'])
384 # Ensure exception is raised.
385 self.assertTrue(False)
386 except CommandException as e:
387 self.assertIn('Invalid non-ASCII header', e.reason)
390 class TestCommandRunnerIntegrationTests(
391 testcase.GsUtilIntegrationTestCase):
392 """Integration tests for gsutil update check in command_runner module."""
394 def setUp(self):
395 """Sets up the command runner mock objects."""
396 super(TestCommandRunnerIntegrationTests, self).setUp()
398 # Mock out the timestamp file so we can manipulate it.
399 self.previous_update_file = (
400 command_runner.LAST_CHECKED_FOR_GSUTIL_UPDATE_TIMESTAMP_FILE)
401 self.timestamp_file = self.CreateTempFile(contents='0')
402 command_runner.LAST_CHECKED_FOR_GSUTIL_UPDATE_TIMESTAMP_FILE = (
403 self.timestamp_file)
405 # Mock out raw_input to trigger yes prompt.
406 command_runner.raw_input = lambda p: 'y'
408 def tearDown(self):
409 """Tears down the command runner mock objects."""
410 super(TestCommandRunnerIntegrationTests, self).tearDown()
411 command_runner.LAST_CHECKED_FOR_GSUTIL_UPDATE_TIMESTAMP_FILE = (
412 self.previous_update_file)
413 command_runner.raw_input = raw_input
415 @unittest.skipUnless(not util.HAS_GS_HOST, 'gs_host is defined in config')
416 def test_lookup_version_without_credentials(self):
417 """Tests that gsutil tarball version lookup works without credentials."""
418 with SetBotoConfigFileForTest(self.CreateTempFile(
419 contents='[GSUtil]\nsoftware_update_check_period=1')):
420 self.command_runner = command_runner.CommandRunner()
421 # Looking up software version shouldn't get auth failure exception.
422 self.command_runner.RunNamedCommand('ls', [GSUTIL_PUB_TARBALL])