Merge Chromium + Blink git repositories
[chromium-blink-merge.git] / tools / telemetry / third_party / gsutilz / gslib / tests / util.py
blob06da870bf8170ff8ee4b39b6ea028da57d0e795b
1 # -*- coding: utf-8 -*-
2 # Copyright 2013 Google Inc. All Rights Reserved.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
16 from __future__ import absolute_import
18 from contextlib import contextmanager
19 import functools
20 import os
21 import pkgutil
22 import posixpath
23 import re
24 import tempfile
25 import unittest
26 import urlparse
28 import boto
29 import gslib.tests as gslib_tests
31 if not hasattr(unittest.TestCase, 'assertIsNone'):
32 # external dependency unittest2 required for Python <= 2.6
33 import unittest2 as unittest # pylint: disable=g-import-not-at-top
35 # Flags for running different types of tests.
36 RUN_INTEGRATION_TESTS = True
37 RUN_UNIT_TESTS = True
38 RUN_S3_TESTS = False
40 PARALLEL_COMPOSITE_UPLOAD_TEST_CONFIG = '/tmp/.boto.parallel_upload_test_config'
43 def _HasS3Credentials():
44 return (boto.config.get('Credentials', 'aws_access_key_id', None) and
45 boto.config.get('Credentials', 'aws_secret_access_key', None))
47 HAS_S3_CREDS = _HasS3Credentials()
50 def _HasGSHost():
51 return boto.config.get('Credentials', 'gs_host', None) is not None
53 HAS_GS_HOST = _HasGSHost()
56 def _UsingJSONApi():
57 return boto.config.get('GSUtil', 'prefer_api', 'json').upper() != 'XML'
59 USING_JSON_API = _UsingJSONApi()
62 def _ArgcompleteAvailable():
63 argcomplete = None
64 try:
65 # pylint: disable=g-import-not-at-top
66 import argcomplete
67 except ImportError:
68 pass
69 return argcomplete is not None
71 ARGCOMPLETE_AVAILABLE = _ArgcompleteAvailable()
74 def _NormalizeURI(uri):
75 """Normalizes the path component of a URI.
77 Args:
78 uri: URI to normalize.
80 Returns:
81 Normalized URI.
83 Examples:
84 gs://foo//bar -> gs://foo/bar
85 gs://foo/./bar -> gs://foo/bar
86 """
87 # Note: we have to do this dance of changing gs:// to file:// because on
88 # Windows, the urlparse function won't work with URL schemes that are not
89 # known. urlparse('gs://foo/bar') on Windows turns into:
90 # scheme='gs', netloc='', path='//foo/bar'
91 # while on non-Windows platforms, it turns into:
92 # scheme='gs', netloc='foo', path='/bar'
93 uri = uri.replace('gs://', 'file://')
94 parsed = list(urlparse.urlparse(uri))
95 parsed[2] = posixpath.normpath(parsed[2])
96 if parsed[2].startswith('//'):
97 # The normpath function doesn't change '//foo' -> '/foo' by design.
98 parsed[2] = parsed[2][1:]
99 unparsed = urlparse.urlunparse(parsed)
100 unparsed = unparsed.replace('file://', 'gs://')
101 return unparsed
104 def GenerationFromURI(uri):
105 """Returns a the generation for a StorageUri.
107 Args:
108 uri: boto.storage_uri.StorageURI object to get the URI from.
110 Returns:
111 Generation string for the URI.
113 if not (uri.generation or uri.version_id):
114 if uri.scheme == 's3': return 'null'
115 return uri.generation or uri.version_id
118 def ObjectToURI(obj, *suffixes):
119 """Returns the storage URI string for a given StorageUri or file object.
121 Args:
122 obj: The object to get the URI from. Can be a file object, a subclass of
123 boto.storage_uri.StorageURI, or a string. If a string, it is assumed to
124 be a local on-disk path.
125 *suffixes: Suffixes to append. For example, ObjectToUri(bucketuri, 'foo')
126 would return the URI for a key name 'foo' inside the given
127 bucket.
129 Returns:
130 Storage URI string.
132 if isinstance(obj, file):
133 return 'file://%s' % os.path.abspath(os.path.join(obj.name, *suffixes))
134 if isinstance(obj, basestring):
135 return 'file://%s' % os.path.join(obj, *suffixes)
136 uri = obj.uri
137 if suffixes:
138 uri = _NormalizeURI('/'.join([uri] + list(suffixes)))
140 # Storage URIs shouldn't contain a trailing slash.
141 if uri.endswith('/'):
142 uri = uri[:-1]
143 return uri
145 # The mock storage service comes from the Boto library, but it is not
146 # distributed with Boto when installed as a package. To get around this, we
147 # copy the file to gslib/tests/mock_storage_service.py when building the gsutil
148 # package. Try and import from both places here.
149 # pylint: disable=g-import-not-at-top
150 try:
151 from gslib.tests import mock_storage_service
152 except ImportError:
153 try:
154 from boto.tests.integration.s3 import mock_storage_service
155 except ImportError:
156 try:
157 from tests.integration.s3 import mock_storage_service
158 except ImportError:
159 import mock_storage_service
162 class GSMockConnection(mock_storage_service.MockConnection):
164 def __init__(self, *args, **kwargs):
165 kwargs['provider'] = 'gs'
166 self.debug = 0
167 super(GSMockConnection, self).__init__(*args, **kwargs)
169 mock_connection = GSMockConnection()
172 class GSMockBucketStorageUri(mock_storage_service.MockBucketStorageUri):
174 def connect(self, access_key_id=None, secret_access_key=None):
175 return mock_connection
177 def compose(self, components, headers=None):
178 """Dummy implementation to allow parallel uploads with tests."""
179 return self.new_key()
182 TEST_BOTO_REMOVE_SECTION = 'TestRemoveSection'
185 def _SetBotoConfig(section, name, value, revert_list):
186 """Sets boto configuration temporarily for testing.
188 SetBotoConfigForTest and SetBotoConfigFileForTest should be called by tests
189 instead of this function. Those functions will ensure that the configuration
190 is reverted to its original setting using _RevertBotoConfig.
192 Args:
193 section: Boto config section to set
194 name: Boto config name to set
195 value: Value to set
196 revert_list: List for tracking configs to revert.
198 prev_value = boto.config.get(section, name, None)
199 if not boto.config.has_section(section):
200 revert_list.append((section, TEST_BOTO_REMOVE_SECTION, None))
201 boto.config.add_section(section)
202 revert_list.append((section, name, prev_value))
203 if value is None:
204 boto.config.remove_option(section, name)
205 else:
206 boto.config.set(section, name, value)
209 def _RevertBotoConfig(revert_list):
210 """Reverts boto config modifications made by _SetBotoConfig.
212 Args:
213 revert_list: List of boto config modifications created by calls to
214 _SetBotoConfig.
216 sections_to_remove = []
217 for section, name, value in revert_list:
218 if value is None:
219 if name == TEST_BOTO_REMOVE_SECTION:
220 sections_to_remove.append(section)
221 else:
222 boto.config.remove_option(section, name)
223 else:
224 boto.config.set(section, name, value)
225 for section in sections_to_remove:
226 boto.config.remove_section(section)
229 def PerformsFileToObjectUpload(func):
230 """Decorator indicating that a test uploads from a local file to an object.
232 This forces the test to run once normally, and again with special boto
233 config settings that will ensure that the test follows the parallel composite
234 upload code path.
236 Args:
237 func: Function to wrap.
239 Returns:
240 Wrapped function.
242 @functools.wraps(func)
243 def Wrapper(*args, **kwargs):
244 # Run the test normally once.
245 func(*args, **kwargs)
247 # Try again, forcing parallel composite uploads.
248 with SetBotoConfigForTest([
249 ('GSUtil', 'parallel_composite_upload_threshold', '1'),
250 ('GSUtil', 'check_hashes', 'always')]):
251 func(*args, **kwargs)
253 return Wrapper
256 @contextmanager
257 def SetBotoConfigForTest(boto_config_list):
258 """Sets the input list of boto configs for the duration of a 'with' clause.
260 Args:
261 boto_config_list: list of tuples of:
262 (boto config section to set, boto config name to set, value to set)
264 Yields:
265 Once after config is set.
267 revert_configs = []
268 tmp_filename = None
269 try:
270 tmp_fd, tmp_filename = tempfile.mkstemp(prefix='gsutil-temp-cfg')
271 os.close(tmp_fd)
272 for boto_config in boto_config_list:
273 _SetBotoConfig(boto_config[0], boto_config[1], boto_config[2],
274 revert_configs)
275 with open(tmp_filename, 'w') as tmp_file:
276 boto.config.write(tmp_file)
278 with SetBotoConfigFileForTest(tmp_filename):
279 yield
280 finally:
281 _RevertBotoConfig(revert_configs)
282 if tmp_filename:
283 try:
284 os.remove(tmp_filename)
285 except OSError:
286 pass
289 @contextmanager
290 def SetEnvironmentForTest(env_variable_dict):
291 """Sets OS environment variables for a single test."""
293 def _ApplyDictToEnvironment(dict_to_apply):
294 for k, v in dict_to_apply.iteritems():
295 old_values[k] = os.environ.get(k)
296 if v is not None:
297 os.environ[k] = v
298 elif k in os.environ:
299 del os.environ[k]
301 old_values = {}
302 for k in env_variable_dict:
303 old_values[k] = os.environ.get(k)
305 try:
306 _ApplyDictToEnvironment(env_variable_dict)
307 yield
308 finally:
309 _ApplyDictToEnvironment(old_values)
312 @contextmanager
313 def SetBotoConfigFileForTest(boto_config_path):
314 """Sets a given file as the boto config file for a single test."""
315 # Setup for entering "with" block.
316 try:
317 old_boto_config_env_variable = os.environ['BOTO_CONFIG']
318 boto_config_was_set = True
319 except KeyError:
320 boto_config_was_set = False
321 os.environ['BOTO_CONFIG'] = boto_config_path
323 try:
324 yield
325 finally:
326 # Teardown for exiting "with" block.
327 if boto_config_was_set:
328 os.environ['BOTO_CONFIG'] = old_boto_config_env_variable
329 else:
330 os.environ.pop('BOTO_CONFIG', None)
333 def GetTestNames():
334 """Returns a list of the names of the test modules in gslib.tests."""
335 matcher = re.compile(r'^test_(?P<name>.*)$')
336 names = []
337 for _, modname, _ in pkgutil.iter_modules(gslib_tests.__path__):
338 m = matcher.match(modname)
339 if m:
340 names.append(m.group('name'))
341 return names
344 @contextmanager
345 def WorkingDirectory(new_working_directory):
346 """Changes the working directory for the duration of a 'with' call.
348 Args:
349 new_working_directory: The directory to switch to before executing wrapped
350 code. A None value indicates that no switching is necessary.
352 Yields:
353 Once after working directory has been changed.
355 prev_working_directory = None
356 try:
357 prev_working_directory = os.getcwd()
358 except OSError:
359 # This can happen if the current working directory no longer exists.
360 pass
362 if new_working_directory:
363 os.chdir(new_working_directory)
365 try:
366 yield
367 finally:
368 if new_working_directory and prev_working_directory:
369 os.chdir(prev_working_directory)