1 # -*- coding: utf-8 -*-
2 # Copyright 2014 Google Inc. All Rights Reserved.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 """Media helper functions and classes for Google Cloud Storage JSON API."""
17 from __future__
import absolute_import
27 from apitools
.base
.py
import exceptions
as apitools_exceptions
29 from httplib2
import parse_uri
31 from gslib
.cloud_api
import BadRequestException
32 from gslib
.progress_callback
import ProgressCallbackWithBackoff
33 from gslib
.util
import SSL_TIMEOUT
34 from gslib
.util
import TRANSFER_BUFFER_SIZE
37 class BytesTransferredContainer(object):
38 """Container class for passing number of bytes transferred to lower layers.
40 For resumed transfers or connection rebuilds in the middle of a transfer, we
41 need to rebuild the connection class with how much we've transferred so far.
42 For uploads, we don't know the total number of bytes uploaded until we've
43 queried the server, but we need to create the connection class to pass to
44 httplib2 before we can query the server. This container object allows us to
45 pass a reference into Upload/DownloadCallbackConnection.
49 self
.__bytes
_transferred
= 0
52 def bytes_transferred(self
):
53 return self
.__bytes
_transferred
55 @bytes_transferred.setter
56 def bytes_transferred(self
, value
):
57 self
.__bytes
_transferred
= value
60 class UploadCallbackConnectionClassFactory(object):
61 """Creates a class that can override an httplib2 connection.
63 This is used to provide progress callbacks and disable dumping the upload
64 payload during debug statements. It can later be used to provide on-the-fly
65 hash digestion during upload.
68 def __init__(self
, bytes_uploaded_container
,
69 buffer_size
=TRANSFER_BUFFER_SIZE
,
70 total_size
=0, progress_callback
=None):
71 self
.bytes_uploaded_container
= bytes_uploaded_container
72 self
.buffer_size
= buffer_size
73 self
.total_size
= total_size
74 self
.progress_callback
= progress_callback
76 def GetConnectionClass(self
):
77 """Returns a connection class that overrides send."""
78 outer_bytes_uploaded_container
= self
.bytes_uploaded_container
79 outer_buffer_size
= self
.buffer_size
80 outer_total_size
= self
.total_size
81 outer_progress_callback
= self
.progress_callback
83 class UploadCallbackConnection(httplib2
.HTTPSConnectionWithTimeout
):
84 """Connection class override for uploads."""
85 bytes_uploaded_container
= outer_bytes_uploaded_container
86 # After we instantiate this class, apitools will check with the server
87 # to find out how many bytes remain for a resumable upload. This allows
88 # us to update our progress once based on that number.
89 processed_initial_bytes
= False
90 GCS_JSON_BUFFER_SIZE
= outer_buffer_size
91 callback_processor
= None
92 size
= outer_total_size
94 def __init__(self
, *args
, **kwargs
):
95 kwargs
['timeout'] = SSL_TIMEOUT
96 httplib2
.HTTPSConnectionWithTimeout
.__init
__(self
, *args
, **kwargs
)
99 """Overrides HTTPConnection.send."""
100 if not self
.processed_initial_bytes
:
101 self
.processed_initial_bytes
= True
102 if outer_progress_callback
:
103 self
.callback_processor
= ProgressCallbackWithBackoff(
104 outer_total_size
, outer_progress_callback
)
105 self
.callback_processor
.Progress(
106 self
.bytes_uploaded_container
.bytes_transferred
)
107 # httplib.HTTPConnection.send accepts either a string or a file-like
108 # object (anything that implements read()).
109 if isinstance(data
, basestring
):
110 full_buffer
= cStringIO
.StringIO(data
)
113 partial_buffer
= full_buffer
.read(self
.GCS_JSON_BUFFER_SIZE
)
114 while partial_buffer
:
115 httplib2
.HTTPSConnectionWithTimeout
.send(self
, partial_buffer
)
116 send_length
= len(partial_buffer
)
117 if self
.callback_processor
:
118 # This is the only place where gsutil has control over making a
119 # callback, but here we can't differentiate the metadata bytes
120 # (such as headers and OAuth2 refreshes) sent during an upload
121 # from the actual upload bytes, so we will actually report
122 # slightly more bytes than desired to the callback handler.
124 # One considered/rejected alternative is to move the callbacks
125 # into the HashingFileUploadWrapper which only processes reads on
126 # the bytes. This has the disadvantages of being removed from
127 # where we actually send the bytes and unnecessarily
128 # multi-purposing that class.
129 self
.callback_processor
.Progress(send_length
)
130 partial_buffer
= full_buffer
.read(self
.GCS_JSON_BUFFER_SIZE
)
132 return UploadCallbackConnection
135 def WrapUploadHttpRequest(upload_http
):
136 """Wraps upload_http so we only use our custom connection_type on PUTs.
138 POSTs are used to refresh oauth tokens, and we don't want to process the
139 data sent in those requests.
142 upload_http: httplib2.Http instance to wrap
144 request_orig
= upload_http
.request
145 def NewRequest(uri
, method
='GET', body
=None, headers
=None,
146 redirections
=httplib2
.DEFAULT_MAX_REDIRECTS
,
147 connection_type
=None):
148 if method
== 'PUT' or method
== 'POST':
149 override_connection_type
= connection_type
151 override_connection_type
= None
152 return request_orig(uri
, method
=method
, body
=body
,
153 headers
=headers
, redirections
=redirections
,
154 connection_type
=override_connection_type
)
155 # Replace the request method with our own closure.
156 upload_http
.request
= NewRequest
159 class DownloadCallbackConnectionClassFactory(object):
160 """Creates a class that can override an httplib2 connection.
162 This is used to provide progress callbacks, disable dumping the download
163 payload during debug statements, and provide on-the-fly hash digestion during
164 download. On-the-fly digestion is particularly important because httplib2
165 will decompress gzipped content on-the-fly, thus this class provides our
166 only opportunity to calculate the correct hash for an object that has a
167 gzip hash in the cloud.
170 def __init__(self
, bytes_downloaded_container
,
171 buffer_size
=TRANSFER_BUFFER_SIZE
, total_size
=0,
172 progress_callback
=None, digesters
=None):
173 self
.buffer_size
= buffer_size
174 self
.total_size
= total_size
175 self
.progress_callback
= progress_callback
176 self
.digesters
= digesters
177 self
.bytes_downloaded_container
= bytes_downloaded_container
179 def GetConnectionClass(self
):
180 """Returns a connection class that overrides getresponse."""
182 class DownloadCallbackConnection(httplib2
.HTTPSConnectionWithTimeout
):
183 """Connection class override for downloads."""
184 outer_total_size
= self
.total_size
185 outer_digesters
= self
.digesters
186 outer_progress_callback
= self
.progress_callback
187 outer_bytes_downloaded_container
= self
.bytes_downloaded_container
188 processed_initial_bytes
= False
189 callback_processor
= None
191 def __init__(self
, *args
, **kwargs
):
192 kwargs
['timeout'] = SSL_TIMEOUT
193 httplib2
.HTTPSConnectionWithTimeout
.__init
__(self
, *args
, **kwargs
)
195 def getresponse(self
, buffering
=False):
196 """Wraps an HTTPResponse to perform callbacks and hashing.
198 In this function, self is a DownloadCallbackConnection.
201 buffering: Unused. This function uses a local buffer.
204 HTTPResponse object with wrapped read function.
206 orig_response
= httplib
.HTTPConnection
.getresponse(self
)
207 if orig_response
.status
not in (httplib
.OK
, httplib
.PARTIAL_CONTENT
):
209 orig_read_func
= orig_response
.read
211 def read(amt
=None): # pylint: disable=invalid-name
212 """Overrides HTTPConnection.getresponse.read.
214 This function only supports reads of TRANSFER_BUFFER_SIZE or smaller.
217 amt: Integer n where 0 < n <= TRANSFER_BUFFER_SIZE. This is a
218 keyword argument to match the read function it overrides,
222 Data read from HTTPConnection.
224 if not amt
or amt
> TRANSFER_BUFFER_SIZE
:
225 raise BadRequestException(
226 'Invalid HTTP read size %s during download, expected %s.' %
227 (amt
, TRANSFER_BUFFER_SIZE
))
229 amt
= amt
or TRANSFER_BUFFER_SIZE
231 if not self
.processed_initial_bytes
:
232 self
.processed_initial_bytes
= True
233 if self
.outer_progress_callback
:
234 self
.callback_processor
= ProgressCallbackWithBackoff(
235 self
.outer_total_size
, self
.outer_progress_callback
)
236 self
.callback_processor
.Progress(
237 self
.outer_bytes_downloaded_container
.bytes_transferred
)
239 data
= orig_read_func(amt
)
240 read_length
= len(data
)
241 if self
.callback_processor
:
242 self
.callback_processor
.Progress(read_length
)
243 if self
.outer_digesters
:
244 for alg
in self
.outer_digesters
:
245 self
.outer_digesters
[alg
].update(data
)
247 orig_response
.read
= read
250 return DownloadCallbackConnection
253 def WrapDownloadHttpRequest(download_http
):
254 """Overrides download request functions for an httplib2.Http object.
257 download_http: httplib2.Http.object to wrap / override.
260 Wrapped / overridden httplib2.Http object.
263 # httplib2 has a bug https://code.google.com/p/httplib2/issues/detail?id=305
264 # where custom connection_type is not respected after redirects. This
265 # function is copied from httplib2 and overrides the request function so that
266 # the connection_type is properly passed through.
267 # pylint: disable=protected-access,g-inconsistent-quotes,unused-variable
268 # pylint: disable=g-equals-none,g-doc-return-or-yield
269 # pylint: disable=g-short-docstring-punctuation,g-doc-args
270 # pylint: disable=too-many-statements
271 def OverrideRequest(self
, conn
, host
, absolute_uri
, request_uri
, method
,
272 body
, headers
, redirections
, cachekey
):
273 """Do the actual request using the connection object.
275 Also follow one level of redirects if necessary.
278 auths
= ([(auth
.depth(request_uri
), auth
) for auth
in self
.authorizations
279 if auth
.inscope(host
, request_uri
)])
280 auth
= auths
and sorted(auths
)[0][1] or None
282 auth
.request(method
, request_uri
, headers
, body
)
284 (response
, content
) = self
._conn
_request
(conn
, request_uri
, method
, body
,
288 if auth
.response(response
, body
):
289 auth
.request(method
, request_uri
, headers
, body
)
290 (response
, content
) = self
._conn
_request
(conn
, request_uri
, method
,
292 response
._stale
_digest
= 1
294 if response
.status
== 401:
295 for authorization
in self
._auth
_from
_challenge
(
296 host
, request_uri
, headers
, response
, content
):
297 authorization
.request(method
, request_uri
, headers
, body
)
298 (response
, content
) = self
._conn
_request
(conn
, request_uri
, method
,
300 if response
.status
!= 401:
301 self
.authorizations
.append(authorization
)
302 authorization
.response(response
, body
)
305 if (self
.follow_all_redirects
or (method
in ["GET", "HEAD"])
306 or response
.status
== 303):
307 if self
.follow_redirects
and response
.status
in [300, 301, 302,
309 # Pick out the location header and basically start from the beginning
310 # remembering first to strip the ETag header and decrement our 'depth'
312 if not response
.has_key('location') and response
.status
!= 300:
313 raise httplib2
.RedirectMissingLocation(
314 "Redirected but the response is missing a Location: header.",
316 # Fix-up relative redirects (which violate an RFC 2616 MUST)
317 if response
.has_key('location'):
318 location
= response
['location']
319 (scheme
, authority
, path
, query
, fragment
) = parse_uri(location
)
320 if authority
== None:
321 response
['location'] = urlparse
.urljoin(absolute_uri
, location
)
322 if response
.status
== 301 and method
in ["GET", "HEAD"]:
323 response
['-x-permanent-redirect-url'] = response
['location']
324 if not response
.has_key('content-location'):
325 response
['content-location'] = absolute_uri
326 httplib2
._updateCache(headers
, response
, content
, self
.cache
,
328 if headers
.has_key('if-none-match'):
329 del headers
['if-none-match']
330 if headers
.has_key('if-modified-since'):
331 del headers
['if-modified-since']
332 if ('authorization' in headers
and
333 not self
.forward_authorization_headers
):
334 del headers
['authorization']
335 if response
.has_key('location'):
336 location
= response
['location']
337 old_response
= copy
.deepcopy(response
)
338 if not old_response
.has_key('content-location'):
339 old_response
['content-location'] = absolute_uri
340 redirect_method
= method
341 if response
.status
in [302, 303]:
342 redirect_method
= "GET"
344 (response
, content
) = self
.request(
345 location
, redirect_method
, body
=body
, headers
=headers
,
346 redirections
=redirections
-1,
347 connection_type
=conn
.__class
__)
348 response
.previous
= old_response
350 raise httplib2
.RedirectLimit(
351 "Redirected more times than redirection_limit allows.",
353 elif response
.status
in [200, 203] and method
in ["GET", "HEAD"]:
354 # Don't cache 206's since we aren't going to handle byte range
356 if not response
.has_key('content-location'):
357 response
['content-location'] = absolute_uri
358 httplib2
._updateCache(headers
, response
, content
, self
.cache
,
361 return (response
, content
)
363 # Wrap download_http so we do not use our custom connection_type
364 # on POSTS, which are used to refresh oauth tokens. We don't want to
365 # process the data received in those requests.
366 request_orig
= download_http
.request
367 def NewRequest(uri
, method
='GET', body
=None, headers
=None,
368 redirections
=httplib2
.DEFAULT_MAX_REDIRECTS
,
369 connection_type
=None):
371 return request_orig(uri
, method
=method
, body
=body
,
372 headers
=headers
, redirections
=redirections
,
373 connection_type
=None)
375 return request_orig(uri
, method
=method
, body
=body
,
376 headers
=headers
, redirections
=redirections
,
377 connection_type
=connection_type
)
379 # Replace the request methods with our own closures.
380 download_http
._request
= types
.MethodType(OverrideRequest
, download_http
)
381 download_http
.request
= NewRequest
386 class HttpWithNoRetries(httplib2
.Http
):
387 """httplib2.Http variant that does not retry.
389 httplib2 automatically retries requests according to httplib2.RETRIES, but
390 in certain cases httplib2 ignores the RETRIES value and forces a retry.
391 Because httplib2 does not handle the case where the underlying request body
392 is a stream, a retry may cause a non-idempotent write as the stream is
393 partially consumed and not reset before the retry occurs.
395 Here we override _conn_request to disable retries unequivocally, so that
396 uploads may be retried at higher layers that properly handle stream request
400 def _conn_request(self
, conn
, request_uri
, method
, body
, headers
): # pylint: disable=too-many-statements
403 if hasattr(conn
, 'sock') and conn
.sock
is None:
405 conn
.request(method
, request_uri
, body
, headers
)
406 except socket
.timeout
:
408 except socket
.gaierror
:
410 raise httplib2
.ServerNotFoundError(
411 'Unable to find the server at %s' % conn
.host
)
412 except httplib2
.ssl_SSLError
:
415 except socket
.error
, e
:
417 if hasattr(e
, 'args'):
418 err
= getattr(e
, 'args')[0]
421 if err
== httplib2
.errno
.ECONNREFUSED
: # Connection refused
423 except httplib
.HTTPException
:
427 response
= conn
.getresponse()
428 except (socket
.error
, httplib
.HTTPException
):
436 content
= response
.read()
437 response
= httplib2
.Response(response
)
439 # pylint: disable=protected-access
440 content
= httplib2
._decompressContent(response
, content
)
441 return (response
, content
)
444 class HttpWithDownloadStream(httplib2
.Http
):
445 """httplib2.Http variant that only pushes bytes through a stream.
447 httplib2 handles media by storing entire chunks of responses in memory, which
448 is undesirable particularly when multiple instances are used during
449 multi-threaded/multi-process copy. This class copies and then overrides some
450 httplib2 functions to use a streaming copy approach that uses small memory
453 Also disables httplib2 retries (for reasons stated in the HttpWithNoRetries
457 def __init__(self
, stream
=None, *args
, **kwds
):
459 raise apitools_exceptions
.InvalidUserInputError(
460 'Cannot create HttpWithDownloadStream with no stream')
461 self
._stream
= stream
462 self
._logger
= logging
.getLogger()
463 super(HttpWithDownloadStream
, self
).__init
__(*args
, **kwds
)
469 def _conn_request(self
, conn
, request_uri
, method
, body
, headers
): # pylint: disable=too-many-statements
471 if hasattr(conn
, 'sock') and conn
.sock
is None:
473 conn
.request(method
, request_uri
, body
, headers
)
474 except socket
.timeout
:
476 except socket
.gaierror
:
478 raise httplib2
.ServerNotFoundError(
479 'Unable to find the server at %s' % conn
.host
)
480 except httplib2
.ssl_SSLError
:
483 except socket
.error
, e
:
485 if hasattr(e
, 'args'):
486 err
= getattr(e
, 'args')[0]
489 if err
== httplib2
.errno
.ECONNREFUSED
: # Connection refused
491 except httplib
.HTTPException
:
492 # Just because the server closed the connection doesn't apparently mean
493 # that the server didn't send a response.
497 response
= conn
.getresponse()
498 except (socket
.error
, httplib
.HTTPException
):
505 response
= httplib2
.Response(response
)
507 if response
.status
in (httplib
.OK
, httplib
.PARTIAL_CONTENT
):
508 content_length
= None
509 if hasattr(response
, 'msg'):
510 content_length
= response
.getheader('content-length')
511 http_stream
= response
514 new_data
= http_stream
.read(TRANSFER_BUFFER_SIZE
)
516 self
.stream
.write(new_data
)
517 bytes_read
+= len(new_data
)
521 if (content_length
is not None and
522 long(bytes_read
) != long(content_length
)):
523 # The input stream terminated before we were able to read the
524 # entire contents, possibly due to a network condition. Set
525 # content-length to indicate how many bytes we actually read.
527 logging
.DEBUG
, 'Only got %s bytes out of content-length %s '
528 'for request URI %s. Resetting content-length to match '
529 'bytes read.', bytes_read
, content_length
, request_uri
)
530 response
.msg
['content-length'] = str(bytes_read
)
531 response
= httplib2
.Response(response
)
533 # We fall back to the current httplib2 behavior if we're
534 # not processing bytes (eg it's a redirect).
535 content
= response
.read()
536 response
= httplib2
.Response(response
)
537 # pylint: disable=protected-access
538 content
= httplib2
._decompressContent(response
, content
)
539 return (response
, content
)