Roll src/third_party/WebKit d9c6159:8139f33 (svn 201974:201975)
[chromium-blink-merge.git] / net / socket / websocket_transport_client_socket_pool.cc
blob4a7da5ca12f85139a3d839d8909c461313c53883
1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "net/socket/websocket_transport_client_socket_pool.h"
7 #include <algorithm>
9 #include "base/compiler_specific.h"
10 #include "base/location.h"
11 #include "base/logging.h"
12 #include "base/numerics/safe_conversions.h"
13 #include "base/single_thread_task_runner.h"
14 #include "base/strings/string_util.h"
15 #include "base/thread_task_runner_handle.h"
16 #include "base/time/time.h"
17 #include "base/values.h"
18 #include "net/base/net_errors.h"
19 #include "net/log/net_log.h"
20 #include "net/socket/client_socket_handle.h"
21 #include "net/socket/client_socket_pool_base.h"
22 #include "net/socket/websocket_endpoint_lock_manager.h"
23 #include "net/socket/websocket_transport_connect_sub_job.h"
25 namespace net {
27 namespace {
29 using base::TimeDelta;
31 // TODO(ricea): For now, we implement a global timeout for compatability with
32 // TransportConnectJob. Since WebSocketTransportConnectJob controls the address
33 // selection process more tightly, it could do something smarter here.
34 const int kTransportConnectJobTimeoutInSeconds = 240; // 4 minutes.
36 } // namespace
38 WebSocketTransportConnectJob::WebSocketTransportConnectJob(
39 const std::string& group_name,
40 RequestPriority priority,
41 const scoped_refptr<TransportSocketParams>& params,
42 TimeDelta timeout_duration,
43 const CompletionCallback& callback,
44 ClientSocketFactory* client_socket_factory,
45 HostResolver* host_resolver,
46 ClientSocketHandle* handle,
47 Delegate* delegate,
48 NetLog* pool_net_log,
49 const BoundNetLog& request_net_log)
50 : ConnectJob(group_name,
51 timeout_duration,
52 priority,
53 delegate,
54 BoundNetLog::Make(pool_net_log, NetLog::SOURCE_CONNECT_JOB)),
55 helper_(params, client_socket_factory, host_resolver, &connect_timing_),
56 race_result_(TransportConnectJobHelper::CONNECTION_LATENCY_UNKNOWN),
57 handle_(handle),
58 callback_(callback),
59 request_net_log_(request_net_log),
60 had_ipv4_(false),
61 had_ipv6_(false) {
62 helper_.SetOnIOComplete(this);
65 WebSocketTransportConnectJob::~WebSocketTransportConnectJob() {}
67 LoadState WebSocketTransportConnectJob::GetLoadState() const {
68 LoadState load_state = LOAD_STATE_RESOLVING_HOST;
69 if (ipv6_job_)
70 load_state = ipv6_job_->GetLoadState();
71 // This method should return LOAD_STATE_CONNECTING in preference to
72 // LOAD_STATE_WAITING_FOR_AVAILABLE_SOCKET when possible because "waiting for
73 // available socket" implies that nothing is happening.
74 if (ipv4_job_ && load_state != LOAD_STATE_CONNECTING)
75 load_state = ipv4_job_->GetLoadState();
76 return load_state;
79 int WebSocketTransportConnectJob::DoResolveHost() {
80 return helper_.DoResolveHost(priority(), net_log());
83 int WebSocketTransportConnectJob::DoResolveHostComplete(int result) {
84 return helper_.DoResolveHostComplete(result, net_log());
87 int WebSocketTransportConnectJob::DoTransportConnect() {
88 AddressList ipv4_addresses;
89 AddressList ipv6_addresses;
90 int result = ERR_UNEXPECTED;
91 helper_.set_next_state(
92 TransportConnectJobHelper::STATE_TRANSPORT_CONNECT_COMPLETE);
94 for (AddressList::const_iterator it = helper_.addresses().begin();
95 it != helper_.addresses().end();
96 ++it) {
97 switch (it->GetFamily()) {
98 case ADDRESS_FAMILY_IPV4:
99 ipv4_addresses.push_back(*it);
100 break;
102 case ADDRESS_FAMILY_IPV6:
103 ipv6_addresses.push_back(*it);
104 break;
106 default:
107 DVLOG(1) << "Unexpected ADDRESS_FAMILY: " << it->GetFamily();
108 break;
112 if (!ipv4_addresses.empty()) {
113 had_ipv4_ = true;
114 ipv4_job_.reset(new WebSocketTransportConnectSubJob(
115 ipv4_addresses, this, SUB_JOB_IPV4));
118 if (!ipv6_addresses.empty()) {
119 had_ipv6_ = true;
120 ipv6_job_.reset(new WebSocketTransportConnectSubJob(
121 ipv6_addresses, this, SUB_JOB_IPV6));
122 result = ipv6_job_->Start();
123 switch (result) {
124 case OK:
125 SetSocket(ipv6_job_->PassSocket());
126 race_result_ =
127 had_ipv4_
128 ? TransportConnectJobHelper::CONNECTION_LATENCY_IPV6_RACEABLE
129 : TransportConnectJobHelper::CONNECTION_LATENCY_IPV6_SOLO;
130 return result;
132 case ERR_IO_PENDING:
133 if (ipv4_job_) {
134 // This use of base::Unretained is safe because |fallback_timer_| is
135 // owned by this object.
136 fallback_timer_.Start(
137 FROM_HERE,
138 TimeDelta::FromMilliseconds(
139 TransportConnectJobHelper::kIPv6FallbackTimerInMs),
140 base::Bind(&WebSocketTransportConnectJob::StartIPv4JobAsync,
141 base::Unretained(this)));
143 return result;
145 default:
146 ipv6_job_.reset();
150 DCHECK(!ipv6_job_);
151 if (ipv4_job_) {
152 result = ipv4_job_->Start();
153 if (result == OK) {
154 SetSocket(ipv4_job_->PassSocket());
155 race_result_ =
156 had_ipv6_
157 ? TransportConnectJobHelper::CONNECTION_LATENCY_IPV4_WINS_RACE
158 : TransportConnectJobHelper::CONNECTION_LATENCY_IPV4_NO_RACE;
162 return result;
165 int WebSocketTransportConnectJob::DoTransportConnectComplete(int result) {
166 if (result == OK)
167 helper_.HistogramDuration(race_result_);
168 return result;
171 void WebSocketTransportConnectJob::OnSubJobComplete(
172 int result,
173 WebSocketTransportConnectSubJob* job) {
174 if (result == OK) {
175 switch (job->type()) {
176 case SUB_JOB_IPV4:
177 race_result_ =
178 had_ipv6_
179 ? TransportConnectJobHelper::CONNECTION_LATENCY_IPV4_WINS_RACE
180 : TransportConnectJobHelper::CONNECTION_LATENCY_IPV4_NO_RACE;
181 break;
183 case SUB_JOB_IPV6:
184 race_result_ =
185 had_ipv4_
186 ? TransportConnectJobHelper::CONNECTION_LATENCY_IPV6_RACEABLE
187 : TransportConnectJobHelper::CONNECTION_LATENCY_IPV6_SOLO;
188 break;
190 SetSocket(job->PassSocket());
192 // Make sure all connections are cancelled even if this object fails to be
193 // deleted.
194 ipv4_job_.reset();
195 ipv6_job_.reset();
196 } else {
197 switch (job->type()) {
198 case SUB_JOB_IPV4:
199 ipv4_job_.reset();
200 break;
202 case SUB_JOB_IPV6:
203 ipv6_job_.reset();
204 if (ipv4_job_ && !ipv4_job_->started()) {
205 fallback_timer_.Stop();
206 result = ipv4_job_->Start();
207 if (result != ERR_IO_PENDING) {
208 OnSubJobComplete(result, ipv4_job_.get());
209 return;
212 break;
214 if (ipv4_job_ || ipv6_job_)
215 return;
217 helper_.OnIOComplete(this, result);
220 void WebSocketTransportConnectJob::StartIPv4JobAsync() {
221 DCHECK(ipv4_job_);
222 int result = ipv4_job_->Start();
223 if (result != ERR_IO_PENDING)
224 OnSubJobComplete(result, ipv4_job_.get());
227 int WebSocketTransportConnectJob::ConnectInternal() {
228 return helper_.DoConnectInternal(this);
231 WebSocketTransportClientSocketPool::WebSocketTransportClientSocketPool(
232 int max_sockets,
233 int max_sockets_per_group,
234 HostResolver* host_resolver,
235 ClientSocketFactory* client_socket_factory,
236 NetLog* net_log)
237 : TransportClientSocketPool(max_sockets,
238 max_sockets_per_group,
239 host_resolver,
240 client_socket_factory,
241 net_log),
242 connect_job_delegate_(this),
243 pool_net_log_(net_log),
244 client_socket_factory_(client_socket_factory),
245 host_resolver_(host_resolver),
246 max_sockets_(max_sockets),
247 handed_out_socket_count_(0),
248 flushing_(false),
249 weak_factory_(this) {}
251 WebSocketTransportClientSocketPool::~WebSocketTransportClientSocketPool() {
252 // Clean up any pending connect jobs.
253 FlushWithError(ERR_ABORTED);
254 DCHECK(pending_connects_.empty());
255 DCHECK_EQ(0, handed_out_socket_count_);
256 DCHECK(stalled_request_queue_.empty());
257 DCHECK(stalled_request_map_.empty());
260 // static
261 void WebSocketTransportClientSocketPool::UnlockEndpoint(
262 ClientSocketHandle* handle) {
263 DCHECK(handle->is_initialized());
264 DCHECK(handle->socket());
265 IPEndPoint address;
266 if (handle->socket()->GetPeerAddress(&address) == OK)
267 WebSocketEndpointLockManager::GetInstance()->UnlockEndpoint(address);
270 int WebSocketTransportClientSocketPool::RequestSocket(
271 const std::string& group_name,
272 const void* params,
273 RequestPriority priority,
274 ClientSocketHandle* handle,
275 const CompletionCallback& callback,
276 const BoundNetLog& request_net_log) {
277 DCHECK(params);
278 const scoped_refptr<TransportSocketParams>& casted_params =
279 *static_cast<const scoped_refptr<TransportSocketParams>*>(params);
281 NetLogTcpClientSocketPoolRequestedSocket(request_net_log, &casted_params);
283 CHECK(!callback.is_null());
284 CHECK(handle);
286 request_net_log.BeginEvent(NetLog::TYPE_SOCKET_POOL);
288 if (ReachedMaxSocketsLimit() && !casted_params->ignore_limits()) {
289 request_net_log.AddEvent(NetLog::TYPE_SOCKET_POOL_STALLED_MAX_SOCKETS);
290 // TODO(ricea): Use emplace_back when C++11 becomes allowed.
291 StalledRequest request(
292 casted_params, priority, handle, callback, request_net_log);
293 stalled_request_queue_.push_back(request);
294 StalledRequestQueue::iterator iterator = stalled_request_queue_.end();
295 --iterator;
296 DCHECK_EQ(handle, iterator->handle);
297 // Because StalledRequestQueue is a std::list, its iterators are guaranteed
298 // to remain valid as long as the elements are not removed. As long as
299 // stalled_request_queue_ and stalled_request_map_ are updated in sync, it
300 // is safe to dereference an iterator in stalled_request_map_ to find the
301 // corresponding list element.
302 stalled_request_map_.insert(
303 StalledRequestMap::value_type(handle, iterator));
304 return ERR_IO_PENDING;
307 scoped_ptr<WebSocketTransportConnectJob> connect_job(
308 new WebSocketTransportConnectJob(group_name,
309 priority,
310 casted_params,
311 ConnectionTimeout(),
312 callback,
313 client_socket_factory_,
314 host_resolver_,
315 handle,
316 &connect_job_delegate_,
317 pool_net_log_,
318 request_net_log));
320 int rv = connect_job->Connect();
321 // Regardless of the outcome of |connect_job|, it will always be bound to
322 // |handle|, since this pool uses early-binding. So the binding is logged
323 // here, without waiting for the result.
324 request_net_log.AddEvent(
325 NetLog::TYPE_SOCKET_POOL_BOUND_TO_CONNECT_JOB,
326 connect_job->net_log().source().ToEventParametersCallback());
327 if (rv == OK) {
328 HandOutSocket(connect_job->PassSocket(),
329 connect_job->connect_timing(),
330 handle,
331 request_net_log);
332 request_net_log.EndEvent(NetLog::TYPE_SOCKET_POOL);
333 } else if (rv == ERR_IO_PENDING) {
334 // TODO(ricea): Implement backup job timer?
335 AddJob(handle, connect_job.Pass());
336 } else {
337 scoped_ptr<StreamSocket> error_socket;
338 connect_job->GetAdditionalErrorState(handle);
339 error_socket = connect_job->PassSocket();
340 if (error_socket) {
341 HandOutSocket(error_socket.Pass(),
342 connect_job->connect_timing(),
343 handle,
344 request_net_log);
348 if (rv != ERR_IO_PENDING) {
349 request_net_log.EndEventWithNetErrorCode(NetLog::TYPE_SOCKET_POOL, rv);
352 return rv;
355 void WebSocketTransportClientSocketPool::RequestSockets(
356 const std::string& group_name,
357 const void* params,
358 int num_sockets,
359 const BoundNetLog& net_log) {
360 NOTIMPLEMENTED();
363 void WebSocketTransportClientSocketPool::CancelRequest(
364 const std::string& group_name,
365 ClientSocketHandle* handle) {
366 DCHECK(!handle->is_initialized());
367 if (DeleteStalledRequest(handle))
368 return;
369 scoped_ptr<StreamSocket> socket = handle->PassSocket();
370 if (socket)
371 ReleaseSocket(handle->group_name(), socket.Pass(), handle->id());
372 if (!DeleteJob(handle))
373 pending_callbacks_.erase(handle);
374 if (!ReachedMaxSocketsLimit() && !stalled_request_queue_.empty())
375 ActivateStalledRequest();
378 void WebSocketTransportClientSocketPool::ReleaseSocket(
379 const std::string& group_name,
380 scoped_ptr<StreamSocket> socket,
381 int id) {
382 WebSocketEndpointLockManager::GetInstance()->UnlockSocket(socket.get());
383 CHECK_GT(handed_out_socket_count_, 0);
384 --handed_out_socket_count_;
385 if (!ReachedMaxSocketsLimit() && !stalled_request_queue_.empty())
386 ActivateStalledRequest();
389 void WebSocketTransportClientSocketPool::FlushWithError(int error) {
390 // Sockets which are in LOAD_STATE_CONNECTING are in danger of unlocking
391 // sockets waiting for the endpoint lock. If they connected synchronously,
392 // then OnConnectJobComplete(). The |flushing_| flag tells this object to
393 // ignore spurious calls to OnConnectJobComplete(). It is safe to ignore those
394 // calls because this method will delete the jobs and call their callbacks
395 // anyway.
396 flushing_ = true;
397 for (PendingConnectsMap::iterator it = pending_connects_.begin();
398 it != pending_connects_.end();
399 ++it) {
400 InvokeUserCallbackLater(
401 it->second->handle(), it->second->callback(), error);
402 delete it->second, it->second = NULL;
404 pending_connects_.clear();
405 for (StalledRequestQueue::iterator it = stalled_request_queue_.begin();
406 it != stalled_request_queue_.end();
407 ++it) {
408 InvokeUserCallbackLater(it->handle, it->callback, error);
410 stalled_request_map_.clear();
411 stalled_request_queue_.clear();
412 flushing_ = false;
415 void WebSocketTransportClientSocketPool::CloseIdleSockets() {
416 // We have no idle sockets.
419 int WebSocketTransportClientSocketPool::IdleSocketCount() const {
420 return 0;
423 int WebSocketTransportClientSocketPool::IdleSocketCountInGroup(
424 const std::string& group_name) const {
425 return 0;
428 LoadState WebSocketTransportClientSocketPool::GetLoadState(
429 const std::string& group_name,
430 const ClientSocketHandle* handle) const {
431 if (stalled_request_map_.find(handle) != stalled_request_map_.end())
432 return LOAD_STATE_WAITING_FOR_AVAILABLE_SOCKET;
433 if (pending_callbacks_.count(handle))
434 return LOAD_STATE_CONNECTING;
435 return LookupConnectJob(handle)->GetLoadState();
438 scoped_ptr<base::DictionaryValue>
439 WebSocketTransportClientSocketPool::GetInfoAsValue(
440 const std::string& name,
441 const std::string& type,
442 bool include_nested_pools) const {
443 scoped_ptr<base::DictionaryValue> dict(new base::DictionaryValue());
444 dict->SetString("name", name);
445 dict->SetString("type", type);
446 dict->SetInteger("handed_out_socket_count", handed_out_socket_count_);
447 dict->SetInteger("connecting_socket_count", pending_connects_.size());
448 dict->SetInteger("idle_socket_count", 0);
449 dict->SetInteger("max_socket_count", max_sockets_);
450 dict->SetInteger("max_sockets_per_group", max_sockets_);
451 dict->SetInteger("pool_generation_number", 0);
452 return dict.Pass();
455 TimeDelta WebSocketTransportClientSocketPool::ConnectionTimeout() const {
456 return TimeDelta::FromSeconds(kTransportConnectJobTimeoutInSeconds);
459 bool WebSocketTransportClientSocketPool::IsStalled() const {
460 return !stalled_request_queue_.empty();
463 void WebSocketTransportClientSocketPool::OnConnectJobComplete(
464 int result,
465 WebSocketTransportConnectJob* job) {
466 DCHECK_NE(ERR_IO_PENDING, result);
468 scoped_ptr<StreamSocket> socket = job->PassSocket();
470 // See comment in FlushWithError.
471 if (flushing_) {
472 WebSocketEndpointLockManager::GetInstance()->UnlockSocket(socket.get());
473 return;
476 BoundNetLog request_net_log = job->request_net_log();
477 CompletionCallback callback = job->callback();
478 LoadTimingInfo::ConnectTiming connect_timing = job->connect_timing();
480 ClientSocketHandle* const handle = job->handle();
481 bool handed_out_socket = false;
483 if (result == OK) {
484 DCHECK(socket.get());
485 handed_out_socket = true;
486 HandOutSocket(socket.Pass(), connect_timing, handle, request_net_log);
487 request_net_log.EndEvent(NetLog::TYPE_SOCKET_POOL);
488 } else {
489 // If we got a socket, it must contain error information so pass that
490 // up so that the caller can retrieve it.
491 job->GetAdditionalErrorState(handle);
492 if (socket.get()) {
493 handed_out_socket = true;
494 HandOutSocket(socket.Pass(), connect_timing, handle, request_net_log);
496 request_net_log.EndEventWithNetErrorCode(NetLog::TYPE_SOCKET_POOL, result);
498 bool delete_succeeded = DeleteJob(handle);
499 DCHECK(delete_succeeded);
500 if (!handed_out_socket && !stalled_request_queue_.empty() &&
501 !ReachedMaxSocketsLimit())
502 ActivateStalledRequest();
503 InvokeUserCallbackLater(handle, callback, result);
506 void WebSocketTransportClientSocketPool::InvokeUserCallbackLater(
507 ClientSocketHandle* handle,
508 const CompletionCallback& callback,
509 int rv) {
510 DCHECK(!pending_callbacks_.count(handle));
511 pending_callbacks_.insert(handle);
512 base::ThreadTaskRunnerHandle::Get()->PostTask(
513 FROM_HERE,
514 base::Bind(&WebSocketTransportClientSocketPool::InvokeUserCallback,
515 weak_factory_.GetWeakPtr(), handle, callback, rv));
518 void WebSocketTransportClientSocketPool::InvokeUserCallback(
519 ClientSocketHandle* handle,
520 const CompletionCallback& callback,
521 int rv) {
522 if (pending_callbacks_.erase(handle))
523 callback.Run(rv);
526 bool WebSocketTransportClientSocketPool::ReachedMaxSocketsLimit() const {
527 return handed_out_socket_count_ >= max_sockets_ ||
528 base::checked_cast<int>(pending_connects_.size()) >=
529 max_sockets_ - handed_out_socket_count_;
532 void WebSocketTransportClientSocketPool::HandOutSocket(
533 scoped_ptr<StreamSocket> socket,
534 const LoadTimingInfo::ConnectTiming& connect_timing,
535 ClientSocketHandle* handle,
536 const BoundNetLog& net_log) {
537 DCHECK(socket);
538 handle->SetSocket(socket.Pass());
539 DCHECK_EQ(ClientSocketHandle::UNUSED, handle->reuse_type());
540 DCHECK_EQ(0, handle->idle_time().InMicroseconds());
541 handle->set_pool_id(0);
542 handle->set_connect_timing(connect_timing);
544 net_log.AddEvent(
545 NetLog::TYPE_SOCKET_POOL_BOUND_TO_SOCKET,
546 handle->socket()->NetLog().source().ToEventParametersCallback());
548 ++handed_out_socket_count_;
551 void WebSocketTransportClientSocketPool::AddJob(
552 ClientSocketHandle* handle,
553 scoped_ptr<WebSocketTransportConnectJob> connect_job) {
554 bool inserted =
555 pending_connects_.insert(PendingConnectsMap::value_type(
556 handle, connect_job.release())).second;
557 DCHECK(inserted);
560 bool WebSocketTransportClientSocketPool::DeleteJob(ClientSocketHandle* handle) {
561 PendingConnectsMap::iterator it = pending_connects_.find(handle);
562 if (it == pending_connects_.end())
563 return false;
564 // Deleting a ConnectJob which holds an endpoint lock can lead to a different
565 // ConnectJob proceeding to connect. If the connect proceeds synchronously
566 // (usually because of a failure) then it can trigger that job to be
567 // deleted. |it| remains valid because std::map guarantees that erase() does
568 // not invalid iterators to other entries.
569 delete it->second, it->second = NULL;
570 DCHECK(pending_connects_.find(handle) == it);
571 pending_connects_.erase(it);
572 return true;
575 const WebSocketTransportConnectJob*
576 WebSocketTransportClientSocketPool::LookupConnectJob(
577 const ClientSocketHandle* handle) const {
578 PendingConnectsMap::const_iterator it = pending_connects_.find(handle);
579 CHECK(it != pending_connects_.end());
580 return it->second;
583 void WebSocketTransportClientSocketPool::ActivateStalledRequest() {
584 DCHECK(!stalled_request_queue_.empty());
585 DCHECK(!ReachedMaxSocketsLimit());
586 // Usually we will only be able to activate one stalled request at a time,
587 // however if all the connects fail synchronously for some reason, we may be
588 // able to clear the whole queue at once.
589 while (!stalled_request_queue_.empty() && !ReachedMaxSocketsLimit()) {
590 StalledRequest request(stalled_request_queue_.front());
591 stalled_request_queue_.pop_front();
592 stalled_request_map_.erase(request.handle);
593 int rv = RequestSocket("ignored",
594 &request.params,
595 request.priority,
596 request.handle,
597 request.callback,
598 request.net_log);
599 // ActivateStalledRequest() never returns synchronously, so it is never
600 // called re-entrantly.
601 if (rv != ERR_IO_PENDING)
602 InvokeUserCallbackLater(request.handle, request.callback, rv);
606 bool WebSocketTransportClientSocketPool::DeleteStalledRequest(
607 ClientSocketHandle* handle) {
608 StalledRequestMap::iterator it = stalled_request_map_.find(handle);
609 if (it == stalled_request_map_.end())
610 return false;
611 stalled_request_queue_.erase(it->second);
612 stalled_request_map_.erase(it);
613 return true;
616 WebSocketTransportClientSocketPool::ConnectJobDelegate::ConnectJobDelegate(
617 WebSocketTransportClientSocketPool* owner)
618 : owner_(owner) {}
620 WebSocketTransportClientSocketPool::ConnectJobDelegate::~ConnectJobDelegate() {}
622 void
623 WebSocketTransportClientSocketPool::ConnectJobDelegate::OnConnectJobComplete(
624 int result,
625 ConnectJob* job) {
626 owner_->OnConnectJobComplete(result,
627 static_cast<WebSocketTransportConnectJob*>(job));
630 WebSocketTransportClientSocketPool::StalledRequest::StalledRequest(
631 const scoped_refptr<TransportSocketParams>& params,
632 RequestPriority priority,
633 ClientSocketHandle* handle,
634 const CompletionCallback& callback,
635 const BoundNetLog& net_log)
636 : params(params),
637 priority(priority),
638 handle(handle),
639 callback(callback),
640 net_log(net_log) {}
642 WebSocketTransportClientSocketPool::StalledRequest::~StalledRequest() {}
644 } // namespace net