1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "mojo/services/network/tcp_connected_socket_impl.h"
7 #include "base/message_loop/message_loop.h"
8 #include "mojo/services/network/net_adapters.h"
9 #include "net/base/net_errors.h"
13 TCPConnectedSocketImpl::TCPConnectedSocketImpl(
14 scoped_ptr
<net::TCPSocket
> socket
,
15 ScopedDataPipeConsumerHandle send_stream
,
16 ScopedDataPipeProducerHandle receive_stream
)
17 : socket_(socket
.Pass()),
18 send_stream_(send_stream
.Pass()),
19 receive_stream_(receive_stream
.Pass()),
20 weak_ptr_factory_(this) {
21 // Queue up async communication.
26 TCPConnectedSocketImpl::~TCPConnectedSocketImpl() {
29 void TCPConnectedSocketImpl::ReceiveMore() {
30 DCHECK(!pending_receive_
.get());
33 MojoResult result
= NetToMojoPendingBuffer::BeginWrite(
34 &receive_stream_
, &pending_receive_
, &num_bytes
);
35 if (result
== MOJO_RESULT_SHOULD_WAIT
) {
36 // The pipe is full. We need to wait for it to have more space.
37 receive_handle_watcher_
.Start(
38 receive_stream_
.get(),
39 MOJO_HANDLE_SIGNAL_WRITABLE
| MOJO_HANDLE_SIGNAL_PEER_CLOSED
,
40 MOJO_DEADLINE_INDEFINITE
,
41 base::Bind(&TCPConnectedSocketImpl::OnReceiveStreamReady
,
42 weak_ptr_factory_
.GetWeakPtr()));
46 if (result
== MOJO_RESULT_FAILED_PRECONDITION
) {
47 // It's valid that the user of this class consumed the data they care about
48 // and closed their data pipe handles after writing data. This class should
49 // still write out all the data.
51 // TODO(johnmccutchan): Notify socket direction is closed along with
52 // net_result and mojo_result.
56 if (result
!= MOJO_RESULT_OK
) {
57 // The receive stream is in a bad state.
59 // TODO(johnmccutchan): Notify socket direction is closed along with
60 // net_result and mojo_result.
64 // Mojo is ready for the receive.
65 CHECK_GT(static_cast<uint32_t>(std::numeric_limits
<int>::max()), num_bytes
);
66 scoped_refptr
<net::IOBuffer
> buf(
67 new NetToMojoIOBuffer(pending_receive_
.get()));
68 int read_result
= socket_
->Read(
69 buf
.get(), static_cast<int>(num_bytes
),
70 base::Bind(&TCPConnectedSocketImpl::DidReceive
, base::Unretained(this),
72 if (read_result
== net::ERR_IO_PENDING
) {
73 // Pending I/O, wait for result in DidReceive().
74 } else if (read_result
> 0) {
75 // Synchronous data ready.
76 DidReceive(true, read_result
);
78 // read_result == 0 indicates EOF.
79 // read_result < 0 indicates error.
81 // TODO(johnmccutchan): Notify socket direction is closed along with
82 // net_result and mojo_result.
86 void TCPConnectedSocketImpl::OnReceiveStreamReady(MojoResult result
) {
87 if (result
!= MOJO_RESULT_OK
) {
89 // TODO(johnmccutchan): Notify socket direction is closed along with
90 // net_result and mojo_result.
96 void TCPConnectedSocketImpl::DidReceive(bool completed_synchronously
,
101 // TODO(johnmccutchan): Notify socket direction is closed along with
102 // net_result and mojo_result.
106 receive_stream_
= pending_receive_
->Complete(result
);
107 pending_receive_
= nullptr;
109 // Schedule more reading.
110 if (completed_synchronously
) {
111 // Don't recursively call ReceiveMore if this is a sync read.
112 base::MessageLoop::current()->PostTask(
114 base::Bind(&TCPConnectedSocketImpl::ReceiveMore
,
115 weak_ptr_factory_
.GetWeakPtr()));
121 void TCPConnectedSocketImpl::ShutdownReceive() {
122 pending_receive_
= nullptr;
123 receive_stream_
.reset();
126 void TCPConnectedSocketImpl::SendMore() {
127 uint32_t num_bytes
= 0;
128 MojoResult result
= MojoToNetPendingBuffer::BeginRead(
129 &send_stream_
, &pending_send_
, &num_bytes
);
130 if (result
== MOJO_RESULT_SHOULD_WAIT
) {
131 // Data not ready, wait for it.
132 send_handle_watcher_
.Start(
134 MOJO_HANDLE_SIGNAL_READABLE
| MOJO_HANDLE_SIGNAL_PEER_CLOSED
,
135 MOJO_DEADLINE_INDEFINITE
,
136 base::Bind(&TCPConnectedSocketImpl::OnSendStreamReady
,
137 weak_ptr_factory_
.GetWeakPtr()));
139 } else if (result
!= MOJO_RESULT_OK
) {
141 // TODO(johnmccutchan): Notify socket direction is closed along with
142 // net_result and mojo_result.
146 // Got a buffer from Mojo, give it to the socket. Note that the sockets may
147 // do partial writes.
148 scoped_refptr
<net::IOBuffer
> buf(new MojoToNetIOBuffer(pending_send_
.get()));
149 int write_result
= socket_
->Write(
150 buf
.get(), static_cast<int>(num_bytes
),
151 base::Bind(&TCPConnectedSocketImpl::DidSend
, base::Unretained(this),
153 if (write_result
== net::ERR_IO_PENDING
) {
154 // Pending I/O, wait for result in DidSend().
155 } else if (write_result
>= 0) {
156 // Synchronous data consumed.
157 DidSend(true, write_result
);
159 // write_result < 0 indicates error.
161 // TODO(johnmccutchan): Notify socket direction is closed along with
162 // net_result and mojo_result.
166 void TCPConnectedSocketImpl::OnSendStreamReady(MojoResult result
) {
167 if (result
!= MOJO_RESULT_OK
) {
169 // TODO(johnmccutchan): Notify socket direction is closed along with
170 // net_result and mojo_result.
176 void TCPConnectedSocketImpl::DidSend(bool completed_synchronously
,
180 // TODO(johnmccutchan): Notify socket direction is closed along with
181 // net_result and mojo_result.
185 // Take back ownership of the stream and free the IOBuffer.
186 send_stream_
= pending_send_
->Complete(result
);
187 pending_send_
= nullptr;
189 // Schedule more writing.
190 if (completed_synchronously
) {
191 // Don't recursively call SendMore if this is a sync read.
192 base::MessageLoop::current()->PostTask(
194 base::Bind(&TCPConnectedSocketImpl::SendMore
,
195 weak_ptr_factory_
.GetWeakPtr()));
201 void TCPConnectedSocketImpl::ShutdownSend() {
202 pending_send_
= nullptr;
203 send_stream_
.reset();