base: Change DCHECK_IS_ON to a macro DCHECK_IS_ON().
[chromium-blink-merge.git] / mojo / services / network / tcp_connected_socket_impl.cc
blobecd174c6e5bdbe2d71686549e96859f97d5ebfdd
1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "mojo/services/network/tcp_connected_socket_impl.h"
7 #include "base/message_loop/message_loop.h"
8 #include "mojo/services/network/net_adapters.h"
9 #include "net/base/net_errors.h"
11 namespace mojo {
13 TCPConnectedSocketImpl::TCPConnectedSocketImpl(
14 scoped_ptr<net::TCPSocket> socket,
15 ScopedDataPipeConsumerHandle send_stream,
16 ScopedDataPipeProducerHandle receive_stream)
17 : socket_(socket.Pass()),
18 send_stream_(send_stream.Pass()),
19 receive_stream_(receive_stream.Pass()),
20 weak_ptr_factory_(this) {
21 // Queue up async communication.
22 ReceiveMore();
23 SendMore();
26 TCPConnectedSocketImpl::~TCPConnectedSocketImpl() {
29 void TCPConnectedSocketImpl::ReceiveMore() {
30 DCHECK(!pending_receive_.get());
32 uint32_t num_bytes;
33 MojoResult result = NetToMojoPendingBuffer::BeginWrite(
34 &receive_stream_, &pending_receive_, &num_bytes);
35 if (result == MOJO_RESULT_SHOULD_WAIT) {
36 // The pipe is full. We need to wait for it to have more space.
37 receive_handle_watcher_.Start(
38 receive_stream_.get(),
39 MOJO_HANDLE_SIGNAL_WRITABLE, MOJO_DEADLINE_INDEFINITE,
40 base::Bind(&TCPConnectedSocketImpl::OnReceiveStreamReady,
41 weak_ptr_factory_.GetWeakPtr()));
42 return;
43 } else if (result != MOJO_RESULT_OK) {
44 // The receive stream is in a bad state.
45 // TODO(darin): How should this be communicated to our client?
46 socket_->Close();
47 return;
50 // Mojo is ready for the receive.
51 CHECK_GT(static_cast<uint32_t>(std::numeric_limits<int>::max()), num_bytes);
52 scoped_refptr<net::IOBuffer> buf(
53 new NetToMojoIOBuffer(pending_receive_.get()));
54 int read_result = socket_->Read(
55 buf.get(), static_cast<int>(num_bytes),
56 base::Bind(&TCPConnectedSocketImpl::DidReceive, base::Unretained(this),
57 false));
58 if (read_result == net::ERR_IO_PENDING) {
59 // Pending I/O, wait for result in DidReceive().
60 } else if (read_result >= 0) {
61 // Synchronous data ready.
62 DidReceive(true, read_result);
63 } else {
64 // Some kind of error.
65 // TODO(brettw) notify caller of error.
66 socket_->Close();
70 void TCPConnectedSocketImpl::OnReceiveStreamReady(MojoResult result) {
71 // TODO(darin): Handle a bad |result| value.
72 ReceiveMore();
75 void TCPConnectedSocketImpl::DidReceive(bool completed_synchronously,
76 int result) {
77 if (result < 0) {
78 // Error.
79 pending_receive_ = NULL; // Closes the pipe (owned by the pending write).
80 // TODO(brettw) notify the caller of an error?
81 socket_->Close();
82 return;
85 receive_stream_ = pending_receive_->Complete(result);
86 pending_receive_ = NULL;
88 // Schedule more reading.
89 if (completed_synchronously) {
90 // Don't recursively call ReceiveMore if this is a sync read.
91 base::MessageLoop::current()->PostTask(
92 FROM_HERE,
93 base::Bind(&TCPConnectedSocketImpl::ReceiveMore,
94 weak_ptr_factory_.GetWeakPtr()));
95 } else {
96 ReceiveMore();
100 void TCPConnectedSocketImpl::SendMore() {
101 uint32_t num_bytes = 0;
102 MojoResult result = MojoToNetPendingBuffer::BeginRead(
103 &send_stream_, &pending_send_, &num_bytes);
104 if (result == MOJO_RESULT_SHOULD_WAIT) {
105 // Data not ready, wait for it.
106 send_handle_watcher_.Start(
107 send_stream_.get(),
108 MOJO_HANDLE_SIGNAL_READABLE, MOJO_DEADLINE_INDEFINITE,
109 base::Bind(&TCPConnectedSocketImpl::OnSendStreamReady,
110 weak_ptr_factory_.GetWeakPtr()));
111 return;
112 } else if (result != MOJO_RESULT_OK) {
113 // TODO(brettw) notify caller of error.
114 socket_->Close();
115 return;
118 // Got a buffer from Mojo, give it to the socket. Note that the sockets may
119 // do partial writes.
120 scoped_refptr<net::IOBuffer> buf(new MojoToNetIOBuffer(pending_send_.get()));
121 int write_result = socket_->Write(
122 buf.get(), static_cast<int>(num_bytes),
123 base::Bind(&TCPConnectedSocketImpl::DidSend, base::Unretained(this),
124 false));
125 if (write_result == net::ERR_IO_PENDING) {
126 // Pending I/O, wait for result in DidSend().
127 } else if (write_result >= 0) {
128 // Synchronous data consumed.
129 DidSend(true, write_result);
133 void TCPConnectedSocketImpl::OnSendStreamReady(MojoResult result) {
134 // TODO(brettw): Handle a bad |result| value.
135 SendMore();
138 void TCPConnectedSocketImpl::DidSend(bool completed_synchronously,
139 int result) {
140 if (result < 0) {
141 // TODO(brettw) report error.
142 pending_send_ = NULL;
143 socket_->Close();
144 return;
147 // Take back ownership of the stream and free the IOBuffer.
148 send_stream_ = pending_send_->Complete(result);
149 pending_send_ = NULL;
151 // Schedule more writing.
152 if (completed_synchronously) {
153 // Don't recursively call SendMore if this is a sync read.
154 base::MessageLoop::current()->PostTask(
155 FROM_HERE,
156 base::Bind(&TCPConnectedSocketImpl::SendMore,
157 weak_ptr_factory_.GetWeakPtr()));
158 } else {
159 SendMore();
163 } // namespace mojo