1 # -*- coding: utf-8 -*-
2 # Copyright 2014 Google Inc. All Rights Reserved.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 """Wrapper for use in daisy-chained copies."""
17 from collections
import deque
22 from gslib
.cloud_api
import BadRequestException
23 from gslib
.cloud_api
import CloudApi
24 from gslib
.util
import CreateLock
25 from gslib
.util
import TRANSFER_BUFFER_SIZE
28 # This controls the amount of bytes downloaded per download request.
29 # We do not buffer this many bytes in memory at a time - that is controlled by
30 # DaisyChainWrapper.max_buffer_size. This is the upper bound of bytes that may
31 # be unnecessarily downloaded if there is a break in the resumable upload.
32 _DEFAULT_DOWNLOAD_CHUNK_SIZE
= 1024*1024*100
35 class BufferWrapper(object):
36 """Wraps the download file pointer to use our in-memory buffer."""
38 def __init__(self
, daisy_chain_wrapper
):
39 """Provides a buffered write interface for a file download.
42 daisy_chain_wrapper: DaisyChainWrapper instance to use for buffer and
45 self
.daisy_chain_wrapper
= daisy_chain_wrapper
47 def write(self
, data
): # pylint: disable=invalid-name
48 """Waits for space in the buffer, then writes data to the buffer."""
50 with self
.daisy_chain_wrapper
.lock
:
51 if (self
.daisy_chain_wrapper
.bytes_buffered
<
52 self
.daisy_chain_wrapper
.max_buffer_size
):
54 # Buffer was full, yield thread priority so the upload can pull from it.
58 with self
.daisy_chain_wrapper
.lock
:
59 self
.daisy_chain_wrapper
.buffer.append(data
)
60 self
.daisy_chain_wrapper
.bytes_buffered
+= data_len
63 class DaisyChainWrapper(object):
64 """Wrapper class for daisy-chaining a cloud download to an upload.
66 This class instantiates a BufferWrapper object to buffer the download into
67 memory, consuming a maximum of max_buffer_size. It implements intelligent
68 behavior around read and seek that allow for all of the operations necessary
71 This class is coupled with the XML and JSON implementations in that it
72 expects that small buffers (maximum of TRANSFER_BUFFER_SIZE) in size will be
76 def __init__(self
, src_url
, src_obj_size
, gsutil_api
, progress_callback
=None,
77 download_chunk_size
=_DEFAULT_DOWNLOAD_CHUNK_SIZE
):
78 """Initializes the daisy chain wrapper.
81 src_url: Source CloudUrl to copy from.
82 src_obj_size: Size of source object.
83 gsutil_api: gsutil Cloud API to use for the copy.
84 progress_callback: Optional callback function for progress notifications
85 for the download thread. Receives calls with arguments
86 (bytes_transferred, total_size).
87 download_chunk_size: Integer number of bytes to download per
88 GetObjectMedia request. This is the upper bound of bytes that may be
89 unnecessarily downloaded if there is a break in the resumable upload.
92 # Current read position for the upload file pointer.
96 self
.bytes_buffered
= 0
97 # Maximum amount of bytes in memory at a time.
98 self
.max_buffer_size
= 1024 * 1024 # 1 MiB
100 self
._download
_chunk
_size
= download_chunk_size
102 # We save one buffer's worth of data as a special case for boto,
103 # which seeks back one buffer and rereads to compute hashes. This is
104 # unnecessary because we can just compare cloud hash digests at the end,
105 # but it allows this to work without modfiying boto.
106 self
.last_position
= 0
107 self
.last_data
= None
109 # Protects buffer, position, bytes_buffered, last_position, and last_data.
110 self
.lock
= CreateLock()
112 # Protects download_exception.
113 self
.download_exception_lock
= CreateLock()
115 self
.src_obj_size
= src_obj_size
116 self
.src_url
= src_url
118 # This is safe to use the upload and download thread because the download
119 # thread calls only GetObjectMedia, which creates a new HTTP connection
120 # independent of gsutil_api. Thus, it will not share an HTTP connection
122 self
.gsutil_api
= gsutil_api
124 # If self.download_thread dies due to an exception, it is saved here so
125 # that it can also be raised in the upload thread.
126 self
.download_exception
= None
127 self
.download_thread
= None
128 self
.progress_callback
= progress_callback
129 self
.stop_download
= threading
.Event()
130 self
.StartDownloadThread(progress_callback
=self
.progress_callback
)
132 def StartDownloadThread(self
, start_byte
=0, progress_callback
=None):
133 """Starts the download thread for the source object (from start_byte)."""
135 def PerformDownload(start_byte
, progress_callback
):
136 """Downloads the source object in chunks.
138 This function checks the stop_download event and exits early if it is set.
139 It should be set when there is an error during the daisy-chain upload,
140 then this function can be called again with the upload's current position
144 start_byte: Byte from which to begin the download.
145 progress_callback: Optional callback function for progress
146 notifications. Receives calls with arguments
147 (bytes_transferred, total_size).
149 # TODO: Support resumable downloads. This would require the BufferWrapper
150 # object to support seek() and tell() which requires coordination with
153 while start_byte
+ self
._download
_chunk
_size
< self
.src_obj_size
:
154 self
.gsutil_api
.GetObjectMedia(
155 self
.src_url
.bucket_name
, self
.src_url
.object_name
,
156 BufferWrapper(self
), start_byte
=start_byte
,
157 end_byte
=start_byte
+ self
._download
_chunk
_size
- 1,
158 generation
=self
.src_url
.generation
, object_size
=self
.src_obj_size
,
159 download_strategy
=CloudApi
.DownloadStrategy
.ONE_SHOT
,
160 provider
=self
.src_url
.scheme
, progress_callback
=progress_callback
)
161 if self
.stop_download
.is_set():
162 # Download thread needs to be restarted, so exit.
163 self
.stop_download
.clear()
165 start_byte
+= self
._download
_chunk
_size
166 self
.gsutil_api
.GetObjectMedia(
167 self
.src_url
.bucket_name
, self
.src_url
.object_name
,
168 BufferWrapper(self
), start_byte
=start_byte
,
169 generation
=self
.src_url
.generation
, object_size
=self
.src_obj_size
,
170 download_strategy
=CloudApi
.DownloadStrategy
.ONE_SHOT
,
171 provider
=self
.src_url
.scheme
, progress_callback
=progress_callback
)
172 # We catch all exceptions here because we want to store them.
173 except Exception, e
: # pylint: disable=broad-except
174 # Save the exception so that it can be seen in the upload thread.
175 with self
.download_exception_lock
:
176 self
.download_exception
= e
179 # TODO: If we do gzip encoding transforms mid-transfer, this will fail.
180 self
.download_thread
= threading
.Thread(
181 target
=PerformDownload
,
182 args
=(start_byte
, progress_callback
))
183 self
.download_thread
.start()
185 def read(self
, amt
=None): # pylint: disable=invalid-name
186 """Exposes a stream from the in-memory buffer to the upload."""
187 if self
.position
== self
.src_obj_size
or amt
== 0:
188 # If there is no data left or 0 bytes were requested, return an empty
189 # string so callers can call still call len() and read(0).
191 if amt
is None or amt
> TRANSFER_BUFFER_SIZE
:
192 raise BadRequestException(
193 'Invalid HTTP read size %s during daisy chain operation, '
194 'expected <= %s.' % (amt
, TRANSFER_BUFFER_SIZE
))
200 with self
.download_exception_lock
:
201 if self
.download_exception
:
202 # Download thread died, so we will never recover. Raise the
203 # exception that killed it.
204 raise self
.download_exception
# pylint: disable=raising-bad-type
205 # Buffer was empty, yield thread priority so the download thread can fill.
208 # TODO: Need to handle the caller requesting less than a
209 # transfer_buffer_size worth of data.
210 data
= self
.buffer.popleft()
211 self
.last_position
= self
.position
212 self
.last_data
= data
214 self
.position
+= data_len
215 self
.bytes_buffered
-= data_len
217 raise BadRequestException(
218 'Invalid read during daisy chain operation, got data of size '
219 '%s, expected size %s.' % (data_len
, amt
))
222 def tell(self
): # pylint: disable=invalid-name
226 def seek(self
, offset
, whence
=os
.SEEK_SET
): # pylint: disable=invalid-name
227 restart_download
= False
228 if whence
== os
.SEEK_END
:
231 'Invalid seek during daisy chain operation. Non-zero offset %s '
232 'from os.SEEK_END is not supported' % offset
)
234 self
.last_position
= self
.position
235 self
.last_data
= None
236 # Safe because we check position against src_obj_size in read.
237 self
.position
= self
.src_obj_size
238 elif whence
== os
.SEEK_SET
:
240 if offset
== self
.position
:
242 elif offset
== self
.last_position
:
243 self
.position
= self
.last_position
245 # If we seek to end and then back, we won't have last_data; we'll
246 # get it on the next call to read.
247 self
.buffer.appendleft(self
.last_data
)
248 self
.bytes_buffered
+= len(self
.last_data
)
250 # Once a download is complete, boto seeks to 0 and re-reads to
251 # compute the hash if an md5 isn't already present (for example a GCS
252 # composite object), so we have to re-download the whole object.
253 # Also, when daisy-chaining to a resumable upload, on error the
254 # service may have received any number of the bytes; the download
255 # needs to be restarted from that point.
256 restart_download
= True
259 self
.stop_download
.set()
261 # Consume any remaining bytes in the download thread so that
262 # the thread can exit, then restart the thread at the desired position.
263 while self
.download_thread
.is_alive():
265 while self
.bytes_buffered
:
266 self
.bytes_buffered
-= len(self
.buffer.popleft())
270 self
.position
= offset
271 self
.buffer = deque()
272 self
.bytes_buffered
= 0
273 self
.last_position
= 0
274 self
.last_data
= None
275 self
.StartDownloadThread(start_byte
=offset
,
276 progress_callback
=self
.progress_callback
)
278 raise IOError('Daisy-chain download wrapper does not support '
279 'seek mode %s' % whence
)
281 def seekable(self
): # pylint: disable=invalid-name