Add ICU message format support
[chromium-blink-merge.git] / mojo / services / network / http_connection_impl.cc
blob965c27e06a01ac0787d7db9fc5ae0660f2862fb7
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "mojo/services/network/http_connection_impl.h"
7 #include <limits>
9 #include "base/bind.h"
10 #include "base/bind_helpers.h"
11 #include "base/callback.h"
12 #include "base/logging.h"
13 #include "base/stl_util.h"
14 #include "base/strings/string_util.h"
15 #include "mojo/message_pump/handle_watcher.h"
16 #include "mojo/services/network/http_server_impl.h"
17 #include "mojo/services/network/net_adapters.h"
18 #include "mojo/services/network/public/cpp/web_socket_read_queue.h"
19 #include "mojo/services/network/public/cpp/web_socket_write_queue.h"
20 #include "mojo/services/network/public/interfaces/web_socket.mojom.h"
21 #include "net/base/net_errors.h"
22 #include "net/http/http_request_headers.h"
23 #include "net/http/http_status_code.h"
24 #include "net/server/http_server.h"
25 #include "net/server/http_server_request_info.h"
26 #include "net/server/http_server_response_info.h"
27 #include "third_party/mojo/src/mojo/public/cpp/bindings/type_converter.h"
28 #include "third_party/mojo/src/mojo/public/cpp/system/data_pipe.h"
30 namespace mojo {
32 // SimpleDataPipeReader reads till end-of-file, stores the data in a string and
33 // notifies completion.
34 class HttpConnectionImpl::SimpleDataPipeReader {
35 public:
36 using CompletionCallback =
37 base::Callback<void(SimpleDataPipeReader*, scoped_ptr<std::string>)>;
39 SimpleDataPipeReader() {}
40 ~SimpleDataPipeReader() {}
42 void Start(ScopedDataPipeConsumerHandle consumer,
43 const CompletionCallback& completion_callback) {
44 DCHECK(consumer.is_valid() && !consumer_.is_valid());
45 consumer_ = consumer.Pass();
46 completion_callback_ = completion_callback;
47 buffer_.reset(new std::string);
48 ReadMore();
51 private:
52 void ReadMore() {
53 const void* buf;
54 uint32_t buf_size;
55 MojoResult rv = BeginReadDataRaw(consumer_.get(), &buf, &buf_size,
56 MOJO_READ_DATA_FLAG_NONE);
57 if (rv == MOJO_RESULT_OK) {
58 buffer_->append(static_cast<const char*>(buf), buf_size);
59 EndReadDataRaw(consumer_.get(), buf_size);
60 WaitToReadMore();
61 } else if (rv == MOJO_RESULT_SHOULD_WAIT) {
62 WaitToReadMore();
63 } else if (rv == MOJO_RESULT_FAILED_PRECONDITION) {
64 // We reached end-of-file.
65 completion_callback_.Run(this, buffer_.Pass());
66 // Note: This object may have been destroyed in the callback.
67 } else {
68 CHECK(false);
72 void WaitToReadMore() {
73 watcher_.Start(consumer_.get(), MOJO_HANDLE_SIGNAL_READABLE,
74 MOJO_DEADLINE_INDEFINITE,
75 base::Bind(&SimpleDataPipeReader::OnHandleReady,
76 base::Unretained(this)));
79 void OnHandleReady(MojoResult result) { ReadMore(); }
81 ScopedDataPipeConsumerHandle consumer_;
82 common::HandleWatcher watcher_;
83 CompletionCallback completion_callback_;
84 scoped_ptr<std::string> buffer_;
86 DISALLOW_COPY_AND_ASSIGN(SimpleDataPipeReader);
89 class HttpConnectionImpl::WebSocketImpl : public WebSocket {
90 public:
91 // |connection| must outlive this object.
92 WebSocketImpl(HttpConnectionImpl* connection,
93 InterfaceRequest<WebSocket> request,
94 ScopedDataPipeConsumerHandle send_stream,
95 WebSocketClientPtr client)
96 : connection_(connection),
97 binding_(this, request.Pass()),
98 client_(client.Pass()),
99 send_stream_(send_stream.Pass()),
100 read_send_stream_(new WebSocketReadQueue(send_stream_.get())),
101 pending_send_count_(0) {
102 DCHECK(binding_.is_bound());
103 DCHECK(client_);
104 DCHECK(send_stream_.is_valid());
106 binding_.set_connection_error_handler([this]() { Close(); });
107 client_.set_connection_error_handler([this]() { Close(); });
109 DataPipe data_pipe;
110 receive_stream_ = data_pipe.producer_handle.Pass();
111 write_receive_stream_.reset(new WebSocketWriteQueue(receive_stream_.get()));
113 client_->DidConnect("", "", data_pipe.consumer_handle.Pass());
116 ~WebSocketImpl() override {}
118 void Close() {
119 DCHECK(!IsClosing());
121 binding_.Close();
122 client_.reset();
124 NotifyOwnerCloseIfAllDone();
127 void OnReceivedWebSocketMessage(const std::string& data) {
128 if (IsClosing())
129 return;
131 // TODO(yzshen): It shouldn't be an issue to pass an empty message. However,
132 // WebSocket{Read,Write}Queue doesn't handle that correctly.
133 if (data.empty())
134 return;
136 uint32_t size = static_cast<uint32_t>(data.size());
137 write_receive_stream_->Write(
138 &data[0], size,
139 base::Bind(&WebSocketImpl::OnFinishedWritingReceiveStream,
140 base::Unretained(this), size));
143 private:
144 // WebSocket implementation.
145 void Connect(const String& url,
146 Array<String> protocols,
147 const String& origin,
148 ScopedDataPipeConsumerHandle send_stream,
149 WebSocketClientPtr client) override {
150 NOTREACHED();
153 void Send(bool fin, MessageType type, uint32_t num_bytes) override {
154 if (!fin || type != MESSAGE_TYPE_TEXT) {
155 NOTIMPLEMENTED();
156 Close();
159 // TODO(yzshen): It shouldn't be an issue to pass an empty message. However,
160 // WebSocket{Read,Write}Queue doesn't handle that correctly.
161 if (num_bytes == 0)
162 return;
164 pending_send_count_++;
165 read_send_stream_->Read(
166 num_bytes, base::Bind(&WebSocketImpl::OnFinishedReadingSendStream,
167 base::Unretained(this), num_bytes));
170 void FlowControl(int64_t quota) override { NOTIMPLEMENTED(); }
172 void Close(uint16_t code, const String& reason) override {
173 Close();
176 void OnFinishedReadingSendStream(uint32_t num_bytes, const char* data) {
177 DCHECK_GT(pending_send_count_, 0u);
178 pending_send_count_--;
180 if (data) {
181 connection_->server_->server()->SendOverWebSocket(
182 connection_->connection_id_, std::string(data, num_bytes));
185 if (IsClosing())
186 NotifyOwnerCloseIfAllDone();
189 void OnFinishedWritingReceiveStream(uint32_t num_bytes, const char* buffer) {
190 if (IsClosing())
191 return;
193 if (buffer)
194 client_->DidReceiveData(true, MESSAGE_TYPE_TEXT, num_bytes);
197 // Checks whether Close() has been called.
198 bool IsClosing() const { return !binding_.is_bound(); }
200 void NotifyOwnerCloseIfAllDone() {
201 DCHECK(IsClosing());
203 if (pending_send_count_ == 0)
204 connection_->OnWebSocketClosed();
207 HttpConnectionImpl* const connection_;
209 Binding<WebSocket> binding_;
210 WebSocketClientPtr client_;
212 ScopedDataPipeConsumerHandle send_stream_;
213 scoped_ptr<WebSocketReadQueue> read_send_stream_;
214 size_t pending_send_count_;
216 ScopedDataPipeProducerHandle receive_stream_;
217 scoped_ptr<WebSocketWriteQueue> write_receive_stream_;
219 DISALLOW_COPY_AND_ASSIGN(WebSocketImpl);
222 template <>
223 struct TypeConverter<HttpRequestPtr, net::HttpServerRequestInfo> {
224 static HttpRequestPtr Convert(const net::HttpServerRequestInfo& obj) {
225 HttpRequestPtr request(HttpRequest::New());
226 request->method = obj.method;
227 request->url = obj.path;
228 request->headers.resize(obj.headers.size());
229 size_t index = 0;
230 for (const auto& item : obj.headers) {
231 HttpHeaderPtr header(HttpHeader::New());
232 header->name = item.first;
233 header->value = item.second;
234 request->headers[index++] = header.Pass();
236 if (!obj.data.empty()) {
237 uint32_t num_bytes = static_cast<uint32_t>(obj.data.size());
238 MojoCreateDataPipeOptions options;
239 options.struct_size = sizeof(MojoCreateDataPipeOptions);
240 options.flags = MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE;
241 options.element_num_bytes = 1;
242 options.capacity_num_bytes = num_bytes;
243 DataPipe data_pipe(options);
244 request->body = data_pipe.consumer_handle.Pass();
245 MojoResult result =
246 WriteDataRaw(data_pipe.producer_handle.get(), obj.data.data(),
247 &num_bytes, MOJO_WRITE_DATA_FLAG_ALL_OR_NONE);
248 CHECK_EQ(MOJO_RESULT_OK, result);
250 return request.Pass();
254 HttpConnectionImpl::HttpConnectionImpl(int connection_id,
255 HttpServerImpl* server,
256 HttpConnectionDelegatePtr delegate,
257 HttpConnectionPtr* connection)
258 : connection_id_(connection_id),
259 server_(server),
260 delegate_(delegate.Pass()),
261 binding_(this, connection) {
262 DCHECK(delegate_);
263 binding_.set_connection_error_handler([this]() { Close(); });
264 delegate_.set_connection_error_handler([this]() { Close(); });
267 HttpConnectionImpl::~HttpConnectionImpl() {
268 STLDeleteElements(&response_body_readers_);
271 void HttpConnectionImpl::OnReceivedHttpRequest(
272 const net::HttpServerRequestInfo& info) {
273 if (IsClosing())
274 return;
276 delegate_->OnReceivedRequest(
277 HttpRequest::From(info), [this](HttpResponsePtr response) {
278 if (response->body.is_valid()) {
279 SimpleDataPipeReader* reader = new SimpleDataPipeReader;
280 response_body_readers_.insert(reader);
281 ScopedDataPipeConsumerHandle body = response->body.Pass();
282 reader->Start(
283 body.Pass(),
284 base::Bind(&HttpConnectionImpl::OnFinishedReadingResponseBody,
285 base::Unretained(this), base::Passed(&response)));
286 } else {
287 OnFinishedReadingResponseBody(response.Pass(), nullptr, nullptr);
292 void HttpConnectionImpl::OnReceivedWebSocketRequest(
293 const net::HttpServerRequestInfo& info) {
294 if (IsClosing())
295 return;
297 delegate_->OnReceivedWebSocketRequest(
298 HttpRequest::From(info),
299 [this, info](InterfaceRequest<WebSocket> web_socket,
300 ScopedDataPipeConsumerHandle send_stream,
301 WebSocketClientPtr web_socket_client) {
302 if (!web_socket.is_pending() || !send_stream.is_valid() ||
303 !web_socket_client) {
304 Close();
305 return;
308 web_socket_.reset(new WebSocketImpl(this, web_socket.Pass(),
309 send_stream.Pass(),
310 web_socket_client.Pass()));
311 server_->server()->AcceptWebSocket(connection_id_, info);
315 void HttpConnectionImpl::OnReceivedWebSocketMessage(const std::string& data) {
316 if (IsClosing())
317 return;
319 web_socket_->OnReceivedWebSocketMessage(data);
322 void HttpConnectionImpl::SetSendBufferSize(
323 uint32_t size,
324 const SetSendBufferSizeCallback& callback) {
325 if (size > static_cast<uint32_t>(std::numeric_limits<int32_t>::max()))
326 size = std::numeric_limits<int32_t>::max();
328 server_->server()->SetSendBufferSize(connection_id_,
329 static_cast<int32_t>(size));
330 callback.Run(MakeNetworkError(net::OK));
333 void HttpConnectionImpl::SetReceiveBufferSize(
334 uint32_t size,
335 const SetReceiveBufferSizeCallback& callback) {
336 if (size > static_cast<uint32_t>(std::numeric_limits<int32_t>::max()))
337 size = std::numeric_limits<int32_t>::max();
339 server_->server()->SetReceiveBufferSize(connection_id_,
340 static_cast<int32_t>(size));
341 callback.Run(MakeNetworkError(net::OK));
344 void HttpConnectionImpl::OnFinishedReadingResponseBody(
345 HttpResponsePtr response,
346 SimpleDataPipeReader* reader,
347 scoped_ptr<std::string> body) {
348 if (reader) {
349 delete reader;
350 response_body_readers_.erase(reader);
353 net::HttpServerResponseInfo info(
354 static_cast<net::HttpStatusCode>(response->status_code));
356 std::string content_type;
357 for (size_t i = 0; i < response->headers.size(); ++i) {
358 const HttpHeader& header = *(response->headers[i]);
360 if (body) {
361 // net::HttpServerResponseInfo::SetBody() automatically sets
362 // Content-Length and Content-Types, so skip the two here.
364 // TODO(yzshen): Consider adding to net::HttpServerResponseInfo a simple
365 // setter for body which doesn't fiddle with headers.
366 base::StringPiece name_piece(header.name.data(), header.name.size());
367 if (base::EqualsCaseInsensitiveASCII(
368 name_piece, net::HttpRequestHeaders::kContentLength)) {
369 continue;
370 } else if (base::EqualsCaseInsensitiveASCII(
371 name_piece, net::HttpRequestHeaders::kContentType)) {
372 content_type = header.value;
373 continue;
376 info.AddHeader(header.name, header.value);
379 if (body)
380 info.SetBody(*body, content_type);
382 server_->server()->SendResponse(connection_id_, info);
384 if (IsClosing())
385 NotifyOwnerCloseIfAllDone();
388 void HttpConnectionImpl::Close() {
389 DCHECK(!IsClosing());
391 binding_.Close();
392 delegate_.reset();
394 if (web_socket_)
395 web_socket_->Close();
397 NotifyOwnerCloseIfAllDone();
400 void HttpConnectionImpl::NotifyOwnerCloseIfAllDone() {
401 DCHECK(IsClosing());
403 // Don't close the connection until all pending sends are done.
404 bool should_wait = !response_body_readers_.empty() || web_socket_;
405 if (!should_wait)
406 server_->server()->Close(connection_id_);
409 void HttpConnectionImpl::OnWebSocketClosed() {
410 web_socket_.reset();
412 if (IsClosing()) {
413 // The close operation is initiated by this object.
414 NotifyOwnerCloseIfAllDone();
415 } else {
416 // The close operation is initiated by |web_socket_|; start closing this
417 // object.
418 Close();
422 } // namespace mojo