1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "mojo/services/network/http_connection_impl.h"
10 #include "base/bind_helpers.h"
11 #include "base/callback.h"
12 #include "base/logging.h"
13 #include "base/stl_util.h"
14 #include "base/strings/string_util.h"
15 #include "mojo/common/handle_watcher.h"
16 #include "mojo/services/network/http_server_impl.h"
17 #include "mojo/services/network/net_adapters.h"
18 #include "mojo/services/network/public/cpp/web_socket_read_queue.h"
19 #include "mojo/services/network/public/cpp/web_socket_write_queue.h"
20 #include "mojo/services/network/public/interfaces/web_socket.mojom.h"
21 #include "net/base/net_errors.h"
22 #include "net/http/http_request_headers.h"
23 #include "net/http/http_status_code.h"
24 #include "net/server/http_server.h"
25 #include "net/server/http_server_request_info.h"
26 #include "net/server/http_server_response_info.h"
27 #include "third_party/mojo/src/mojo/public/cpp/bindings/type_converter.h"
28 #include "third_party/mojo/src/mojo/public/cpp/system/data_pipe.h"
32 // SimpleDataPipeReader reads till end-of-file, stores the data in a string and
33 // notifies completion.
34 class HttpConnectionImpl::SimpleDataPipeReader
{
36 using CompletionCallback
=
37 base::Callback
<void(SimpleDataPipeReader
*, scoped_ptr
<std::string
>)>;
39 SimpleDataPipeReader() {}
40 ~SimpleDataPipeReader() {}
42 void Start(ScopedDataPipeConsumerHandle consumer
,
43 const CompletionCallback
& completion_callback
) {
44 DCHECK(consumer
.is_valid() && !consumer_
.is_valid());
45 consumer_
= consumer
.Pass();
46 completion_callback_
= completion_callback
;
47 buffer_
.reset(new std::string
);
55 MojoResult rv
= BeginReadDataRaw(consumer_
.get(), &buf
, &buf_size
,
56 MOJO_READ_DATA_FLAG_NONE
);
57 if (rv
== MOJO_RESULT_OK
) {
58 buffer_
->append(static_cast<const char*>(buf
), buf_size
);
59 EndReadDataRaw(consumer_
.get(), buf_size
);
61 } else if (rv
== MOJO_RESULT_SHOULD_WAIT
) {
63 } else if (rv
== MOJO_RESULT_FAILED_PRECONDITION
) {
64 // We reached end-of-file.
65 completion_callback_
.Run(this, buffer_
.Pass());
66 // Note: This object may have been destroyed in the callback.
72 void WaitToReadMore() {
73 watcher_
.Start(consumer_
.get(), MOJO_HANDLE_SIGNAL_READABLE
,
74 MOJO_DEADLINE_INDEFINITE
,
75 base::Bind(&SimpleDataPipeReader::OnHandleReady
,
76 base::Unretained(this)));
79 void OnHandleReady(MojoResult result
) { ReadMore(); }
81 ScopedDataPipeConsumerHandle consumer_
;
82 common::HandleWatcher watcher_
;
83 CompletionCallback completion_callback_
;
84 scoped_ptr
<std::string
> buffer_
;
86 DISALLOW_COPY_AND_ASSIGN(SimpleDataPipeReader
);
89 class HttpConnectionImpl::WebSocketImpl
: public WebSocket
,
92 // |connection| must outlive this object.
93 WebSocketImpl(HttpConnectionImpl
* connection
,
94 InterfaceRequest
<WebSocket
> request
,
95 ScopedDataPipeConsumerHandle send_stream
,
96 WebSocketClientPtr client
)
97 : connection_(connection
),
98 binding_(this, request
.Pass()),
99 client_(client
.Pass()),
100 send_stream_(send_stream
.Pass()),
101 read_send_stream_(new WebSocketReadQueue(send_stream_
.get())),
102 pending_send_count_(0) {
103 DCHECK(binding_
.is_bound());
105 DCHECK(send_stream_
.is_valid());
107 binding_
.set_error_handler(this);
108 client_
.set_error_handler(this);
111 receive_stream_
= data_pipe
.producer_handle
.Pass();
112 write_receive_stream_
.reset(new WebSocketWriteQueue(receive_stream_
.get()));
114 client_
->DidConnect("", "", data_pipe
.consumer_handle
.Pass());
117 ~WebSocketImpl() override
{}
120 DCHECK(!IsClosing());
125 NotifyOwnerCloseIfAllDone();
128 void OnReceivedWebSocketMessage(const std::string
& data
) {
132 // TODO(yzshen): It shouldn't be an issue to pass an empty message. However,
133 // WebSocket{Read,Write}Queue doesn't handle that correctly.
137 uint32_t size
= static_cast<uint32_t>(data
.size());
138 write_receive_stream_
->Write(
140 base::Bind(&WebSocketImpl::OnFinishedWritingReceiveStream
,
141 base::Unretained(this), size
));
145 // WebSocket implementation.
146 void Connect(const String
& url
,
147 Array
<String
> protocols
,
148 const String
& origin
,
149 ScopedDataPipeConsumerHandle send_stream
,
150 WebSocketClientPtr client
) override
{
154 void Send(bool fin
, MessageType type
, uint32_t num_bytes
) override
{
155 if (!fin
|| type
!= MESSAGE_TYPE_TEXT
) {
160 // TODO(yzshen): It shouldn't be an issue to pass an empty message. However,
161 // WebSocket{Read,Write}Queue doesn't handle that correctly.
165 pending_send_count_
++;
166 read_send_stream_
->Read(
167 num_bytes
, base::Bind(&WebSocketImpl::OnFinishedReadingSendStream
,
168 base::Unretained(this), num_bytes
));
171 void FlowControl(int64_t quota
) override
{ NOTIMPLEMENTED(); }
173 void Close(uint16_t code
, const String
& reason
) override
{
177 // ErrorHandler implementation.
178 void OnConnectionError() override
{ Close(); }
180 void OnFinishedReadingSendStream(uint32_t num_bytes
, const char* data
) {
181 DCHECK_GT(pending_send_count_
, 0u);
182 pending_send_count_
--;
185 connection_
->server_
->server()->SendOverWebSocket(
186 connection_
->connection_id_
, std::string(data
, num_bytes
));
190 NotifyOwnerCloseIfAllDone();
193 void OnFinishedWritingReceiveStream(uint32_t num_bytes
, const char* buffer
) {
198 client_
->DidReceiveData(true, MESSAGE_TYPE_TEXT
, num_bytes
);
201 // Checks whether Close() has been called.
202 bool IsClosing() const { return !binding_
.is_bound(); }
204 void NotifyOwnerCloseIfAllDone() {
207 if (pending_send_count_
== 0)
208 connection_
->OnWebSocketClosed();
211 HttpConnectionImpl
* const connection_
;
213 Binding
<WebSocket
> binding_
;
214 WebSocketClientPtr client_
;
216 ScopedDataPipeConsumerHandle send_stream_
;
217 scoped_ptr
<WebSocketReadQueue
> read_send_stream_
;
218 size_t pending_send_count_
;
220 ScopedDataPipeProducerHandle receive_stream_
;
221 scoped_ptr
<WebSocketWriteQueue
> write_receive_stream_
;
223 DISALLOW_COPY_AND_ASSIGN(WebSocketImpl
);
227 struct TypeConverter
<HttpRequestPtr
, net::HttpServerRequestInfo
> {
228 static HttpRequestPtr
Convert(const net::HttpServerRequestInfo
& obj
) {
229 HttpRequestPtr
request(HttpRequest::New());
230 request
->method
= obj
.method
;
231 request
->url
= obj
.path
;
232 request
->headers
.resize(obj
.headers
.size());
234 for (const auto& item
: obj
.headers
) {
235 HttpHeaderPtr
header(HttpHeader::New());
236 header
->name
= item
.first
;
237 header
->value
= item
.second
;
238 request
->headers
[index
++] = header
.Pass();
240 if (!obj
.data
.empty()) {
241 uint32_t num_bytes
= static_cast<uint32_t>(obj
.data
.size());
242 MojoCreateDataPipeOptions options
;
243 options
.struct_size
= sizeof(MojoCreateDataPipeOptions
);
244 options
.flags
= MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE
;
245 options
.element_num_bytes
= 1;
246 options
.capacity_num_bytes
= num_bytes
;
247 DataPipe
data_pipe(options
);
248 request
->body
= data_pipe
.consumer_handle
.Pass();
250 WriteDataRaw(data_pipe
.producer_handle
.get(), obj
.data
.data(),
251 &num_bytes
, MOJO_WRITE_DATA_FLAG_ALL_OR_NONE
);
252 CHECK_EQ(MOJO_RESULT_OK
, result
);
254 return request
.Pass();
258 HttpConnectionImpl::HttpConnectionImpl(int connection_id
,
259 HttpServerImpl
* server
,
260 HttpConnectionDelegatePtr delegate
,
261 HttpConnectionPtr
* connection
)
262 : connection_id_(connection_id
),
264 delegate_(delegate
.Pass()),
265 binding_(this, connection
) {
267 binding_
.set_error_handler(this);
268 delegate_
.set_error_handler(this);
271 HttpConnectionImpl::~HttpConnectionImpl() {
272 STLDeleteElements(&response_body_readers_
);
275 void HttpConnectionImpl::OnReceivedHttpRequest(
276 const net::HttpServerRequestInfo
& info
) {
280 delegate_
->OnReceivedRequest(
281 HttpRequest::From(info
), [this](HttpResponsePtr response
) {
282 if (response
->body
.is_valid()) {
283 SimpleDataPipeReader
* reader
= new SimpleDataPipeReader
;
284 response_body_readers_
.insert(reader
);
285 ScopedDataPipeConsumerHandle body
= response
->body
.Pass();
288 base::Bind(&HttpConnectionImpl::OnFinishedReadingResponseBody
,
289 base::Unretained(this), base::Passed(&response
)));
291 OnFinishedReadingResponseBody(response
.Pass(), nullptr, nullptr);
296 void HttpConnectionImpl::OnReceivedWebSocketRequest(
297 const net::HttpServerRequestInfo
& info
) {
301 delegate_
->OnReceivedWebSocketRequest(
302 HttpRequest::From(info
),
303 [this, info
](InterfaceRequest
<WebSocket
> web_socket
,
304 ScopedDataPipeConsumerHandle send_stream
,
305 WebSocketClientPtr web_socket_client
) {
306 if (!web_socket
.is_pending() || !send_stream
.is_valid() ||
307 !web_socket_client
) {
312 web_socket_
.reset(new WebSocketImpl(this, web_socket
.Pass(),
314 web_socket_client
.Pass()));
315 server_
->server()->AcceptWebSocket(connection_id_
, info
);
319 void HttpConnectionImpl::OnReceivedWebSocketMessage(const std::string
& data
) {
323 web_socket_
->OnReceivedWebSocketMessage(data
);
326 void HttpConnectionImpl::SetSendBufferSize(
328 const SetSendBufferSizeCallback
& callback
) {
329 if (size
> static_cast<uint32_t>(std::numeric_limits
<int32_t>::max()))
330 size
= std::numeric_limits
<int32_t>::max();
332 server_
->server()->SetSendBufferSize(connection_id_
,
333 static_cast<int32_t>(size
));
334 callback
.Run(MakeNetworkError(net::OK
));
337 void HttpConnectionImpl::SetReceiveBufferSize(
339 const SetReceiveBufferSizeCallback
& callback
) {
340 if (size
> static_cast<uint32_t>(std::numeric_limits
<int32_t>::max()))
341 size
= std::numeric_limits
<int32_t>::max();
343 server_
->server()->SetReceiveBufferSize(connection_id_
,
344 static_cast<int32_t>(size
));
345 callback
.Run(MakeNetworkError(net::OK
));
348 void HttpConnectionImpl::OnConnectionError() {
349 // This method is called when the proxy side of |binding_| or the impl side of
350 // |delegate_| has closed the pipe. Although it is set as error handler for
351 // both |binding_| and |delegate_|, it will only be called at most once
352 // because when called it closes/resets |binding_| and |delegate_|.
356 void HttpConnectionImpl::OnFinishedReadingResponseBody(
357 HttpResponsePtr response
,
358 SimpleDataPipeReader
* reader
,
359 scoped_ptr
<std::string
> body
) {
362 response_body_readers_
.erase(reader
);
365 net::HttpServerResponseInfo
info(
366 static_cast<net::HttpStatusCode
>(response
->status_code
));
368 std::string content_type
;
369 for (size_t i
= 0; i
< response
->headers
.size(); ++i
) {
370 const HttpHeader
& header
= *(response
->headers
[i
]);
373 // net::HttpServerResponseInfo::SetBody() automatically sets
374 // Content-Length and Content-Types, so skip the two here.
376 // TODO(yzshen): Consider adding to net::HttpServerResponseInfo a simple
377 // setter for body which doesn't fiddle with headers.
378 if (base::strcasecmp(header
.name
.data(),
379 net::HttpRequestHeaders::kContentLength
) == 0) {
381 } else if (base::strcasecmp(header
.name
.data(),
382 net::HttpRequestHeaders::kContentType
) == 0) {
383 content_type
= header
.value
;
387 info
.AddHeader(header
.name
, header
.value
);
391 info
.SetBody(*body
, content_type
);
393 server_
->server()->SendResponse(connection_id_
, info
);
396 NotifyOwnerCloseIfAllDone();
399 void HttpConnectionImpl::Close() {
400 DCHECK(!IsClosing());
406 web_socket_
->Close();
408 NotifyOwnerCloseIfAllDone();
411 void HttpConnectionImpl::NotifyOwnerCloseIfAllDone() {
414 // Don't close the connection until all pending sends are done.
415 bool should_wait
= !response_body_readers_
.empty() || web_socket_
;
417 server_
->server()->Close(connection_id_
);
420 void HttpConnectionImpl::OnWebSocketClosed() {
424 // The close operation is initiated by this object.
425 NotifyOwnerCloseIfAllDone();
427 // The close operation is initiated by |web_socket_|; start closing this