1 # Copyright 2012 Google Inc. All Rights Reserved.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing,
10 # software distributed under the License is distributed on an
11 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
12 # either express or implied. See the License for the specific
13 # language governing permissions and limitations under the License.
15 """Python wrappers for the Google Storage RESTful API."""
21 __all__
= ['ReadBuffer',
30 from . import api_utils
33 from . import rest_api
36 from google
.appengine
.api
import urlfetch
37 from google
.appengine
.ext
import ndb
39 from google
.appengine
.api
import urlfetch
40 from google
.appengine
.ext
import ndb
44 def _get_storage_api(retry_params
, account_id
=None):
45 """Returns storage_api instance for API methods.
48 retry_params: An instance of api_utils.RetryParams. If none,
49 thread's default will be used.
50 account_id: Internal-use only.
53 A storage_api instance to handle urlfetch work to GCS.
54 On dev appserver, this instance by default will talk to a local stub
55 unless common.ACCESS_TOKEN is set. That token will be used to talk
60 api
= _StorageApi(_StorageApi
.full_control_scope
,
61 service_account_id
=account_id
,
62 retry_params
=retry_params
)
63 if common
.local_run() and not common
.get_access_token():
64 api
.api_url
= common
.local_api_url()
65 if common
.get_access_token():
66 api
.token
= common
.get_access_token()
70 class _StorageApi(rest_api
._RestApi
):
71 """A simple wrapper for the Google Storage RESTful API.
73 WARNING: Do NOT directly use this api. It's an implementation detail
74 and is subject to change at any release.
76 All async methods have similar args and returns.
79 path: The path to the Google Storage object or bucket, e.g.
80 '/mybucket/myfile' or '/mybucket'.
81 **kwd: Options for urlfetch. e.g.
82 headers={'content-type': 'text/plain'}, payload='blah'.
85 A ndb Future. When fulfilled, future.get_result() should return
86 a tuple of (status, headers, content) that represents a HTTP response
87 of Google Cloud Storage XML API.
90 api_url
= 'https://storage.googleapis.com'
91 read_only_scope
= 'https://www.googleapis.com/auth/devstorage.read_only'
92 read_write_scope
= 'https://www.googleapis.com/auth/devstorage.read_write'
93 full_control_scope
= 'https://www.googleapis.com/auth/devstorage.full_control'
95 def __getstate__(self
):
96 """Store state as part of serialization/pickling.
99 A tuple (of dictionaries) with the state of this object
101 return (super(_StorageApi
, self
).__getstate
__(), {'api_url': self
.api_url
})
103 def __setstate__(self
, state
):
104 """Restore state as part of deserialization/unpickling.
107 state: the tuple from a __getstate__ call
109 superstate
, localstate
= state
110 super(_StorageApi
, self
).__setstate
__(superstate
)
111 self
.api_url
= localstate
['api_url']
113 @api_utils._eager
_tasklet
115 def do_request_async(self
, url
, method
='GET', headers
=None, payload
=None,
116 deadline
=None, callback
=None):
119 This method translates urlfetch exceptions to more service specific ones.
123 if 'x-goog-api-version' not in headers
:
124 headers
['x-goog-api-version'] = '2'
125 headers
['accept-encoding'] = 'gzip, *'
127 resp_tuple
= yield super(_StorageApi
, self
).do_request_async(
128 url
, method
=method
, headers
=headers
, payload
=payload
,
129 deadline
=deadline
, callback
=callback
)
130 except urlfetch
.DownloadError
, e
:
131 raise errors
.TimeoutError(
132 'Request to Google Cloud Storage timed out.', e
)
134 raise ndb
.Return(resp_tuple
)
137 def post_object_async(self
, path
, **kwds
):
138 """POST to an object."""
139 return self
.do_request_async(self
.api_url
+ path
, 'POST', **kwds
)
141 def put_object_async(self
, path
, **kwds
):
143 return self
.do_request_async(self
.api_url
+ path
, 'PUT', **kwds
)
145 def get_object_async(self
, path
, **kwds
):
148 Note: No payload argument is supported.
150 return self
.do_request_async(self
.api_url
+ path
, 'GET', **kwds
)
152 def delete_object_async(self
, path
, **kwds
):
155 Note: No payload argument is supported.
157 return self
.do_request_async(self
.api_url
+ path
, 'DELETE', **kwds
)
159 def head_object_async(self
, path
, **kwds
):
162 Depending on request headers, HEAD returns various object properties,
163 e.g. Content-Length, Last-Modified, and ETag.
165 Note: No payload argument is supported.
167 return self
.do_request_async(self
.api_url
+ path
, 'HEAD', **kwds
)
169 def get_bucket_async(self
, path
, **kwds
):
171 return self
.do_request_async(self
.api_url
+ path
, 'GET', **kwds
)
174 _StorageApi
= rest_api
.add_sync_methods(_StorageApi
)
177 class ReadBuffer(object):
178 """A class for reading Google storage files."""
180 DEFAULT_BUFFER_SIZE
= 1024 * 1024
181 MAX_REQUEST_SIZE
= 30 * DEFAULT_BUFFER_SIZE
186 buffer_size
=DEFAULT_BUFFER_SIZE
,
187 max_request_size
=MAX_REQUEST_SIZE
):
191 api: A StorageApi instance.
192 path: Quoted/escaped path to the object, e.g. /mybucket/myfile
193 buffer_size: buffer size. The ReadBuffer keeps
194 one buffer. But there may be a pending future that contains
195 a second buffer. This size must be less than max_request_size.
196 max_request_size: Max bytes to request in one urlfetch.
200 self
.name
= api_utils
._unquote
_filename
(path
)
203 assert buffer_size
<= max_request_size
204 self
._buffer
_size
= buffer_size
205 self
._max
_request
_size
= max_request_size
207 self
._buffer
= _Buffer()
210 self
._request
_next
_buffer
()
212 status
, headers
, _
= self
._api
.head_object(path
)
213 errors
.check_status(status
, [200], path
, resp_headers
=headers
)
214 self
._file
_size
= long(headers
['content-length'])
215 self
._check
_etag
(headers
.get('etag'))
216 if self
._file
_size
== 0:
217 self
._buffer
_future
= None
219 def __getstate__(self
):
220 """Store state as part of serialization/pickling.
222 The contents of the read buffer are not stored, only the current offset for
223 data read by the client. A new read buffer is established at unpickling.
224 The head information for the object (file size and etag) are stored to
225 reduce startup and ensure the file has not changed.
228 A dictionary with the state of this object
230 return {'api': self
._api
,
232 'buffer_size': self
._buffer
_size
,
233 'request_size': self
._max
_request
_size
,
235 'size': self
._file
_size
,
236 'offset': self
._offset
,
237 'closed': self
.closed
}
239 def __setstate__(self
, state
):
240 """Restore state as part of deserialization/unpickling.
243 state: the dictionary from a __getstate__ call
245 Along with restoring the state, pre-fetch the next read buffer.
247 self
._api
= state
['api']
248 self
._path
= state
['path']
249 self
.name
= api_utils
._unquote
_filename
(self
._path
)
250 self
._buffer
_size
= state
['buffer_size']
251 self
._max
_request
_size
= state
['request_size']
252 self
._etag
= state
['etag']
253 self
._file
_size
= state
['size']
254 self
._offset
= state
['offset']
255 self
._buffer
= _Buffer()
256 self
.closed
= state
['closed']
257 self
._buffer
_future
= None
258 if self
._remaining
() and not self
.closed
:
259 self
._request
_next
_buffer
()
262 """Iterator interface.
264 Note the ReadBuffer container itself is the iterator. It's
266 'destructive: they consumes all the values and a second iterator
267 cannot easily be created that iterates independently over the same values.
268 You could open the file for the second time, or seek() to the beginning.'
276 line
= self
.readline()
278 raise StopIteration()
281 def readline(self
, size
=-1):
282 """Read one line delimited by '\n' from the file.
284 A trailing newline character is kept in the string. It may be absent when a
285 file ends with an incomplete line. If the size argument is non-negative,
286 it specifies the maximum string size (counting the newline) to return.
287 A negative size is the same as unspecified. Empty string is returned
288 only when EOF is encountered immediately.
291 size: Maximum number of bytes to read. If not specified, readline stops
295 The data read as a string.
298 IOError: When this buffer is closed.
301 if size
== 0 or not self
._remaining
():
305 newline_offset
= self
._buffer
.find_newline(size
)
306 while newline_offset
< 0:
307 data
= self
._buffer
.read(size
)
309 self
._offset
+= len(data
)
310 data_list
.append(data
)
311 if size
== 0 or not self
._remaining
():
312 return ''.join(data_list
)
313 self
._buffer
.reset(self
._buffer
_future
.get_result())
314 self
._request
_next
_buffer
()
315 newline_offset
= self
._buffer
.find_newline(size
)
317 data
= self
._buffer
.read_to_offset(newline_offset
+ 1)
318 self
._offset
+= len(data
)
319 data_list
.append(data
)
321 return ''.join(data_list
)
323 def read(self
, size
=-1):
324 """Read data from RAW file.
327 size: Number of bytes to read as integer. Actual number of bytes
328 read is always equal to size unless EOF is reached. If size is
329 negative or unspecified, read the entire file.
335 IOError: When this buffer is closed.
338 if not self
._remaining
():
343 remaining
= self
._buffer
.remaining()
344 if size
>= 0 and size
< remaining
:
345 data_list
.append(self
._buffer
.read(size
))
350 self
._offset
+= remaining
351 data_list
.append(self
._buffer
.read())
353 if self
._buffer
_future
is None:
354 if size
< 0 or size
>= self
._remaining
():
355 needs
= self
._remaining
()
358 data_list
.extend(self
._get
_segments
(self
._offset
, needs
))
359 self
._offset
+= needs
362 if self
._buffer
_future
:
363 self
._buffer
.reset(self
._buffer
_future
.get_result())
364 self
._buffer
_future
= None
366 if self
._buffer
_future
is None:
367 self
._request
_next
_buffer
()
368 return ''.join(data_list
)
370 def _remaining(self
):
371 return self
._file
_size
- self
._offset
373 def _request_next_buffer(self
):
374 """Request next buffer.
376 Requires self._offset and self._buffer are in consistent state
378 self
._buffer
_future
= None
379 next_offset
= self
._offset
+ self
._buffer
.remaining()
380 if not hasattr(self
, '_file_size') or next_offset
!= self
._file
_size
:
381 self
._buffer
_future
= self
._get
_segment
(next_offset
,
384 def _get_segments(self
, start
, request_size
):
385 """Get segments of the file from Google Storage as a list.
387 A large request is broken into segments to avoid hitting urlfetch
388 response size limit. Each segment is returned from a separate urlfetch.
391 start: start offset to request. Inclusive. Have to be within the
393 request_size: number of bytes to request.
396 A list of file segments in order
401 end
= start
+ request_size
404 while request_size
> self
._max
_request
_size
:
405 futures
.append(self
._get
_segment
(start
, self
._max
_request
_size
))
406 request_size
-= self
._max
_request
_size
407 start
+= self
._max
_request
_size
409 futures
.append(self
._get
_segment
(start
, end
-start
))
410 return [fut
.get_result() for fut
in futures
]
413 def _get_segment(self
, start
, request_size
):
414 """Get a segment of the file from Google Storage.
417 start: start offset of the segment. Inclusive. Have to be within the
419 request_size: number of bytes to request. Have to be small enough
420 for a single urlfetch request. May go over the logical range of the
424 a segment [start, start + request_size) of the file.
427 ValueError: if the file has changed while reading.
429 end
= start
+ request_size
- 1
430 content_range
= '%d-%d' % (start
, end
)
431 headers
= {'Range': 'bytes=' + content_range
}
432 status
, resp_headers
, content
= yield self
._api
.get_object_async(
433 self
._path
, headers
=headers
)
434 errors
.check_status(status
, [200, 206], self
._path
, headers
, resp_headers
)
435 self
._check
_etag
(resp_headers
.get('etag'))
436 raise ndb
.Return(content
)
438 def _check_etag(self
, etag
):
439 """Check if etag is the same across requests to GCS.
441 If self._etag is None, set it. If etag is set, check that the new
442 etag equals the old one.
444 In the __init__ method, we fire one HEAD and one GET request using
445 ndb tasklet. One of them would return first and set the first value.
448 etag: etag from a GCS HTTP response. None if etag is not part of the
449 response header. It could be None for example in the case of GCS
453 ValueError: if two etags are not equal.
457 elif self
._etag
is None:
459 elif self
._etag
!= etag
:
460 raise ValueError('File on GCS has changed while reading.')
465 self
._buffer
_future
= None
470 def __exit__(self
, atype
, value
, traceback
):
474 def seek(self
, offset
, whence
=os
.SEEK_SET
):
475 """Set the file's current offset.
477 Note if the new offset is out of bound, it is adjusted to either 0 or EOF.
480 offset: seek offset as number.
481 whence: seek mode. Supported modes are os.SEEK_SET (absolute seek),
482 os.SEEK_CUR (seek relative to the current position), and os.SEEK_END
483 (seek relative to the end, offset should be negative).
486 IOError: When this buffer is closed.
487 ValueError: When whence is invalid.
492 self
._buffer
_future
= None
494 if whence
== os
.SEEK_SET
:
495 self
._offset
= offset
496 elif whence
== os
.SEEK_CUR
:
497 self
._offset
+= offset
498 elif whence
== os
.SEEK_END
:
499 self
._offset
= self
._file
_size
+ offset
501 raise ValueError('Whence mode %s is invalid.' % str(whence
))
503 self
._offset
= min(self
._offset
, self
._file
_size
)
504 self
._offset
= max(self
._offset
, 0)
505 if self
._remaining
():
506 self
._request
_next
_buffer
()
509 """Tell the file's current offset.
512 current offset in reading this file.
515 IOError: When this buffer is closed.
520 def _check_open(self
):
522 raise IOError('Buffer is closed.')
534 class _Buffer(object):
535 """In memory buffer."""
540 def reset(self
, content
='', offset
=0):
541 self
._buffer
= content
542 self
._offset
= offset
544 def read(self
, size
=-1):
545 """Returns bytes from self._buffer and update related offsets.
548 size: number of bytes to read starting from current offset.
549 Read the entire buffer if negative.
552 Requested bytes from buffer.
555 offset
= len(self
._buffer
)
557 offset
= self
._offset
+ size
558 return self
.read_to_offset(offset
)
560 def read_to_offset(self
, offset
):
561 """Returns bytes from self._buffer and update related offsets.
564 offset: read from current offset to this offset, exclusive.
567 Requested bytes from buffer.
569 assert offset
>= self
._offset
570 result
= self
._buffer
[self
._offset
: offset
]
571 self
._offset
+= len(result
)
575 return len(self
._buffer
) - self
._offset
577 def find_newline(self
, size
=-1):
578 """Search for newline char in buffer starting from current offset.
581 size: number of bytes to search. -1 means all.
584 offset of newline char in buffer. -1 if doesn't exist.
587 return self
._buffer
.find('\n', self
._offset
)
588 return self
._buffer
.find('\n', self
._offset
, self
._offset
+ size
)
591 class StreamingBuffer(object):
592 """A class for creating large objects using the 'resumable' API.
594 The API is a subset of the Python writable stream API sufficient to
595 support writing zip files using the zipfile module.
597 The exact sequence of calls and use of headers is documented at
598 https://developers.google.com/storage/docs/developer-guide#unknownresumables
601 _blocksize
= 256 * 1024
603 _maxrequestsize
= 16 * _blocksize
613 api: A StorageApi instance.
614 path: Quoted/escaped path to the object, e.g. /mybucket/myfile
615 content_type: Optional content-type; Default value is
616 delegate to Google Cloud Storage.
617 gcs_headers: additional gs headers as a str->str dict, e.g
618 {'x-goog-acl': 'private', 'x-goog-meta-foo': 'foo'}.
620 assert self
._maxrequestsize
> self
._blocksize
621 assert self
._maxrequestsize
% self
._blocksize
== 0
625 self
.name
= api_utils
._unquote
_filename
(path
)
628 self
._buffer
= collections
.deque()
633 headers
= {'x-goog-resumable': 'start'}
635 headers
['content-type'] = content_type
637 headers
.update(gcs_headers
)
638 status
, resp_headers
, _
= self
._api
.post_object(path
, headers
=headers
)
639 errors
.check_status(status
, [201], path
, headers
, resp_headers
)
640 loc
= resp_headers
.get('location')
642 raise IOError('No location header found in 201 response')
643 parsed
= urlparse
.urlparse(loc
)
644 self
._path
_with
_token
= '%s?%s' % (self
._path
, parsed
.query
)
646 def __getstate__(self
):
647 """Store state as part of serialization/pickling.
649 The contents of the write buffer are stored. Writes to the underlying
650 storage are required to be on block boundaries (_blocksize) except for the
651 last write. In the worst case the pickled version of this object may be
652 slightly larger than the blocksize.
655 A dictionary with the state of this object
658 return {'api': self
._api
,
660 'path_token': self
._path
_with
_token
,
661 'buffer': self
._buffer
,
662 'buffered': self
._buffered
,
663 'written': self
._written
,
664 'offset': self
._offset
,
665 'closed': self
.closed
}
667 def __setstate__(self
, state
):
668 """Restore state as part of deserialization/unpickling.
671 state: the dictionary from a __getstate__ call
673 self
._api
= state
['api']
674 self
._path
_with
_token
= state
['path_token']
675 self
._buffer
= state
['buffer']
676 self
._buffered
= state
['buffered']
677 self
._written
= state
['written']
678 self
._offset
= state
['offset']
679 self
.closed
= state
['closed']
680 self
._path
= state
['path']
681 self
.name
= api_utils
._unquote
_filename
(self
._path
)
683 def write(self
, data
):
687 data: data to write. str.
690 TypeError: if data is not of type str.
693 if not isinstance(data
, str):
694 raise TypeError('Expected str but got %s.' % type(data
))
697 self
._buffer
.append(data
)
698 self
._buffered
+= len(data
)
699 self
._offset
+= len(data
)
700 if self
._buffered
>= self
._blocksize
:
706 This API is provided because the zipfile module uses it. It is a
707 no-op because Google Storage *requires* that all writes except for
708 the final one are multiples on 256K bytes aligned on 256K-byte
714 """Return the total number of bytes passed to write() so far.
716 (There is no seek() method.)
721 """Flush the buffer and finalize the file.
723 When this returns the new file is available for reading.
727 self
._flush
(finish
=True)
733 def __exit__(self
, atype
, value
, traceback
):
737 def _flush(self
, finish
=False):
738 """Internal API to flush.
740 This is called only when the total amount of buffered data is at
741 least self._blocksize, or to flush the final (incomplete) block of
742 the file with finish=True.
744 flush_len
= 0 if finish
else self
._blocksize
746 while self
._buffered
>= flush_len
:
751 buf
= self
._buffer
.popleft()
753 self
._buffered
-= size
756 if buffered
>= self
._maxrequestsize
:
759 if buffered
> self
._maxrequestsize
:
760 excess
= buffered
- self
._maxrequestsize
764 excess
= buffered
% self
._blocksize
769 assert size
>= excess
771 head
, tail
= over
[:-excess
], over
[-excess
:]
772 self
._buffer
.appendleft(tail
)
773 self
._buffered
+= len(tail
)
776 buffered
+= len(head
)
778 data
= ''.join(buffer)
780 if finish
and not self
._buffered
:
781 file_len
= self
._written
+ len(data
)
782 self
._send
_data
(data
, self
._written
, file_len
)
783 self
._written
+= len(data
)
787 def _send_data(self
, data
, start_offset
, file_len
):
788 """Send the block to the storage service.
790 This is a utility method that does not modify self.
793 data: data to send in str.
794 start_offset: start offset of the data in relation to the file.
795 file_len: an int if this is the last data to append to the file.
799 end_offset
= start_offset
+ len(data
) - 1
802 headers
['content-range'] = ('bytes %d-%d/%s' %
803 (start_offset
, end_offset
, file_len
))
805 headers
['content-range'] = ('bytes */%s' % file_len
)
807 status
, response_headers
, _
= self
._api
.put_object(
808 self
._path
_with
_token
, payload
=data
, headers
=headers
)
813 errors
.check_status(status
, [expected
], self
._path
, headers
,
815 {'upload_path': self
._path
_with
_token
})
817 def _get_offset_from_gcs(self
):
818 """Get the last offset that has been written to GCS.
820 This is a utility method that does not modify self.
823 an int of the last offset written to GCS by this upload, inclusive.
824 -1 means nothing has been written.
826 headers
= {'content-range': 'bytes */*'}
827 status
, response_headers
, _
= self
._api
.put_object(
828 self
._path
_with
_token
, headers
=headers
)
829 errors
.check_status(status
, [308], self
._path
, headers
,
831 {'upload_path': self
._path
_with
_token
})
832 val
= response_headers
.get('range')
835 _
, offset
= val
.rsplit('-', 1)
838 def _force_close(self
, file_length
=None):
839 """Close this buffer on file_length.
841 Finalize this upload immediately on file_length.
842 Contents that are still in memory will not be uploaded.
844 This is a utility method that does not modify self.
847 file_length: file length. Must match what has been uploaded. If None,
848 it will be queried from GCS.
850 if file_length
is None:
851 file_length
= self
._get
_offset
_from
_gcs
() + 1
852 self
._send
_data
('', 0, file_length
)
854 def _check_open(self
):
856 raise IOError('Buffer is closed.')