1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "google_apis/gcm/base/socket_stream.h"
8 #include "base/callback.h"
9 #include "base/profiler/scoped_tracker.h"
10 #include "net/base/io_buffer.h"
11 #include "net/socket/stream_socket.h"
17 // TODO(zea): consider having dynamically-sized buffers if this becomes too
19 const uint32 kDefaultBufferSize
= 8*1024;
23 SocketInputStream::SocketInputStream(net::StreamSocket
* socket
)
25 io_buffer_(new net::IOBuffer(kDefaultBufferSize
)),
26 read_buffer_(new net::DrainableIOBuffer(io_buffer_
.get(),
30 weak_ptr_factory_(this) {
31 DCHECK(socket
->IsConnected());
34 SocketInputStream::~SocketInputStream() {
37 bool SocketInputStream::Next(const void** data
, int* size
) {
38 if (GetState() != EMPTY
&& GetState() != READY
) {
39 NOTREACHED() << "Invalid input stream read attempt.";
43 if (GetState() == EMPTY
) {
44 DVLOG(1) << "No unread data remaining, ending read.";
48 DCHECK_EQ(GetState(), READY
)
49 << " Input stream must have pending data before reading.";
50 DCHECK_LT(next_pos_
, read_buffer_
->BytesConsumed());
51 *data
= io_buffer_
->data() + next_pos_
;
52 *size
= UnreadByteCount();
53 next_pos_
= read_buffer_
->BytesConsumed();
54 DVLOG(1) << "Consuming " << *size
<< " bytes in input buffer.";
58 void SocketInputStream::BackUp(int count
) {
59 DCHECK(GetState() == READY
|| GetState() == EMPTY
);
60 // TODO(zea): investigating crbug.com/409985
62 CHECK_LE(count
, next_pos_
);
65 DVLOG(1) << "Backing up " << count
<< " bytes in input buffer. "
66 << "Current position now at " << next_pos_
67 << " of " << read_buffer_
->BytesConsumed();
70 bool SocketInputStream::Skip(int count
) {
75 int64
SocketInputStream::ByteCount() const {
76 DCHECK_NE(GetState(), CLOSED
);
77 DCHECK_NE(GetState(), READING
);
81 int SocketInputStream::UnreadByteCount() const {
82 DCHECK_NE(GetState(), CLOSED
);
83 DCHECK_NE(GetState(), READING
);
84 return read_buffer_
->BytesConsumed() - next_pos_
;
87 net::Error
SocketInputStream::Refresh(const base::Closure
& callback
,
89 DCHECK_NE(GetState(), CLOSED
);
90 DCHECK_NE(GetState(), READING
);
91 DCHECK_GT(byte_limit
, 0);
93 if (byte_limit
> read_buffer_
->BytesRemaining()) {
94 LOG(ERROR
) << "Out of buffer space, closing input stream.";
95 CloseStream(net::ERR_FILE_TOO_BIG
, base::Closure());
99 if (!socket_
->IsConnected()) {
100 LOG(ERROR
) << "Socket was disconnected, closing input stream";
101 CloseStream(net::ERR_CONNECTION_CLOSED
, base::Closure());
105 DVLOG(1) << "Refreshing input stream, limit of " << byte_limit
<< " bytes.";
107 socket_
->Read(read_buffer_
.get(),
109 base::Bind(&SocketInputStream::RefreshCompletionCallback
,
110 weak_ptr_factory_
.GetWeakPtr(),
112 DVLOG(1) << "Read returned " << result
;
113 if (result
== net::ERR_IO_PENDING
) {
114 last_error_
= net::ERR_IO_PENDING
;
115 return net::ERR_IO_PENDING
;
118 RefreshCompletionCallback(base::Closure(), result
);
122 void SocketInputStream::RebuildBuffer() {
123 DVLOG(1) << "Rebuilding input stream, consumed "
124 << next_pos_
<< " bytes.";
125 DCHECK_NE(GetState(), READING
);
126 DCHECK_NE(GetState(), CLOSED
);
128 int unread_data_size
= 0;
129 const void* unread_data_ptr
= NULL
;
130 Next(&unread_data_ptr
, &unread_data_size
);
133 if (unread_data_ptr
!= io_buffer_
->data()) {
134 DVLOG(1) << "Have " << unread_data_size
135 << " unread bytes remaining, shifting.";
136 // Move any remaining unread data to the start of the buffer;
137 std::memmove(io_buffer_
->data(), unread_data_ptr
, unread_data_size
);
139 DVLOG(1) << "Have " << unread_data_size
<< " unread bytes remaining.";
141 read_buffer_
->DidConsume(unread_data_size
);
142 // TODO(zea): investigating crbug.com/409985
143 CHECK_GE(UnreadByteCount(), 0);
146 net::Error
SocketInputStream::last_error() const {
150 SocketInputStream::State
SocketInputStream::GetState() const {
151 if (last_error_
< net::ERR_IO_PENDING
)
154 if (last_error_
== net::ERR_IO_PENDING
)
157 DCHECK_EQ(last_error_
, net::OK
);
158 if (read_buffer_
->BytesConsumed() == next_pos_
)
164 void SocketInputStream::RefreshCompletionCallback(
165 const base::Closure
& callback
, int result
) {
166 // TODO(vadimt): Remove ScopedTracker below once crbug.com/418183 is fixed.
167 tracked_objects::ScopedTracker
tracking_profile(
168 FROM_HERE_WITH_EXPLICIT_FUNCTION(
169 "418183 DoReadCallback => SocketInputStream::RefreshCompletionC..."));
171 // If an error occurred before the completion callback could complete, ignore
173 if (GetState() == CLOSED
)
176 // Result == 0 implies EOF, which is treated as an error.
178 result
= net::ERR_CONNECTION_CLOSED
;
180 DCHECK_NE(result
, net::ERR_IO_PENDING
);
182 if (result
< net::OK
) {
183 DVLOG(1) << "Failed to refresh socket: " << result
;
184 CloseStream(static_cast<net::Error
>(result
), callback
);
188 DCHECK_GT(result
, 0);
189 last_error_
= net::OK
;
190 read_buffer_
->DidConsume(result
);
191 // TODO(zea): investigating crbug.com/409985
192 CHECK_GT(UnreadByteCount(), 0);
194 DVLOG(1) << "Refresh complete with " << result
<< " new bytes. "
195 << "Current position " << next_pos_
196 << " of " << read_buffer_
->BytesConsumed() << ".";
198 if (!callback
.is_null())
202 void SocketInputStream::ResetInternal() {
203 read_buffer_
->SetOffset(0);
205 last_error_
= net::OK
;
206 weak_ptr_factory_
.InvalidateWeakPtrs(); // Invalidate any callbacks.
209 void SocketInputStream::CloseStream(net::Error error
,
210 const base::Closure
& callback
) {
211 DCHECK_LT(error
, net::ERR_IO_PENDING
);
214 LOG(ERROR
) << "Closing stream with result " << error
;
215 if (!callback
.is_null())
219 SocketOutputStream::SocketOutputStream(net::StreamSocket
* socket
)
221 io_buffer_(new net::IOBuffer(kDefaultBufferSize
)),
222 write_buffer_(new net::DrainableIOBuffer(io_buffer_
.get(),
223 kDefaultBufferSize
)),
225 last_error_(net::OK
),
226 weak_ptr_factory_(this) {
227 DCHECK(socket
->IsConnected());
230 SocketOutputStream::~SocketOutputStream() {
233 bool SocketOutputStream::Next(void** data
, int* size
) {
234 DCHECK_NE(GetState(), CLOSED
);
235 DCHECK_NE(GetState(), FLUSHING
);
236 if (next_pos_
== write_buffer_
->size())
239 *data
= write_buffer_
->data() + next_pos_
;
240 *size
= write_buffer_
->size() - next_pos_
;
241 next_pos_
= write_buffer_
->size();
245 void SocketOutputStream::BackUp(int count
) {
247 if (count
> next_pos_
)
250 DVLOG(1) << "Backing up " << count
<< " bytes in output buffer. "
251 << next_pos_
<< " bytes used.";
254 int64
SocketOutputStream::ByteCount() const {
255 DCHECK_NE(GetState(), CLOSED
);
256 DCHECK_NE(GetState(), FLUSHING
);
260 net::Error
SocketOutputStream::Flush(const base::Closure
& callback
) {
261 DCHECK_EQ(GetState(), READY
);
263 if (!socket_
->IsConnected()) {
264 LOG(ERROR
) << "Socket was disconnected, closing output stream";
265 last_error_
= net::ERR_CONNECTION_CLOSED
;
269 DVLOG(1) << "Flushing " << next_pos_
<< " bytes into socket.";
271 socket_
->Write(write_buffer_
.get(),
273 base::Bind(&SocketOutputStream::FlushCompletionCallback
,
274 weak_ptr_factory_
.GetWeakPtr(),
276 DVLOG(1) << "Write returned " << result
;
277 if (result
== net::ERR_IO_PENDING
) {
278 last_error_
= net::ERR_IO_PENDING
;
279 return net::ERR_IO_PENDING
;
282 FlushCompletionCallback(base::Closure(), result
);
286 SocketOutputStream::State
SocketOutputStream::GetState() const{
287 if (last_error_
< net::ERR_IO_PENDING
)
290 if (last_error_
== net::ERR_IO_PENDING
)
293 DCHECK_EQ(last_error_
, net::OK
);
300 net::Error
SocketOutputStream::last_error() const {
304 void SocketOutputStream::FlushCompletionCallback(
305 const base::Closure
& callback
, int result
) {
306 // If an error occurred before the completion callback could complete, ignore
308 if (GetState() == CLOSED
)
311 // Result == 0 implies EOF, which is treated as an error.
313 result
= net::ERR_CONNECTION_CLOSED
;
315 DCHECK_NE(result
, net::ERR_IO_PENDING
);
317 if (result
< net::OK
) {
318 LOG(ERROR
) << "Failed to flush socket.";
319 last_error_
= static_cast<net::Error
>(result
);
320 if (!callback
.is_null())
325 DCHECK_GT(result
, net::OK
);
326 last_error_
= net::OK
;
328 if (write_buffer_
->BytesConsumed() + result
< next_pos_
) {
329 DVLOG(1) << "Partial flush complete. Retrying.";
330 // Only a partial write was completed. Flush again to finish the write.
331 write_buffer_
->DidConsume(result
);
336 DVLOG(1) << "Socket flush complete.";
337 write_buffer_
->SetOffset(0);
339 if (!callback
.is_null())