Updating XTBs based on .GRDs from branch master
[chromium-blink-merge.git] / google_apis / gcm / base / socket_stream.cc
blob48c7da89b40f71e4fdd9917821f542fe3108a654
1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "google_apis/gcm/base/socket_stream.h"
7 #include "base/bind.h"
8 #include "base/callback.h"
9 #include "net/base/io_buffer.h"
10 #include "net/socket/stream_socket.h"
12 namespace gcm {
14 namespace {
16 // TODO(zea): consider having dynamically-sized buffers if this becomes too
17 // expensive.
18 const size_t kDefaultBufferSize = 8*1024;
20 } // namespace
22 SocketInputStream::SocketInputStream(net::StreamSocket* socket)
23 : socket_(socket),
24 io_buffer_(new net::IOBuffer(kDefaultBufferSize)),
25 read_buffer_(new net::DrainableIOBuffer(io_buffer_.get(),
26 kDefaultBufferSize)),
27 next_pos_(0),
28 last_error_(net::OK),
29 weak_ptr_factory_(this) {
30 DCHECK(socket->IsConnected());
33 SocketInputStream::~SocketInputStream() {
36 bool SocketInputStream::Next(const void** data, int* size) {
37 if (GetState() != EMPTY && GetState() != READY) {
38 NOTREACHED() << "Invalid input stream read attempt.";
39 return false;
42 if (GetState() == EMPTY) {
43 DVLOG(1) << "No unread data remaining, ending read.";
44 return false;
47 DCHECK_EQ(GetState(), READY)
48 << " Input stream must have pending data before reading.";
49 DCHECK_LT(next_pos_, read_buffer_->BytesConsumed());
50 *data = io_buffer_->data() + next_pos_;
51 *size = UnreadByteCount();
52 next_pos_ = read_buffer_->BytesConsumed();
53 DVLOG(1) << "Consuming " << *size << " bytes in input buffer.";
54 return true;
57 void SocketInputStream::BackUp(int count) {
58 DCHECK(GetState() == READY || GetState() == EMPTY);
59 // TODO(zea): investigating crbug.com/409985
60 CHECK_GT(count, 0);
61 CHECK_LE(count, next_pos_);
63 next_pos_ -= count;
64 DVLOG(1) << "Backing up " << count << " bytes in input buffer. "
65 << "Current position now at " << next_pos_
66 << " of " << read_buffer_->BytesConsumed();
69 bool SocketInputStream::Skip(int count) {
70 NOTIMPLEMENTED();
71 return false;
74 int64 SocketInputStream::ByteCount() const {
75 DCHECK_NE(GetState(), CLOSED);
76 DCHECK_NE(GetState(), READING);
77 return next_pos_;
80 int SocketInputStream::UnreadByteCount() const {
81 DCHECK_NE(GetState(), CLOSED);
82 DCHECK_NE(GetState(), READING);
83 return read_buffer_->BytesConsumed() - next_pos_;
86 net::Error SocketInputStream::Refresh(const base::Closure& callback,
87 int byte_limit) {
88 DCHECK_NE(GetState(), CLOSED);
89 DCHECK_NE(GetState(), READING);
90 DCHECK_GT(byte_limit, 0);
92 if (byte_limit > read_buffer_->BytesRemaining()) {
93 LOG(ERROR) << "Out of buffer space, closing input stream.";
94 CloseStream(net::ERR_FILE_TOO_BIG, base::Closure());
95 return net::OK;
98 if (!socket_->IsConnected()) {
99 LOG(ERROR) << "Socket was disconnected, closing input stream";
100 CloseStream(net::ERR_CONNECTION_CLOSED, base::Closure());
101 return net::OK;
104 DVLOG(1) << "Refreshing input stream, limit of " << byte_limit << " bytes.";
105 int result =
106 socket_->Read(read_buffer_.get(),
107 byte_limit,
108 base::Bind(&SocketInputStream::RefreshCompletionCallback,
109 weak_ptr_factory_.GetWeakPtr(),
110 callback));
111 DVLOG(1) << "Read returned " << result;
112 if (result == net::ERR_IO_PENDING) {
113 last_error_ = net::ERR_IO_PENDING;
114 return net::ERR_IO_PENDING;
117 RefreshCompletionCallback(base::Closure(), result);
118 return net::OK;
121 void SocketInputStream::RebuildBuffer() {
122 DVLOG(1) << "Rebuilding input stream, consumed "
123 << next_pos_ << " bytes.";
124 DCHECK_NE(GetState(), READING);
125 DCHECK_NE(GetState(), CLOSED);
127 int unread_data_size = 0;
128 const void* unread_data_ptr = NULL;
129 Next(&unread_data_ptr, &unread_data_size);
130 ResetInternal();
132 if (unread_data_ptr != io_buffer_->data()) {
133 DVLOG(1) << "Have " << unread_data_size
134 << " unread bytes remaining, shifting.";
135 // Move any remaining unread data to the start of the buffer;
136 std::memmove(io_buffer_->data(), unread_data_ptr, unread_data_size);
137 } else {
138 DVLOG(1) << "Have " << unread_data_size << " unread bytes remaining.";
140 read_buffer_->DidConsume(unread_data_size);
141 // TODO(zea): investigating crbug.com/409985
142 CHECK_GE(UnreadByteCount(), 0);
145 net::Error SocketInputStream::last_error() const {
146 return last_error_;
149 SocketInputStream::State SocketInputStream::GetState() const {
150 if (last_error_ < net::ERR_IO_PENDING)
151 return CLOSED;
153 if (last_error_ == net::ERR_IO_PENDING)
154 return READING;
156 DCHECK_EQ(last_error_, net::OK);
157 if (read_buffer_->BytesConsumed() == next_pos_)
158 return EMPTY;
160 return READY;
163 void SocketInputStream::RefreshCompletionCallback(
164 const base::Closure& callback, int result) {
165 // If an error occurred before the completion callback could complete, ignore
166 // the result.
167 if (GetState() == CLOSED)
168 return;
170 // Result == 0 implies EOF, which is treated as an error.
171 if (result == 0)
172 result = net::ERR_CONNECTION_CLOSED;
174 DCHECK_NE(result, net::ERR_IO_PENDING);
176 if (result < net::OK) {
177 DVLOG(1) << "Failed to refresh socket: " << result;
178 CloseStream(static_cast<net::Error>(result), callback);
179 return;
182 DCHECK_GT(result, 0);
183 last_error_ = net::OK;
184 read_buffer_->DidConsume(result);
185 // TODO(zea): investigating crbug.com/409985
186 CHECK_GT(UnreadByteCount(), 0);
188 DVLOG(1) << "Refresh complete with " << result << " new bytes. "
189 << "Current position " << next_pos_
190 << " of " << read_buffer_->BytesConsumed() << ".";
192 if (!callback.is_null())
193 callback.Run();
196 void SocketInputStream::ResetInternal() {
197 read_buffer_->SetOffset(0);
198 next_pos_ = 0;
199 last_error_ = net::OK;
200 weak_ptr_factory_.InvalidateWeakPtrs(); // Invalidate any callbacks.
203 void SocketInputStream::CloseStream(net::Error error,
204 const base::Closure& callback) {
205 DCHECK_LT(error, net::ERR_IO_PENDING);
206 ResetInternal();
207 last_error_ = error;
208 LOG(ERROR) << "Closing stream with result " << error;
209 if (!callback.is_null())
210 callback.Run();
213 SocketOutputStream::SocketOutputStream(net::StreamSocket* socket)
214 : socket_(socket),
215 io_buffer_(new net::IOBuffer(kDefaultBufferSize)),
216 write_buffer_(new net::DrainableIOBuffer(io_buffer_.get(),
217 kDefaultBufferSize)),
218 next_pos_(0),
219 last_error_(net::OK),
220 weak_ptr_factory_(this) {
221 DCHECK(socket->IsConnected());
224 SocketOutputStream::~SocketOutputStream() {
227 bool SocketOutputStream::Next(void** data, int* size) {
228 DCHECK_NE(GetState(), CLOSED);
229 DCHECK_NE(GetState(), FLUSHING);
230 if (next_pos_ == write_buffer_->size())
231 return false;
233 *data = write_buffer_->data() + next_pos_;
234 *size = write_buffer_->size() - next_pos_;
235 next_pos_ = write_buffer_->size();
236 return true;
239 void SocketOutputStream::BackUp(int count) {
240 DCHECK_GE(count, 0);
241 if (count > next_pos_)
242 next_pos_ = 0;
243 next_pos_ -= count;
244 DVLOG(1) << "Backing up " << count << " bytes in output buffer. "
245 << next_pos_ << " bytes used.";
248 int64 SocketOutputStream::ByteCount() const {
249 DCHECK_NE(GetState(), CLOSED);
250 DCHECK_NE(GetState(), FLUSHING);
251 return next_pos_;
254 net::Error SocketOutputStream::Flush(const base::Closure& callback) {
255 DCHECK_EQ(GetState(), READY);
257 if (!socket_->IsConnected()) {
258 LOG(ERROR) << "Socket was disconnected, closing output stream";
259 last_error_ = net::ERR_CONNECTION_CLOSED;
260 return net::OK;
263 DVLOG(1) << "Flushing " << next_pos_ << " bytes into socket.";
264 int result =
265 socket_->Write(write_buffer_.get(),
266 next_pos_,
267 base::Bind(&SocketOutputStream::FlushCompletionCallback,
268 weak_ptr_factory_.GetWeakPtr(),
269 callback));
270 DVLOG(1) << "Write returned " << result;
271 if (result == net::ERR_IO_PENDING) {
272 last_error_ = net::ERR_IO_PENDING;
273 return net::ERR_IO_PENDING;
276 FlushCompletionCallback(base::Closure(), result);
277 return net::OK;
280 SocketOutputStream::State SocketOutputStream::GetState() const{
281 if (last_error_ < net::ERR_IO_PENDING)
282 return CLOSED;
284 if (last_error_ == net::ERR_IO_PENDING)
285 return FLUSHING;
287 DCHECK_EQ(last_error_, net::OK);
288 if (next_pos_ == 0)
289 return EMPTY;
291 return READY;
294 net::Error SocketOutputStream::last_error() const {
295 return last_error_;
298 void SocketOutputStream::FlushCompletionCallback(
299 const base::Closure& callback, int result) {
300 // If an error occurred before the completion callback could complete, ignore
301 // the result.
302 if (GetState() == CLOSED)
303 return;
305 // Result == 0 implies EOF, which is treated as an error.
306 if (result == 0)
307 result = net::ERR_CONNECTION_CLOSED;
309 DCHECK_NE(result, net::ERR_IO_PENDING);
311 if (result < net::OK) {
312 LOG(ERROR) << "Failed to flush socket.";
313 last_error_ = static_cast<net::Error>(result);
314 if (!callback.is_null())
315 callback.Run();
316 return;
319 DCHECK_GT(result, net::OK);
320 last_error_ = net::OK;
322 if (write_buffer_->BytesConsumed() + result < next_pos_) {
323 DVLOG(1) << "Partial flush complete. Retrying.";
324 // Only a partial write was completed. Flush again to finish the write.
325 write_buffer_->DidConsume(result);
326 Flush(callback);
327 return;
330 DVLOG(1) << "Socket flush complete.";
331 write_buffer_->SetOffset(0);
332 next_pos_ = 0;
333 if (!callback.is_null())
334 callback.Run();
337 } // namespace gcm