1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "mojo/services/network/http_connection_impl.h"
10 #include "base/bind_helpers.h"
11 #include "base/callback.h"
12 #include "base/logging.h"
13 #include "base/stl_util.h"
14 #include "base/strings/string_util.h"
15 #include "mojo/message_pump/handle_watcher.h"
16 #include "mojo/services/network/http_server_impl.h"
17 #include "mojo/services/network/net_adapters.h"
18 #include "mojo/services/network/public/cpp/web_socket_read_queue.h"
19 #include "mojo/services/network/public/cpp/web_socket_write_queue.h"
20 #include "mojo/services/network/public/interfaces/web_socket.mojom.h"
21 #include "net/base/net_errors.h"
22 #include "net/http/http_request_headers.h"
23 #include "net/http/http_status_code.h"
24 #include "net/server/http_server.h"
25 #include "net/server/http_server_request_info.h"
26 #include "net/server/http_server_response_info.h"
27 #include "third_party/mojo/src/mojo/public/cpp/bindings/type_converter.h"
28 #include "third_party/mojo/src/mojo/public/cpp/system/data_pipe.h"
32 // SimpleDataPipeReader reads till end-of-file, stores the data in a string and
33 // notifies completion.
34 class HttpConnectionImpl::SimpleDataPipeReader
{
36 using CompletionCallback
=
37 base::Callback
<void(SimpleDataPipeReader
*, scoped_ptr
<std::string
>)>;
39 SimpleDataPipeReader() {}
40 ~SimpleDataPipeReader() {}
42 void Start(ScopedDataPipeConsumerHandle consumer
,
43 const CompletionCallback
& completion_callback
) {
44 DCHECK(consumer
.is_valid() && !consumer_
.is_valid());
45 consumer_
= consumer
.Pass();
46 completion_callback_
= completion_callback
;
47 buffer_
.reset(new std::string
);
55 MojoResult rv
= BeginReadDataRaw(consumer_
.get(), &buf
, &buf_size
,
56 MOJO_READ_DATA_FLAG_NONE
);
57 if (rv
== MOJO_RESULT_OK
) {
58 buffer_
->append(static_cast<const char*>(buf
), buf_size
);
59 EndReadDataRaw(consumer_
.get(), buf_size
);
61 } else if (rv
== MOJO_RESULT_SHOULD_WAIT
) {
63 } else if (rv
== MOJO_RESULT_FAILED_PRECONDITION
) {
64 // We reached end-of-file.
65 completion_callback_
.Run(this, buffer_
.Pass());
66 // Note: This object may have been destroyed in the callback.
72 void WaitToReadMore() {
73 watcher_
.Start(consumer_
.get(), MOJO_HANDLE_SIGNAL_READABLE
,
74 MOJO_DEADLINE_INDEFINITE
,
75 base::Bind(&SimpleDataPipeReader::OnHandleReady
,
76 base::Unretained(this)));
79 void OnHandleReady(MojoResult result
) { ReadMore(); }
81 ScopedDataPipeConsumerHandle consumer_
;
82 common::HandleWatcher watcher_
;
83 CompletionCallback completion_callback_
;
84 scoped_ptr
<std::string
> buffer_
;
86 DISALLOW_COPY_AND_ASSIGN(SimpleDataPipeReader
);
89 class HttpConnectionImpl::WebSocketImpl
: public WebSocket
{
91 // |connection| must outlive this object.
92 WebSocketImpl(HttpConnectionImpl
* connection
,
93 InterfaceRequest
<WebSocket
> request
,
94 ScopedDataPipeConsumerHandle send_stream
,
95 WebSocketClientPtr client
)
96 : connection_(connection
),
97 binding_(this, request
.Pass()),
98 client_(client
.Pass()),
99 send_stream_(send_stream
.Pass()),
100 read_send_stream_(new WebSocketReadQueue(send_stream_
.get())),
101 pending_send_count_(0) {
102 DCHECK(binding_
.is_bound());
104 DCHECK(send_stream_
.is_valid());
106 binding_
.set_connection_error_handler([this]() { Close(); });
107 client_
.set_connection_error_handler([this]() { Close(); });
110 receive_stream_
= data_pipe
.producer_handle
.Pass();
111 write_receive_stream_
.reset(new WebSocketWriteQueue(receive_stream_
.get()));
113 client_
->DidConnect("", "", data_pipe
.consumer_handle
.Pass());
116 ~WebSocketImpl() override
{}
119 DCHECK(!IsClosing());
124 NotifyOwnerCloseIfAllDone();
127 void OnReceivedWebSocketMessage(const std::string
& data
) {
131 // TODO(yzshen): It shouldn't be an issue to pass an empty message. However,
132 // WebSocket{Read,Write}Queue doesn't handle that correctly.
136 uint32_t size
= static_cast<uint32_t>(data
.size());
137 write_receive_stream_
->Write(
139 base::Bind(&WebSocketImpl::OnFinishedWritingReceiveStream
,
140 base::Unretained(this), size
));
144 // WebSocket implementation.
145 void Connect(const String
& url
,
146 Array
<String
> protocols
,
147 const String
& origin
,
148 ScopedDataPipeConsumerHandle send_stream
,
149 WebSocketClientPtr client
) override
{
153 void Send(bool fin
, MessageType type
, uint32_t num_bytes
) override
{
154 if (!fin
|| type
!= MESSAGE_TYPE_TEXT
) {
159 // TODO(yzshen): It shouldn't be an issue to pass an empty message. However,
160 // WebSocket{Read,Write}Queue doesn't handle that correctly.
164 pending_send_count_
++;
165 read_send_stream_
->Read(
166 num_bytes
, base::Bind(&WebSocketImpl::OnFinishedReadingSendStream
,
167 base::Unretained(this), num_bytes
));
170 void FlowControl(int64_t quota
) override
{ NOTIMPLEMENTED(); }
172 void Close(uint16_t code
, const String
& reason
) override
{
176 void OnFinishedReadingSendStream(uint32_t num_bytes
, const char* data
) {
177 DCHECK_GT(pending_send_count_
, 0u);
178 pending_send_count_
--;
181 connection_
->server_
->server()->SendOverWebSocket(
182 connection_
->connection_id_
, std::string(data
, num_bytes
));
186 NotifyOwnerCloseIfAllDone();
189 void OnFinishedWritingReceiveStream(uint32_t num_bytes
, const char* buffer
) {
194 client_
->DidReceiveData(true, MESSAGE_TYPE_TEXT
, num_bytes
);
197 // Checks whether Close() has been called.
198 bool IsClosing() const { return !binding_
.is_bound(); }
200 void NotifyOwnerCloseIfAllDone() {
203 if (pending_send_count_
== 0)
204 connection_
->OnWebSocketClosed();
207 HttpConnectionImpl
* const connection_
;
209 Binding
<WebSocket
> binding_
;
210 WebSocketClientPtr client_
;
212 ScopedDataPipeConsumerHandle send_stream_
;
213 scoped_ptr
<WebSocketReadQueue
> read_send_stream_
;
214 size_t pending_send_count_
;
216 ScopedDataPipeProducerHandle receive_stream_
;
217 scoped_ptr
<WebSocketWriteQueue
> write_receive_stream_
;
219 DISALLOW_COPY_AND_ASSIGN(WebSocketImpl
);
223 struct TypeConverter
<HttpRequestPtr
, net::HttpServerRequestInfo
> {
224 static HttpRequestPtr
Convert(const net::HttpServerRequestInfo
& obj
) {
225 HttpRequestPtr
request(HttpRequest::New());
226 request
->method
= obj
.method
;
227 request
->url
= obj
.path
;
228 request
->headers
.resize(obj
.headers
.size());
230 for (const auto& item
: obj
.headers
) {
231 HttpHeaderPtr
header(HttpHeader::New());
232 header
->name
= item
.first
;
233 header
->value
= item
.second
;
234 request
->headers
[index
++] = header
.Pass();
236 if (!obj
.data
.empty()) {
237 uint32_t num_bytes
= static_cast<uint32_t>(obj
.data
.size());
238 MojoCreateDataPipeOptions options
;
239 options
.struct_size
= sizeof(MojoCreateDataPipeOptions
);
240 options
.flags
= MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE
;
241 options
.element_num_bytes
= 1;
242 options
.capacity_num_bytes
= num_bytes
;
243 DataPipe
data_pipe(options
);
244 request
->body
= data_pipe
.consumer_handle
.Pass();
246 WriteDataRaw(data_pipe
.producer_handle
.get(), obj
.data
.data(),
247 &num_bytes
, MOJO_WRITE_DATA_FLAG_ALL_OR_NONE
);
248 CHECK_EQ(MOJO_RESULT_OK
, result
);
250 return request
.Pass();
254 HttpConnectionImpl::HttpConnectionImpl(int connection_id
,
255 HttpServerImpl
* server
,
256 HttpConnectionDelegatePtr delegate
,
257 HttpConnectionPtr
* connection
)
258 : connection_id_(connection_id
),
260 delegate_(delegate
.Pass()),
261 binding_(this, connection
) {
263 binding_
.set_connection_error_handler([this]() { Close(); });
264 delegate_
.set_connection_error_handler([this]() { Close(); });
267 HttpConnectionImpl::~HttpConnectionImpl() {
268 STLDeleteElements(&response_body_readers_
);
271 void HttpConnectionImpl::OnReceivedHttpRequest(
272 const net::HttpServerRequestInfo
& info
) {
276 delegate_
->OnReceivedRequest(
277 HttpRequest::From(info
), [this](HttpResponsePtr response
) {
278 if (response
->body
.is_valid()) {
279 SimpleDataPipeReader
* reader
= new SimpleDataPipeReader
;
280 response_body_readers_
.insert(reader
);
281 ScopedDataPipeConsumerHandle body
= response
->body
.Pass();
284 base::Bind(&HttpConnectionImpl::OnFinishedReadingResponseBody
,
285 base::Unretained(this), base::Passed(&response
)));
287 OnFinishedReadingResponseBody(response
.Pass(), nullptr, nullptr);
292 void HttpConnectionImpl::OnReceivedWebSocketRequest(
293 const net::HttpServerRequestInfo
& info
) {
297 delegate_
->OnReceivedWebSocketRequest(
298 HttpRequest::From(info
),
299 [this, info
](InterfaceRequest
<WebSocket
> web_socket
,
300 ScopedDataPipeConsumerHandle send_stream
,
301 WebSocketClientPtr web_socket_client
) {
302 if (!web_socket
.is_pending() || !send_stream
.is_valid() ||
303 !web_socket_client
) {
308 web_socket_
.reset(new WebSocketImpl(this, web_socket
.Pass(),
310 web_socket_client
.Pass()));
311 server_
->server()->AcceptWebSocket(connection_id_
, info
);
315 void HttpConnectionImpl::OnReceivedWebSocketMessage(const std::string
& data
) {
319 web_socket_
->OnReceivedWebSocketMessage(data
);
322 void HttpConnectionImpl::SetSendBufferSize(
324 const SetSendBufferSizeCallback
& callback
) {
325 if (size
> static_cast<uint32_t>(std::numeric_limits
<int32_t>::max()))
326 size
= std::numeric_limits
<int32_t>::max();
328 server_
->server()->SetSendBufferSize(connection_id_
,
329 static_cast<int32_t>(size
));
330 callback
.Run(MakeNetworkError(net::OK
));
333 void HttpConnectionImpl::SetReceiveBufferSize(
335 const SetReceiveBufferSizeCallback
& callback
) {
336 if (size
> static_cast<uint32_t>(std::numeric_limits
<int32_t>::max()))
337 size
= std::numeric_limits
<int32_t>::max();
339 server_
->server()->SetReceiveBufferSize(connection_id_
,
340 static_cast<int32_t>(size
));
341 callback
.Run(MakeNetworkError(net::OK
));
344 void HttpConnectionImpl::OnFinishedReadingResponseBody(
345 HttpResponsePtr response
,
346 SimpleDataPipeReader
* reader
,
347 scoped_ptr
<std::string
> body
) {
350 response_body_readers_
.erase(reader
);
353 net::HttpServerResponseInfo
info(
354 static_cast<net::HttpStatusCode
>(response
->status_code
));
356 std::string content_type
;
357 for (size_t i
= 0; i
< response
->headers
.size(); ++i
) {
358 const HttpHeader
& header
= *(response
->headers
[i
]);
361 // net::HttpServerResponseInfo::SetBody() automatically sets
362 // Content-Length and Content-Types, so skip the two here.
364 // TODO(yzshen): Consider adding to net::HttpServerResponseInfo a simple
365 // setter for body which doesn't fiddle with headers.
366 base::StringPiece
name_piece(header
.name
.data(), header
.name
.size());
367 if (base::EqualsCaseInsensitiveASCII(
368 name_piece
, net::HttpRequestHeaders::kContentLength
)) {
370 } else if (base::EqualsCaseInsensitiveASCII(
371 name_piece
, net::HttpRequestHeaders::kContentType
)) {
372 content_type
= header
.value
;
376 info
.AddHeader(header
.name
, header
.value
);
380 info
.SetBody(*body
, content_type
);
382 server_
->server()->SendResponse(connection_id_
, info
);
385 NotifyOwnerCloseIfAllDone();
388 void HttpConnectionImpl::Close() {
389 DCHECK(!IsClosing());
395 web_socket_
->Close();
397 NotifyOwnerCloseIfAllDone();
400 void HttpConnectionImpl::NotifyOwnerCloseIfAllDone() {
403 // Don't close the connection until all pending sends are done.
404 bool should_wait
= !response_body_readers_
.empty() || web_socket_
;
406 server_
->server()->Close(connection_id_
);
409 void HttpConnectionImpl::OnWebSocketClosed() {
413 // The close operation is initiated by this object.
414 NotifyOwnerCloseIfAllDone();
416 // The close operation is initiated by |web_socket_|; start closing this