1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "google_apis/gcm/base/socket_stream.h"
8 #include "base/callback.h"
9 #include "net/base/io_buffer.h"
10 #include "net/socket/stream_socket.h"
16 // TODO(zea): consider having dynamically-sized buffers if this becomes too
18 const size_t kDefaultBufferSize
= 8*1024;
22 SocketInputStream::SocketInputStream(net::StreamSocket
* socket
)
24 io_buffer_(new net::IOBuffer(kDefaultBufferSize
)),
25 read_buffer_(new net::DrainableIOBuffer(io_buffer_
.get(),
29 weak_ptr_factory_(this) {
30 DCHECK(socket
->IsConnected());
33 SocketInputStream::~SocketInputStream() {
36 bool SocketInputStream::Next(const void** data
, int* size
) {
37 if (GetState() != EMPTY
&& GetState() != READY
) {
38 NOTREACHED() << "Invalid input stream read attempt.";
42 if (GetState() == EMPTY
) {
43 DVLOG(1) << "No unread data remaining, ending read.";
47 DCHECK_EQ(GetState(), READY
)
48 << " Input stream must have pending data before reading.";
49 DCHECK_LT(next_pos_
, read_buffer_
->BytesConsumed());
50 *data
= io_buffer_
->data() + next_pos_
;
51 *size
= UnreadByteCount();
52 next_pos_
= read_buffer_
->BytesConsumed();
53 DVLOG(1) << "Consuming " << *size
<< " bytes in input buffer.";
57 void SocketInputStream::BackUp(int count
) {
58 DCHECK(GetState() == READY
|| GetState() == EMPTY
);
59 // TODO(zea): investigating crbug.com/409985
61 CHECK_LE(count
, next_pos_
);
64 DVLOG(1) << "Backing up " << count
<< " bytes in input buffer. "
65 << "Current position now at " << next_pos_
66 << " of " << read_buffer_
->BytesConsumed();
69 bool SocketInputStream::Skip(int count
) {
74 int64
SocketInputStream::ByteCount() const {
75 DCHECK_NE(GetState(), CLOSED
);
76 DCHECK_NE(GetState(), READING
);
80 int SocketInputStream::UnreadByteCount() const {
81 DCHECK_NE(GetState(), CLOSED
);
82 DCHECK_NE(GetState(), READING
);
83 return read_buffer_
->BytesConsumed() - next_pos_
;
86 net::Error
SocketInputStream::Refresh(const base::Closure
& callback
,
88 DCHECK_NE(GetState(), CLOSED
);
89 DCHECK_NE(GetState(), READING
);
90 DCHECK_GT(byte_limit
, 0);
92 if (byte_limit
> read_buffer_
->BytesRemaining()) {
93 LOG(ERROR
) << "Out of buffer space, closing input stream.";
94 CloseStream(net::ERR_FILE_TOO_BIG
, base::Closure());
98 if (!socket_
->IsConnected()) {
99 LOG(ERROR
) << "Socket was disconnected, closing input stream";
100 CloseStream(net::ERR_CONNECTION_CLOSED
, base::Closure());
104 DVLOG(1) << "Refreshing input stream, limit of " << byte_limit
<< " bytes.";
106 socket_
->Read(read_buffer_
.get(),
108 base::Bind(&SocketInputStream::RefreshCompletionCallback
,
109 weak_ptr_factory_
.GetWeakPtr(),
111 DVLOG(1) << "Read returned " << result
;
112 if (result
== net::ERR_IO_PENDING
) {
113 last_error_
= net::ERR_IO_PENDING
;
114 return net::ERR_IO_PENDING
;
117 RefreshCompletionCallback(base::Closure(), result
);
121 void SocketInputStream::RebuildBuffer() {
122 DVLOG(1) << "Rebuilding input stream, consumed "
123 << next_pos_
<< " bytes.";
124 DCHECK_NE(GetState(), READING
);
125 DCHECK_NE(GetState(), CLOSED
);
127 int unread_data_size
= 0;
128 const void* unread_data_ptr
= NULL
;
129 Next(&unread_data_ptr
, &unread_data_size
);
132 if (unread_data_ptr
!= io_buffer_
->data()) {
133 DVLOG(1) << "Have " << unread_data_size
134 << " unread bytes remaining, shifting.";
135 // Move any remaining unread data to the start of the buffer;
136 std::memmove(io_buffer_
->data(), unread_data_ptr
, unread_data_size
);
138 DVLOG(1) << "Have " << unread_data_size
<< " unread bytes remaining.";
140 read_buffer_
->DidConsume(unread_data_size
);
141 // TODO(zea): investigating crbug.com/409985
142 CHECK_GE(UnreadByteCount(), 0);
145 net::Error
SocketInputStream::last_error() const {
149 SocketInputStream::State
SocketInputStream::GetState() const {
150 if (last_error_
< net::ERR_IO_PENDING
)
153 if (last_error_
== net::ERR_IO_PENDING
)
156 DCHECK_EQ(last_error_
, net::OK
);
157 if (read_buffer_
->BytesConsumed() == next_pos_
)
163 void SocketInputStream::RefreshCompletionCallback(
164 const base::Closure
& callback
, int result
) {
165 // If an error occurred before the completion callback could complete, ignore
167 if (GetState() == CLOSED
)
170 // Result == 0 implies EOF, which is treated as an error.
172 result
= net::ERR_CONNECTION_CLOSED
;
174 DCHECK_NE(result
, net::ERR_IO_PENDING
);
176 if (result
< net::OK
) {
177 DVLOG(1) << "Failed to refresh socket: " << result
;
178 CloseStream(static_cast<net::Error
>(result
), callback
);
182 DCHECK_GT(result
, 0);
183 last_error_
= net::OK
;
184 read_buffer_
->DidConsume(result
);
185 // TODO(zea): investigating crbug.com/409985
186 CHECK_GT(UnreadByteCount(), 0);
188 DVLOG(1) << "Refresh complete with " << result
<< " new bytes. "
189 << "Current position " << next_pos_
190 << " of " << read_buffer_
->BytesConsumed() << ".";
192 if (!callback
.is_null())
196 void SocketInputStream::ResetInternal() {
197 read_buffer_
->SetOffset(0);
199 last_error_
= net::OK
;
200 weak_ptr_factory_
.InvalidateWeakPtrs(); // Invalidate any callbacks.
203 void SocketInputStream::CloseStream(net::Error error
,
204 const base::Closure
& callback
) {
205 DCHECK_LT(error
, net::ERR_IO_PENDING
);
208 LOG(ERROR
) << "Closing stream with result " << error
;
209 if (!callback
.is_null())
213 SocketOutputStream::SocketOutputStream(net::StreamSocket
* socket
)
215 io_buffer_(new net::IOBuffer(kDefaultBufferSize
)),
216 write_buffer_(new net::DrainableIOBuffer(io_buffer_
.get(),
217 kDefaultBufferSize
)),
219 last_error_(net::OK
),
220 weak_ptr_factory_(this) {
221 DCHECK(socket
->IsConnected());
224 SocketOutputStream::~SocketOutputStream() {
227 bool SocketOutputStream::Next(void** data
, int* size
) {
228 DCHECK_NE(GetState(), CLOSED
);
229 DCHECK_NE(GetState(), FLUSHING
);
230 if (next_pos_
== write_buffer_
->size())
233 *data
= write_buffer_
->data() + next_pos_
;
234 *size
= write_buffer_
->size() - next_pos_
;
235 next_pos_
= write_buffer_
->size();
239 void SocketOutputStream::BackUp(int count
) {
241 if (count
> next_pos_
)
244 DVLOG(1) << "Backing up " << count
<< " bytes in output buffer. "
245 << next_pos_
<< " bytes used.";
248 int64
SocketOutputStream::ByteCount() const {
249 DCHECK_NE(GetState(), CLOSED
);
250 DCHECK_NE(GetState(), FLUSHING
);
254 net::Error
SocketOutputStream::Flush(const base::Closure
& callback
) {
255 DCHECK_EQ(GetState(), READY
);
257 if (!socket_
->IsConnected()) {
258 LOG(ERROR
) << "Socket was disconnected, closing output stream";
259 last_error_
= net::ERR_CONNECTION_CLOSED
;
263 DVLOG(1) << "Flushing " << next_pos_
<< " bytes into socket.";
265 socket_
->Write(write_buffer_
.get(),
267 base::Bind(&SocketOutputStream::FlushCompletionCallback
,
268 weak_ptr_factory_
.GetWeakPtr(),
270 DVLOG(1) << "Write returned " << result
;
271 if (result
== net::ERR_IO_PENDING
) {
272 last_error_
= net::ERR_IO_PENDING
;
273 return net::ERR_IO_PENDING
;
276 FlushCompletionCallback(base::Closure(), result
);
280 SocketOutputStream::State
SocketOutputStream::GetState() const{
281 if (last_error_
< net::ERR_IO_PENDING
)
284 if (last_error_
== net::ERR_IO_PENDING
)
287 DCHECK_EQ(last_error_
, net::OK
);
294 net::Error
SocketOutputStream::last_error() const {
298 void SocketOutputStream::FlushCompletionCallback(
299 const base::Closure
& callback
, int result
) {
300 // If an error occurred before the completion callback could complete, ignore
302 if (GetState() == CLOSED
)
305 // Result == 0 implies EOF, which is treated as an error.
307 result
= net::ERR_CONNECTION_CLOSED
;
309 DCHECK_NE(result
, net::ERR_IO_PENDING
);
311 if (result
< net::OK
) {
312 LOG(ERROR
) << "Failed to flush socket.";
313 last_error_
= static_cast<net::Error
>(result
);
314 if (!callback
.is_null())
319 DCHECK_GT(result
, net::OK
);
320 last_error_
= net::OK
;
322 if (write_buffer_
->BytesConsumed() + result
< next_pos_
) {
323 DVLOG(1) << "Partial flush complete. Retrying.";
324 // Only a partial write was completed. Flush again to finish the write.
325 write_buffer_
->DidConsume(result
);
330 DVLOG(1) << "Socket flush complete.";
331 write_buffer_
->SetOffset(0);
333 if (!callback
.is_null())