1 # -*- coding: utf-8 -*-
2 # Copyright 2014 Google Inc. All Rights Reserved.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 """Tests for signurl command."""
16 from datetime
import timedelta
19 import gslib
.commands
.signurl
20 from gslib
.commands
.signurl
import HAVE_OPENSSL
21 from gslib
.exception
import CommandException
22 import gslib
.tests
.testcase
as testcase
23 from gslib
.tests
.testcase
.integration_testcase
import SkipForS3
24 from gslib
.tests
.util
import ObjectToURI
as suri
25 from gslib
.tests
.util
import unittest
28 # pylint: disable=protected-access
29 @unittest.skipUnless(HAVE_OPENSSL
, 'signurl requires pyopenssl.')
30 @SkipForS3('Signed URLs are only supported for gs:// URLs.')
31 class TestSignUrl(testcase
.GsUtilIntegrationTestCase
):
32 """Integration tests for signurl command."""
35 if not hasattr(self
, 'ks_file'):
36 # Dummy pkcs12 keystore generated with the command
38 # openssl req -new -passout pass:notasecret -batch \
39 # -x509 -keyout signed_url_test.key -out signed_url_test.pem \
40 # -subj '/CN=test.apps.googleusercontent.com'
44 # openssl pkcs12 -export -passin pass:notasecret \
45 # -passout pass:notasecret -inkey signed_url_test.key \
46 # -in signed_url_test.pem -out test.p12
50 # rm signed_url_test.key signed_url_test.pem
51 contents
= pkgutil
.get_data('gslib', 'tests/test_data/test.p12')
52 self
.ks_file
= self
.CreateTempFile(contents
=contents
)
55 def testSignUrlOutput(self
):
56 """Tests signurl output of a sample object."""
58 object_url
= self
.CreateObject(contents
='z')
59 stdout
= self
.RunGsUtil(['signurl', '-p', 'notasecret',
60 self
._GetKsFile
(), suri(object_url
)],
63 self
.assertIn(object_url
.uri
, stdout
)
64 self
.assertIn('test@developer.gserviceaccount.com', stdout
)
65 self
.assertIn('Expires=', stdout
)
66 self
.assertIn('\tGET\t', stdout
)
68 stdout
= self
.RunGsUtil(['signurl', '-m', 'PUT', '-p',
69 'notasecret', self
._GetKsFile
(),
70 'gs://test/test.txt'], return_stdout
=True)
72 self
.assertIn('test@developer.gserviceaccount.com', stdout
)
73 self
.assertIn('Expires=', stdout
)
74 self
.assertIn('\tPUT\t', stdout
)
76 def testSignUrlWithURLEncodeRequiredChars(self
):
77 objs
= ['gs://example.org/test 1', 'gs://example.org/test/test 2',
78 'gs://example.org/Аудиоарi хив']
79 expected_partial_urls
= [
80 ('https://storage.googleapis.com/example.org/test%201?GoogleAccessId=te'
81 'st@developer.gserviceaccount.com'),
82 ('https://storage.googleapis.com/example.org/test/test%202?GoogleAccess'
83 'Id=test@developer.gserviceaccount.com'),
84 ('https://storage.googleapis.com/example.org/%D0%90%D1%83%D0%B4%D0%B8%D'
85 '0%BE%D0%B0%D1%80i%20%D1%85%D0%B8%D0%B2?GoogleAccessId=test@developer.'
86 'gserviceaccount.com')
89 self
.assertEquals(len(objs
), len(expected_partial_urls
))
91 cmd_args
= ['signurl', '-p', 'notasecret', self
._GetKsFile
()]
94 stdout
= self
.RunGsUtil(cmd_args
, return_stdout
=True)
96 lines
= stdout
.split('\n')
97 # Header, signed urls, trailing newline.
98 self
.assertEquals(len(lines
), len(objs
) + 2)
100 # Strip the header line to make the indices line up.
103 for obj
, line
, partial_url
in zip(objs
, lines
, expected_partial_urls
):
104 self
.assertIn(obj
, line
)
105 self
.assertIn(partial_url
, line
)
107 def testSignUrlWithWildcard(self
):
108 objs
= ['test1', 'test2', 'test3']
109 bucket
= self
.CreateBucket()
112 for obj_name
in objs
:
113 obj_urls
.append(self
.CreateObject(bucket_uri
=bucket
,
114 object_name
=obj_name
, contents
=''))
116 stdout
= self
.RunGsUtil(['signurl', '-p',
117 'notasecret', self
._GetKsFile
(),
118 suri(bucket
) + '/*'], return_stdout
=True)
120 # Header, 3 signed urls, trailing newline
121 self
.assertEquals(len(stdout
.split('\n')), 5)
123 for obj_url
in obj_urls
:
124 self
.assertIn(suri(obj_url
), stdout
)
126 def testSignUrlOfNonObjectUrl(self
):
127 """Tests the signurl output of a non-existent file."""
128 self
.RunGsUtil(['signurl', self
._GetKsFile
(), 'gs://'],
129 expected_status
=1, stdin
='notasecret')
130 self
.RunGsUtil(['signurl', 'file://tmp/abc'], expected_status
=1)
133 @unittest.skipUnless(HAVE_OPENSSL
, 'signurl requires pyopenssl.')
134 class UnitTestSignUrl(testcase
.GsUtilUnitTestCase
):
135 """Unit tests for the signurl command."""
138 super(UnitTestSignUrl
, self
).setUp()
139 self
.ks_contents
= pkgutil
.get_data('gslib', 'tests/test_data/test.p12')
141 def testDurationSpec(self
):
142 tests
= [('1h', timedelta(hours
=1)),
143 ('2d', timedelta(days
=2)),
144 ('5D', timedelta(days
=5)),
145 ('35s', timedelta(seconds
=35)),
146 ('1h', timedelta(hours
=1)),
147 ('33', timedelta(hours
=33)),
148 ('22m', timedelta(minutes
=22)),
153 for inp
, expected
in tests
:
155 td
= gslib
.commands
.signurl
._DurationToTimeDelta
(inp
)
156 self
.assertEquals(td
, expected
)
157 except CommandException
:
158 if expected
is not None:
159 self
.fail('{0} failed to parse')
161 def testSignPut(self
):
162 """Tests the return value of the _GenSignedUrl function with \
165 expected
= ('https://storage.googleapis.com/test/test.txt?'
166 'GoogleAccessId=test@developer.gserviceaccount.com'
167 '&Expires=1391816302&Signature=A6QbgTA8cXZCtjy2xCr401bdi0e'
168 '7zChTBQ6BX61L7AfytTGEQDMD%2BbvOQKjX7%2FsEh77cmzcSxOEKqTLUD'
169 'bbkPgPqW3j8sGPSRX9VM58bgj1vt9yU8cRKoegFHXAqsATx2G5rc%2FvEl'
170 'iFp9UWMfVj5TaukqlBAVuzZWlyx0aQa9tCKXRtC9YcxORxG41RfiowA2kd8'
171 'XBTQt4M9XTzpVyr5rVMzfr2LvtGf9UAJvlt8p6T6nThl2vy9%2FwBoPcMFa'
172 'OWQcGTagwjyKWDcI1vQPIFQLGftAcv3QnGZxZTtg8pZW%2FIxRJrBhfFfcA'
173 'c62hDKyaU2YssSMy%2FjUJynWx3TIiJjhg%3D%3D')
175 expiration
= 1391816302
176 ks
, client_id
= (gslib
.commands
.signurl
177 ._ReadKeystore
(self
.ks_contents
, 'notasecret'))
178 signed_url
= (gslib
.commands
.signurl
179 ._GenSignedUrl
(ks
.get_privatekey(),
180 client_id
, 'PUT', '',
181 '', expiration
, 'test/test.txt'))
182 self
.assertEquals(expected
, signed_url
)
184 def testSignurlPutContentype(self
):
185 """Tests the return value of the _GenSignedUrl function with \
186 a PUT method and specified content type."""
188 expected
= ('https://storage.googleapis.com/test/test.txt?'
189 'GoogleAccessId=test@developer.gserviceaccount.com&'
190 'Expires=1391816302&Signature=APn%2BCCVcQrfc1fKQXrs'
191 'PEZFj9%2FmASO%2BolR8xwgBY6PbWMkcCtrUVFBauP6t4NxqZO'
192 'UnbOFYTZYzul0RC57ZkEWJp3VcyDIHcn6usEE%2FTzUHhbDCDW'
193 'awAkZS7p8kO8IIACuJlF5s9xZmZzaEBtzF0%2BBOsGgBPBlg2y'
194 'zrhFB6cyyAwNiUgmhLQaVkdobnSwtI5QJkvXoIjJb6hhLiVbLC'
195 'rWdgSZVusjAKGlWCJsM%2B4TkCR%2Bi8AnrkECngcMHuJ9mYbS'
196 'XI1VfEmcnRVcfkKkJGZGctaDIWK%2FMTEmfYCW6USt3Zk2WowJ'
197 'SGuJHqEcFz0kyfAlkpmG%2Fl5E1FQROYqLN2kZQ%3D%3D')
199 expiration
= 1391816302
200 ks
, client_id
= (gslib
.commands
.signurl
201 ._ReadKeystore
(self
.ks_contents
,
203 signed_url
= (gslib
.commands
.signurl
204 ._GenSignedUrl
(ks
.get_privatekey(),
205 client_id
, 'PUT', '',
206 'text/plain', expiration
,
208 self
.assertEquals(expected
, signed_url
)
210 def testSignurlGet(self
):
211 """Tests the return value of the _GenSignedUrl function with \
214 expected
= ('https://storage.googleapis.com/test/test.txt?'
215 'GoogleAccessId=test@developer.gserviceaccount.com&'
216 'Expires=0&Signature=TCZwe32cU%2BMksmLiSY9shHXQjLs1'
217 'F3y%2F%2F1M0UhiK4qsPRVNZVwI7YWvv2qa2Xa%2BVBBafboF0'
218 '1%2BWvx3ZG316pwpNIRR6y7jNnE0LvQmHE8afbm2VYCi%2B2JS'
219 'ZK2YZFJAyEek8si53jhYQEmaRq1zPfGbX84B2FJ8v4iI%2FTC1'
220 'I9OE5vHF0sWwIR9d73JDrFLjaync7QYFWRExdwvqlQX%2BPO3r'
221 'OG9Ns%2BcQFIN7npnsVjH28yNY9gBzXya8LYmNvUx6bWHWZMiu'
222 'fLwDZ0jejNeDZTOfQGRM%2B0vY7NslzaT06W1wo8P7McSkAZEl'
223 'DCbhR0Vo1fturPMwmAhi88f0qzRzywbg%3D%3D')
226 ks
, client_id
= (gslib
.commands
.signurl
227 ._ReadKeystore
(self
.ks_contents
,
229 signed_url
= (gslib
.commands
.signurl
230 ._GenSignedUrl
(ks
.get_privatekey(),
231 client_id
, 'GET', '',
232 '', expiration
, 'test/test.txt'))
233 self
.assertEquals(expected
, signed_url
)