1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "net/proxy/multi_threaded_proxy_resolver.h"
10 #include "base/bind.h"
11 #include "base/bind_helpers.h"
12 #include "base/location.h"
13 #include "base/single_thread_task_runner.h"
14 #include "base/stl_util.h"
15 #include "base/strings/string_util.h"
16 #include "base/strings/stringprintf.h"
17 #include "base/thread_task_runner_handle.h"
18 #include "base/threading/non_thread_safe.h"
19 #include "base/threading/thread.h"
20 #include "base/threading/thread_restrictions.h"
21 #include "net/base/net_errors.h"
22 #include "net/log/net_log.h"
23 #include "net/proxy/proxy_info.h"
24 #include "net/proxy/proxy_resolver.h"
30 // An "executor" is a job-runner for PAC requests. It encapsulates a worker
31 // thread and a synchronous ProxyResolver (which will be operated on said
33 class Executor
: public base::RefCountedThreadSafe
<Executor
> {
37 virtual void OnExecutorReady(Executor
* executor
) = 0;
40 virtual ~Coordinator() = default;
43 // |coordinator| must remain valid throughout our lifetime. It is used to
44 // signal when the executor is ready to receive work by calling
45 // |coordinator->OnExecutorReady()|.
46 // |thread_number| is an identifier used when naming the worker thread.
47 Executor(Coordinator
* coordinator
, int thread_number
);
49 // Submit a job to this executor.
50 void StartJob(Job
* job
);
52 // Callback for when a job has completed running on the executor's thread.
53 void OnJobCompleted(Job
* job
);
55 // Cleanup the executor. Cancels all outstanding work, and frees the thread
59 // Returns the outstanding job, or NULL.
60 Job
* outstanding_job() const { return outstanding_job_
.get(); }
62 ProxyResolver
* resolver() { return resolver_
.get(); }
64 int thread_number() const { return thread_number_
; }
66 void set_resolver(scoped_ptr
<ProxyResolver
> resolver
) {
67 resolver_
= resolver
.Pass();
70 void set_coordinator(Coordinator
* coordinator
) {
73 coordinator_
= coordinator
;
77 friend class base::RefCountedThreadSafe
<Executor
>;
80 Coordinator
* coordinator_
;
81 const int thread_number_
;
83 // The currently active job for this executor (either a CreateProxyResolver or
84 // GetProxyForURL task).
85 scoped_refptr
<Job
> outstanding_job_
;
87 // The synchronous resolver implementation.
88 scoped_ptr
<ProxyResolver
> resolver_
;
90 // The thread where |resolver_| is run on.
91 // Note that declaration ordering is important here. |thread_| needs to be
92 // destroyed *before* |resolver_|, in case |resolver_| is currently
93 // executing on |thread_|.
94 scoped_ptr
<base::Thread
> thread_
;
97 class MultiThreadedProxyResolver
: public ProxyResolver
,
98 public Executor::Coordinator
,
99 public base::NonThreadSafe
{
101 // Creates an asynchronous ProxyResolver that runs requests on up to
102 // |max_num_threads|.
104 // For each thread that is created, an accompanying synchronous ProxyResolver
105 // will be provisioned using |resolver_factory|. All methods on these
106 // ProxyResolvers will be called on the one thread.
107 MultiThreadedProxyResolver(
108 scoped_ptr
<ProxyResolverFactory
> resolver_factory
,
109 size_t max_num_threads
,
110 const scoped_refptr
<ProxyResolverScriptData
>& script_data
,
111 scoped_refptr
<Executor
> executor
);
113 ~MultiThreadedProxyResolver() override
;
115 // ProxyResolver implementation:
116 int GetProxyForURL(const GURL
& url
,
118 const CompletionCallback
& callback
,
119 RequestHandle
* request
,
120 const BoundNetLog
& net_log
) override
;
121 void CancelRequest(RequestHandle request
) override
;
122 LoadState
GetLoadState(RequestHandle request
) const override
;
125 class GetProxyForURLJob
;
126 // FIFO queue of pending jobs waiting to be started.
127 // TODO(eroman): Make this priority queue.
128 typedef std::deque
<scoped_refptr
<Job
>> PendingJobsQueue
;
129 typedef std::vector
<scoped_refptr
<Executor
>> ExecutorList
;
131 // Returns an idle worker thread which is ready to receive GetProxyForURL()
132 // requests. If all threads are occupied, returns NULL.
133 Executor
* FindIdleExecutor();
135 // Creates a new worker thread, and appends it to |executors_|.
136 void AddNewExecutor();
138 // Starts the next job from |pending_jobs_| if possible.
139 void OnExecutorReady(Executor
* executor
) override
;
141 const scoped_ptr
<ProxyResolverFactory
> resolver_factory_
;
142 const size_t max_num_threads_
;
143 PendingJobsQueue pending_jobs_
;
144 ExecutorList executors_
;
145 scoped_refptr
<ProxyResolverScriptData
> script_data_
;
148 // Job ---------------------------------------------
150 class Job
: public base::RefCountedThreadSafe
<Job
> {
152 // Identifies the subclass of Job (only being used for debugging purposes).
154 TYPE_GET_PROXY_FOR_URL
,
155 TYPE_CREATE_RESOLVER
,
158 Job(Type type
, const CompletionCallback
& callback
)
162 was_cancelled_(false) {
165 void set_executor(Executor
* executor
) {
166 executor_
= executor
;
169 // The "executor" is the job runner that is scheduling this job. If
170 // this job has not been submitted to an executor yet, this will be
171 // NULL (and we know it hasn't started yet).
172 Executor
* executor() {
176 // Mark the job as having been cancelled.
178 was_cancelled_
= true;
181 // Returns true if Cancel() has been called.
182 bool was_cancelled() const { return was_cancelled_
; }
184 Type
type() const { return type_
; }
186 // Returns true if this job still has a user callback. Some jobs
187 // do not have a user callback, because they were helper jobs
188 // scheduled internally (for example TYPE_CREATE_RESOLVER).
190 // Otherwise jobs that correspond with user-initiated work will
191 // have a non-null callback up until the callback is run.
192 bool has_user_callback() const { return !callback_
.is_null(); }
194 // This method is called when the job is inserted into a wait queue
195 // because no executors were ready to accept it.
196 virtual void WaitingForThread() {}
198 // This method is called just before the job is posted to the work thread.
199 virtual void FinishedWaitingForThread() {}
201 // This method is called on the worker thread to do the job's work. On
202 // completion, implementors are expected to call OnJobCompleted() on
205 scoped_refptr
<base::SingleThreadTaskRunner
> origin_runner
) = 0;
208 void OnJobCompleted() {
209 // |executor_| will be NULL if the executor has already been deleted.
211 executor_
->OnJobCompleted(this);
214 void RunUserCallback(int result
) {
215 DCHECK(has_user_callback());
216 CompletionCallback callback
= callback_
;
217 // Reset the callback so has_user_callback() will now return false.
219 callback
.Run(result
);
222 friend class base::RefCountedThreadSafe
<Job
>;
228 CompletionCallback callback_
;
233 // CreateResolverJob -----------------------------------------------------------
235 // Runs on the worker thread to call ProxyResolverFactory::CreateProxyResolver.
236 class CreateResolverJob
: public Job
{
238 CreateResolverJob(const scoped_refptr
<ProxyResolverScriptData
>& script_data
,
239 ProxyResolverFactory
* factory
)
240 : Job(TYPE_CREATE_RESOLVER
, CompletionCallback()),
241 script_data_(script_data
),
244 // Runs on the worker thread.
245 void Run(scoped_refptr
<base::SingleThreadTaskRunner
> origin_runner
) override
{
246 scoped_ptr
<ProxyResolverFactory::Request
> request
;
247 int rv
= factory_
->CreateProxyResolver(script_data_
, &resolver_
,
248 CompletionCallback(), &request
);
250 DCHECK_NE(rv
, ERR_IO_PENDING
);
251 origin_runner
->PostTask(
252 FROM_HERE
, base::Bind(&CreateResolverJob::RequestComplete
, this, rv
));
256 ~CreateResolverJob() override
{}
259 // Runs the completion callback on the origin thread.
260 void RequestComplete(int result_code
) {
261 // The task may have been cancelled after it was started.
262 if (!was_cancelled()) {
264 executor()->set_resolver(resolver_
.Pass());
269 const scoped_refptr
<ProxyResolverScriptData
> script_data_
;
270 ProxyResolverFactory
* factory_
;
271 scoped_ptr
<ProxyResolver
> resolver_
;
274 // MultiThreadedProxyResolver::GetProxyForURLJob ------------------------------
276 class MultiThreadedProxyResolver::GetProxyForURLJob
: public Job
{
278 // |url| -- the URL of the query.
279 // |results| -- the structure to fill with proxy resolve results.
280 GetProxyForURLJob(const GURL
& url
,
282 const CompletionCallback
& callback
,
283 const BoundNetLog
& net_log
)
284 : Job(TYPE_GET_PROXY_FOR_URL
, callback
),
288 was_waiting_for_thread_(false) {
289 DCHECK(!callback
.is_null());
292 BoundNetLog
* net_log() { return &net_log_
; }
294 void WaitingForThread() override
{
295 was_waiting_for_thread_
= true;
296 net_log_
.BeginEvent(NetLog::TYPE_WAITING_FOR_PROXY_RESOLVER_THREAD
);
299 void FinishedWaitingForThread() override
{
302 if (was_waiting_for_thread_
) {
303 net_log_
.EndEvent(NetLog::TYPE_WAITING_FOR_PROXY_RESOLVER_THREAD
);
307 NetLog::TYPE_SUBMITTED_TO_RESOLVER_THREAD
,
308 NetLog::IntegerCallback("thread_number", executor()->thread_number()));
311 // Runs on the worker thread.
312 void Run(scoped_refptr
<base::SingleThreadTaskRunner
> origin_runner
) override
{
313 ProxyResolver
* resolver
= executor()->resolver();
315 int rv
= resolver
->GetProxyForURL(
316 url_
, &results_buf_
, CompletionCallback(), NULL
, net_log_
);
317 DCHECK_NE(rv
, ERR_IO_PENDING
);
319 origin_runner
->PostTask(
320 FROM_HERE
, base::Bind(&GetProxyForURLJob::QueryComplete
, this, rv
));
324 ~GetProxyForURLJob() override
{}
327 // Runs the completion callback on the origin thread.
328 void QueryComplete(int result_code
) {
329 // The Job may have been cancelled after it was started.
330 if (!was_cancelled()) {
331 if (result_code
>= OK
) { // Note: unit-tests use values > 0.
332 results_
->Use(results_buf_
);
334 RunUserCallback(result_code
);
339 // Must only be used on the "origin" thread.
342 // Can be used on either "origin" or worker thread.
343 BoundNetLog net_log_
;
346 // Usable from within DoQuery on the worker thread.
347 ProxyInfo results_buf_
;
349 bool was_waiting_for_thread_
;
352 // Executor ----------------------------------------
354 Executor::Executor(Executor::Coordinator
* coordinator
, int thread_number
)
355 : coordinator_(coordinator
), thread_number_(thread_number
) {
357 // Start up the thread.
358 thread_
.reset(new base::Thread(base::StringPrintf("PAC thread #%d",
360 CHECK(thread_
->Start());
363 void Executor::StartJob(Job
* job
) {
364 DCHECK(!outstanding_job_
.get());
365 outstanding_job_
= job
;
367 // Run the job. Once it has completed (regardless of whether it was
368 // cancelled), it will invoke OnJobCompleted() on this thread.
369 job
->set_executor(this);
370 job
->FinishedWaitingForThread();
371 thread_
->task_runner()->PostTask(
373 base::Bind(&Job::Run
, job
, base::ThreadTaskRunnerHandle::Get()));
376 void Executor::OnJobCompleted(Job
* job
) {
377 DCHECK_EQ(job
, outstanding_job_
.get());
378 outstanding_job_
= NULL
;
379 coordinator_
->OnExecutorReady(this);
382 void Executor::Destroy() {
383 DCHECK(coordinator_
);
386 // See http://crbug.com/69710.
387 base::ThreadRestrictions::ScopedAllowIO allow_io
;
389 // Join the worker thread.
393 // Cancel any outstanding job.
394 if (outstanding_job_
.get()) {
395 outstanding_job_
->Cancel();
396 // Orphan the job (since this executor may be deleted soon).
397 outstanding_job_
->set_executor(NULL
);
400 // It is now safe to free the ProxyResolver, since all the tasks that
401 // were using it on the resolver thread have completed.
404 // Null some stuff as a precaution.
406 outstanding_job_
= NULL
;
409 Executor::~Executor() {
410 // The important cleanup happens as part of Destroy(), which should always be
412 DCHECK(!coordinator_
) << "Destroy() was not called";
413 DCHECK(!thread_
.get());
414 DCHECK(!resolver_
.get());
415 DCHECK(!outstanding_job_
.get());
418 // MultiThreadedProxyResolver --------------------------------------------------
420 MultiThreadedProxyResolver::MultiThreadedProxyResolver(
421 scoped_ptr
<ProxyResolverFactory
> resolver_factory
,
422 size_t max_num_threads
,
423 const scoped_refptr
<ProxyResolverScriptData
>& script_data
,
424 scoped_refptr
<Executor
> executor
)
425 : resolver_factory_(resolver_factory
.Pass()),
426 max_num_threads_(max_num_threads
),
427 script_data_(script_data
) {
428 DCHECK(script_data_
);
429 executor
->set_coordinator(this);
430 executors_
.push_back(executor
);
433 MultiThreadedProxyResolver::~MultiThreadedProxyResolver() {
434 DCHECK(CalledOnValidThread());
435 // We will cancel all outstanding requests.
436 pending_jobs_
.clear();
438 for (auto& executor
: executors_
) {
443 int MultiThreadedProxyResolver::GetProxyForURL(
444 const GURL
& url
, ProxyInfo
* results
, const CompletionCallback
& callback
,
445 RequestHandle
* request
, const BoundNetLog
& net_log
) {
446 DCHECK(CalledOnValidThread());
447 DCHECK(!callback
.is_null());
449 scoped_refptr
<GetProxyForURLJob
> job(
450 new GetProxyForURLJob(url
, results
, callback
, net_log
));
452 // Completion will be notified through |callback|, unless the caller cancels
453 // the request using |request|.
455 *request
= reinterpret_cast<RequestHandle
>(job
.get());
457 // If there is an executor that is ready to run this request, submit it!
458 Executor
* executor
= FindIdleExecutor();
460 DCHECK_EQ(0u, pending_jobs_
.size());
461 executor
->StartJob(job
.get());
462 return ERR_IO_PENDING
;
465 // Otherwise queue this request. (We will schedule it to a thread once one
466 // becomes available).
467 job
->WaitingForThread();
468 pending_jobs_
.push_back(job
);
470 // If we haven't already reached the thread limit, provision a new thread to
471 // drain the requests more quickly.
472 if (executors_
.size() < max_num_threads_
)
475 return ERR_IO_PENDING
;
478 void MultiThreadedProxyResolver::CancelRequest(RequestHandle req
) {
479 DCHECK(CalledOnValidThread());
482 Job
* job
= reinterpret_cast<Job
*>(req
);
483 DCHECK_EQ(Job::TYPE_GET_PROXY_FOR_URL
, job
->type());
485 if (job
->executor()) {
486 // If the job was already submitted to the executor, just mark it
487 // as cancelled so the user callback isn't run on completion.
490 // Otherwise the job is just sitting in a queue.
491 PendingJobsQueue::iterator it
=
492 std::find(pending_jobs_
.begin(), pending_jobs_
.end(), job
);
493 DCHECK(it
!= pending_jobs_
.end());
494 pending_jobs_
.erase(it
);
498 LoadState
MultiThreadedProxyResolver::GetLoadState(RequestHandle req
) const {
499 DCHECK(CalledOnValidThread());
501 return LOAD_STATE_RESOLVING_PROXY_FOR_URL
;
504 Executor
* MultiThreadedProxyResolver::FindIdleExecutor() {
505 DCHECK(CalledOnValidThread());
506 for (ExecutorList::iterator it
= executors_
.begin();
507 it
!= executors_
.end(); ++it
) {
508 Executor
* executor
= it
->get();
509 if (!executor
->outstanding_job())
515 void MultiThreadedProxyResolver::AddNewExecutor() {
516 DCHECK(CalledOnValidThread());
517 DCHECK_LT(executors_
.size(), max_num_threads_
);
518 // The "thread number" is used to give the thread a unique name.
519 int thread_number
= executors_
.size();
520 Executor
* executor
= new Executor(this, thread_number
);
522 new CreateResolverJob(script_data_
, resolver_factory_
.get()));
523 executors_
.push_back(make_scoped_refptr(executor
));
526 void MultiThreadedProxyResolver::OnExecutorReady(Executor
* executor
) {
527 DCHECK(CalledOnValidThread());
528 if (pending_jobs_
.empty())
531 // Get the next job to process (FIFO). Transfer it from the pending queue
533 scoped_refptr
<Job
> job
= pending_jobs_
.front();
534 pending_jobs_
.pop_front();
535 executor
->StartJob(job
.get());
540 class MultiThreadedProxyResolverFactory::Job
541 : public ProxyResolverFactory::Request
,
542 public Executor::Coordinator
{
544 Job(MultiThreadedProxyResolverFactory
* factory
,
545 const scoped_refptr
<ProxyResolverScriptData
>& script_data
,
546 scoped_ptr
<ProxyResolver
>* resolver
,
547 scoped_ptr
<ProxyResolverFactory
> resolver_factory
,
548 size_t max_num_threads
,
549 const CompletionCallback
& callback
)
551 resolver_out_(resolver
),
552 resolver_factory_(resolver_factory
.Pass()),
553 max_num_threads_(max_num_threads
),
554 script_data_(script_data
),
555 executor_(new Executor(this, 0)),
556 callback_(callback
) {
558 new CreateResolverJob(script_data_
, resolver_factory_
.get()));
563 executor_
->Destroy();
564 factory_
->RemoveJob(this);
568 void FactoryDestroyed() {
569 executor_
->Destroy();
575 void OnExecutorReady(Executor
* executor
) override
{
577 if (executor
->resolver()) {
578 resolver_out_
->reset(new MultiThreadedProxyResolver(
579 resolver_factory_
.Pass(), max_num_threads_
, script_data_
.Pass(),
582 error
= ERR_PAC_SCRIPT_FAILED
;
583 executor_
->Destroy();
585 factory_
->RemoveJob(this);
587 callback_
.Run(error
);
590 MultiThreadedProxyResolverFactory
* factory_
;
591 scoped_ptr
<ProxyResolver
>* const resolver_out_
;
592 scoped_ptr
<ProxyResolverFactory
> resolver_factory_
;
593 const size_t max_num_threads_
;
594 scoped_refptr
<ProxyResolverScriptData
> script_data_
;
595 scoped_refptr
<Executor
> executor_
;
596 const CompletionCallback callback_
;
599 MultiThreadedProxyResolverFactory::MultiThreadedProxyResolverFactory(
600 size_t max_num_threads
,
601 bool factory_expects_bytes
)
602 : ProxyResolverFactory(factory_expects_bytes
),
603 max_num_threads_(max_num_threads
) {
604 DCHECK_GE(max_num_threads
, 1u);
607 MultiThreadedProxyResolverFactory::~MultiThreadedProxyResolverFactory() {
608 for (auto job
: jobs_
) {
609 job
->FactoryDestroyed();
613 int MultiThreadedProxyResolverFactory::CreateProxyResolver(
614 const scoped_refptr
<ProxyResolverScriptData
>& pac_script
,
615 scoped_ptr
<ProxyResolver
>* resolver
,
616 const CompletionCallback
& callback
,
617 scoped_ptr
<Request
>* request
) {
618 scoped_ptr
<Job
> job(new Job(this, pac_script
, resolver
,
619 CreateProxyResolverFactory(), max_num_threads_
,
621 jobs_
.insert(job
.get());
622 *request
= job
.Pass();
623 return ERR_IO_PENDING
;
626 void MultiThreadedProxyResolverFactory::RemoveJob(
627 MultiThreadedProxyResolverFactory::Job
* job
) {
628 size_t erased
= jobs_
.erase(job
);
629 DCHECK_EQ(1u, erased
);