1 # -*- coding: utf-8 -*-
2 # Copyright 2014 Google Inc. All Rights Reserved.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 """Unit tests for resumable streaming upload functions and classes."""
17 from __future__
import absolute_import
19 from hashlib
import md5
23 from gslib
.exception
import CommandException
24 from gslib
.hashing_helper
import CalculateHashesFromContents
25 from gslib
.hashing_helper
import CalculateMd5FromContents
26 from gslib
.resumable_streaming_upload
import ResumableStreamingJsonUploadWrapper
27 import gslib
.tests
.testcase
as testcase
28 from gslib
.util
import GetJsonResumableChunkSize
29 from gslib
.util
import TRANSFER_BUFFER_SIZE
32 _TEST_FILE
= 'test.txt'
35 class TestResumableStreamingJsonUploadWrapper(testcase
.GsUtilUnitTestCase
):
36 """Unit tests for the TestResumableStreamingJsonUploadWrapper class."""
38 _temp_test_file
= None
39 _temp_test_file_contents
= None
40 _temp_test_file_len
= None
42 def _GetTestFile(self
):
43 if not self
._temp
_test
_file
:
44 self
._temp
_test
_file
_contents
= pkgutil
.get_data(
45 'gslib', 'tests/test_data/%s' % _TEST_FILE
)
46 self
._temp
_test
_file
= self
.CreateTempFile(
47 file_name
=_TEST_FILE
, contents
=self
._temp
_test
_file
_contents
)
48 self
._temp
_test
_file
_len
= len(self
._temp
_test
_file
_contents
)
49 return self
._temp
_test
_file
51 def testReadInChunks(self
):
52 tmp_file
= self
._GetTestFile
()
53 with
open(tmp_file
, 'rb') as stream
:
54 wrapper
= ResumableStreamingJsonUploadWrapper(
55 stream
, TRANSFER_BUFFER_SIZE
, test_small_buffer
=True)
56 hash_dict
= {'md5': md5()}
57 # CalculateHashesFromContents reads in chunks, but does not seek.
58 CalculateHashesFromContents(wrapper
, hash_dict
)
59 with
open(tmp_file
, 'rb') as stream
:
60 actual
= CalculateMd5FromContents(stream
)
61 self
.assertEqual(actual
, hash_dict
['md5'].hexdigest())
63 def testReadInChunksWithSeekToBeginning(self
):
64 """Reads one buffer, then seeks to 0 and reads chunks until the end."""
65 tmp_file
= self
._GetTestFile
()
66 for initial_read
in (TRANSFER_BUFFER_SIZE
- 1,
68 TRANSFER_BUFFER_SIZE
+ 1,
69 TRANSFER_BUFFER_SIZE
* 2 - 1,
70 TRANSFER_BUFFER_SIZE
* 2,
71 TRANSFER_BUFFER_SIZE
* 2 + 1,
72 TRANSFER_BUFFER_SIZE
* 3 - 1,
73 TRANSFER_BUFFER_SIZE
* 3,
74 TRANSFER_BUFFER_SIZE
* 3 + 1):
75 for buffer_size
in (TRANSFER_BUFFER_SIZE
- 1,
77 TRANSFER_BUFFER_SIZE
+ 1,
78 self
._temp
_test
_file
_len
- 1,
79 self
._temp
_test
_file
_len
,
80 self
._temp
_test
_file
_len
+ 1):
81 # Can't seek to 0 if the buffer is too small, so we expect an
83 expect_exception
= buffer_size
< self
._temp
_test
_file
_len
84 with
open(tmp_file
, 'rb') as stream
:
85 wrapper
= ResumableStreamingJsonUploadWrapper(
86 stream
, buffer_size
, test_small_buffer
=True)
87 wrapper
.read(initial_read
)
88 # CalculateMd5FromContents seeks to 0, reads in chunks, then seeks
91 hex_digest
= CalculateMd5FromContents(wrapper
)
93 self
.fail('Did not get expected CommandException for '
94 'initial read size %s, buffer size %s' %
95 (initial_read
, buffer_size
))
96 except CommandException
, e
:
97 if not expect_exception
:
98 self
.fail('Got unexpected CommandException "%s" for '
99 'initial read size %s, buffer size %s' %
100 (str(e
), initial_read
, buffer_size
))
101 if not expect_exception
:
102 with
open(tmp_file
, 'rb') as stream
:
103 actual
= CalculateMd5FromContents(stream
)
106 'Digests not equal for initial read size %s, buffer size %s' %
107 (initial_read
, buffer_size
))
109 def _testSeekBack(self
, initial_reads
, buffer_size
, seek_back_amount
):
110 """Tests reading then seeking backwards.
112 This function simulates an upload that is resumed after a connection break.
113 It reads one transfer buffer at a time until it reaches initial_position,
114 then seeks backwards (as if the server did not receive some of the bytes)
115 and reads to the end of the file, ensuring the data read after the seek
116 matches the original file.
119 initial_reads: List of integers containing read sizes to perform
121 buffer_size: Maximum buffer size for the wrapper.
122 seek_back_amount: Number of bytes to seek backward.
125 AssertionError on wrong data returned by the wrapper.
127 tmp_file
= self
._GetTestFile
()
129 for read_size
in initial_reads
:
130 initial_position
+= read_size
131 self
.assertGreaterEqual(
132 buffer_size
, seek_back_amount
,
133 'seek_back_amount must be less than initial position %s '
134 '(but was actually: %s)' % (buffer_size
, seek_back_amount
))
136 initial_position
, self
._temp
_test
_file
_len
,
137 'initial_position must be less than test file size %s '
138 '(but was actually: %s)' % (self
._temp
_test
_file
_len
, initial_position
))
140 with
open(tmp_file
, 'rb') as stream
:
141 wrapper
= ResumableStreamingJsonUploadWrapper(
142 stream
, buffer_size
, test_small_buffer
=True)
144 for read_size
in initial_reads
:
145 data
= wrapper
.read(read_size
)
147 self
._temp
_test
_file
_contents
[position
:position
+ read_size
],
148 data
, 'Data from position %s to %s did not match file contents.' %
149 (position
, position
+ read_size
))
150 position
+= len(data
)
151 wrapper
.seek(initial_position
- seek_back_amount
)
152 self
.assertEqual(wrapper
.tell(),
153 initial_position
- seek_back_amount
)
154 data
= wrapper
.read()
156 self
._temp
_test
_file
_len
- (initial_position
- seek_back_amount
),
158 'Unexpected data length with initial pos %s seek_back_amount %s. '
159 'Expected: %s, actual: %s.' %
160 (initial_position
, seek_back_amount
,
161 self
._temp
_test
_file
_len
- (initial_position
- seek_back_amount
),
164 self
._temp
_test
_file
_contents
[-len(data
):], data
,
165 'Data from position %s to EOF did not match file contents.' %
168 def testReadSeekAndReadToEOF(self
):
169 """Tests performing reads on the wrapper, seeking, then reading to EOF."""
170 for initial_reads
in ([1],
171 [TRANSFER_BUFFER_SIZE
- 1],
172 [TRANSFER_BUFFER_SIZE
],
173 [TRANSFER_BUFFER_SIZE
+ 1],
174 [1, TRANSFER_BUFFER_SIZE
- 1],
175 [1, TRANSFER_BUFFER_SIZE
],
176 [1, TRANSFER_BUFFER_SIZE
+ 1],
177 [TRANSFER_BUFFER_SIZE
- 1, 1],
178 [TRANSFER_BUFFER_SIZE
, 1],
179 [TRANSFER_BUFFER_SIZE
+ 1, 1],
180 [TRANSFER_BUFFER_SIZE
- 1, TRANSFER_BUFFER_SIZE
- 1],
181 [TRANSFER_BUFFER_SIZE
- 1, TRANSFER_BUFFER_SIZE
],
182 [TRANSFER_BUFFER_SIZE
- 1, TRANSFER_BUFFER_SIZE
+ 1],
183 [TRANSFER_BUFFER_SIZE
, TRANSFER_BUFFER_SIZE
- 1],
184 [TRANSFER_BUFFER_SIZE
, TRANSFER_BUFFER_SIZE
],
185 [TRANSFER_BUFFER_SIZE
, TRANSFER_BUFFER_SIZE
+ 1],
186 [TRANSFER_BUFFER_SIZE
+ 1, TRANSFER_BUFFER_SIZE
- 1],
187 [TRANSFER_BUFFER_SIZE
+ 1, TRANSFER_BUFFER_SIZE
],
188 [TRANSFER_BUFFER_SIZE
+ 1, TRANSFER_BUFFER_SIZE
+ 1],
189 [TRANSFER_BUFFER_SIZE
, TRANSFER_BUFFER_SIZE
,
190 TRANSFER_BUFFER_SIZE
]):
192 for read_size
in initial_reads
:
193 initial_position
+= read_size
194 for buffer_size
in (initial_position
,
195 initial_position
+ 1,
196 initial_position
* 2 - 1,
197 initial_position
* 2):
198 for seek_back_amount
in (
199 min(TRANSFER_BUFFER_SIZE
- 1, initial_position
),
200 min(TRANSFER_BUFFER_SIZE
, initial_position
),
201 min(TRANSFER_BUFFER_SIZE
+ 1, initial_position
),
202 min(TRANSFER_BUFFER_SIZE
* 2 - 1, initial_position
),
203 min(TRANSFER_BUFFER_SIZE
* 2, initial_position
),
204 min(TRANSFER_BUFFER_SIZE
* 2 + 1, initial_position
)):
205 self
._testSeekBack
(initial_reads
, buffer_size
, seek_back_amount
)
207 def testBufferSizeLessThanChunkSize(self
):
208 ResumableStreamingJsonUploadWrapper(None, GetJsonResumableChunkSize())
210 ResumableStreamingJsonUploadWrapper(None, GetJsonResumableChunkSize() - 1)
211 self
.fail('Did not get expected CommandException')
212 except CommandException
, e
:
213 self
.assertIn('Buffer size must be >= JSON resumable upload', str(e
))
215 def testSeekPartialBuffer(self
):
216 """Tests seeking back partially within the buffer."""
217 tmp_file
= self
._GetTestFile
()
218 read_size
= TRANSFER_BUFFER_SIZE
219 with
open(tmp_file
, 'rb') as stream
:
220 wrapper
= ResumableStreamingJsonUploadWrapper(
221 stream
, TRANSFER_BUFFER_SIZE
* 3, test_small_buffer
=True)
224 data
= wrapper
.read(read_size
)
226 self
._temp
_test
_file
_contents
[position
:position
+ read_size
],
227 data
, 'Data from position %s to %s did not match file contents.' %
228 (position
, position
+ read_size
))
229 position
+= len(data
)
231 data
= wrapper
.read(read_size
/ 2)
232 # Buffer contents should now be have contents from:
233 # read_size/2 through 7*read_size/2.
234 position
= read_size
/ 2
235 wrapper
.seek(position
)
236 data
= wrapper
.read()
238 self
._temp
_test
_file
_contents
[-len(data
):], data
,
239 'Data from position %s to EOF did not match file contents.' %
242 def testSeekEnd(self
):
243 tmp_file
= self
._GetTestFile
()
244 for buffer_size
in (TRANSFER_BUFFER_SIZE
- 1,
245 TRANSFER_BUFFER_SIZE
,
246 TRANSFER_BUFFER_SIZE
+ 1):
247 for seek_back
in (TRANSFER_BUFFER_SIZE
- 1,
248 TRANSFER_BUFFER_SIZE
,
249 TRANSFER_BUFFER_SIZE
+ 1):
250 expect_exception
= seek_back
> buffer_size
251 with
open(tmp_file
, 'rb') as stream
:
252 wrapper
= ResumableStreamingJsonUploadWrapper(
253 stream
, buffer_size
, test_small_buffer
=True)
255 while wrapper
.read(TRANSFER_BUFFER_SIZE
):
258 wrapper
.seek(seek_back
, whence
=os
.SEEK_END
)
260 self
.fail('Did not get expected CommandException for '
261 'seek_back size %s, buffer size %s' %
262 (seek_back
, buffer_size
))
263 except CommandException
, e
:
264 if not expect_exception
:
265 self
.fail('Got unexpected CommandException "%s" for '
266 'seek_back size %s, buffer size %s' %
267 (str(e
), seek_back
, buffer_size
))