1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "net/socket/websocket_transport_client_socket_pool.h"
9 #include "base/compiler_specific.h"
10 #include "base/logging.h"
11 #include "base/numerics/safe_conversions.h"
12 #include "base/profiler/scoped_tracker.h"
13 #include "base/strings/string_util.h"
14 #include "base/time/time.h"
15 #include "base/values.h"
16 #include "net/base/net_errors.h"
17 #include "net/base/net_log.h"
18 #include "net/socket/client_socket_handle.h"
19 #include "net/socket/client_socket_pool_base.h"
20 #include "net/socket/websocket_endpoint_lock_manager.h"
21 #include "net/socket/websocket_transport_connect_sub_job.h"
27 using base::TimeDelta
;
29 // TODO(ricea): For now, we implement a global timeout for compatability with
30 // TransportConnectJob. Since WebSocketTransportConnectJob controls the address
31 // selection process more tightly, it could do something smarter here.
32 const int kTransportConnectJobTimeoutInSeconds
= 240; // 4 minutes.
36 WebSocketTransportConnectJob::WebSocketTransportConnectJob(
37 const std::string
& group_name
,
38 RequestPriority priority
,
39 const scoped_refptr
<TransportSocketParams
>& params
,
40 TimeDelta timeout_duration
,
41 const CompletionCallback
& callback
,
42 ClientSocketFactory
* client_socket_factory
,
43 HostResolver
* host_resolver
,
44 ClientSocketHandle
* handle
,
47 const BoundNetLog
& request_net_log
)
48 : ConnectJob(group_name
,
52 BoundNetLog::Make(pool_net_log
, NetLog::SOURCE_CONNECT_JOB
)),
53 helper_(params
, client_socket_factory
, host_resolver
, &connect_timing_
),
54 race_result_(TransportConnectJobHelper::CONNECTION_LATENCY_UNKNOWN
),
57 request_net_log_(request_net_log
),
60 helper_
.SetOnIOComplete(this);
63 WebSocketTransportConnectJob::~WebSocketTransportConnectJob() {}
65 LoadState
WebSocketTransportConnectJob::GetLoadState() const {
66 LoadState load_state
= LOAD_STATE_RESOLVING_HOST
;
68 load_state
= ipv6_job_
->GetLoadState();
69 // This method should return LOAD_STATE_CONNECTING in preference to
70 // LOAD_STATE_WAITING_FOR_AVAILABLE_SOCKET when possible because "waiting for
71 // available socket" implies that nothing is happening.
72 if (ipv4_job_
&& load_state
!= LOAD_STATE_CONNECTING
)
73 load_state
= ipv4_job_
->GetLoadState();
77 int WebSocketTransportConnectJob::DoResolveHost() {
78 return helper_
.DoResolveHost(priority(), net_log());
81 int WebSocketTransportConnectJob::DoResolveHostComplete(int result
) {
82 return helper_
.DoResolveHostComplete(result
, net_log());
85 int WebSocketTransportConnectJob::DoTransportConnect() {
86 AddressList ipv4_addresses
;
87 AddressList ipv6_addresses
;
88 int result
= ERR_UNEXPECTED
;
89 helper_
.set_next_state(
90 TransportConnectJobHelper::STATE_TRANSPORT_CONNECT_COMPLETE
);
92 for (AddressList::const_iterator it
= helper_
.addresses().begin();
93 it
!= helper_
.addresses().end();
95 switch (it
->GetFamily()) {
96 case ADDRESS_FAMILY_IPV4
:
97 ipv4_addresses
.push_back(*it
);
100 case ADDRESS_FAMILY_IPV6
:
101 ipv6_addresses
.push_back(*it
);
105 DVLOG(1) << "Unexpected ADDRESS_FAMILY: " << it
->GetFamily();
110 if (!ipv4_addresses
.empty()) {
112 ipv4_job_
.reset(new WebSocketTransportConnectSubJob(
113 ipv4_addresses
, this, SUB_JOB_IPV4
));
116 if (!ipv6_addresses
.empty()) {
118 ipv6_job_
.reset(new WebSocketTransportConnectSubJob(
119 ipv6_addresses
, this, SUB_JOB_IPV6
));
120 result
= ipv6_job_
->Start();
123 SetSocket(ipv6_job_
->PassSocket());
126 ? TransportConnectJobHelper::CONNECTION_LATENCY_IPV6_RACEABLE
127 : TransportConnectJobHelper::CONNECTION_LATENCY_IPV6_SOLO
;
132 // This use of base::Unretained is safe because |fallback_timer_| is
133 // owned by this object.
134 fallback_timer_
.Start(
136 TimeDelta::FromMilliseconds(
137 TransportConnectJobHelper::kIPv6FallbackTimerInMs
),
138 base::Bind(&WebSocketTransportConnectJob::StartIPv4JobAsync
,
139 base::Unretained(this)));
150 result
= ipv4_job_
->Start();
152 SetSocket(ipv4_job_
->PassSocket());
155 ? TransportConnectJobHelper::CONNECTION_LATENCY_IPV4_WINS_RACE
156 : TransportConnectJobHelper::CONNECTION_LATENCY_IPV4_NO_RACE
;
163 int WebSocketTransportConnectJob::DoTransportConnectComplete(int result
) {
165 helper_
.HistogramDuration(race_result_
);
169 void WebSocketTransportConnectJob::OnSubJobComplete(
171 WebSocketTransportConnectSubJob
* job
) {
173 switch (job
->type()) {
177 ? TransportConnectJobHelper::CONNECTION_LATENCY_IPV4_WINS_RACE
178 : TransportConnectJobHelper::CONNECTION_LATENCY_IPV4_NO_RACE
;
184 ? TransportConnectJobHelper::CONNECTION_LATENCY_IPV6_RACEABLE
185 : TransportConnectJobHelper::CONNECTION_LATENCY_IPV6_SOLO
;
188 SetSocket(job
->PassSocket());
190 // Make sure all connections are cancelled even if this object fails to be
195 switch (job
->type()) {
202 if (ipv4_job_
&& !ipv4_job_
->started()) {
203 fallback_timer_
.Stop();
204 result
= ipv4_job_
->Start();
205 if (result
!= ERR_IO_PENDING
) {
206 OnSubJobComplete(result
, ipv4_job_
.get());
212 if (ipv4_job_
|| ipv6_job_
)
215 helper_
.OnIOComplete(this, result
);
218 void WebSocketTransportConnectJob::StartIPv4JobAsync() {
220 int result
= ipv4_job_
->Start();
221 if (result
!= ERR_IO_PENDING
)
222 OnSubJobComplete(result
, ipv4_job_
.get());
225 int WebSocketTransportConnectJob::ConnectInternal() {
226 return helper_
.DoConnectInternal(this);
229 WebSocketTransportClientSocketPool::WebSocketTransportClientSocketPool(
231 int max_sockets_per_group
,
232 ClientSocketPoolHistograms
* histograms
,
233 HostResolver
* host_resolver
,
234 ClientSocketFactory
* client_socket_factory
,
236 : TransportClientSocketPool(max_sockets
,
237 max_sockets_per_group
,
240 client_socket_factory
,
242 connect_job_delegate_(this),
243 histograms_(histograms
),
244 pool_net_log_(net_log
),
245 client_socket_factory_(client_socket_factory
),
246 host_resolver_(host_resolver
),
247 max_sockets_(max_sockets
),
248 handed_out_socket_count_(0),
250 weak_factory_(this) {}
252 WebSocketTransportClientSocketPool::~WebSocketTransportClientSocketPool() {
253 // Clean up any pending connect jobs.
254 FlushWithError(ERR_ABORTED
);
255 DCHECK(pending_connects_
.empty());
256 DCHECK_EQ(0, handed_out_socket_count_
);
257 DCHECK(stalled_request_queue_
.empty());
258 DCHECK(stalled_request_map_
.empty());
262 void WebSocketTransportClientSocketPool::UnlockEndpoint(
263 ClientSocketHandle
* handle
) {
264 DCHECK(handle
->is_initialized());
265 DCHECK(handle
->socket());
267 if (handle
->socket()->GetPeerAddress(&address
) == OK
)
268 WebSocketEndpointLockManager::GetInstance()->UnlockEndpoint(address
);
271 int WebSocketTransportClientSocketPool::RequestSocket(
272 const std::string
& group_name
,
274 RequestPriority priority
,
275 ClientSocketHandle
* handle
,
276 const CompletionCallback
& callback
,
277 const BoundNetLog
& request_net_log
) {
279 const scoped_refptr
<TransportSocketParams
>& casted_params
=
280 *static_cast<const scoped_refptr
<TransportSocketParams
>*>(params
);
282 NetLogTcpClientSocketPoolRequestedSocket(request_net_log
, &casted_params
);
284 CHECK(!callback
.is_null());
287 request_net_log
.BeginEvent(NetLog::TYPE_SOCKET_POOL
);
289 if (ReachedMaxSocketsLimit() && !casted_params
->ignore_limits()) {
290 request_net_log
.AddEvent(NetLog::TYPE_SOCKET_POOL_STALLED_MAX_SOCKETS
);
291 // TODO(ricea): Use emplace_back when C++11 becomes allowed.
292 StalledRequest
request(
293 casted_params
, priority
, handle
, callback
, request_net_log
);
294 stalled_request_queue_
.push_back(request
);
295 StalledRequestQueue::iterator iterator
= stalled_request_queue_
.end();
297 DCHECK_EQ(handle
, iterator
->handle
);
298 // Because StalledRequestQueue is a std::list, its iterators are guaranteed
299 // to remain valid as long as the elements are not removed. As long as
300 // stalled_request_queue_ and stalled_request_map_ are updated in sync, it
301 // is safe to dereference an iterator in stalled_request_map_ to find the
302 // corresponding list element.
303 stalled_request_map_
.insert(
304 StalledRequestMap::value_type(handle
, iterator
));
305 return ERR_IO_PENDING
;
308 scoped_ptr
<WebSocketTransportConnectJob
> connect_job(
309 new WebSocketTransportConnectJob(group_name
,
314 client_socket_factory_
,
317 &connect_job_delegate_
,
321 int rv
= connect_job
->Connect();
322 // Regardless of the outcome of |connect_job|, it will always be bound to
323 // |handle|, since this pool uses early-binding. So the binding is logged
324 // here, without waiting for the result.
325 request_net_log
.AddEvent(
326 NetLog::TYPE_SOCKET_POOL_BOUND_TO_CONNECT_JOB
,
327 connect_job
->net_log().source().ToEventParametersCallback());
329 HandOutSocket(connect_job
->PassSocket(),
330 connect_job
->connect_timing(),
333 request_net_log
.EndEvent(NetLog::TYPE_SOCKET_POOL
);
334 } else if (rv
== ERR_IO_PENDING
) {
335 // TODO(ricea): Implement backup job timer?
336 AddJob(handle
, connect_job
.Pass());
338 scoped_ptr
<StreamSocket
> error_socket
;
339 connect_job
->GetAdditionalErrorState(handle
);
340 error_socket
= connect_job
->PassSocket();
342 HandOutSocket(error_socket
.Pass(),
343 connect_job
->connect_timing(),
349 if (rv
!= ERR_IO_PENDING
) {
350 request_net_log
.EndEventWithNetErrorCode(NetLog::TYPE_SOCKET_POOL
, rv
);
356 void WebSocketTransportClientSocketPool::RequestSockets(
357 const std::string
& group_name
,
360 const BoundNetLog
& net_log
) {
364 void WebSocketTransportClientSocketPool::CancelRequest(
365 const std::string
& group_name
,
366 ClientSocketHandle
* handle
) {
367 DCHECK(!handle
->is_initialized());
368 if (DeleteStalledRequest(handle
))
370 scoped_ptr
<StreamSocket
> socket
= handle
->PassSocket();
372 ReleaseSocket(handle
->group_name(), socket
.Pass(), handle
->id());
373 if (!DeleteJob(handle
))
374 pending_callbacks_
.erase(handle
);
375 if (!ReachedMaxSocketsLimit() && !stalled_request_queue_
.empty())
376 ActivateStalledRequest();
379 void WebSocketTransportClientSocketPool::ReleaseSocket(
380 const std::string
& group_name
,
381 scoped_ptr
<StreamSocket
> socket
,
383 WebSocketEndpointLockManager::GetInstance()->UnlockSocket(socket
.get());
384 CHECK_GT(handed_out_socket_count_
, 0);
385 --handed_out_socket_count_
;
386 if (!ReachedMaxSocketsLimit() && !stalled_request_queue_
.empty())
387 ActivateStalledRequest();
390 void WebSocketTransportClientSocketPool::FlushWithError(int error
) {
391 // Sockets which are in LOAD_STATE_CONNECTING are in danger of unlocking
392 // sockets waiting for the endpoint lock. If they connected synchronously,
393 // then OnConnectJobComplete(). The |flushing_| flag tells this object to
394 // ignore spurious calls to OnConnectJobComplete(). It is safe to ignore those
395 // calls because this method will delete the jobs and call their callbacks
398 for (PendingConnectsMap::iterator it
= pending_connects_
.begin();
399 it
!= pending_connects_
.end();
401 InvokeUserCallbackLater(
402 it
->second
->handle(), it
->second
->callback(), error
);
403 delete it
->second
, it
->second
= NULL
;
405 pending_connects_
.clear();
406 for (StalledRequestQueue::iterator it
= stalled_request_queue_
.begin();
407 it
!= stalled_request_queue_
.end();
409 InvokeUserCallbackLater(it
->handle
, it
->callback
, error
);
411 stalled_request_map_
.clear();
412 stalled_request_queue_
.clear();
416 void WebSocketTransportClientSocketPool::CloseIdleSockets() {
417 // We have no idle sockets.
420 int WebSocketTransportClientSocketPool::IdleSocketCount() const {
424 int WebSocketTransportClientSocketPool::IdleSocketCountInGroup(
425 const std::string
& group_name
) const {
429 LoadState
WebSocketTransportClientSocketPool::GetLoadState(
430 const std::string
& group_name
,
431 const ClientSocketHandle
* handle
) const {
432 if (stalled_request_map_
.find(handle
) != stalled_request_map_
.end())
433 return LOAD_STATE_WAITING_FOR_AVAILABLE_SOCKET
;
434 if (pending_callbacks_
.count(handle
))
435 return LOAD_STATE_CONNECTING
;
436 return LookupConnectJob(handle
)->GetLoadState();
439 base::DictionaryValue
* WebSocketTransportClientSocketPool::GetInfoAsValue(
440 const std::string
& name
,
441 const std::string
& type
,
442 bool include_nested_pools
) const {
443 base::DictionaryValue
* dict
= new base::DictionaryValue();
444 dict
->SetString("name", name
);
445 dict
->SetString("type", type
);
446 dict
->SetInteger("handed_out_socket_count", handed_out_socket_count_
);
447 dict
->SetInteger("connecting_socket_count", pending_connects_
.size());
448 dict
->SetInteger("idle_socket_count", 0);
449 dict
->SetInteger("max_socket_count", max_sockets_
);
450 dict
->SetInteger("max_sockets_per_group", max_sockets_
);
451 dict
->SetInteger("pool_generation_number", 0);
455 TimeDelta
WebSocketTransportClientSocketPool::ConnectionTimeout() const {
456 return TimeDelta::FromSeconds(kTransportConnectJobTimeoutInSeconds
);
459 ClientSocketPoolHistograms
* WebSocketTransportClientSocketPool::histograms()
464 bool WebSocketTransportClientSocketPool::IsStalled() const {
465 return !stalled_request_queue_
.empty();
468 void WebSocketTransportClientSocketPool::OnConnectJobComplete(
470 WebSocketTransportConnectJob
* job
) {
471 DCHECK_NE(ERR_IO_PENDING
, result
);
473 scoped_ptr
<StreamSocket
> socket
= job
->PassSocket();
475 // See comment in FlushWithError.
477 WebSocketEndpointLockManager::GetInstance()->UnlockSocket(socket
.get());
481 BoundNetLog request_net_log
= job
->request_net_log();
482 CompletionCallback callback
= job
->callback();
483 LoadTimingInfo::ConnectTiming connect_timing
= job
->connect_timing();
485 ClientSocketHandle
* const handle
= job
->handle();
486 bool handed_out_socket
= false;
489 DCHECK(socket
.get());
490 handed_out_socket
= true;
491 HandOutSocket(socket
.Pass(), connect_timing
, handle
, request_net_log
);
492 request_net_log
.EndEvent(NetLog::TYPE_SOCKET_POOL
);
494 // If we got a socket, it must contain error information so pass that
495 // up so that the caller can retrieve it.
496 job
->GetAdditionalErrorState(handle
);
498 handed_out_socket
= true;
499 HandOutSocket(socket
.Pass(), connect_timing
, handle
, request_net_log
);
501 request_net_log
.EndEventWithNetErrorCode(NetLog::TYPE_SOCKET_POOL
, result
);
503 bool delete_succeeded
= DeleteJob(handle
);
504 DCHECK(delete_succeeded
);
505 if (!handed_out_socket
&& !stalled_request_queue_
.empty() &&
506 !ReachedMaxSocketsLimit())
507 ActivateStalledRequest();
508 InvokeUserCallbackLater(handle
, callback
, result
);
511 void WebSocketTransportClientSocketPool::InvokeUserCallbackLater(
512 ClientSocketHandle
* handle
,
513 const CompletionCallback
& callback
,
515 DCHECK(!pending_callbacks_
.count(handle
));
516 pending_callbacks_
.insert(handle
);
517 base::MessageLoop::current()->PostTask(
519 base::Bind(&WebSocketTransportClientSocketPool::InvokeUserCallback
,
520 weak_factory_
.GetWeakPtr(),
526 void WebSocketTransportClientSocketPool::InvokeUserCallback(
527 ClientSocketHandle
* handle
,
528 const CompletionCallback
& callback
,
530 if (pending_callbacks_
.erase(handle
))
534 bool WebSocketTransportClientSocketPool::ReachedMaxSocketsLimit() const {
535 return handed_out_socket_count_
>= max_sockets_
||
536 base::checked_cast
<int>(pending_connects_
.size()) >=
537 max_sockets_
- handed_out_socket_count_
;
540 void WebSocketTransportClientSocketPool::HandOutSocket(
541 scoped_ptr
<StreamSocket
> socket
,
542 const LoadTimingInfo::ConnectTiming
& connect_timing
,
543 ClientSocketHandle
* handle
,
544 const BoundNetLog
& net_log
) {
546 handle
->SetSocket(socket
.Pass());
547 DCHECK_EQ(ClientSocketHandle::UNUSED
, handle
->reuse_type());
548 DCHECK_EQ(0, handle
->idle_time().InMicroseconds());
549 handle
->set_pool_id(0);
550 handle
->set_connect_timing(connect_timing
);
553 NetLog::TYPE_SOCKET_POOL_BOUND_TO_SOCKET
,
554 handle
->socket()->NetLog().source().ToEventParametersCallback());
556 ++handed_out_socket_count_
;
559 void WebSocketTransportClientSocketPool::AddJob(
560 ClientSocketHandle
* handle
,
561 scoped_ptr
<WebSocketTransportConnectJob
> connect_job
) {
563 pending_connects_
.insert(PendingConnectsMap::value_type(
564 handle
, connect_job
.release())).second
;
568 bool WebSocketTransportClientSocketPool::DeleteJob(ClientSocketHandle
* handle
) {
569 PendingConnectsMap::iterator it
= pending_connects_
.find(handle
);
570 if (it
== pending_connects_
.end())
572 // Deleting a ConnectJob which holds an endpoint lock can lead to a different
573 // ConnectJob proceeding to connect. If the connect proceeds synchronously
574 // (usually because of a failure) then it can trigger that job to be
575 // deleted. |it| remains valid because std::map guarantees that erase() does
576 // not invalid iterators to other entries.
577 delete it
->second
, it
->second
= NULL
;
578 DCHECK(pending_connects_
.find(handle
) == it
);
579 pending_connects_
.erase(it
);
583 const WebSocketTransportConnectJob
*
584 WebSocketTransportClientSocketPool::LookupConnectJob(
585 const ClientSocketHandle
* handle
) const {
586 PendingConnectsMap::const_iterator it
= pending_connects_
.find(handle
);
587 CHECK(it
!= pending_connects_
.end());
591 void WebSocketTransportClientSocketPool::ActivateStalledRequest() {
592 DCHECK(!stalled_request_queue_
.empty());
593 DCHECK(!ReachedMaxSocketsLimit());
594 // Usually we will only be able to activate one stalled request at a time,
595 // however if all the connects fail synchronously for some reason, we may be
596 // able to clear the whole queue at once.
597 while (!stalled_request_queue_
.empty() && !ReachedMaxSocketsLimit()) {
598 StalledRequest
request(stalled_request_queue_
.front());
599 stalled_request_queue_
.pop_front();
600 stalled_request_map_
.erase(request
.handle
);
601 int rv
= RequestSocket("ignored",
607 // ActivateStalledRequest() never returns synchronously, so it is never
608 // called re-entrantly.
609 if (rv
!= ERR_IO_PENDING
)
610 InvokeUserCallbackLater(request
.handle
, request
.callback
, rv
);
614 bool WebSocketTransportClientSocketPool::DeleteStalledRequest(
615 ClientSocketHandle
* handle
) {
616 StalledRequestMap::iterator it
= stalled_request_map_
.find(handle
);
617 if (it
== stalled_request_map_
.end())
619 stalled_request_queue_
.erase(it
->second
);
620 stalled_request_map_
.erase(it
);
624 WebSocketTransportClientSocketPool::ConnectJobDelegate::ConnectJobDelegate(
625 WebSocketTransportClientSocketPool
* owner
)
628 WebSocketTransportClientSocketPool::ConnectJobDelegate::~ConnectJobDelegate() {}
631 WebSocketTransportClientSocketPool::ConnectJobDelegate::OnConnectJobComplete(
634 // TODO(vadimt): Remove ScopedTracker below once crbug.com/436634 is fixed.
635 tracked_objects::ScopedTracker
tracking_profile(
636 FROM_HERE_WITH_EXPLICIT_FUNCTION(
637 "436634 WebSocket...::ConnectJobDelegate::OnConnectJobComplete"));
639 owner_
->OnConnectJobComplete(result
,
640 static_cast<WebSocketTransportConnectJob
*>(job
));
643 WebSocketTransportClientSocketPool::StalledRequest::StalledRequest(
644 const scoped_refptr
<TransportSocketParams
>& params
,
645 RequestPriority priority
,
646 ClientSocketHandle
* handle
,
647 const CompletionCallback
& callback
,
648 const BoundNetLog
& net_log
)
655 WebSocketTransportClientSocketPool::StalledRequest::~StalledRequest() {}