1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "mojo/services/network/tcp_connected_socket_impl.h"
7 #include "base/message_loop/message_loop.h"
8 #include "mojo/services/network/net_adapters.h"
9 #include "net/base/net_errors.h"
13 TCPConnectedSocketImpl::TCPConnectedSocketImpl(
14 scoped_ptr
<net::TCPSocket
> socket
,
15 ScopedDataPipeConsumerHandle send_stream
,
16 ScopedDataPipeProducerHandle receive_stream
,
17 InterfaceRequest
<TCPConnectedSocket
> request
,
18 scoped_ptr
<mojo::AppRefCount
> app_refcount
)
19 : socket_(socket
.Pass()),
20 send_stream_(send_stream
.Pass()),
21 receive_stream_(receive_stream
.Pass()),
22 binding_(this, request
.Pass()),
23 app_refcount_(app_refcount
.Pass()),
24 weak_ptr_factory_(this) {
25 // Queue up async communication.
26 binding_
.set_connection_error_handler([this]() { OnConnectionError(); });
27 ListenForReceivePeerClosed();
28 ListenForSendPeerClosed();
33 TCPConnectedSocketImpl::~TCPConnectedSocketImpl() {
36 void TCPConnectedSocketImpl::OnConnectionError() {
41 void TCPConnectedSocketImpl::ReceiveMore() {
42 DCHECK(!pending_receive_
.get());
45 MojoResult result
= NetToMojoPendingBuffer::BeginWrite(
46 &receive_stream_
, &pending_receive_
, &num_bytes
);
47 if (result
== MOJO_RESULT_SHOULD_WAIT
) {
48 // The pipe is full. We need to wait for it to have more space.
49 receive_handle_watcher_
.Start(
50 receive_stream_
.get(), MOJO_HANDLE_SIGNAL_WRITABLE
,
51 MOJO_DEADLINE_INDEFINITE
,
52 base::Bind(&TCPConnectedSocketImpl::OnReceiveStreamReady
,
53 weak_ptr_factory_
.GetWeakPtr()));
57 if (result
== MOJO_RESULT_FAILED_PRECONDITION
) {
58 // It's valid that the user of this class consumed the data they care about
59 // and closed their data pipe handles after writing data. This class should
60 // still write out all the data.
62 // TODO(johnmccutchan): Notify socket direction is closed along with
63 // net_result and mojo_result.
67 if (result
!= MOJO_RESULT_OK
) {
68 // The receive stream is in a bad state.
70 // TODO(johnmccutchan): Notify socket direction is closed along with
71 // net_result and mojo_result.
75 // Mojo is ready for the receive.
76 CHECK_GT(static_cast<uint32_t>(std::numeric_limits
<int>::max()), num_bytes
);
77 scoped_refptr
<net::IOBuffer
> buf(
78 new NetToMojoIOBuffer(pending_receive_
.get()));
80 socket_
->Read(buf
.get(), static_cast<int>(num_bytes
),
81 base::Bind(&TCPConnectedSocketImpl::DidReceive
,
82 weak_ptr_factory_
.GetWeakPtr(), false));
83 if (read_result
== net::ERR_IO_PENDING
) {
84 // Pending I/O, wait for result in DidReceive().
85 } else if (read_result
> 0) {
86 // Synchronous data ready.
87 DidReceive(true, read_result
);
89 // read_result == 0 indicates EOF.
90 // read_result < 0 indicates error.
92 // TODO(johnmccutchan): Notify socket direction is closed along with
93 // net_result and mojo_result.
97 void TCPConnectedSocketImpl::OnReceiveStreamReady(MojoResult result
) {
98 if (result
!= MOJO_RESULT_OK
) {
100 // TODO(johnmccutchan): Notify socket direction is closed along with
101 // net_result and mojo_result.
104 ListenForReceivePeerClosed();
108 void TCPConnectedSocketImpl::DidReceive(bool completed_synchronously
,
110 if (!pending_receive_
)
116 // TODO(johnmccutchan): Notify socket direction is closed along with
117 // net_result and mojo_result.
121 receive_stream_
= pending_receive_
->Complete(result
);
122 pending_receive_
= nullptr;
124 // Schedule more reading.
125 if (completed_synchronously
) {
126 // Don't recursively call ReceiveMore if this is a sync read.
127 base::MessageLoop::current()->PostTask(
128 FROM_HERE
, base::Bind(&TCPConnectedSocketImpl::ReceiveMore
,
129 weak_ptr_factory_
.GetWeakPtr()));
135 void TCPConnectedSocketImpl::ShutdownReceive() {
136 receive_handle_watcher_
.Stop();
137 pending_receive_
= nullptr;
138 receive_stream_
.reset();
142 void TCPConnectedSocketImpl::ListenForReceivePeerClosed() {
143 receive_handle_watcher_
.Start(
144 receive_stream_
.get(), MOJO_HANDLE_SIGNAL_PEER_CLOSED
,
145 MOJO_DEADLINE_INDEFINITE
,
146 base::Bind(&TCPConnectedSocketImpl::OnReceiveDataPipeClosed
,
147 weak_ptr_factory_
.GetWeakPtr()));
150 void TCPConnectedSocketImpl::OnReceiveDataPipeClosed(MojoResult result
) {
154 void TCPConnectedSocketImpl::SendMore() {
155 uint32_t num_bytes
= 0;
156 MojoResult result
= MojoToNetPendingBuffer::BeginRead(
157 &send_stream_
, &pending_send_
, &num_bytes
);
158 if (result
== MOJO_RESULT_SHOULD_WAIT
) {
159 // Data not ready, wait for it.
160 send_handle_watcher_
.Start(
161 send_stream_
.get(), MOJO_HANDLE_SIGNAL_READABLE
,
162 MOJO_DEADLINE_INDEFINITE
,
163 base::Bind(&TCPConnectedSocketImpl::OnSendStreamReady
,
164 weak_ptr_factory_
.GetWeakPtr()));
166 } else if (result
!= MOJO_RESULT_OK
) {
168 // TODO(johnmccutchan): Notify socket direction is closed along with
169 // net_result and mojo_result.
173 // Got a buffer from Mojo, give it to the socket. Note that the sockets may
174 // do partial writes.
175 scoped_refptr
<net::IOBuffer
> buf(new MojoToNetIOBuffer(pending_send_
.get()));
177 socket_
->Write(buf
.get(), static_cast<int>(num_bytes
),
178 base::Bind(&TCPConnectedSocketImpl::DidSend
,
179 weak_ptr_factory_
.GetWeakPtr(), false));
180 if (write_result
== net::ERR_IO_PENDING
) {
181 // Pending I/O, wait for result in DidSend().
182 } else if (write_result
>= 0) {
183 // Synchronous data consumed.
184 DidSend(true, write_result
);
186 // write_result < 0 indicates error.
188 // TODO(johnmccutchan): Notify socket direction is closed along with
189 // net_result and mojo_result.
193 void TCPConnectedSocketImpl::OnSendStreamReady(MojoResult result
) {
194 if (result
!= MOJO_RESULT_OK
) {
196 // TODO(johnmccutchan): Notify socket direction is closed along with
197 // net_result and mojo_result.
200 ListenForSendPeerClosed();
204 void TCPConnectedSocketImpl::DidSend(bool completed_synchronously
, int result
) {
210 // TODO(johnmccutchan): Notify socket direction is closed along with
211 // net_result and mojo_result.
215 // Take back ownership of the stream and free the IOBuffer.
216 send_stream_
= pending_send_
->Complete(result
);
217 pending_send_
= nullptr;
219 // Schedule more writing.
220 if (completed_synchronously
) {
221 // Don't recursively call SendMore if this is a sync read.
222 base::MessageLoop::current()->PostTask(
223 FROM_HERE
, base::Bind(&TCPConnectedSocketImpl::SendMore
,
224 weak_ptr_factory_
.GetWeakPtr()));
230 void TCPConnectedSocketImpl::ShutdownSend() {
231 send_handle_watcher_
.Stop();
232 pending_send_
= nullptr;
233 send_stream_
.reset();
237 void TCPConnectedSocketImpl::ListenForSendPeerClosed() {
238 send_handle_watcher_
.Start(
239 send_stream_
.get(), MOJO_HANDLE_SIGNAL_PEER_CLOSED
,
240 MOJO_DEADLINE_INDEFINITE
,
241 base::Bind(&TCPConnectedSocketImpl::OnSendDataPipeClosed
,
242 weak_ptr_factory_
.GetWeakPtr()));
245 void TCPConnectedSocketImpl::OnSendDataPipeClosed(MojoResult result
) {
249 void TCPConnectedSocketImpl::DeleteIfNeeded() {
250 bool has_send
= pending_send_
|| send_stream_
.is_valid();
251 bool has_receive
= pending_receive_
|| receive_stream_
.is_valid();
252 if (!binding_
.is_bound() && !has_send
&& !has_receive
)