1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "google_apis/gcm/base/socket_stream.h"
8 #include "base/callback.h"
9 #include "net/base/io_buffer.h"
10 #include "net/socket/stream_socket.h"
16 // TODO(zea): consider having dynamically-sized buffers if this becomes too
18 const uint32 kDefaultBufferSize
= 8*1024;
22 SocketInputStream::SocketInputStream(net::StreamSocket
* socket
)
24 io_buffer_(new net::IOBuffer(kDefaultBufferSize
)),
25 read_buffer_(new net::DrainableIOBuffer(io_buffer_
.get(),
29 weak_ptr_factory_(this) {
30 DCHECK(socket
->IsConnected());
33 SocketInputStream::~SocketInputStream() {
36 bool SocketInputStream::Next(const void** data
, int* size
) {
37 if (GetState() != EMPTY
&& GetState() != READY
) {
38 NOTREACHED() << "Invalid input stream read attempt.";
42 if (GetState() == EMPTY
) {
43 DVLOG(1) << "No unread data remaining, ending read.";
47 DCHECK_EQ(GetState(), READY
)
48 << " Input stream must have pending data before reading.";
49 DCHECK_LT(next_pos_
, read_buffer_
->BytesConsumed());
50 *data
= io_buffer_
->data() + next_pos_
;
51 *size
= UnreadByteCount();
52 next_pos_
= read_buffer_
->BytesConsumed();
53 DVLOG(1) << "Consuming " << *size
<< " bytes in input buffer.";
57 void SocketInputStream::BackUp(int count
) {
58 DCHECK(GetState() == READY
|| GetState() == EMPTY
);
60 DCHECK_LE(count
, next_pos_
);
63 DVLOG(1) << "Backing up " << count
<< " bytes in input buffer. "
64 << "Current position now at " << next_pos_
65 << " of " << read_buffer_
->BytesConsumed();
68 bool SocketInputStream::Skip(int count
) {
73 int64
SocketInputStream::ByteCount() const {
74 DCHECK_NE(GetState(), CLOSED
);
75 DCHECK_NE(GetState(), READING
);
79 size_t SocketInputStream::UnreadByteCount() const {
80 DCHECK_NE(GetState(), CLOSED
);
81 DCHECK_NE(GetState(), READING
);
82 return read_buffer_
->BytesConsumed() - next_pos_
;
85 net::Error
SocketInputStream::Refresh(const base::Closure
& callback
,
87 DCHECK_NE(GetState(), CLOSED
);
88 DCHECK_NE(GetState(), READING
);
89 DCHECK_GT(byte_limit
, 0);
91 if (byte_limit
> read_buffer_
->BytesRemaining()) {
92 LOG(ERROR
) << "Out of buffer space, closing input stream.";
93 CloseStream(net::ERR_FILE_TOO_BIG
, base::Closure());
97 if (!socket_
->IsConnected()) {
98 LOG(ERROR
) << "Socket was disconnected, closing input stream";
99 CloseStream(net::ERR_CONNECTION_CLOSED
, base::Closure());
103 DVLOG(1) << "Refreshing input stream, limit of " << byte_limit
<< " bytes.";
105 socket_
->Read(read_buffer_
.get(),
107 base::Bind(&SocketInputStream::RefreshCompletionCallback
,
108 weak_ptr_factory_
.GetWeakPtr(),
110 DVLOG(1) << "Read returned " << result
;
111 if (result
== net::ERR_IO_PENDING
) {
112 last_error_
= net::ERR_IO_PENDING
;
113 return net::ERR_IO_PENDING
;
116 RefreshCompletionCallback(base::Closure(), result
);
120 void SocketInputStream::RebuildBuffer() {
121 DVLOG(1) << "Rebuilding input stream, consumed "
122 << next_pos_
<< " bytes.";
123 DCHECK_NE(GetState(), READING
);
124 DCHECK_NE(GetState(), CLOSED
);
126 int unread_data_size
= 0;
127 const void* unread_data_ptr
= NULL
;
128 Next(&unread_data_ptr
, &unread_data_size
);
131 if (unread_data_ptr
!= io_buffer_
->data()) {
132 DVLOG(1) << "Have " << unread_data_size
133 << " unread bytes remaining, shifting.";
134 // Move any remaining unread data to the start of the buffer;
135 std::memmove(io_buffer_
->data(), unread_data_ptr
, unread_data_size
);
137 DVLOG(1) << "Have " << unread_data_size
<< " unread bytes remaining.";
139 read_buffer_
->DidConsume(unread_data_size
);
142 net::Error
SocketInputStream::last_error() const {
146 SocketInputStream::State
SocketInputStream::GetState() const {
147 if (last_error_
< net::ERR_IO_PENDING
)
150 if (last_error_
== net::ERR_IO_PENDING
)
153 DCHECK_EQ(last_error_
, net::OK
);
154 if (read_buffer_
->BytesConsumed() == next_pos_
)
160 void SocketInputStream::RefreshCompletionCallback(
161 const base::Closure
& callback
, int result
) {
162 // If an error occurred before the completion callback could complete, ignore
164 if (GetState() == CLOSED
)
167 // Result == 0 implies EOF, which is treated as an error.
169 result
= net::ERR_CONNECTION_CLOSED
;
171 DCHECK_NE(result
, net::ERR_IO_PENDING
);
173 if (result
< net::OK
) {
174 DVLOG(1) << "Failed to refresh socket: " << result
;
175 CloseStream(static_cast<net::Error
>(result
), callback
);
179 DCHECK_GT(result
, 0);
180 last_error_
= net::OK
;
181 read_buffer_
->DidConsume(result
);
183 DVLOG(1) << "Refresh complete with " << result
<< " new bytes. "
184 << "Current position " << next_pos_
185 << " of " << read_buffer_
->BytesConsumed() << ".";
187 if (!callback
.is_null())
191 void SocketInputStream::ResetInternal() {
192 read_buffer_
->SetOffset(0);
194 last_error_
= net::OK
;
195 weak_ptr_factory_
.InvalidateWeakPtrs(); // Invalidate any callbacks.
198 void SocketInputStream::CloseStream(net::Error error
,
199 const base::Closure
& callback
) {
200 DCHECK_LT(error
, net::ERR_IO_PENDING
);
203 LOG(ERROR
) << "Closing stream with result " << error
;
204 if (!callback
.is_null())
208 SocketOutputStream::SocketOutputStream(net::StreamSocket
* socket
)
210 io_buffer_(new net::IOBuffer(kDefaultBufferSize
)),
211 write_buffer_(new net::DrainableIOBuffer(io_buffer_
.get(),
212 kDefaultBufferSize
)),
214 last_error_(net::OK
),
215 weak_ptr_factory_(this) {
216 DCHECK(socket
->IsConnected());
219 SocketOutputStream::~SocketOutputStream() {
222 bool SocketOutputStream::Next(void** data
, int* size
) {
223 DCHECK_NE(GetState(), CLOSED
);
224 DCHECK_NE(GetState(), FLUSHING
);
225 if (next_pos_
== write_buffer_
->size())
228 *data
= write_buffer_
->data() + next_pos_
;
229 *size
= write_buffer_
->size() - next_pos_
;
230 next_pos_
= write_buffer_
->size();
234 void SocketOutputStream::BackUp(int count
) {
236 if (count
> next_pos_
)
239 DVLOG(1) << "Backing up " << count
<< " bytes in output buffer. "
240 << next_pos_
<< " bytes used.";
243 int64
SocketOutputStream::ByteCount() const {
244 DCHECK_NE(GetState(), CLOSED
);
245 DCHECK_NE(GetState(), FLUSHING
);
249 net::Error
SocketOutputStream::Flush(const base::Closure
& callback
) {
250 DCHECK_EQ(GetState(), READY
);
252 if (!socket_
->IsConnected()) {
253 LOG(ERROR
) << "Socket was disconnected, closing output stream";
254 last_error_
= net::ERR_CONNECTION_CLOSED
;
258 DVLOG(1) << "Flushing " << next_pos_
<< " bytes into socket.";
260 socket_
->Write(write_buffer_
.get(),
262 base::Bind(&SocketOutputStream::FlushCompletionCallback
,
263 weak_ptr_factory_
.GetWeakPtr(),
265 DVLOG(1) << "Write returned " << result
;
266 if (result
== net::ERR_IO_PENDING
) {
267 last_error_
= net::ERR_IO_PENDING
;
268 return net::ERR_IO_PENDING
;
271 FlushCompletionCallback(base::Closure(), result
);
275 SocketOutputStream::State
SocketOutputStream::GetState() const{
276 if (last_error_
< net::ERR_IO_PENDING
)
279 if (last_error_
== net::ERR_IO_PENDING
)
282 DCHECK_EQ(last_error_
, net::OK
);
289 net::Error
SocketOutputStream::last_error() const {
293 void SocketOutputStream::FlushCompletionCallback(
294 const base::Closure
& callback
, int result
) {
295 // If an error occurred before the completion callback could complete, ignore
297 if (GetState() == CLOSED
)
300 // Result == 0 implies EOF, which is treated as an error.
302 result
= net::ERR_CONNECTION_CLOSED
;
304 DCHECK_NE(result
, net::ERR_IO_PENDING
);
306 if (result
< net::OK
) {
307 LOG(ERROR
) << "Failed to flush socket.";
308 last_error_
= static_cast<net::Error
>(result
);
309 if (!callback
.is_null())
314 DCHECK_GT(result
, net::OK
);
315 last_error_
= net::OK
;
317 if (write_buffer_
->BytesConsumed() + result
< next_pos_
) {
318 DVLOG(1) << "Partial flush complete. Retrying.";
319 // Only a partial write was completed. Flush again to finish the write.
320 write_buffer_
->DidConsume(result
);
325 DVLOG(1) << "Socket flush complete.";
326 write_buffer_
->SetOffset(0);
328 if (!callback
.is_null())