Only grant permissions to new extensions from sync if they have the expected version
[chromium-blink-merge.git] / tools / telemetry / third_party / gsutilz / gslib / resumable_streaming_upload.py
bloba32787d3abe710bd057e9cc03852ee53a45cbd01
1 # Copyright 2014 Google Inc. All Rights Reserved.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 """Helper class for streaming resumable uploads."""
16 import collections
17 import os
19 from gslib.exception import CommandException
20 from gslib.util import GetJsonResumableChunkSize
23 class ResumableStreamingJsonUploadWrapper(object):
24 """Wraps an input stream in a buffer for resumable uploads.
26 This class takes a non-seekable input stream, buffers it, and exposes it
27 as a stream with limited seek capabilities such that it can be used in a
28 resumable JSON API upload.
30 max_buffer_size bytes of buffering is supported.
31 """
33 def __init__(self, stream, max_buffer_size, test_small_buffer=False):
34 """Initializes the wrapper.
36 Args:
37 stream: Input stream.
38 max_buffer_size: Maximum size of internal buffer; should be >= the chunk
39 size of the resumable upload API to ensure that at least one full
40 chunk write can be replayed in the event of a server error.
41 test_small_buffer: Skip check for buffer size vs. chunk size, for testing.
42 """
43 self._orig_fp = stream
45 if not test_small_buffer and max_buffer_size < GetJsonResumableChunkSize():
46 raise CommandException('Resumable streaming upload created with buffer '
47 'size %s, JSON resumable upload chunk size %s. '
48 'Buffer size must be >= JSON resumable upload '
49 'chunk size to ensure that uploads can be '
50 'resumed.' % (max_buffer_size,
51 GetJsonResumableChunkSize()))
53 self._max_buffer_size = max_buffer_size
54 self._buffer = collections.deque()
55 self._buffer_start = 0
56 self._buffer_end = 0
57 self._position = 0
59 def read(self, size=-1): # pylint: disable=invalid-name
60 """"Reads from the wrapped stream.
62 Args:
63 size: The amount of bytes to read. If omitted or negative, the entire
64 contents of the stream will be read and returned.
66 Returns:
67 Bytes from the wrapped stream.
68 """
69 read_all_bytes = size is None or size < 0
70 if read_all_bytes:
71 bytes_remaining = self._max_buffer_size
72 else:
73 bytes_remaining = size
74 data = b''
75 buffered_data = []
76 if self._position < self._buffer_end:
77 # There was a backwards seek, so read from the buffer first.
79 # TODO: Performance test to validate if it is worth re-aligning
80 # the buffers in this case. Also, seeking through the buffer for
81 # each read on a long catch-up is probably not performant, but we'd
82 # need a more complex data structure than a deque to get around this.
83 pos_in_buffer = self._buffer_start
84 buffer_index = 0
85 # First, find the start position in the buffer.
86 while pos_in_buffer + len(self._buffer[buffer_index]) < self._position:
87 # When this loop exits, buffer_index will refer to a buffer that
88 # has at least some overlap with self._position, and
89 # pos_in_buffer will be >= self._position
90 pos_in_buffer += len(self._buffer[buffer_index])
91 buffer_index += 1
93 # Read until we've read enough or we're out of buffer.
94 while pos_in_buffer < self._buffer_end and bytes_remaining > 0:
95 buffer_len = len(self._buffer[buffer_index])
96 # This describes how far into the current buffer self._position is.
97 offset_from_position = self._position - pos_in_buffer
98 bytes_available_this_buffer = buffer_len - offset_from_position
99 read_size = min(bytes_available_this_buffer, bytes_remaining)
100 buffered_data.append(
101 self._buffer[buffer_index]
102 [offset_from_position:offset_from_position + read_size])
103 bytes_remaining -= read_size
104 pos_in_buffer += buffer_len
105 buffer_index += 1
106 self._position += read_size
108 # At this point we're guaranteed that if there are any bytes left to read,
109 # then self._position == self._buffer_end, and we can read from the
110 # wrapped stream if needed.
111 if read_all_bytes:
112 # TODO: The user is requesting reading until the end of an
113 # arbitrary length stream, which is bad we'll need to return data
114 # with no size limits; if the stream is sufficiently long, we could run
115 # out of memory. We could break this down into smaller reads and
116 # buffer it as we go, but we're still left returning the data all at
117 # once to the caller. We could raise, but for now trust the caller to
118 # be sane and have enough memory to hold the remaining stream contents.
119 new_data = self._orig_fp.read(size)
120 data_len = len(new_data)
121 if not buffered_data:
122 data = new_data
123 else:
124 buffered_data.append(new_data)
125 data = b''.join(buffered_data)
126 self._position += data_len
127 elif bytes_remaining:
128 new_data = self._orig_fp.read(bytes_remaining)
129 if not buffered_data:
130 data = new_data
131 else:
132 buffered_data.append(new_data)
133 data = b''.join(buffered_data)
134 data_len = len(new_data)
135 if data_len:
136 self._position += data_len
137 self._buffer.append(new_data)
138 self._buffer_end += data_len
139 oldest_data = None
140 while self._buffer_end - self._buffer_start > self._max_buffer_size:
141 oldest_data = self._buffer.popleft()
142 self._buffer_start += len(oldest_data)
143 if oldest_data:
144 refill_amount = self._max_buffer_size - (self._buffer_end -
145 self._buffer_start)
146 if refill_amount:
147 self._buffer.appendleft(oldest_data[-refill_amount:])
148 self._buffer_start -= refill_amount
149 else:
150 data = b''.join(buffered_data) if buffered_data else b''
152 return data
154 def tell(self): # pylint: disable=invalid-name
155 """Returns the current stream position."""
156 return self._position
158 def seekable(self): # pylint: disable=invalid-name
159 """Returns true since limited seek support exists."""
160 return True
162 def seek(self, offset, whence=os.SEEK_SET): # pylint: disable=invalid-name
163 """Seeks on the buffered stream.
165 Args:
166 offset: The offset to seek to; must be within the buffer bounds.
167 whence: Must be os.SEEK_SET.
169 Raises:
170 CommandException if an unsupported seek mode or position is used.
172 if whence == os.SEEK_SET:
173 if offset < self._buffer_start or offset > self._buffer_end:
174 raise CommandException('Unable to resume upload because of limited '
175 'buffering available for streaming uploads. '
176 'Offset %s was requested, but only data from '
177 '%s to %s is buffered.' %
178 (offset, self._buffer_start, self._buffer_end))
179 # Move to a position within the buffer.
180 self._position = offset
181 elif whence == os.SEEK_END:
182 if offset > self._max_buffer_size:
183 raise CommandException('Invalid SEEK_END offset %s on streaming '
184 'upload. Only %s can be buffered.' %
185 (offset, self._max_buffer_size))
186 # Read to the end and rely on buffering to handle the offset.
187 while self.read(self._max_buffer_size):
188 pass
189 # Now we're at the end.
190 self._position -= offset
191 else:
192 raise CommandException('Invalid seek mode on streaming upload. '
193 '(mode %s, offset %s)' % (whence, offset))
195 def close(self): # pylint: disable=invalid-name
196 return self._orig_fp.close()