1 # -*- coding: utf-8 -*-
2 # Copyright 2013 Google Inc. All Rights Reserved.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 """Integration tests for the acl command."""
17 from __future__
import absolute_import
21 from gslib
import aclhelpers
22 from gslib
.command
import CreateGsutilLogger
23 from gslib
.cs_api_map
import ApiSelector
24 from gslib
.project_id
import PopulateProjectId
25 from gslib
.storage_url
import StorageUrlFromString
26 import gslib
.tests
.testcase
as testcase
27 from gslib
.tests
.testcase
.integration_testcase
import SkipForGS
28 from gslib
.tests
.testcase
.integration_testcase
import SkipForS3
29 from gslib
.tests
.util
import ObjectToURI
as suri
30 from gslib
.translation_helper
import AclTranslation
31 from gslib
.util
import Retry
33 PUBLIC_READ_JSON_ACL_TEXT
= '"entity":"allUsers","role":"READER"'
36 class TestAclBase(testcase
.GsUtilIntegrationTestCase
):
37 """Integration test case base class for acl command."""
39 _set_acl_prefix
= ['acl', 'set']
40 _get_acl_prefix
= ['acl', 'get']
41 _set_defacl_prefix
= ['defacl', 'set']
42 _ch_acl_prefix
= ['acl', 'ch']
44 _project_team
= 'owners'
45 _project_test_acl
= '%s-%s' % (_project_team
, PopulateProjectId())
48 @SkipForS3('Tests use GS ACL model.')
49 class TestAcl(TestAclBase
):
50 """Integration tests for acl command."""
53 super(TestAcl
, self
).setUp()
54 self
.sample_uri
= self
.CreateBucket()
55 self
.sample_url
= StorageUrlFromString(str(self
.sample_uri
))
56 self
.logger
= CreateGsutilLogger('acl')
58 def test_set_invalid_acl_object(self
):
59 """Ensures that invalid content returns a bad request error."""
60 obj_uri
= suri(self
.CreateObject(contents
='foo'))
61 inpath
= self
.CreateTempFile(contents
='badAcl')
62 stderr
= self
.RunGsUtil(self
._set
_acl
_prefix
+ [inpath
, obj_uri
],
63 return_stderr
=True, expected_status
=1)
64 self
.assertIn('ArgumentException', stderr
)
66 def test_set_invalid_acl_bucket(self
):
67 """Ensures that invalid content returns a bad request error."""
68 bucket_uri
= suri(self
.CreateBucket())
69 inpath
= self
.CreateTempFile(contents
='badAcl')
70 stderr
= self
.RunGsUtil(self
._set
_acl
_prefix
+ [inpath
, bucket_uri
],
71 return_stderr
=True, expected_status
=1)
72 self
.assertIn('ArgumentException', stderr
)
74 def test_set_xml_acl_json_api_object(self
):
75 """Ensures XML content returns a bad request error and migration warning."""
76 obj_uri
= suri(self
.CreateObject(contents
='foo'))
77 inpath
= self
.CreateTempFile(contents
='<ValidXml></ValidXml>')
78 stderr
= self
.RunGsUtil(self
._set
_acl
_prefix
+ [inpath
, obj_uri
],
79 return_stderr
=True, expected_status
=1)
80 self
.assertIn('ArgumentException', stderr
)
81 self
.assertIn('XML ACL data provided', stderr
)
83 def test_set_xml_acl_json_api_bucket(self
):
84 """Ensures XML content returns a bad request error and migration warning."""
85 bucket_uri
= suri(self
.CreateBucket())
86 inpath
= self
.CreateTempFile(contents
='<ValidXml></ValidXml>')
87 stderr
= self
.RunGsUtil(self
._set
_acl
_prefix
+ [inpath
, bucket_uri
],
88 return_stderr
=True, expected_status
=1)
89 self
.assertIn('ArgumentException', stderr
)
90 self
.assertIn('XML ACL data provided', stderr
)
92 def test_set_valid_acl_object(self
):
93 """Tests setting a valid ACL on an object."""
94 obj_uri
= suri(self
.CreateObject(contents
='foo'))
95 acl_string
= self
.RunGsUtil(self
._get
_acl
_prefix
+ [obj_uri
],
97 inpath
= self
.CreateTempFile(contents
=acl_string
)
98 self
.RunGsUtil(self
._set
_acl
_prefix
+ ['public-read', obj_uri
])
99 acl_string2
= self
.RunGsUtil(self
._get
_acl
_prefix
+ [obj_uri
],
101 self
.RunGsUtil(self
._set
_acl
_prefix
+ [inpath
, obj_uri
])
102 acl_string3
= self
.RunGsUtil(self
._get
_acl
_prefix
+ [obj_uri
],
105 self
.assertNotEqual(acl_string
, acl_string2
)
106 self
.assertEqual(acl_string
, acl_string3
)
108 def test_set_valid_permission_whitespace_object(self
):
109 """Ensures that whitespace is allowed in role and entity elements."""
110 obj_uri
= suri(self
.CreateObject(contents
='foo'))
111 acl_string
= self
.RunGsUtil(self
._get
_acl
_prefix
+ [obj_uri
],
113 acl_string
= re
.sub(r
'"role"', r
'"role" \n', acl_string
)
114 acl_string
= re
.sub(r
'"entity"', r
'\n "entity"', acl_string
)
115 inpath
= self
.CreateTempFile(contents
=acl_string
)
117 self
.RunGsUtil(self
._set
_acl
_prefix
+ [inpath
, obj_uri
])
119 def test_set_valid_acl_bucket(self
):
120 """Ensures that valid canned and XML ACLs work with get/set."""
121 bucket_uri
= suri(self
.CreateBucket())
122 acl_string
= self
.RunGsUtil(self
._get
_acl
_prefix
+ [bucket_uri
],
124 inpath
= self
.CreateTempFile(contents
=acl_string
)
125 self
.RunGsUtil(self
._set
_acl
_prefix
+ ['public-read', bucket_uri
])
126 acl_string2
= self
.RunGsUtil(self
._get
_acl
_prefix
+ [bucket_uri
],
128 self
.RunGsUtil(self
._set
_acl
_prefix
+ [inpath
, bucket_uri
])
129 acl_string3
= self
.RunGsUtil(self
._get
_acl
_prefix
+ [bucket_uri
],
132 self
.assertNotEqual(acl_string
, acl_string2
)
133 self
.assertEqual(acl_string
, acl_string3
)
135 def test_invalid_canned_acl_object(self
):
136 """Ensures that an invalid canned ACL returns a CommandException."""
137 obj_uri
= suri(self
.CreateObject(contents
='foo'))
138 stderr
= self
.RunGsUtil(
139 self
._set
_acl
_prefix
+ ['not-a-canned-acl', obj_uri
],
140 return_stderr
=True, expected_status
=1)
141 self
.assertIn('CommandException', stderr
)
142 self
.assertIn('Invalid canned ACL', stderr
)
144 def test_set_valid_def_acl_bucket(self
):
145 """Ensures that valid default canned and XML ACLs works with get/set."""
146 bucket_uri
= self
.CreateBucket()
148 # Default ACL is project private.
149 obj_uri1
= suri(self
.CreateObject(bucket_uri
=bucket_uri
, contents
='foo'))
150 acl_string
= self
.RunGsUtil(self
._get
_acl
_prefix
+ [obj_uri1
],
153 # Change it to authenticated-read.
155 self
._set
_defacl
_prefix
+ ['authenticated-read', suri(bucket_uri
)])
156 obj_uri2
= suri(self
.CreateObject(bucket_uri
=bucket_uri
, contents
='foo2'))
157 acl_string2
= self
.RunGsUtil(self
._get
_acl
_prefix
+ [obj_uri2
],
160 # Now change it back to the default via XML.
161 inpath
= self
.CreateTempFile(contents
=acl_string
)
162 self
.RunGsUtil(self
._set
_defacl
_prefix
+ [inpath
, suri(bucket_uri
)])
163 obj_uri3
= suri(self
.CreateObject(bucket_uri
=bucket_uri
, contents
='foo3'))
164 acl_string3
= self
.RunGsUtil(self
._get
_acl
_prefix
+ [obj_uri3
],
167 self
.assertNotEqual(acl_string
, acl_string2
)
168 self
.assertIn('allAuthenticatedUsers', acl_string2
)
169 self
.assertEqual(acl_string
, acl_string3
)
171 def test_acl_set_version_specific_uri(self
):
172 """Tests setting an ACL on a specific version of an object."""
173 bucket_uri
= self
.CreateVersionedBucket()
174 # Create initial object version.
175 uri
= self
.CreateObject(bucket_uri
=bucket_uri
, contents
='data')
176 # Create a second object version.
177 inpath
= self
.CreateTempFile(contents
='def')
178 self
.RunGsUtil(['cp', inpath
, uri
.uri
])
180 # Find out the two object version IDs.
181 lines
= self
.AssertNObjectsInBucket(bucket_uri
, 2, versioned
=True)
182 v0_uri_str
, v1_uri_str
= lines
[0], lines
[1]
184 # Check that neither version currently has public-read permission
185 # (default ACL is project-private).
187 for uri_str
in (v0_uri_str
, v1_uri_str
):
188 acl
= self
.RunGsUtil(self
._get
_acl
_prefix
+ [uri_str
],
190 self
.assertNotIn(PUBLIC_READ_JSON_ACL_TEXT
,
191 self
._strip
_json
_whitespace
(acl
))
192 orig_acls
.append(acl
)
194 # Set the ACL for the older version of the object to public-read.
195 self
.RunGsUtil(self
._set
_acl
_prefix
+ ['public-read', v0_uri_str
])
196 # Check that the older version's ACL is public-read, but newer version
198 acl
= self
.RunGsUtil(self
._get
_acl
_prefix
+ [v0_uri_str
],
200 self
.assertIn(PUBLIC_READ_JSON_ACL_TEXT
, self
._strip
_json
_whitespace
(acl
))
201 acl
= self
.RunGsUtil(self
._get
_acl
_prefix
+ [v1_uri_str
],
203 self
.assertNotIn(PUBLIC_READ_JSON_ACL_TEXT
,
204 self
._strip
_json
_whitespace
(acl
))
206 # Check that reading the ACL with the version-less URI returns the
207 # original ACL (since the version-less URI means the current version).
208 acl
= self
.RunGsUtil(self
._get
_acl
_prefix
+ [uri
.uri
], return_stdout
=True)
209 self
.assertEqual(acl
, orig_acls
[0])
211 def _strip_json_whitespace(self
, json_text
):
212 return re
.sub(r
'\s*', '', json_text
)
214 def testAclChangeWithUserId(self
):
215 change
= aclhelpers
.AclChange(self
.USER_TEST_ID
+ ':r',
216 scope_type
=aclhelpers
.ChangeType
.USER
)
217 acl
= list(AclTranslation
.BotoBucketAclToMessage(self
.sample_uri
.get_acl()))
218 change
.Execute(self
.sample_url
, acl
, 'acl', self
.logger
)
219 self
._AssertHas
(acl
, 'READER', 'UserById', self
.USER_TEST_ID
)
221 def testAclChangeWithGroupId(self
):
222 change
= aclhelpers
.AclChange(self
.GROUP_TEST_ID
+ ':r',
223 scope_type
=aclhelpers
.ChangeType
.GROUP
)
224 acl
= list(AclTranslation
.BotoBucketAclToMessage(self
.sample_uri
.get_acl()))
225 change
.Execute(self
.sample_url
, acl
, 'acl', self
.logger
)
226 self
._AssertHas
(acl
, 'READER', 'GroupById', self
.GROUP_TEST_ID
)
228 def testAclChangeWithUserEmail(self
):
229 change
= aclhelpers
.AclChange(self
.USER_TEST_ADDRESS
+ ':r',
230 scope_type
=aclhelpers
.ChangeType
.USER
)
231 acl
= list(AclTranslation
.BotoBucketAclToMessage(self
.sample_uri
.get_acl()))
232 change
.Execute(self
.sample_url
, acl
, 'acl', self
.logger
)
233 self
._AssertHas
(acl
, 'READER', 'UserByEmail', self
.USER_TEST_ADDRESS
)
235 def testAclChangeWithGroupEmail(self
):
236 change
= aclhelpers
.AclChange(self
.GROUP_TEST_ADDRESS
+ ':fc',
237 scope_type
=aclhelpers
.ChangeType
.GROUP
)
238 acl
= list(AclTranslation
.BotoBucketAclToMessage(self
.sample_uri
.get_acl()))
239 change
.Execute(self
.sample_url
, acl
, 'acl', self
.logger
)
240 self
._AssertHas
(acl
, 'OWNER', 'GroupByEmail', self
.GROUP_TEST_ADDRESS
)
242 def testAclChangeWithDomain(self
):
243 change
= aclhelpers
.AclChange(self
.DOMAIN_TEST
+ ':READ',
244 scope_type
=aclhelpers
.ChangeType
.GROUP
)
245 acl
= list(AclTranslation
.BotoBucketAclToMessage(self
.sample_uri
.get_acl()))
246 change
.Execute(self
.sample_url
, acl
, 'acl', self
.logger
)
247 self
._AssertHas
(acl
, 'READER', 'GroupByDomain', self
.DOMAIN_TEST
)
249 def testAclChangeWithProjectOwners(self
):
250 change
= aclhelpers
.AclChange(self
._project
_test
_acl
+ ':READ',
251 scope_type
=aclhelpers
.ChangeType
.PROJECT
)
252 acl
= list(AclTranslation
.BotoBucketAclToMessage(self
.sample_uri
.get_acl()))
253 change
.Execute(self
.sample_url
, acl
, 'acl', self
.logger
)
254 self
._AssertHas
(acl
, 'READER', 'Project', self
._project
_test
_acl
)
256 def testAclChangeWithAllUsers(self
):
257 change
= aclhelpers
.AclChange('AllUsers:WRITE',
258 scope_type
=aclhelpers
.ChangeType
.GROUP
)
259 acl
= list(AclTranslation
.BotoBucketAclToMessage(self
.sample_uri
.get_acl()))
260 change
.Execute(self
.sample_url
, acl
, 'acl', self
.logger
)
261 self
._AssertHas
(acl
, 'WRITER', 'AllUsers')
263 def testAclChangeWithAllAuthUsers(self
):
264 change
= aclhelpers
.AclChange('AllAuthenticatedUsers:READ',
265 scope_type
=aclhelpers
.ChangeType
.GROUP
)
266 acl
= list(AclTranslation
.BotoBucketAclToMessage(self
.sample_uri
.get_acl()))
267 change
.Execute(self
.sample_url
, acl
, 'acl', self
.logger
)
268 self
._AssertHas
(acl
, 'READER', 'AllAuthenticatedUsers')
269 remove
= aclhelpers
.AclDel('AllAuthenticatedUsers')
270 remove
.Execute(self
.sample_url
, acl
, 'acl', self
.logger
)
271 self
._AssertHasNo
(acl
, 'READER', 'AllAuthenticatedUsers')
273 def testAclDelWithUser(self
):
274 add
= aclhelpers
.AclChange(self
.USER_TEST_ADDRESS
+ ':READ',
275 scope_type
=aclhelpers
.ChangeType
.USER
)
276 acl
= list(AclTranslation
.BotoBucketAclToMessage(self
.sample_uri
.get_acl()))
277 add
.Execute(self
.sample_url
, acl
, 'acl', self
.logger
)
278 self
._AssertHas
(acl
, 'READER', 'UserByEmail', self
.USER_TEST_ADDRESS
)
280 remove
= aclhelpers
.AclDel(self
.USER_TEST_ADDRESS
)
281 remove
.Execute(self
.sample_url
, acl
, 'acl', self
.logger
)
282 self
._AssertHasNo
(acl
, 'READ', 'UserByEmail', self
.USER_TEST_ADDRESS
)
284 def testAclDelWithProjectOwners(self
):
285 add
= aclhelpers
.AclChange(self
._project
_test
_acl
+ ':READ',
286 scope_type
=aclhelpers
.ChangeType
.PROJECT
)
287 acl
= list(AclTranslation
.BotoBucketAclToMessage(self
.sample_uri
.get_acl()))
288 add
.Execute(self
.sample_url
, acl
, 'acl', self
.logger
)
289 self
._AssertHas
(acl
, 'READER', 'Project', self
._project
_test
_acl
)
291 remove
= aclhelpers
.AclDel(self
._project
_test
_acl
)
292 remove
.Execute(self
.sample_url
, acl
, 'acl', self
.logger
)
293 self
._AssertHasNo
(acl
, 'READ', 'Project', self
._project
_test
_acl
)
295 def testAclDelWithGroup(self
):
296 add
= aclhelpers
.AclChange(self
.USER_TEST_ADDRESS
+ ':READ',
297 scope_type
=aclhelpers
.ChangeType
.GROUP
)
298 acl
= list(AclTranslation
.BotoBucketAclToMessage(self
.sample_uri
.get_acl()))
299 add
.Execute(self
.sample_url
, acl
, 'acl', self
.logger
)
300 self
._AssertHas
(acl
, 'READER', 'GroupByEmail', self
.USER_TEST_ADDRESS
)
302 remove
= aclhelpers
.AclDel(self
.USER_TEST_ADDRESS
)
303 remove
.Execute(self
.sample_url
, acl
, 'acl', self
.logger
)
304 self
._AssertHasNo
(acl
, 'READER', 'GroupByEmail', self
.GROUP_TEST_ADDRESS
)
307 # Here are a whole lot of verbose asserts
310 def _AssertHas(self
, current_acl
, perm
, scope
, value
=None):
311 matches
= list(self
._YieldMatchingEntriesJson
(current_acl
, perm
, scope
,
313 self
.assertEqual(1, len(matches
))
315 def _AssertHasNo(self
, current_acl
, perm
, scope
, value
=None):
316 matches
= list(self
._YieldMatchingEntriesJson
(current_acl
, perm
, scope
,
318 self
.assertEqual(0, len(matches
))
320 def _YieldMatchingEntriesJson(self
, current_acl
, perm
, scope
, value
=None):
321 """Generator that yields entries that match the change descriptor.
324 current_acl: A list of apitools_messages.BucketAccessControls or
325 ObjectAccessControls which will be searched for matching
327 perm: Role (permission) to match.
328 scope: Scope type to match.
329 value: Value to match (against the scope type).
332 An apitools_messages.BucketAccessControl or ObjectAccessControl.
334 for entry
in current_acl
:
335 if (scope
in ['UserById', 'GroupById'] and
336 entry
.entityId
and value
== entry
.entityId
and
339 elif (scope
in ['UserByEmail', 'GroupByEmail'] and
340 entry
.email
and value
== entry
.email
and
343 elif (scope
== 'GroupByDomain' and
344 entry
.domain
and value
== entry
.domain
and
347 elif (scope
== 'Project' and entry
.role
== perm
and
348 value
== entry
.entityId
):
350 elif (scope
in ['AllUsers', 'AllAuthenticatedUsers'] and
351 entry
.entity
.lower() == scope
.lower() and
355 def _MakeScopeRegex(self
, role
, entity_type
, email_address
):
356 template_regex
= (r
'\{.*"entity":\s*"%s-%s".*"role":\s*"%s".*\}' %
357 (entity_type
, email_address
, role
))
358 return re
.compile(template_regex
, flags
=re
.DOTALL
)
360 def _MakeProjectScopeRegex(self
, role
, project_team
):
361 template_regex
= (r
'\{.*"entity":\s*"project-%s-\d+",\s*"projectTeam":\s*'
362 r
'\{\s*"projectNumber":\s*"(\d+)",\s*"team":\s*"%s"\s*\},'
363 r
'\s*"role":\s*"%s".*\}') % (project_team
, project_team
,
366 return re
.compile(template_regex
, flags
=re
.DOTALL
)
368 def testBucketAclChange(self
):
369 """Tests acl change on a bucket."""
370 test_regex
= self
._MakeScopeRegex
(
371 'OWNER', 'user', self
.USER_TEST_ADDRESS
)
372 json_text
= self
.RunGsUtil(
373 self
._get
_acl
_prefix
+ [suri(self
.sample_uri
)], return_stdout
=True)
374 self
.assertNotRegexpMatches(json_text
, test_regex
)
376 self
.RunGsUtil(self
._ch
_acl
_prefix
+
377 ['-u', self
.USER_TEST_ADDRESS
+':fc', suri(self
.sample_uri
)])
378 json_text
= self
.RunGsUtil(
379 self
._get
_acl
_prefix
+ [suri(self
.sample_uri
)], return_stdout
=True)
380 self
.assertRegexpMatches(json_text
, test_regex
)
382 test_regex2
= self
._MakeScopeRegex
(
383 'WRITER', 'user', self
.USER_TEST_ADDRESS
)
384 self
.RunGsUtil(self
._ch
_acl
_prefix
+
385 ['-u', self
.USER_TEST_ADDRESS
+':w', suri(self
.sample_uri
)])
386 json_text2
= self
.RunGsUtil(
387 self
._get
_acl
_prefix
+ [suri(self
.sample_uri
)], return_stdout
=True)
388 self
.assertRegexpMatches(json_text2
, test_regex2
)
390 self
.RunGsUtil(self
._ch
_acl
_prefix
+
391 ['-d', self
.USER_TEST_ADDRESS
, suri(self
.sample_uri
)])
393 json_text3
= self
.RunGsUtil(
394 self
._get
_acl
_prefix
+ [suri(self
.sample_uri
)], return_stdout
=True)
395 self
.assertNotRegexpMatches(json_text3
, test_regex
)
397 def testProjectAclChangesOnBucket(self
):
398 """Tests project entity acl changes on a bucket."""
400 if self
.test_api
== ApiSelector
.XML
:
401 stderr
= self
.RunGsUtil(self
._ch
_acl
_prefix
+
402 ['-p', self
._project
_test
_acl
+':w',
403 suri(self
.sample_uri
)],
406 self
.assertIn(('CommandException: XML API does not support project'
407 ' scopes, cannot translate ACL.'), stderr
)
409 test_regex
= self
._MakeProjectScopeRegex
(
410 'WRITER', self
._project
_team
)
411 self
.RunGsUtil(self
._ch
_acl
_prefix
+
412 ['-p', self
._project
_test
_acl
+':w',
413 suri(self
.sample_uri
)])
414 json_text
= self
.RunGsUtil(
415 self
._get
_acl
_prefix
+ [suri(self
.sample_uri
)], return_stdout
=True)
417 self
.assertRegexpMatches(json_text
, test_regex
)
419 # The api will accept string project ids, but stores the numeric project
420 # ids internally, this extracts the numeric id from the returned acls.
421 proj_num_id
= test_regex
.search(json_text
).group(1)
422 acl_to_remove
= '%s-%s' % (self
._project
_team
, proj_num_id
)
424 self
.RunGsUtil(self
._ch
_acl
_prefix
+
425 ['-d', acl_to_remove
, suri(self
.sample_uri
)])
427 json_text2
= self
.RunGsUtil(
428 self
._get
_acl
_prefix
+ [suri(self
.sample_uri
)], return_stdout
=True)
429 self
.assertNotRegexpMatches(json_text2
, test_regex
)
431 def testObjectAclChange(self
):
432 """Tests acl change on an object."""
433 obj
= self
.CreateObject(bucket_uri
=self
.sample_uri
, contents
='something')
434 self
.AssertNObjectsInBucket(self
.sample_uri
, 1)
436 test_regex
= self
._MakeScopeRegex
(
437 'READER', 'group', self
.GROUP_TEST_ADDRESS
)
438 json_text
= self
.RunGsUtil(self
._get
_acl
_prefix
+ [suri(obj
)],
440 self
.assertNotRegexpMatches(json_text
, test_regex
)
442 self
.RunGsUtil(self
._ch
_acl
_prefix
+
443 ['-g', self
.GROUP_TEST_ADDRESS
+':READ', suri(obj
)])
444 json_text
= self
.RunGsUtil(self
._get
_acl
_prefix
+ [suri(obj
)],
446 self
.assertRegexpMatches(json_text
, test_regex
)
448 test_regex2
= self
._MakeScopeRegex
(
449 'OWNER', 'group', self
.GROUP_TEST_ADDRESS
)
450 self
.RunGsUtil(self
._ch
_acl
_prefix
+
451 ['-g', self
.GROUP_TEST_ADDRESS
+':OWNER', suri(obj
)])
452 json_text2
= self
.RunGsUtil(self
._get
_acl
_prefix
+ [suri(obj
)],
454 self
.assertRegexpMatches(json_text2
, test_regex2
)
456 self
.RunGsUtil(self
._ch
_acl
_prefix
+
457 ['-d', self
.GROUP_TEST_ADDRESS
, suri(obj
)])
458 json_text3
= self
.RunGsUtil(self
._get
_acl
_prefix
+ [suri(obj
)],
460 self
.assertNotRegexpMatches(json_text3
, test_regex2
)
462 all_auth_regex
= re
.compile(
463 r
'\{.*"entity":\s*"allAuthenticatedUsers".*"role":\s*"OWNER".*\}',
466 self
.RunGsUtil(self
._ch
_acl
_prefix
+ ['-g', 'AllAuth:O', suri(obj
)])
467 json_text4
= self
.RunGsUtil(self
._get
_acl
_prefix
+ [suri(obj
)],
469 self
.assertRegexpMatches(json_text4
, all_auth_regex
)
471 def testObjectAclChangeAllUsers(self
):
472 """Tests acl ch AllUsers:R on an object."""
473 obj
= self
.CreateObject(bucket_uri
=self
.sample_uri
, contents
='something')
474 self
.AssertNObjectsInBucket(self
.sample_uri
, 1)
476 all_users_regex
= re
.compile(
477 r
'\{.*"entity":\s*"allUsers".*"role":\s*"READER".*\}', flags
=re
.DOTALL
)
478 json_text
= self
.RunGsUtil(self
._get
_acl
_prefix
+ [suri(obj
)],
480 self
.assertNotRegexpMatches(json_text
, all_users_regex
)
482 self
.RunGsUtil(self
._ch
_acl
_prefix
+
483 ['-g', 'AllUsers:R', suri(obj
)])
484 json_text
= self
.RunGsUtil(self
._get
_acl
_prefix
+ [suri(obj
)],
486 self
.assertRegexpMatches(json_text
, all_users_regex
)
488 def testMultithreadedAclChange(self
, count
=10):
489 """Tests multi-threaded acl changing on several objects."""
491 for i
in range(count
):
492 objects
.append(self
.CreateObject(
493 bucket_uri
=self
.sample_uri
,
494 contents
='something {0}'.format(i
)))
496 self
.AssertNObjectsInBucket(self
.sample_uri
, count
)
498 test_regex
= self
._MakeScopeRegex
(
499 'READER', 'group', self
.GROUP_TEST_ADDRESS
)
502 json_texts
.append(self
.RunGsUtil(
503 self
._get
_acl
_prefix
+ [suri(obj
)], return_stdout
=True))
504 for json_text
in json_texts
:
505 self
.assertNotRegexpMatches(json_text
, test_regex
)
507 uris
= [suri(obj
) for obj
in objects
]
508 self
.RunGsUtil(['-m', '-DD'] + self
._ch
_acl
_prefix
+
509 ['-g', self
.GROUP_TEST_ADDRESS
+':READ'] + uris
)
513 json_texts
.append(self
.RunGsUtil(
514 self
._get
_acl
_prefix
+ [suri(obj
)], return_stdout
=True))
515 for json_text
in json_texts
:
516 self
.assertRegexpMatches(json_text
, test_regex
)
518 def testRecursiveChangeAcl(self
):
519 """Tests recursively changing ACLs on nested objects."""
520 obj
= self
.CreateObject(bucket_uri
=self
.sample_uri
, object_name
='foo/bar',
521 contents
='something')
522 self
.AssertNObjectsInBucket(self
.sample_uri
, 1)
524 test_regex
= self
._MakeScopeRegex
(
525 'READER', 'group', self
.GROUP_TEST_ADDRESS
)
526 json_text
= self
.RunGsUtil(self
._get
_acl
_prefix
+ [suri(obj
)],
528 self
.assertNotRegexpMatches(json_text
, test_regex
)
530 @Retry(AssertionError, tries
=5, timeout_secs
=1)
533 self
._ch
_acl
_prefix
+
534 ['-R', '-g', self
.GROUP_TEST_ADDRESS
+':READ', suri(obj
)[:-3]])
535 json_text
= self
.RunGsUtil(self
._get
_acl
_prefix
+ [suri(obj
)],
537 self
.assertRegexpMatches(json_text
, test_regex
)
540 @Retry(AssertionError, tries
=5, timeout_secs
=1)
542 self
.RunGsUtil(self
._ch
_acl
_prefix
+
543 ['-d', self
.GROUP_TEST_ADDRESS
, suri(obj
)])
544 json_text
= self
.RunGsUtil(self
._get
_acl
_prefix
+ [suri(obj
)],
546 self
.assertNotRegexpMatches(json_text
, test_regex
)
549 def testMultiVersionSupport(self
):
550 """Tests changing ACLs on multiple object versions."""
551 bucket
= self
.CreateVersionedBucket()
552 object_name
= self
.MakeTempName('obj')
554 bucket_uri
=bucket
, object_name
=object_name
, contents
='One thing')
555 # Create another on the same URI, giving us a second version.
557 bucket_uri
=bucket
, object_name
=object_name
, contents
='Another thing')
559 lines
= self
.AssertNObjectsInBucket(bucket
, 2, versioned
=True)
561 obj_v1
, obj_v2
= lines
[0], lines
[1]
563 test_regex
= self
._MakeScopeRegex
(
564 'READER', 'group', self
.GROUP_TEST_ADDRESS
)
565 json_text
= self
.RunGsUtil(self
._get
_acl
_prefix
+ [obj_v1
],
567 self
.assertNotRegexpMatches(json_text
, test_regex
)
569 self
.RunGsUtil(self
._ch
_acl
_prefix
+
570 ['-g', self
.GROUP_TEST_ADDRESS
+':READ', obj_v1
])
571 json_text
= self
.RunGsUtil(self
._get
_acl
_prefix
+ [obj_v1
],
573 self
.assertRegexpMatches(json_text
, test_regex
)
575 json_text
= self
.RunGsUtil(self
._get
_acl
_prefix
+ [obj_v2
],
577 self
.assertNotRegexpMatches(json_text
, test_regex
)
579 def testBadRequestAclChange(self
):
580 stdout
, stderr
= self
.RunGsUtil(
581 self
._ch
_acl
_prefix
+
582 ['-u', 'invalid_$$@hello.com:R', suri(self
.sample_uri
)],
583 return_stdout
=True, return_stderr
=True, expected_status
=1)
584 self
.assertIn('BadRequestException', stderr
)
585 self
.assertNotIn('Retrying', stdout
)
586 self
.assertNotIn('Retrying', stderr
)
588 def testAclGetWithoutFullControl(self
):
589 object_uri
= self
.CreateObject(contents
='foo')
590 with self
.SetAnonymousBotoCreds():
591 stderr
= self
.RunGsUtil(self
._get
_acl
_prefix
+ [suri(object_uri
)],
592 return_stderr
=True, expected_status
=1)
593 self
.assertIn('AccessDeniedException', stderr
)
595 def testTooFewArgumentsFails(self
):
596 """Tests calling ACL commands with insufficient number of arguments."""
597 # No arguments for get, but valid subcommand.
598 stderr
= self
.RunGsUtil(self
._get
_acl
_prefix
, return_stderr
=True,
600 self
.assertIn('command requires at least', stderr
)
602 # No arguments for set, but valid subcommand.
603 stderr
= self
.RunGsUtil(self
._set
_acl
_prefix
, return_stderr
=True,
605 self
.assertIn('command requires at least', stderr
)
607 # No arguments for ch, but valid subcommand.
608 stderr
= self
.RunGsUtil(self
._ch
_acl
_prefix
, return_stderr
=True,
610 self
.assertIn('command requires at least', stderr
)
612 # Neither arguments nor subcommand.
613 stderr
= self
.RunGsUtil(['acl'], return_stderr
=True, expected_status
=1)
614 self
.assertIn('command requires at least', stderr
)
616 def testMinusF(self
):
617 """Tests -f option to continue after failure."""
618 bucket_uri
= self
.CreateBucket()
619 obj_uri
= suri(self
.CreateObject(bucket_uri
=bucket_uri
, object_name
='foo',
621 acl_string
= self
.RunGsUtil(self
._get
_acl
_prefix
+ [obj_uri
],
623 self
.RunGsUtil(self
._set
_acl
_prefix
+
624 ['-f', 'public-read', suri(bucket_uri
) + 'foo2', obj_uri
],
626 acl_string2
= self
.RunGsUtil(self
._get
_acl
_prefix
+ [obj_uri
],
629 self
.assertNotEqual(acl_string
, acl_string2
)
632 class TestS3CompatibleAcl(TestAclBase
):
633 """ACL integration tests that work for s3 and gs URLs."""
635 def testAclObjectGetSet(self
):
636 bucket_uri
= self
.CreateBucket()
637 obj_uri
= self
.CreateObject(bucket_uri
=bucket_uri
, contents
='foo')
638 self
.AssertNObjectsInBucket(bucket_uri
, 1)
640 stdout
= self
.RunGsUtil(self
._get
_acl
_prefix
+ [suri(obj_uri
)],
642 set_contents
= self
.CreateTempFile(contents
=stdout
)
643 self
.RunGsUtil(self
._set
_acl
_prefix
+ [set_contents
, suri(obj_uri
)])
645 def testAclBucketGetSet(self
):
646 bucket_uri
= self
.CreateBucket()
647 stdout
= self
.RunGsUtil(self
._get
_acl
_prefix
+ [suri(bucket_uri
)],
649 set_contents
= self
.CreateTempFile(contents
=stdout
)
650 self
.RunGsUtil(self
._set
_acl
_prefix
+ [set_contents
, suri(bucket_uri
)])
653 @SkipForGS('S3 ACLs accept XML and should not cause an XML warning.')
654 class TestS3OnlyAcl(TestAclBase
):
655 """ACL integration tests that work only for s3 URLs."""
657 # TODO: Format all test case names consistently.
658 def test_set_xml_acl(self
):
659 """Ensures XML content does not return an XML warning for S3."""
660 obj_uri
= suri(self
.CreateObject(contents
='foo'))
661 inpath
= self
.CreateTempFile(contents
='<ValidXml></ValidXml>')
662 stderr
= self
.RunGsUtil(self
._set
_acl
_prefix
+ [inpath
, obj_uri
],
663 return_stderr
=True, expected_status
=1)
664 self
.assertIn('BadRequestException', stderr
)
665 self
.assertNotIn('XML ACL data provided', stderr
)
667 def test_set_xml_acl_bucket(self
):
668 """Ensures XML content does not return an XML warning for S3."""
669 bucket_uri
= suri(self
.CreateBucket())
670 inpath
= self
.CreateTempFile(contents
='<ValidXml></ValidXml>')
671 stderr
= self
.RunGsUtil(self
._set
_acl
_prefix
+ [inpath
, bucket_uri
],
672 return_stderr
=True, expected_status
=1)
673 self
.assertIn('BadRequestException', stderr
)
674 self
.assertNotIn('XML ACL data provided', stderr
)
677 class TestAclOldAlias(TestAcl
):
678 _set_acl_prefix
= ['setacl']
679 _get_acl_prefix
= ['getacl']
680 _set_defacl_prefix
= ['setdefacl']
681 _ch_acl_prefix
= ['chacl']