Merge Chromium + Blink git repositories
[chromium-blink-merge.git] / tools / telemetry / third_party / gsutilz / gslib / tests / test_acl.py
blob79c2c5b982e0ad1b07abbf71729d25b06ef14a4f
1 # -*- coding: utf-8 -*-
2 # Copyright 2013 Google Inc. All Rights Reserved.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 """Integration tests for the acl command."""
17 from __future__ import absolute_import
19 import re
21 from gslib import aclhelpers
22 from gslib.command import CreateGsutilLogger
23 from gslib.cs_api_map import ApiSelector
24 from gslib.project_id import PopulateProjectId
25 from gslib.storage_url import StorageUrlFromString
26 import gslib.tests.testcase as testcase
27 from gslib.tests.testcase.integration_testcase import SkipForGS
28 from gslib.tests.testcase.integration_testcase import SkipForS3
29 from gslib.tests.util import ObjectToURI as suri
30 from gslib.translation_helper import AclTranslation
31 from gslib.util import Retry
33 PUBLIC_READ_JSON_ACL_TEXT = '"entity":"allUsers","role":"READER"'
36 class TestAclBase(testcase.GsUtilIntegrationTestCase):
37 """Integration test case base class for acl command."""
39 _set_acl_prefix = ['acl', 'set']
40 _get_acl_prefix = ['acl', 'get']
41 _set_defacl_prefix = ['defacl', 'set']
42 _ch_acl_prefix = ['acl', 'ch']
44 _project_team = 'owners'
45 _project_test_acl = '%s-%s' % (_project_team, PopulateProjectId())
48 @SkipForS3('Tests use GS ACL model.')
49 class TestAcl(TestAclBase):
50 """Integration tests for acl command."""
52 def setUp(self):
53 super(TestAcl, self).setUp()
54 self.sample_uri = self.CreateBucket()
55 self.sample_url = StorageUrlFromString(str(self.sample_uri))
56 self.logger = CreateGsutilLogger('acl')
58 def test_set_invalid_acl_object(self):
59 """Ensures that invalid content returns a bad request error."""
60 obj_uri = suri(self.CreateObject(contents='foo'))
61 inpath = self.CreateTempFile(contents='badAcl')
62 stderr = self.RunGsUtil(self._set_acl_prefix + [inpath, obj_uri],
63 return_stderr=True, expected_status=1)
64 self.assertIn('ArgumentException', stderr)
66 def test_set_invalid_acl_bucket(self):
67 """Ensures that invalid content returns a bad request error."""
68 bucket_uri = suri(self.CreateBucket())
69 inpath = self.CreateTempFile(contents='badAcl')
70 stderr = self.RunGsUtil(self._set_acl_prefix + [inpath, bucket_uri],
71 return_stderr=True, expected_status=1)
72 self.assertIn('ArgumentException', stderr)
74 def test_set_xml_acl_json_api_object(self):
75 """Ensures XML content returns a bad request error and migration warning."""
76 obj_uri = suri(self.CreateObject(contents='foo'))
77 inpath = self.CreateTempFile(contents='<ValidXml></ValidXml>')
78 stderr = self.RunGsUtil(self._set_acl_prefix + [inpath, obj_uri],
79 return_stderr=True, expected_status=1)
80 self.assertIn('ArgumentException', stderr)
81 self.assertIn('XML ACL data provided', stderr)
83 def test_set_xml_acl_json_api_bucket(self):
84 """Ensures XML content returns a bad request error and migration warning."""
85 bucket_uri = suri(self.CreateBucket())
86 inpath = self.CreateTempFile(contents='<ValidXml></ValidXml>')
87 stderr = self.RunGsUtil(self._set_acl_prefix + [inpath, bucket_uri],
88 return_stderr=True, expected_status=1)
89 self.assertIn('ArgumentException', stderr)
90 self.assertIn('XML ACL data provided', stderr)
92 def test_set_valid_acl_object(self):
93 """Tests setting a valid ACL on an object."""
94 obj_uri = suri(self.CreateObject(contents='foo'))
95 acl_string = self.RunGsUtil(self._get_acl_prefix + [obj_uri],
96 return_stdout=True)
97 inpath = self.CreateTempFile(contents=acl_string)
98 self.RunGsUtil(self._set_acl_prefix + ['public-read', obj_uri])
99 acl_string2 = self.RunGsUtil(self._get_acl_prefix + [obj_uri],
100 return_stdout=True)
101 self.RunGsUtil(self._set_acl_prefix + [inpath, obj_uri])
102 acl_string3 = self.RunGsUtil(self._get_acl_prefix + [obj_uri],
103 return_stdout=True)
105 self.assertNotEqual(acl_string, acl_string2)
106 self.assertEqual(acl_string, acl_string3)
108 def test_set_valid_permission_whitespace_object(self):
109 """Ensures that whitespace is allowed in role and entity elements."""
110 obj_uri = suri(self.CreateObject(contents='foo'))
111 acl_string = self.RunGsUtil(self._get_acl_prefix + [obj_uri],
112 return_stdout=True)
113 acl_string = re.sub(r'"role"', r'"role" \n', acl_string)
114 acl_string = re.sub(r'"entity"', r'\n "entity"', acl_string)
115 inpath = self.CreateTempFile(contents=acl_string)
117 self.RunGsUtil(self._set_acl_prefix + [inpath, obj_uri])
119 def test_set_valid_acl_bucket(self):
120 """Ensures that valid canned and XML ACLs work with get/set."""
121 bucket_uri = suri(self.CreateBucket())
122 acl_string = self.RunGsUtil(self._get_acl_prefix + [bucket_uri],
123 return_stdout=True)
124 inpath = self.CreateTempFile(contents=acl_string)
125 self.RunGsUtil(self._set_acl_prefix + ['public-read', bucket_uri])
126 acl_string2 = self.RunGsUtil(self._get_acl_prefix + [bucket_uri],
127 return_stdout=True)
128 self.RunGsUtil(self._set_acl_prefix + [inpath, bucket_uri])
129 acl_string3 = self.RunGsUtil(self._get_acl_prefix + [bucket_uri],
130 return_stdout=True)
132 self.assertNotEqual(acl_string, acl_string2)
133 self.assertEqual(acl_string, acl_string3)
135 def test_invalid_canned_acl_object(self):
136 """Ensures that an invalid canned ACL returns a CommandException."""
137 obj_uri = suri(self.CreateObject(contents='foo'))
138 stderr = self.RunGsUtil(
139 self._set_acl_prefix + ['not-a-canned-acl', obj_uri],
140 return_stderr=True, expected_status=1)
141 self.assertIn('CommandException', stderr)
142 self.assertIn('Invalid canned ACL', stderr)
144 def test_set_valid_def_acl_bucket(self):
145 """Ensures that valid default canned and XML ACLs works with get/set."""
146 bucket_uri = self.CreateBucket()
148 # Default ACL is project private.
149 obj_uri1 = suri(self.CreateObject(bucket_uri=bucket_uri, contents='foo'))
150 acl_string = self.RunGsUtil(self._get_acl_prefix + [obj_uri1],
151 return_stdout=True)
153 # Change it to authenticated-read.
154 self.RunGsUtil(
155 self._set_defacl_prefix + ['authenticated-read', suri(bucket_uri)])
156 obj_uri2 = suri(self.CreateObject(bucket_uri=bucket_uri, contents='foo2'))
157 acl_string2 = self.RunGsUtil(self._get_acl_prefix + [obj_uri2],
158 return_stdout=True)
160 # Now change it back to the default via XML.
161 inpath = self.CreateTempFile(contents=acl_string)
162 self.RunGsUtil(self._set_defacl_prefix + [inpath, suri(bucket_uri)])
163 obj_uri3 = suri(self.CreateObject(bucket_uri=bucket_uri, contents='foo3'))
164 acl_string3 = self.RunGsUtil(self._get_acl_prefix + [obj_uri3],
165 return_stdout=True)
167 self.assertNotEqual(acl_string, acl_string2)
168 self.assertIn('allAuthenticatedUsers', acl_string2)
169 self.assertEqual(acl_string, acl_string3)
171 def test_acl_set_version_specific_uri(self):
172 """Tests setting an ACL on a specific version of an object."""
173 bucket_uri = self.CreateVersionedBucket()
174 # Create initial object version.
175 uri = self.CreateObject(bucket_uri=bucket_uri, contents='data')
176 # Create a second object version.
177 inpath = self.CreateTempFile(contents='def')
178 self.RunGsUtil(['cp', inpath, uri.uri])
180 # Find out the two object version IDs.
181 lines = self.AssertNObjectsInBucket(bucket_uri, 2, versioned=True)
182 v0_uri_str, v1_uri_str = lines[0], lines[1]
184 # Check that neither version currently has public-read permission
185 # (default ACL is project-private).
186 orig_acls = []
187 for uri_str in (v0_uri_str, v1_uri_str):
188 acl = self.RunGsUtil(self._get_acl_prefix + [uri_str],
189 return_stdout=True)
190 self.assertNotIn(PUBLIC_READ_JSON_ACL_TEXT,
191 self._strip_json_whitespace(acl))
192 orig_acls.append(acl)
194 # Set the ACL for the older version of the object to public-read.
195 self.RunGsUtil(self._set_acl_prefix + ['public-read', v0_uri_str])
196 # Check that the older version's ACL is public-read, but newer version
197 # is not.
198 acl = self.RunGsUtil(self._get_acl_prefix + [v0_uri_str],
199 return_stdout=True)
200 self.assertIn(PUBLIC_READ_JSON_ACL_TEXT, self._strip_json_whitespace(acl))
201 acl = self.RunGsUtil(self._get_acl_prefix + [v1_uri_str],
202 return_stdout=True)
203 self.assertNotIn(PUBLIC_READ_JSON_ACL_TEXT,
204 self._strip_json_whitespace(acl))
206 # Check that reading the ACL with the version-less URI returns the
207 # original ACL (since the version-less URI means the current version).
208 acl = self.RunGsUtil(self._get_acl_prefix + [uri.uri], return_stdout=True)
209 self.assertEqual(acl, orig_acls[0])
211 def _strip_json_whitespace(self, json_text):
212 return re.sub(r'\s*', '', json_text)
214 def testAclChangeWithUserId(self):
215 change = aclhelpers.AclChange(self.USER_TEST_ID + ':r',
216 scope_type=aclhelpers.ChangeType.USER)
217 acl = list(AclTranslation.BotoBucketAclToMessage(self.sample_uri.get_acl()))
218 change.Execute(self.sample_url, acl, 'acl', self.logger)
219 self._AssertHas(acl, 'READER', 'UserById', self.USER_TEST_ID)
221 def testAclChangeWithGroupId(self):
222 change = aclhelpers.AclChange(self.GROUP_TEST_ID + ':r',
223 scope_type=aclhelpers.ChangeType.GROUP)
224 acl = list(AclTranslation.BotoBucketAclToMessage(self.sample_uri.get_acl()))
225 change.Execute(self.sample_url, acl, 'acl', self.logger)
226 self._AssertHas(acl, 'READER', 'GroupById', self.GROUP_TEST_ID)
228 def testAclChangeWithUserEmail(self):
229 change = aclhelpers.AclChange(self.USER_TEST_ADDRESS + ':r',
230 scope_type=aclhelpers.ChangeType.USER)
231 acl = list(AclTranslation.BotoBucketAclToMessage(self.sample_uri.get_acl()))
232 change.Execute(self.sample_url, acl, 'acl', self.logger)
233 self._AssertHas(acl, 'READER', 'UserByEmail', self.USER_TEST_ADDRESS)
235 def testAclChangeWithGroupEmail(self):
236 change = aclhelpers.AclChange(self.GROUP_TEST_ADDRESS + ':fc',
237 scope_type=aclhelpers.ChangeType.GROUP)
238 acl = list(AclTranslation.BotoBucketAclToMessage(self.sample_uri.get_acl()))
239 change.Execute(self.sample_url, acl, 'acl', self.logger)
240 self._AssertHas(acl, 'OWNER', 'GroupByEmail', self.GROUP_TEST_ADDRESS)
242 def testAclChangeWithDomain(self):
243 change = aclhelpers.AclChange(self.DOMAIN_TEST + ':READ',
244 scope_type=aclhelpers.ChangeType.GROUP)
245 acl = list(AclTranslation.BotoBucketAclToMessage(self.sample_uri.get_acl()))
246 change.Execute(self.sample_url, acl, 'acl', self.logger)
247 self._AssertHas(acl, 'READER', 'GroupByDomain', self.DOMAIN_TEST)
249 def testAclChangeWithProjectOwners(self):
250 change = aclhelpers.AclChange(self._project_test_acl + ':READ',
251 scope_type=aclhelpers.ChangeType.PROJECT)
252 acl = list(AclTranslation.BotoBucketAclToMessage(self.sample_uri.get_acl()))
253 change.Execute(self.sample_url, acl, 'acl', self.logger)
254 self._AssertHas(acl, 'READER', 'Project', self._project_test_acl)
256 def testAclChangeWithAllUsers(self):
257 change = aclhelpers.AclChange('AllUsers:WRITE',
258 scope_type=aclhelpers.ChangeType.GROUP)
259 acl = list(AclTranslation.BotoBucketAclToMessage(self.sample_uri.get_acl()))
260 change.Execute(self.sample_url, acl, 'acl', self.logger)
261 self._AssertHas(acl, 'WRITER', 'AllUsers')
263 def testAclChangeWithAllAuthUsers(self):
264 change = aclhelpers.AclChange('AllAuthenticatedUsers:READ',
265 scope_type=aclhelpers.ChangeType.GROUP)
266 acl = list(AclTranslation.BotoBucketAclToMessage(self.sample_uri.get_acl()))
267 change.Execute(self.sample_url, acl, 'acl', self.logger)
268 self._AssertHas(acl, 'READER', 'AllAuthenticatedUsers')
269 remove = aclhelpers.AclDel('AllAuthenticatedUsers')
270 remove.Execute(self.sample_url, acl, 'acl', self.logger)
271 self._AssertHasNo(acl, 'READER', 'AllAuthenticatedUsers')
273 def testAclDelWithUser(self):
274 add = aclhelpers.AclChange(self.USER_TEST_ADDRESS + ':READ',
275 scope_type=aclhelpers.ChangeType.USER)
276 acl = list(AclTranslation.BotoBucketAclToMessage(self.sample_uri.get_acl()))
277 add.Execute(self.sample_url, acl, 'acl', self.logger)
278 self._AssertHas(acl, 'READER', 'UserByEmail', self.USER_TEST_ADDRESS)
280 remove = aclhelpers.AclDel(self.USER_TEST_ADDRESS)
281 remove.Execute(self.sample_url, acl, 'acl', self.logger)
282 self._AssertHasNo(acl, 'READ', 'UserByEmail', self.USER_TEST_ADDRESS)
284 def testAclDelWithProjectOwners(self):
285 add = aclhelpers.AclChange(self._project_test_acl + ':READ',
286 scope_type=aclhelpers.ChangeType.PROJECT)
287 acl = list(AclTranslation.BotoBucketAclToMessage(self.sample_uri.get_acl()))
288 add.Execute(self.sample_url, acl, 'acl', self.logger)
289 self._AssertHas(acl, 'READER', 'Project', self._project_test_acl)
291 remove = aclhelpers.AclDel(self._project_test_acl)
292 remove.Execute(self.sample_url, acl, 'acl', self.logger)
293 self._AssertHasNo(acl, 'READ', 'Project', self._project_test_acl)
295 def testAclDelWithGroup(self):
296 add = aclhelpers.AclChange(self.USER_TEST_ADDRESS + ':READ',
297 scope_type=aclhelpers.ChangeType.GROUP)
298 acl = list(AclTranslation.BotoBucketAclToMessage(self.sample_uri.get_acl()))
299 add.Execute(self.sample_url, acl, 'acl', self.logger)
300 self._AssertHas(acl, 'READER', 'GroupByEmail', self.USER_TEST_ADDRESS)
302 remove = aclhelpers.AclDel(self.USER_TEST_ADDRESS)
303 remove.Execute(self.sample_url, acl, 'acl', self.logger)
304 self._AssertHasNo(acl, 'READER', 'GroupByEmail', self.GROUP_TEST_ADDRESS)
307 # Here are a whole lot of verbose asserts
310 def _AssertHas(self, current_acl, perm, scope, value=None):
311 matches = list(self._YieldMatchingEntriesJson(current_acl, perm, scope,
312 value))
313 self.assertEqual(1, len(matches))
315 def _AssertHasNo(self, current_acl, perm, scope, value=None):
316 matches = list(self._YieldMatchingEntriesJson(current_acl, perm, scope,
317 value))
318 self.assertEqual(0, len(matches))
320 def _YieldMatchingEntriesJson(self, current_acl, perm, scope, value=None):
321 """Generator that yields entries that match the change descriptor.
323 Args:
324 current_acl: A list of apitools_messages.BucketAccessControls or
325 ObjectAccessControls which will be searched for matching
326 entries.
327 perm: Role (permission) to match.
328 scope: Scope type to match.
329 value: Value to match (against the scope type).
331 Yields:
332 An apitools_messages.BucketAccessControl or ObjectAccessControl.
334 for entry in current_acl:
335 if (scope in ['UserById', 'GroupById'] and
336 entry.entityId and value == entry.entityId and
337 entry.role == perm):
338 yield entry
339 elif (scope in ['UserByEmail', 'GroupByEmail'] and
340 entry.email and value == entry.email and
341 entry.role == perm):
342 yield entry
343 elif (scope == 'GroupByDomain' and
344 entry.domain and value == entry.domain and
345 entry.role == perm):
346 yield entry
347 elif (scope == 'Project' and entry.role == perm and
348 value == entry.entityId):
349 yield entry
350 elif (scope in ['AllUsers', 'AllAuthenticatedUsers'] and
351 entry.entity.lower() == scope.lower() and
352 entry.role == perm):
353 yield entry
355 def _MakeScopeRegex(self, role, entity_type, email_address):
356 template_regex = (r'\{.*"entity":\s*"%s-%s".*"role":\s*"%s".*\}' %
357 (entity_type, email_address, role))
358 return re.compile(template_regex, flags=re.DOTALL)
360 def _MakeProjectScopeRegex(self, role, project_team):
361 template_regex = (r'\{.*"entity":\s*"project-%s-\d+",\s*"projectTeam":\s*'
362 r'\{\s*"projectNumber":\s*"(\d+)",\s*"team":\s*"%s"\s*\},'
363 r'\s*"role":\s*"%s".*\}') % (project_team, project_team,
364 role)
366 return re.compile(template_regex, flags=re.DOTALL)
368 def testBucketAclChange(self):
369 """Tests acl change on a bucket."""
370 test_regex = self._MakeScopeRegex(
371 'OWNER', 'user', self.USER_TEST_ADDRESS)
372 json_text = self.RunGsUtil(
373 self._get_acl_prefix + [suri(self.sample_uri)], return_stdout=True)
374 self.assertNotRegexpMatches(json_text, test_regex)
376 self.RunGsUtil(self._ch_acl_prefix +
377 ['-u', self.USER_TEST_ADDRESS+':fc', suri(self.sample_uri)])
378 json_text = self.RunGsUtil(
379 self._get_acl_prefix + [suri(self.sample_uri)], return_stdout=True)
380 self.assertRegexpMatches(json_text, test_regex)
382 test_regex2 = self._MakeScopeRegex(
383 'WRITER', 'user', self.USER_TEST_ADDRESS)
384 self.RunGsUtil(self._ch_acl_prefix +
385 ['-u', self.USER_TEST_ADDRESS+':w', suri(self.sample_uri)])
386 json_text2 = self.RunGsUtil(
387 self._get_acl_prefix + [suri(self.sample_uri)], return_stdout=True)
388 self.assertRegexpMatches(json_text2, test_regex2)
390 self.RunGsUtil(self._ch_acl_prefix +
391 ['-d', self.USER_TEST_ADDRESS, suri(self.sample_uri)])
393 json_text3 = self.RunGsUtil(
394 self._get_acl_prefix + [suri(self.sample_uri)], return_stdout=True)
395 self.assertNotRegexpMatches(json_text3, test_regex)
397 def testProjectAclChangesOnBucket(self):
398 """Tests project entity acl changes on a bucket."""
400 if self.test_api == ApiSelector.XML:
401 stderr = self.RunGsUtil(self._ch_acl_prefix +
402 ['-p', self._project_test_acl +':w',
403 suri(self.sample_uri)],
404 expected_status=1,
405 return_stderr=True)
406 self.assertIn(('CommandException: XML API does not support project'
407 ' scopes, cannot translate ACL.'), stderr)
408 else:
409 test_regex = self._MakeProjectScopeRegex(
410 'WRITER', self._project_team)
411 self.RunGsUtil(self._ch_acl_prefix +
412 ['-p', self._project_test_acl +':w',
413 suri(self.sample_uri)])
414 json_text = self.RunGsUtil(
415 self._get_acl_prefix + [suri(self.sample_uri)], return_stdout=True)
417 self.assertRegexpMatches(json_text, test_regex)
419 # The api will accept string project ids, but stores the numeric project
420 # ids internally, this extracts the numeric id from the returned acls.
421 proj_num_id = test_regex.search(json_text).group(1)
422 acl_to_remove = '%s-%s' % (self._project_team, proj_num_id)
424 self.RunGsUtil(self._ch_acl_prefix +
425 ['-d', acl_to_remove, suri(self.sample_uri)])
427 json_text2 = self.RunGsUtil(
428 self._get_acl_prefix + [suri(self.sample_uri)], return_stdout=True)
429 self.assertNotRegexpMatches(json_text2, test_regex)
431 def testObjectAclChange(self):
432 """Tests acl change on an object."""
433 obj = self.CreateObject(bucket_uri=self.sample_uri, contents='something')
434 self.AssertNObjectsInBucket(self.sample_uri, 1)
436 test_regex = self._MakeScopeRegex(
437 'READER', 'group', self.GROUP_TEST_ADDRESS)
438 json_text = self.RunGsUtil(self._get_acl_prefix + [suri(obj)],
439 return_stdout=True)
440 self.assertNotRegexpMatches(json_text, test_regex)
442 self.RunGsUtil(self._ch_acl_prefix +
443 ['-g', self.GROUP_TEST_ADDRESS+':READ', suri(obj)])
444 json_text = self.RunGsUtil(self._get_acl_prefix + [suri(obj)],
445 return_stdout=True)
446 self.assertRegexpMatches(json_text, test_regex)
448 test_regex2 = self._MakeScopeRegex(
449 'OWNER', 'group', self.GROUP_TEST_ADDRESS)
450 self.RunGsUtil(self._ch_acl_prefix +
451 ['-g', self.GROUP_TEST_ADDRESS+':OWNER', suri(obj)])
452 json_text2 = self.RunGsUtil(self._get_acl_prefix + [suri(obj)],
453 return_stdout=True)
454 self.assertRegexpMatches(json_text2, test_regex2)
456 self.RunGsUtil(self._ch_acl_prefix +
457 ['-d', self.GROUP_TEST_ADDRESS, suri(obj)])
458 json_text3 = self.RunGsUtil(self._get_acl_prefix + [suri(obj)],
459 return_stdout=True)
460 self.assertNotRegexpMatches(json_text3, test_regex2)
462 all_auth_regex = re.compile(
463 r'\{.*"entity":\s*"allAuthenticatedUsers".*"role":\s*"OWNER".*\}',
464 flags=re.DOTALL)
466 self.RunGsUtil(self._ch_acl_prefix + ['-g', 'AllAuth:O', suri(obj)])
467 json_text4 = self.RunGsUtil(self._get_acl_prefix + [suri(obj)],
468 return_stdout=True)
469 self.assertRegexpMatches(json_text4, all_auth_regex)
471 def testObjectAclChangeAllUsers(self):
472 """Tests acl ch AllUsers:R on an object."""
473 obj = self.CreateObject(bucket_uri=self.sample_uri, contents='something')
474 self.AssertNObjectsInBucket(self.sample_uri, 1)
476 all_users_regex = re.compile(
477 r'\{.*"entity":\s*"allUsers".*"role":\s*"READER".*\}', flags=re.DOTALL)
478 json_text = self.RunGsUtil(self._get_acl_prefix + [suri(obj)],
479 return_stdout=True)
480 self.assertNotRegexpMatches(json_text, all_users_regex)
482 self.RunGsUtil(self._ch_acl_prefix +
483 ['-g', 'AllUsers:R', suri(obj)])
484 json_text = self.RunGsUtil(self._get_acl_prefix + [suri(obj)],
485 return_stdout=True)
486 self.assertRegexpMatches(json_text, all_users_regex)
488 def testMultithreadedAclChange(self, count=10):
489 """Tests multi-threaded acl changing on several objects."""
490 objects = []
491 for i in range(count):
492 objects.append(self.CreateObject(
493 bucket_uri=self.sample_uri,
494 contents='something {0}'.format(i)))
496 self.AssertNObjectsInBucket(self.sample_uri, count)
498 test_regex = self._MakeScopeRegex(
499 'READER', 'group', self.GROUP_TEST_ADDRESS)
500 json_texts = []
501 for obj in objects:
502 json_texts.append(self.RunGsUtil(
503 self._get_acl_prefix + [suri(obj)], return_stdout=True))
504 for json_text in json_texts:
505 self.assertNotRegexpMatches(json_text, test_regex)
507 uris = [suri(obj) for obj in objects]
508 self.RunGsUtil(['-m', '-DD'] + self._ch_acl_prefix +
509 ['-g', self.GROUP_TEST_ADDRESS+':READ'] + uris)
511 json_texts = []
512 for obj in objects:
513 json_texts.append(self.RunGsUtil(
514 self._get_acl_prefix + [suri(obj)], return_stdout=True))
515 for json_text in json_texts:
516 self.assertRegexpMatches(json_text, test_regex)
518 def testRecursiveChangeAcl(self):
519 """Tests recursively changing ACLs on nested objects."""
520 obj = self.CreateObject(bucket_uri=self.sample_uri, object_name='foo/bar',
521 contents='something')
522 self.AssertNObjectsInBucket(self.sample_uri, 1)
524 test_regex = self._MakeScopeRegex(
525 'READER', 'group', self.GROUP_TEST_ADDRESS)
526 json_text = self.RunGsUtil(self._get_acl_prefix + [suri(obj)],
527 return_stdout=True)
528 self.assertNotRegexpMatches(json_text, test_regex)
530 @Retry(AssertionError, tries=5, timeout_secs=1)
531 def _AddAcl():
532 self.RunGsUtil(
533 self._ch_acl_prefix +
534 ['-R', '-g', self.GROUP_TEST_ADDRESS+':READ', suri(obj)[:-3]])
535 json_text = self.RunGsUtil(self._get_acl_prefix + [suri(obj)],
536 return_stdout=True)
537 self.assertRegexpMatches(json_text, test_regex)
538 _AddAcl()
540 @Retry(AssertionError, tries=5, timeout_secs=1)
541 def _DeleteAcl():
542 self.RunGsUtil(self._ch_acl_prefix +
543 ['-d', self.GROUP_TEST_ADDRESS, suri(obj)])
544 json_text = self.RunGsUtil(self._get_acl_prefix + [suri(obj)],
545 return_stdout=True)
546 self.assertNotRegexpMatches(json_text, test_regex)
547 _DeleteAcl()
549 def testMultiVersionSupport(self):
550 """Tests changing ACLs on multiple object versions."""
551 bucket = self.CreateVersionedBucket()
552 object_name = self.MakeTempName('obj')
553 self.CreateObject(
554 bucket_uri=bucket, object_name=object_name, contents='One thing')
555 # Create another on the same URI, giving us a second version.
556 self.CreateObject(
557 bucket_uri=bucket, object_name=object_name, contents='Another thing')
559 lines = self.AssertNObjectsInBucket(bucket, 2, versioned=True)
561 obj_v1, obj_v2 = lines[0], lines[1]
563 test_regex = self._MakeScopeRegex(
564 'READER', 'group', self.GROUP_TEST_ADDRESS)
565 json_text = self.RunGsUtil(self._get_acl_prefix + [obj_v1],
566 return_stdout=True)
567 self.assertNotRegexpMatches(json_text, test_regex)
569 self.RunGsUtil(self._ch_acl_prefix +
570 ['-g', self.GROUP_TEST_ADDRESS+':READ', obj_v1])
571 json_text = self.RunGsUtil(self._get_acl_prefix + [obj_v1],
572 return_stdout=True)
573 self.assertRegexpMatches(json_text, test_regex)
575 json_text = self.RunGsUtil(self._get_acl_prefix + [obj_v2],
576 return_stdout=True)
577 self.assertNotRegexpMatches(json_text, test_regex)
579 def testBadRequestAclChange(self):
580 stdout, stderr = self.RunGsUtil(
581 self._ch_acl_prefix +
582 ['-u', 'invalid_$$@hello.com:R', suri(self.sample_uri)],
583 return_stdout=True, return_stderr=True, expected_status=1)
584 self.assertIn('BadRequestException', stderr)
585 self.assertNotIn('Retrying', stdout)
586 self.assertNotIn('Retrying', stderr)
588 def testAclGetWithoutFullControl(self):
589 object_uri = self.CreateObject(contents='foo')
590 with self.SetAnonymousBotoCreds():
591 stderr = self.RunGsUtil(self._get_acl_prefix + [suri(object_uri)],
592 return_stderr=True, expected_status=1)
593 self.assertIn('AccessDeniedException', stderr)
595 def testTooFewArgumentsFails(self):
596 """Tests calling ACL commands with insufficient number of arguments."""
597 # No arguments for get, but valid subcommand.
598 stderr = self.RunGsUtil(self._get_acl_prefix, return_stderr=True,
599 expected_status=1)
600 self.assertIn('command requires at least', stderr)
602 # No arguments for set, but valid subcommand.
603 stderr = self.RunGsUtil(self._set_acl_prefix, return_stderr=True,
604 expected_status=1)
605 self.assertIn('command requires at least', stderr)
607 # No arguments for ch, but valid subcommand.
608 stderr = self.RunGsUtil(self._ch_acl_prefix, return_stderr=True,
609 expected_status=1)
610 self.assertIn('command requires at least', stderr)
612 # Neither arguments nor subcommand.
613 stderr = self.RunGsUtil(['acl'], return_stderr=True, expected_status=1)
614 self.assertIn('command requires at least', stderr)
616 def testMinusF(self):
617 """Tests -f option to continue after failure."""
618 bucket_uri = self.CreateBucket()
619 obj_uri = suri(self.CreateObject(bucket_uri=bucket_uri, object_name='foo',
620 contents='foo'))
621 acl_string = self.RunGsUtil(self._get_acl_prefix + [obj_uri],
622 return_stdout=True)
623 self.RunGsUtil(self._set_acl_prefix +
624 ['-f', 'public-read', suri(bucket_uri) + 'foo2', obj_uri],
625 expected_status=1)
626 acl_string2 = self.RunGsUtil(self._get_acl_prefix + [obj_uri],
627 return_stdout=True)
629 self.assertNotEqual(acl_string, acl_string2)
632 class TestS3CompatibleAcl(TestAclBase):
633 """ACL integration tests that work for s3 and gs URLs."""
635 def testAclObjectGetSet(self):
636 bucket_uri = self.CreateBucket()
637 obj_uri = self.CreateObject(bucket_uri=bucket_uri, contents='foo')
638 self.AssertNObjectsInBucket(bucket_uri, 1)
640 stdout = self.RunGsUtil(self._get_acl_prefix + [suri(obj_uri)],
641 return_stdout=True)
642 set_contents = self.CreateTempFile(contents=stdout)
643 self.RunGsUtil(self._set_acl_prefix + [set_contents, suri(obj_uri)])
645 def testAclBucketGetSet(self):
646 bucket_uri = self.CreateBucket()
647 stdout = self.RunGsUtil(self._get_acl_prefix + [suri(bucket_uri)],
648 return_stdout=True)
649 set_contents = self.CreateTempFile(contents=stdout)
650 self.RunGsUtil(self._set_acl_prefix + [set_contents, suri(bucket_uri)])
653 @SkipForGS('S3 ACLs accept XML and should not cause an XML warning.')
654 class TestS3OnlyAcl(TestAclBase):
655 """ACL integration tests that work only for s3 URLs."""
657 # TODO: Format all test case names consistently.
658 def test_set_xml_acl(self):
659 """Ensures XML content does not return an XML warning for S3."""
660 obj_uri = suri(self.CreateObject(contents='foo'))
661 inpath = self.CreateTempFile(contents='<ValidXml></ValidXml>')
662 stderr = self.RunGsUtil(self._set_acl_prefix + [inpath, obj_uri],
663 return_stderr=True, expected_status=1)
664 self.assertIn('BadRequestException', stderr)
665 self.assertNotIn('XML ACL data provided', stderr)
667 def test_set_xml_acl_bucket(self):
668 """Ensures XML content does not return an XML warning for S3."""
669 bucket_uri = suri(self.CreateBucket())
670 inpath = self.CreateTempFile(contents='<ValidXml></ValidXml>')
671 stderr = self.RunGsUtil(self._set_acl_prefix + [inpath, bucket_uri],
672 return_stderr=True, expected_status=1)
673 self.assertIn('BadRequestException', stderr)
674 self.assertNotIn('XML ACL data provided', stderr)
677 class TestAclOldAlias(TestAcl):
678 _set_acl_prefix = ['setacl']
679 _get_acl_prefix = ['getacl']
680 _set_defacl_prefix = ['setdefacl']
681 _ch_acl_prefix = ['chacl']