1 # -*- coding: utf-8 -*-
2 # Copyright 2013 Google Inc. All Rights Reserved.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 """Integration tests for the defacl command."""
17 from __future__
import absolute_import
21 import gslib
.tests
.testcase
as case
22 from gslib
.tests
.testcase
.integration_testcase
import SkipForS3
23 from gslib
.tests
.util
import ObjectToURI
as suri
25 PUBLIC_READ_JSON_ACL_TEXT
= '"entity":"allUsers","role":"READER"'
28 @SkipForS3('S3 does not support default object ACLs.')
29 class TestDefacl(case
.GsUtilIntegrationTestCase
):
30 """Integration tests for the defacl command."""
32 _defacl_ch_prefix
= ['defacl', 'ch']
33 _defacl_get_prefix
= ['defacl', 'get']
34 _defacl_set_prefix
= ['defacl', 'set']
36 def _MakeScopeRegex(self
, role
, entity_type
, email_address
):
37 template_regex
= (r
'\{.*"entity":\s*"%s-%s".*"role":\s*"%s".*\}' %
38 (entity_type
, email_address
, role
))
39 return re
.compile(template_regex
, flags
=re
.DOTALL
)
41 def testChangeDefaultAcl(self
):
42 """Tests defacl ch."""
43 bucket
= self
.CreateBucket()
45 test_regex
= self
._MakeScopeRegex
(
46 'OWNER', 'group', self
.GROUP_TEST_ADDRESS
)
47 test_regex2
= self
._MakeScopeRegex
(
48 'READER', 'group', self
.GROUP_TEST_ADDRESS
)
49 json_text
= self
.RunGsUtil(self
._defacl
_get
_prefix
+
50 [suri(bucket
)], return_stdout
=True)
51 self
.assertNotRegexpMatches(json_text
, test_regex
)
53 self
.RunGsUtil(self
._defacl
_ch
_prefix
+
54 ['-g', self
.GROUP_TEST_ADDRESS
+':FC', suri(bucket
)])
55 json_text2
= self
.RunGsUtil(self
._defacl
_get
_prefix
+
56 [suri(bucket
)], return_stdout
=True)
57 self
.assertRegexpMatches(json_text2
, test_regex
)
59 self
.RunGsUtil(self
._defacl
_ch
_prefix
+
60 ['-g', self
.GROUP_TEST_ADDRESS
+':READ', suri(bucket
)])
61 json_text3
= self
.RunGsUtil(self
._defacl
_get
_prefix
+
62 [suri(bucket
)], return_stdout
=True)
63 self
.assertRegexpMatches(json_text3
, test_regex2
)
65 stderr
= self
.RunGsUtil(self
._defacl
_ch
_prefix
+
66 ['-g', self
.GROUP_TEST_ADDRESS
+':WRITE',
68 return_stderr
=True, expected_status
=1)
69 self
.assertIn('WRITER cannot be set as a default object ACL', stderr
)
71 def testChangeDefaultAclPrivate(self
):
72 bucket
= self
.CreateBucket()
73 test_regex
= self
._MakeScopeRegex
(
74 'READER', 'group', self
.GROUP_TEST_ADDRESS
)
75 self
.RunGsUtil(self
._defacl
_set
_prefix
+ ['private', suri(bucket
)])
76 json_text
= self
.RunGsUtil(self
._defacl
_get
_prefix
+
77 [suri(bucket
)], return_stdout
=True)
78 self
.assertRegexpMatches(json_text
, r
'\[\]\s*')
80 self
.RunGsUtil(self
._defacl
_ch
_prefix
+
81 ['-g', self
.GROUP_TEST_ADDRESS
+':READ', suri(bucket
)])
82 json_text2
= self
.RunGsUtil(self
._defacl
_get
_prefix
+
83 [suri(bucket
)], return_stdout
=True)
84 self
.assertRegexpMatches(json_text2
, test_regex
)
86 def testChangeMultipleBuckets(self
):
87 """Tests defacl ch on multiple buckets."""
88 bucket1
= self
.CreateBucket()
89 bucket2
= self
.CreateBucket()
91 test_regex
= self
._MakeScopeRegex
(
92 'READER', 'group', self
.GROUP_TEST_ADDRESS
)
93 json_text
= self
.RunGsUtil(self
._defacl
_get
_prefix
+ [suri(bucket1
)],
95 self
.assertNotRegexpMatches(json_text
, test_regex
)
96 json_text
= self
.RunGsUtil(self
._defacl
_get
_prefix
+ [suri(bucket2
)],
98 self
.assertNotRegexpMatches(json_text
, test_regex
)
100 self
.RunGsUtil(self
._defacl
_ch
_prefix
+
101 ['-g', self
.GROUP_TEST_ADDRESS
+':READ',
102 suri(bucket1
), suri(bucket2
)])
103 json_text
= self
.RunGsUtil(self
._defacl
_get
_prefix
+ [suri(bucket1
)],
105 self
.assertRegexpMatches(json_text
, test_regex
)
106 json_text
= self
.RunGsUtil(self
._defacl
_get
_prefix
+ [suri(bucket2
)],
108 self
.assertRegexpMatches(json_text
, test_regex
)
110 def testChangeMultipleAcls(self
):
111 """Tests defacl ch with multiple ACL entries."""
112 bucket
= self
.CreateBucket()
114 test_regex_group
= self
._MakeScopeRegex
(
115 'READER', 'group', self
.GROUP_TEST_ADDRESS
)
116 test_regex_user
= self
._MakeScopeRegex
(
117 'OWNER', 'user', self
.USER_TEST_ADDRESS
)
118 json_text
= self
.RunGsUtil(self
._defacl
_get
_prefix
+ [suri(bucket
)],
120 self
.assertNotRegexpMatches(json_text
, test_regex_group
)
121 self
.assertNotRegexpMatches(json_text
, test_regex_user
)
123 self
.RunGsUtil(self
._defacl
_ch
_prefix
+
124 ['-g', self
.GROUP_TEST_ADDRESS
+':READ',
125 '-u', self
.USER_TEST_ADDRESS
+':fc', suri(bucket
)])
126 json_text
= self
.RunGsUtil(self
._defacl
_get
_prefix
+ [suri(bucket
)],
128 self
.assertRegexpMatches(json_text
, test_regex_group
)
129 self
.assertRegexpMatches(json_text
, test_regex_user
)
131 def testEmptyDefAcl(self
):
132 bucket
= self
.CreateBucket()
133 self
.RunGsUtil(self
._defacl
_set
_prefix
+ ['private', suri(bucket
)])
134 stdout
= self
.RunGsUtil(self
._defacl
_get
_prefix
+ [suri(bucket
)],
136 self
.assertEquals(stdout
.rstrip(), '[]')
137 self
.RunGsUtil(self
._defacl
_ch
_prefix
+
138 ['-u', self
.USER_TEST_ADDRESS
+':fc', suri(bucket
)])
140 def testDeletePermissionsWithCh(self
):
141 """Tests removing permissions with defacl ch."""
142 bucket
= self
.CreateBucket()
144 test_regex
= self
._MakeScopeRegex
(
145 'OWNER', 'user', self
.USER_TEST_ADDRESS
)
146 json_text
= self
.RunGsUtil(
147 self
._defacl
_get
_prefix
+ [suri(bucket
)], return_stdout
=True)
148 self
.assertNotRegexpMatches(json_text
, test_regex
)
150 self
.RunGsUtil(self
._defacl
_ch
_prefix
+
151 ['-u', self
.USER_TEST_ADDRESS
+':fc', suri(bucket
)])
152 json_text
= self
.RunGsUtil(
153 self
._defacl
_get
_prefix
+ [suri(bucket
)], return_stdout
=True)
154 self
.assertRegexpMatches(json_text
, test_regex
)
156 self
.RunGsUtil(self
._defacl
_ch
_prefix
+
157 ['-d', self
.USER_TEST_ADDRESS
, suri(bucket
)])
158 json_text
= self
.RunGsUtil(
159 self
._defacl
_get
_prefix
+ [suri(bucket
)], return_stdout
=True)
160 self
.assertNotRegexpMatches(json_text
, test_regex
)
162 def testTooFewArgumentsFails(self
):
163 """Tests calling defacl with insufficient number of arguments."""
164 # No arguments for get, but valid subcommand.
165 stderr
= self
.RunGsUtil(self
._defacl
_get
_prefix
, return_stderr
=True,
167 self
.assertIn('command requires at least', stderr
)
169 # No arguments for set, but valid subcommand.
170 stderr
= self
.RunGsUtil(self
._defacl
_set
_prefix
, return_stderr
=True,
172 self
.assertIn('command requires at least', stderr
)
174 # No arguments for ch, but valid subcommand.
175 stderr
= self
.RunGsUtil(self
._defacl
_ch
_prefix
, return_stderr
=True,
177 self
.assertIn('command requires at least', stderr
)
179 # Neither arguments nor subcommand.
180 stderr
= self
.RunGsUtil(['defacl'], return_stderr
=True, expected_status
=1)
181 self
.assertIn('command requires at least', stderr
)
184 class TestDefaclOldAlias(TestDefacl
):
185 _defacl_ch_prefix
= ['chdefacl']
186 _defacl_get_prefix
= ['getdefacl']
187 _defacl_set_prefix
= ['setdefacl']