Roll src/third_party/WebKit e0eac24:489c548 (svn 193311:193320)
[chromium-blink-merge.git] / google_apis / gcm / base / socket_stream.cc
blob44022422136ab192e55d6fa9cf2109ba060cec9c
1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "google_apis/gcm/base/socket_stream.h"
7 #include "base/bind.h"
8 #include "base/callback.h"
9 #include "base/profiler/scoped_tracker.h"
10 #include "net/base/io_buffer.h"
11 #include "net/socket/stream_socket.h"
13 namespace gcm {
15 namespace {
17 // TODO(zea): consider having dynamically-sized buffers if this becomes too
18 // expensive.
19 const uint32 kDefaultBufferSize = 8*1024;
21 } // namespace
23 SocketInputStream::SocketInputStream(net::StreamSocket* socket)
24 : socket_(socket),
25 io_buffer_(new net::IOBuffer(kDefaultBufferSize)),
26 read_buffer_(new net::DrainableIOBuffer(io_buffer_.get(),
27 kDefaultBufferSize)),
28 next_pos_(0),
29 last_error_(net::OK),
30 weak_ptr_factory_(this) {
31 DCHECK(socket->IsConnected());
34 SocketInputStream::~SocketInputStream() {
37 bool SocketInputStream::Next(const void** data, int* size) {
38 if (GetState() != EMPTY && GetState() != READY) {
39 NOTREACHED() << "Invalid input stream read attempt.";
40 return false;
43 if (GetState() == EMPTY) {
44 DVLOG(1) << "No unread data remaining, ending read.";
45 return false;
48 DCHECK_EQ(GetState(), READY)
49 << " Input stream must have pending data before reading.";
50 DCHECK_LT(next_pos_, read_buffer_->BytesConsumed());
51 *data = io_buffer_->data() + next_pos_;
52 *size = UnreadByteCount();
53 next_pos_ = read_buffer_->BytesConsumed();
54 DVLOG(1) << "Consuming " << *size << " bytes in input buffer.";
55 return true;
58 void SocketInputStream::BackUp(int count) {
59 DCHECK(GetState() == READY || GetState() == EMPTY);
60 // TODO(zea): investigating crbug.com/409985
61 CHECK_GT(count, 0);
62 CHECK_LE(count, next_pos_);
64 next_pos_ -= count;
65 DVLOG(1) << "Backing up " << count << " bytes in input buffer. "
66 << "Current position now at " << next_pos_
67 << " of " << read_buffer_->BytesConsumed();
70 bool SocketInputStream::Skip(int count) {
71 NOTIMPLEMENTED();
72 return false;
75 int64 SocketInputStream::ByteCount() const {
76 DCHECK_NE(GetState(), CLOSED);
77 DCHECK_NE(GetState(), READING);
78 return next_pos_;
81 int SocketInputStream::UnreadByteCount() const {
82 DCHECK_NE(GetState(), CLOSED);
83 DCHECK_NE(GetState(), READING);
84 return read_buffer_->BytesConsumed() - next_pos_;
87 net::Error SocketInputStream::Refresh(const base::Closure& callback,
88 int byte_limit) {
89 DCHECK_NE(GetState(), CLOSED);
90 DCHECK_NE(GetState(), READING);
91 DCHECK_GT(byte_limit, 0);
93 if (byte_limit > read_buffer_->BytesRemaining()) {
94 LOG(ERROR) << "Out of buffer space, closing input stream.";
95 CloseStream(net::ERR_FILE_TOO_BIG, base::Closure());
96 return net::OK;
99 if (!socket_->IsConnected()) {
100 LOG(ERROR) << "Socket was disconnected, closing input stream";
101 CloseStream(net::ERR_CONNECTION_CLOSED, base::Closure());
102 return net::OK;
105 DVLOG(1) << "Refreshing input stream, limit of " << byte_limit << " bytes.";
106 int result =
107 socket_->Read(read_buffer_.get(),
108 byte_limit,
109 base::Bind(&SocketInputStream::RefreshCompletionCallback,
110 weak_ptr_factory_.GetWeakPtr(),
111 callback));
112 DVLOG(1) << "Read returned " << result;
113 if (result == net::ERR_IO_PENDING) {
114 last_error_ = net::ERR_IO_PENDING;
115 return net::ERR_IO_PENDING;
118 RefreshCompletionCallback(base::Closure(), result);
119 return net::OK;
122 void SocketInputStream::RebuildBuffer() {
123 DVLOG(1) << "Rebuilding input stream, consumed "
124 << next_pos_ << " bytes.";
125 DCHECK_NE(GetState(), READING);
126 DCHECK_NE(GetState(), CLOSED);
128 int unread_data_size = 0;
129 const void* unread_data_ptr = NULL;
130 Next(&unread_data_ptr, &unread_data_size);
131 ResetInternal();
133 if (unread_data_ptr != io_buffer_->data()) {
134 DVLOG(1) << "Have " << unread_data_size
135 << " unread bytes remaining, shifting.";
136 // Move any remaining unread data to the start of the buffer;
137 std::memmove(io_buffer_->data(), unread_data_ptr, unread_data_size);
138 } else {
139 DVLOG(1) << "Have " << unread_data_size << " unread bytes remaining.";
141 read_buffer_->DidConsume(unread_data_size);
142 // TODO(zea): investigating crbug.com/409985
143 CHECK_GE(UnreadByteCount(), 0);
146 net::Error SocketInputStream::last_error() const {
147 return last_error_;
150 SocketInputStream::State SocketInputStream::GetState() const {
151 if (last_error_ < net::ERR_IO_PENDING)
152 return CLOSED;
154 if (last_error_ == net::ERR_IO_PENDING)
155 return READING;
157 DCHECK_EQ(last_error_, net::OK);
158 if (read_buffer_->BytesConsumed() == next_pos_)
159 return EMPTY;
161 return READY;
164 void SocketInputStream::RefreshCompletionCallback(
165 const base::Closure& callback, int result) {
166 // TODO(pkasting): Remove ScopedTracker below once crbug.com/462788 is fixed.
167 tracked_objects::ScopedTracker tracking_profile(
168 FROM_HERE_WITH_EXPLICIT_FUNCTION(
169 "462788 SocketInputStream::RefreshCompletionCallback"));
171 // If an error occurred before the completion callback could complete, ignore
172 // the result.
173 if (GetState() == CLOSED)
174 return;
176 // Result == 0 implies EOF, which is treated as an error.
177 if (result == 0)
178 result = net::ERR_CONNECTION_CLOSED;
180 DCHECK_NE(result, net::ERR_IO_PENDING);
182 if (result < net::OK) {
183 DVLOG(1) << "Failed to refresh socket: " << result;
184 CloseStream(static_cast<net::Error>(result), callback);
185 return;
188 DCHECK_GT(result, 0);
189 last_error_ = net::OK;
190 read_buffer_->DidConsume(result);
191 // TODO(zea): investigating crbug.com/409985
192 CHECK_GT(UnreadByteCount(), 0);
194 DVLOG(1) << "Refresh complete with " << result << " new bytes. "
195 << "Current position " << next_pos_
196 << " of " << read_buffer_->BytesConsumed() << ".";
198 if (!callback.is_null())
199 callback.Run();
202 void SocketInputStream::ResetInternal() {
203 read_buffer_->SetOffset(0);
204 next_pos_ = 0;
205 last_error_ = net::OK;
206 weak_ptr_factory_.InvalidateWeakPtrs(); // Invalidate any callbacks.
209 void SocketInputStream::CloseStream(net::Error error,
210 const base::Closure& callback) {
211 DCHECK_LT(error, net::ERR_IO_PENDING);
212 ResetInternal();
213 last_error_ = error;
214 LOG(ERROR) << "Closing stream with result " << error;
215 if (!callback.is_null())
216 callback.Run();
219 SocketOutputStream::SocketOutputStream(net::StreamSocket* socket)
220 : socket_(socket),
221 io_buffer_(new net::IOBuffer(kDefaultBufferSize)),
222 write_buffer_(new net::DrainableIOBuffer(io_buffer_.get(),
223 kDefaultBufferSize)),
224 next_pos_(0),
225 last_error_(net::OK),
226 weak_ptr_factory_(this) {
227 DCHECK(socket->IsConnected());
230 SocketOutputStream::~SocketOutputStream() {
233 bool SocketOutputStream::Next(void** data, int* size) {
234 DCHECK_NE(GetState(), CLOSED);
235 DCHECK_NE(GetState(), FLUSHING);
236 if (next_pos_ == write_buffer_->size())
237 return false;
239 *data = write_buffer_->data() + next_pos_;
240 *size = write_buffer_->size() - next_pos_;
241 next_pos_ = write_buffer_->size();
242 return true;
245 void SocketOutputStream::BackUp(int count) {
246 DCHECK_GE(count, 0);
247 if (count > next_pos_)
248 next_pos_ = 0;
249 next_pos_ -= count;
250 DVLOG(1) << "Backing up " << count << " bytes in output buffer. "
251 << next_pos_ << " bytes used.";
254 int64 SocketOutputStream::ByteCount() const {
255 DCHECK_NE(GetState(), CLOSED);
256 DCHECK_NE(GetState(), FLUSHING);
257 return next_pos_;
260 net::Error SocketOutputStream::Flush(const base::Closure& callback) {
261 DCHECK_EQ(GetState(), READY);
263 if (!socket_->IsConnected()) {
264 LOG(ERROR) << "Socket was disconnected, closing output stream";
265 last_error_ = net::ERR_CONNECTION_CLOSED;
266 return net::OK;
269 DVLOG(1) << "Flushing " << next_pos_ << " bytes into socket.";
270 int result =
271 socket_->Write(write_buffer_.get(),
272 next_pos_,
273 base::Bind(&SocketOutputStream::FlushCompletionCallback,
274 weak_ptr_factory_.GetWeakPtr(),
275 callback));
276 DVLOG(1) << "Write returned " << result;
277 if (result == net::ERR_IO_PENDING) {
278 last_error_ = net::ERR_IO_PENDING;
279 return net::ERR_IO_PENDING;
282 FlushCompletionCallback(base::Closure(), result);
283 return net::OK;
286 SocketOutputStream::State SocketOutputStream::GetState() const{
287 if (last_error_ < net::ERR_IO_PENDING)
288 return CLOSED;
290 if (last_error_ == net::ERR_IO_PENDING)
291 return FLUSHING;
293 DCHECK_EQ(last_error_, net::OK);
294 if (next_pos_ == 0)
295 return EMPTY;
297 return READY;
300 net::Error SocketOutputStream::last_error() const {
301 return last_error_;
304 void SocketOutputStream::FlushCompletionCallback(
305 const base::Closure& callback, int result) {
306 // If an error occurred before the completion callback could complete, ignore
307 // the result.
308 if (GetState() == CLOSED)
309 return;
311 // Result == 0 implies EOF, which is treated as an error.
312 if (result == 0)
313 result = net::ERR_CONNECTION_CLOSED;
315 DCHECK_NE(result, net::ERR_IO_PENDING);
317 if (result < net::OK) {
318 LOG(ERROR) << "Failed to flush socket.";
319 last_error_ = static_cast<net::Error>(result);
320 if (!callback.is_null())
321 callback.Run();
322 return;
325 DCHECK_GT(result, net::OK);
326 last_error_ = net::OK;
328 if (write_buffer_->BytesConsumed() + result < next_pos_) {
329 DVLOG(1) << "Partial flush complete. Retrying.";
330 // Only a partial write was completed. Flush again to finish the write.
331 write_buffer_->DidConsume(result);
332 Flush(callback);
333 return;
336 DVLOG(1) << "Socket flush complete.";
337 write_buffer_->SetOffset(0);
338 next_pos_ = 0;
339 if (!callback.is_null())
340 callback.Run();
343 } // namespace gcm