Update broken references to image assets
[chromium-blink-merge.git] / mojo / services / network / tcp_connected_socket_impl.cc
blob44fba138af40e0e89e96164be8e54888846d3435
1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "mojo/services/network/tcp_connected_socket_impl.h"
7 #include "base/message_loop/message_loop.h"
8 #include "mojo/services/network/net_adapters.h"
9 #include "net/base/net_errors.h"
11 namespace mojo {
13 TCPConnectedSocketImpl::TCPConnectedSocketImpl(
14 scoped_ptr<net::TCPSocket> socket,
15 ScopedDataPipeConsumerHandle send_stream,
16 ScopedDataPipeProducerHandle receive_stream,
17 InterfaceRequest<TCPConnectedSocket> request,
18 scoped_ptr<mojo::AppRefCount> app_refcount)
19 : socket_(socket.Pass()),
20 send_stream_(send_stream.Pass()),
21 receive_stream_(receive_stream.Pass()),
22 binding_(this, request.Pass()),
23 app_refcount_(app_refcount.Pass()),
24 weak_ptr_factory_(this) {
25 // Queue up async communication.
26 binding_.set_connection_error_handler([this]() { OnConnectionError(); });
27 ListenForReceivePeerClosed();
28 ListenForSendPeerClosed();
29 ReceiveMore();
30 SendMore();
33 TCPConnectedSocketImpl::~TCPConnectedSocketImpl() {
36 void TCPConnectedSocketImpl::OnConnectionError() {
37 binding_.Close();
38 DeleteIfNeeded();
41 void TCPConnectedSocketImpl::ReceiveMore() {
42 DCHECK(!pending_receive_.get());
44 uint32_t num_bytes;
45 MojoResult result = NetToMojoPendingBuffer::BeginWrite(
46 &receive_stream_, &pending_receive_, &num_bytes);
47 if (result == MOJO_RESULT_SHOULD_WAIT) {
48 // The pipe is full. We need to wait for it to have more space.
49 receive_handle_watcher_.Start(
50 receive_stream_.get(), MOJO_HANDLE_SIGNAL_WRITABLE,
51 MOJO_DEADLINE_INDEFINITE,
52 base::Bind(&TCPConnectedSocketImpl::OnReceiveStreamReady,
53 weak_ptr_factory_.GetWeakPtr()));
54 return;
57 if (result == MOJO_RESULT_FAILED_PRECONDITION) {
58 // It's valid that the user of this class consumed the data they care about
59 // and closed their data pipe handles after writing data. This class should
60 // still write out all the data.
61 ShutdownReceive();
62 // TODO(johnmccutchan): Notify socket direction is closed along with
63 // net_result and mojo_result.
64 return;
67 if (result != MOJO_RESULT_OK) {
68 // The receive stream is in a bad state.
69 ShutdownReceive();
70 // TODO(johnmccutchan): Notify socket direction is closed along with
71 // net_result and mojo_result.
72 return;
75 // Mojo is ready for the receive.
76 CHECK_GT(static_cast<uint32_t>(std::numeric_limits<int>::max()), num_bytes);
77 scoped_refptr<net::IOBuffer> buf(
78 new NetToMojoIOBuffer(pending_receive_.get()));
79 int read_result =
80 socket_->Read(buf.get(), static_cast<int>(num_bytes),
81 base::Bind(&TCPConnectedSocketImpl::DidReceive,
82 weak_ptr_factory_.GetWeakPtr(), false));
83 if (read_result == net::ERR_IO_PENDING) {
84 // Pending I/O, wait for result in DidReceive().
85 } else if (read_result > 0) {
86 // Synchronous data ready.
87 DidReceive(true, read_result);
88 } else {
89 // read_result == 0 indicates EOF.
90 // read_result < 0 indicates error.
91 ShutdownReceive();
92 // TODO(johnmccutchan): Notify socket direction is closed along with
93 // net_result and mojo_result.
97 void TCPConnectedSocketImpl::OnReceiveStreamReady(MojoResult result) {
98 if (result != MOJO_RESULT_OK) {
99 ShutdownReceive();
100 // TODO(johnmccutchan): Notify socket direction is closed along with
101 // net_result and mojo_result.
102 return;
104 ListenForReceivePeerClosed();
105 ReceiveMore();
108 void TCPConnectedSocketImpl::DidReceive(bool completed_synchronously,
109 int result) {
110 if (!pending_receive_)
111 return;
113 if (result < 0) {
114 // Error.
115 ShutdownReceive();
116 // TODO(johnmccutchan): Notify socket direction is closed along with
117 // net_result and mojo_result.
118 return;
121 receive_stream_ = pending_receive_->Complete(result);
122 pending_receive_ = nullptr;
124 // Schedule more reading.
125 if (completed_synchronously) {
126 // Don't recursively call ReceiveMore if this is a sync read.
127 base::MessageLoop::current()->PostTask(
128 FROM_HERE, base::Bind(&TCPConnectedSocketImpl::ReceiveMore,
129 weak_ptr_factory_.GetWeakPtr()));
130 } else {
131 ReceiveMore();
135 void TCPConnectedSocketImpl::ShutdownReceive() {
136 receive_handle_watcher_.Stop();
137 pending_receive_ = nullptr;
138 receive_stream_.reset();
139 DeleteIfNeeded();
142 void TCPConnectedSocketImpl::ListenForReceivePeerClosed() {
143 receive_handle_watcher_.Start(
144 receive_stream_.get(), MOJO_HANDLE_SIGNAL_PEER_CLOSED,
145 MOJO_DEADLINE_INDEFINITE,
146 base::Bind(&TCPConnectedSocketImpl::OnReceiveDataPipeClosed,
147 weak_ptr_factory_.GetWeakPtr()));
150 void TCPConnectedSocketImpl::OnReceiveDataPipeClosed(MojoResult result) {
151 ShutdownReceive();
154 void TCPConnectedSocketImpl::SendMore() {
155 uint32_t num_bytes = 0;
156 MojoResult result = MojoToNetPendingBuffer::BeginRead(
157 &send_stream_, &pending_send_, &num_bytes);
158 if (result == MOJO_RESULT_SHOULD_WAIT) {
159 // Data not ready, wait for it.
160 send_handle_watcher_.Start(
161 send_stream_.get(), MOJO_HANDLE_SIGNAL_READABLE,
162 MOJO_DEADLINE_INDEFINITE,
163 base::Bind(&TCPConnectedSocketImpl::OnSendStreamReady,
164 weak_ptr_factory_.GetWeakPtr()));
165 return;
166 } else if (result != MOJO_RESULT_OK) {
167 ShutdownSend();
168 // TODO(johnmccutchan): Notify socket direction is closed along with
169 // net_result and mojo_result.
170 return;
173 // Got a buffer from Mojo, give it to the socket. Note that the sockets may
174 // do partial writes.
175 scoped_refptr<net::IOBuffer> buf(new MojoToNetIOBuffer(pending_send_.get()));
176 int write_result =
177 socket_->Write(buf.get(), static_cast<int>(num_bytes),
178 base::Bind(&TCPConnectedSocketImpl::DidSend,
179 weak_ptr_factory_.GetWeakPtr(), false));
180 if (write_result == net::ERR_IO_PENDING) {
181 // Pending I/O, wait for result in DidSend().
182 } else if (write_result >= 0) {
183 // Synchronous data consumed.
184 DidSend(true, write_result);
185 } else {
186 // write_result < 0 indicates error.
187 ShutdownSend();
188 // TODO(johnmccutchan): Notify socket direction is closed along with
189 // net_result and mojo_result.
193 void TCPConnectedSocketImpl::OnSendStreamReady(MojoResult result) {
194 if (result != MOJO_RESULT_OK) {
195 ShutdownSend();
196 // TODO(johnmccutchan): Notify socket direction is closed along with
197 // net_result and mojo_result.
198 return;
200 ListenForSendPeerClosed();
201 SendMore();
204 void TCPConnectedSocketImpl::DidSend(bool completed_synchronously, int result) {
205 if (!pending_send_)
206 return;
208 if (result < 0) {
209 ShutdownSend();
210 // TODO(johnmccutchan): Notify socket direction is closed along with
211 // net_result and mojo_result.
212 return;
215 // Take back ownership of the stream and free the IOBuffer.
216 send_stream_ = pending_send_->Complete(result);
217 pending_send_ = nullptr;
219 // Schedule more writing.
220 if (completed_synchronously) {
221 // Don't recursively call SendMore if this is a sync read.
222 base::MessageLoop::current()->PostTask(
223 FROM_HERE, base::Bind(&TCPConnectedSocketImpl::SendMore,
224 weak_ptr_factory_.GetWeakPtr()));
225 } else {
226 SendMore();
230 void TCPConnectedSocketImpl::ShutdownSend() {
231 send_handle_watcher_.Stop();
232 pending_send_ = nullptr;
233 send_stream_.reset();
234 DeleteIfNeeded();
237 void TCPConnectedSocketImpl::ListenForSendPeerClosed() {
238 send_handle_watcher_.Start(
239 send_stream_.get(), MOJO_HANDLE_SIGNAL_PEER_CLOSED,
240 MOJO_DEADLINE_INDEFINITE,
241 base::Bind(&TCPConnectedSocketImpl::OnSendDataPipeClosed,
242 weak_ptr_factory_.GetWeakPtr()));
245 void TCPConnectedSocketImpl::OnSendDataPipeClosed(MojoResult result) {
246 ShutdownSend();
249 void TCPConnectedSocketImpl::DeleteIfNeeded() {
250 bool has_send = pending_send_ || send_stream_.is_valid();
251 bool has_receive = pending_receive_ || receive_stream_.is_valid();
252 if (!binding_.is_bound() && !has_send && !has_receive)
253 delete this;
256 } // namespace mojo