1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "net/proxy/multi_threaded_proxy_resolver.h"
10 #include "base/bind.h"
11 #include "base/bind_helpers.h"
12 #include "base/single_thread_task_runner.h"
13 #include "base/stl_util.h"
14 #include "base/strings/string_util.h"
15 #include "base/strings/stringprintf.h"
16 #include "base/thread_task_runner_handle.h"
17 #include "base/threading/non_thread_safe.h"
18 #include "base/threading/thread.h"
19 #include "base/threading/thread_restrictions.h"
20 #include "net/base/net_errors.h"
21 #include "net/log/net_log.h"
22 #include "net/proxy/proxy_info.h"
23 #include "net/proxy/proxy_resolver.h"
29 // An "executor" is a job-runner for PAC requests. It encapsulates a worker
30 // thread and a synchronous ProxyResolver (which will be operated on said
32 class Executor
: public base::RefCountedThreadSafe
<Executor
> {
36 virtual void OnExecutorReady(Executor
* executor
) = 0;
39 virtual ~Coordinator() = default;
42 // |coordinator| must remain valid throughout our lifetime. It is used to
43 // signal when the executor is ready to receive work by calling
44 // |coordinator->OnExecutorReady()|.
45 // |thread_number| is an identifier used when naming the worker thread.
46 Executor(Coordinator
* coordinator
, int thread_number
);
48 // Submit a job to this executor.
49 void StartJob(Job
* job
);
51 // Callback for when a job has completed running on the executor's thread.
52 void OnJobCompleted(Job
* job
);
54 // Cleanup the executor. Cancels all outstanding work, and frees the thread
58 // Returns the outstanding job, or NULL.
59 Job
* outstanding_job() const { return outstanding_job_
.get(); }
61 ProxyResolver
* resolver() { return resolver_
.get(); }
63 int thread_number() const { return thread_number_
; }
65 void set_resolver(scoped_ptr
<ProxyResolver
> resolver
) {
66 resolver_
= resolver
.Pass();
69 void set_coordinator(Coordinator
* coordinator
) {
72 coordinator_
= coordinator
;
76 friend class base::RefCountedThreadSafe
<Executor
>;
79 Coordinator
* coordinator_
;
80 const int thread_number_
;
82 // The currently active job for this executor (either a CreateProxyResolver or
83 // GetProxyForURL task).
84 scoped_refptr
<Job
> outstanding_job_
;
86 // The synchronous resolver implementation.
87 scoped_ptr
<ProxyResolver
> resolver_
;
89 // The thread where |resolver_| is run on.
90 // Note that declaration ordering is important here. |thread_| needs to be
91 // destroyed *before* |resolver_|, in case |resolver_| is currently
92 // executing on |thread_|.
93 scoped_ptr
<base::Thread
> thread_
;
96 class MultiThreadedProxyResolver
: public ProxyResolver
,
97 public Executor::Coordinator
,
98 public base::NonThreadSafe
{
100 // Creates an asynchronous ProxyResolver that runs requests on up to
101 // |max_num_threads|.
103 // For each thread that is created, an accompanying synchronous ProxyResolver
104 // will be provisioned using |resolver_factory|. All methods on these
105 // ProxyResolvers will be called on the one thread.
106 MultiThreadedProxyResolver(
107 scoped_ptr
<ProxyResolverFactory
> resolver_factory
,
108 size_t max_num_threads
,
109 const scoped_refptr
<ProxyResolverScriptData
>& script_data
,
110 scoped_refptr
<Executor
> executor
);
112 ~MultiThreadedProxyResolver() override
;
114 // ProxyResolver implementation:
115 int GetProxyForURL(const GURL
& url
,
117 const CompletionCallback
& callback
,
118 RequestHandle
* request
,
119 const BoundNetLog
& net_log
) override
;
120 void CancelRequest(RequestHandle request
) override
;
121 LoadState
GetLoadState(RequestHandle request
) const override
;
124 class GetProxyForURLJob
;
125 // FIFO queue of pending jobs waiting to be started.
126 // TODO(eroman): Make this priority queue.
127 typedef std::deque
<scoped_refptr
<Job
>> PendingJobsQueue
;
128 typedef std::vector
<scoped_refptr
<Executor
>> ExecutorList
;
130 // Returns an idle worker thread which is ready to receive GetProxyForURL()
131 // requests. If all threads are occupied, returns NULL.
132 Executor
* FindIdleExecutor();
134 // Creates a new worker thread, and appends it to |executors_|.
135 void AddNewExecutor();
137 // Starts the next job from |pending_jobs_| if possible.
138 void OnExecutorReady(Executor
* executor
) override
;
140 const scoped_ptr
<ProxyResolverFactory
> resolver_factory_
;
141 const size_t max_num_threads_
;
142 PendingJobsQueue pending_jobs_
;
143 ExecutorList executors_
;
144 scoped_refptr
<ProxyResolverScriptData
> script_data_
;
147 // Job ---------------------------------------------
149 class Job
: public base::RefCountedThreadSafe
<Job
> {
151 // Identifies the subclass of Job (only being used for debugging purposes).
153 TYPE_GET_PROXY_FOR_URL
,
154 TYPE_CREATE_RESOLVER
,
157 Job(Type type
, const CompletionCallback
& callback
)
161 was_cancelled_(false) {
164 void set_executor(Executor
* executor
) {
165 executor_
= executor
;
168 // The "executor" is the job runner that is scheduling this job. If
169 // this job has not been submitted to an executor yet, this will be
170 // NULL (and we know it hasn't started yet).
171 Executor
* executor() {
175 // Mark the job as having been cancelled.
177 was_cancelled_
= true;
180 // Returns true if Cancel() has been called.
181 bool was_cancelled() const { return was_cancelled_
; }
183 Type
type() const { return type_
; }
185 // Returns true if this job still has a user callback. Some jobs
186 // do not have a user callback, because they were helper jobs
187 // scheduled internally (for example TYPE_CREATE_RESOLVER).
189 // Otherwise jobs that correspond with user-initiated work will
190 // have a non-null callback up until the callback is run.
191 bool has_user_callback() const { return !callback_
.is_null(); }
193 // This method is called when the job is inserted into a wait queue
194 // because no executors were ready to accept it.
195 virtual void WaitingForThread() {}
197 // This method is called just before the job is posted to the work thread.
198 virtual void FinishedWaitingForThread() {}
200 // This method is called on the worker thread to do the job's work. On
201 // completion, implementors are expected to call OnJobCompleted() on
204 scoped_refptr
<base::SingleThreadTaskRunner
> origin_runner
) = 0;
207 void OnJobCompleted() {
208 // |executor_| will be NULL if the executor has already been deleted.
210 executor_
->OnJobCompleted(this);
213 void RunUserCallback(int result
) {
214 DCHECK(has_user_callback());
215 CompletionCallback callback
= callback_
;
216 // Reset the callback so has_user_callback() will now return false.
218 callback
.Run(result
);
221 friend class base::RefCountedThreadSafe
<Job
>;
227 CompletionCallback callback_
;
232 // CreateResolverJob -----------------------------------------------------------
234 // Runs on the worker thread to call ProxyResolverFactory::CreateProxyResolver.
235 class CreateResolverJob
: public Job
{
237 CreateResolverJob(const scoped_refptr
<ProxyResolverScriptData
>& script_data
,
238 ProxyResolverFactory
* factory
)
239 : Job(TYPE_CREATE_RESOLVER
, CompletionCallback()),
240 script_data_(script_data
),
243 // Runs on the worker thread.
244 void Run(scoped_refptr
<base::SingleThreadTaskRunner
> origin_runner
) override
{
245 scoped_ptr
<ProxyResolverFactory::Request
> request
;
246 int rv
= factory_
->CreateProxyResolver(script_data_
, &resolver_
,
247 CompletionCallback(), &request
);
249 DCHECK_NE(rv
, ERR_IO_PENDING
);
250 origin_runner
->PostTask(
251 FROM_HERE
, base::Bind(&CreateResolverJob::RequestComplete
, this, rv
));
255 ~CreateResolverJob() override
{}
258 // Runs the completion callback on the origin thread.
259 void RequestComplete(int result_code
) {
260 // The task may have been cancelled after it was started.
261 if (!was_cancelled()) {
263 executor()->set_resolver(resolver_
.Pass());
268 const scoped_refptr
<ProxyResolverScriptData
> script_data_
;
269 ProxyResolverFactory
* factory_
;
270 scoped_ptr
<ProxyResolver
> resolver_
;
273 // MultiThreadedProxyResolver::GetProxyForURLJob ------------------------------
275 class MultiThreadedProxyResolver::GetProxyForURLJob
: public Job
{
277 // |url| -- the URL of the query.
278 // |results| -- the structure to fill with proxy resolve results.
279 GetProxyForURLJob(const GURL
& url
,
281 const CompletionCallback
& callback
,
282 const BoundNetLog
& net_log
)
283 : Job(TYPE_GET_PROXY_FOR_URL
, callback
),
287 was_waiting_for_thread_(false) {
288 DCHECK(!callback
.is_null());
291 BoundNetLog
* net_log() { return &net_log_
; }
293 void WaitingForThread() override
{
294 was_waiting_for_thread_
= true;
295 net_log_
.BeginEvent(NetLog::TYPE_WAITING_FOR_PROXY_RESOLVER_THREAD
);
298 void FinishedWaitingForThread() override
{
301 if (was_waiting_for_thread_
) {
302 net_log_
.EndEvent(NetLog::TYPE_WAITING_FOR_PROXY_RESOLVER_THREAD
);
306 NetLog::TYPE_SUBMITTED_TO_RESOLVER_THREAD
,
307 NetLog::IntegerCallback("thread_number", executor()->thread_number()));
310 // Runs on the worker thread.
311 void Run(scoped_refptr
<base::SingleThreadTaskRunner
> origin_runner
) override
{
312 ProxyResolver
* resolver
= executor()->resolver();
314 int rv
= resolver
->GetProxyForURL(
315 url_
, &results_buf_
, CompletionCallback(), NULL
, net_log_
);
316 DCHECK_NE(rv
, ERR_IO_PENDING
);
318 origin_runner
->PostTask(
319 FROM_HERE
, base::Bind(&GetProxyForURLJob::QueryComplete
, this, rv
));
323 ~GetProxyForURLJob() override
{}
326 // Runs the completion callback on the origin thread.
327 void QueryComplete(int result_code
) {
328 // The Job may have been cancelled after it was started.
329 if (!was_cancelled()) {
330 if (result_code
>= OK
) { // Note: unit-tests use values > 0.
331 results_
->Use(results_buf_
);
333 RunUserCallback(result_code
);
338 // Must only be used on the "origin" thread.
341 // Can be used on either "origin" or worker thread.
342 BoundNetLog net_log_
;
345 // Usable from within DoQuery on the worker thread.
346 ProxyInfo results_buf_
;
348 bool was_waiting_for_thread_
;
351 // Executor ----------------------------------------
353 Executor::Executor(Executor::Coordinator
* coordinator
, int thread_number
)
354 : coordinator_(coordinator
), thread_number_(thread_number
) {
356 // Start up the thread.
357 thread_
.reset(new base::Thread(base::StringPrintf("PAC thread #%d",
359 CHECK(thread_
->Start());
362 void Executor::StartJob(Job
* job
) {
363 DCHECK(!outstanding_job_
.get());
364 outstanding_job_
= job
;
366 // Run the job. Once it has completed (regardless of whether it was
367 // cancelled), it will invoke OnJobCompleted() on this thread.
368 job
->set_executor(this);
369 job
->FinishedWaitingForThread();
370 thread_
->message_loop()->PostTask(
372 base::Bind(&Job::Run
, job
, base::ThreadTaskRunnerHandle::Get()));
375 void Executor::OnJobCompleted(Job
* job
) {
376 DCHECK_EQ(job
, outstanding_job_
.get());
377 outstanding_job_
= NULL
;
378 coordinator_
->OnExecutorReady(this);
381 void Executor::Destroy() {
382 DCHECK(coordinator_
);
385 // See http://crbug.com/69710.
386 base::ThreadRestrictions::ScopedAllowIO allow_io
;
388 // Join the worker thread.
392 // Cancel any outstanding job.
393 if (outstanding_job_
.get()) {
394 outstanding_job_
->Cancel();
395 // Orphan the job (since this executor may be deleted soon).
396 outstanding_job_
->set_executor(NULL
);
399 // It is now safe to free the ProxyResolver, since all the tasks that
400 // were using it on the resolver thread have completed.
403 // Null some stuff as a precaution.
405 outstanding_job_
= NULL
;
408 Executor::~Executor() {
409 // The important cleanup happens as part of Destroy(), which should always be
411 DCHECK(!coordinator_
) << "Destroy() was not called";
412 DCHECK(!thread_
.get());
413 DCHECK(!resolver_
.get());
414 DCHECK(!outstanding_job_
.get());
417 // MultiThreadedProxyResolver --------------------------------------------------
419 MultiThreadedProxyResolver::MultiThreadedProxyResolver(
420 scoped_ptr
<ProxyResolverFactory
> resolver_factory
,
421 size_t max_num_threads
,
422 const scoped_refptr
<ProxyResolverScriptData
>& script_data
,
423 scoped_refptr
<Executor
> executor
)
424 : resolver_factory_(resolver_factory
.Pass()),
425 max_num_threads_(max_num_threads
),
426 script_data_(script_data
) {
427 DCHECK(script_data_
);
428 executor
->set_coordinator(this);
429 executors_
.push_back(executor
);
432 MultiThreadedProxyResolver::~MultiThreadedProxyResolver() {
433 DCHECK(CalledOnValidThread());
434 // We will cancel all outstanding requests.
435 pending_jobs_
.clear();
437 for (auto& executor
: executors_
) {
442 int MultiThreadedProxyResolver::GetProxyForURL(
443 const GURL
& url
, ProxyInfo
* results
, const CompletionCallback
& callback
,
444 RequestHandle
* request
, const BoundNetLog
& net_log
) {
445 DCHECK(CalledOnValidThread());
446 DCHECK(!callback
.is_null());
448 scoped_refptr
<GetProxyForURLJob
> job(
449 new GetProxyForURLJob(url
, results
, callback
, net_log
));
451 // Completion will be notified through |callback|, unless the caller cancels
452 // the request using |request|.
454 *request
= reinterpret_cast<RequestHandle
>(job
.get());
456 // If there is an executor that is ready to run this request, submit it!
457 Executor
* executor
= FindIdleExecutor();
459 DCHECK_EQ(0u, pending_jobs_
.size());
460 executor
->StartJob(job
.get());
461 return ERR_IO_PENDING
;
464 // Otherwise queue this request. (We will schedule it to a thread once one
465 // becomes available).
466 job
->WaitingForThread();
467 pending_jobs_
.push_back(job
);
469 // If we haven't already reached the thread limit, provision a new thread to
470 // drain the requests more quickly.
471 if (executors_
.size() < max_num_threads_
)
474 return ERR_IO_PENDING
;
477 void MultiThreadedProxyResolver::CancelRequest(RequestHandle req
) {
478 DCHECK(CalledOnValidThread());
481 Job
* job
= reinterpret_cast<Job
*>(req
);
482 DCHECK_EQ(Job::TYPE_GET_PROXY_FOR_URL
, job
->type());
484 if (job
->executor()) {
485 // If the job was already submitted to the executor, just mark it
486 // as cancelled so the user callback isn't run on completion.
489 // Otherwise the job is just sitting in a queue.
490 PendingJobsQueue::iterator it
=
491 std::find(pending_jobs_
.begin(), pending_jobs_
.end(), job
);
492 DCHECK(it
!= pending_jobs_
.end());
493 pending_jobs_
.erase(it
);
497 LoadState
MultiThreadedProxyResolver::GetLoadState(RequestHandle req
) const {
498 DCHECK(CalledOnValidThread());
500 return LOAD_STATE_RESOLVING_PROXY_FOR_URL
;
503 Executor
* MultiThreadedProxyResolver::FindIdleExecutor() {
504 DCHECK(CalledOnValidThread());
505 for (ExecutorList::iterator it
= executors_
.begin();
506 it
!= executors_
.end(); ++it
) {
507 Executor
* executor
= it
->get();
508 if (!executor
->outstanding_job())
514 void MultiThreadedProxyResolver::AddNewExecutor() {
515 DCHECK(CalledOnValidThread());
516 DCHECK_LT(executors_
.size(), max_num_threads_
);
517 // The "thread number" is used to give the thread a unique name.
518 int thread_number
= executors_
.size();
519 Executor
* executor
= new Executor(this, thread_number
);
521 new CreateResolverJob(script_data_
, resolver_factory_
.get()));
522 executors_
.push_back(make_scoped_refptr(executor
));
525 void MultiThreadedProxyResolver::OnExecutorReady(Executor
* executor
) {
526 DCHECK(CalledOnValidThread());
527 if (pending_jobs_
.empty())
530 // Get the next job to process (FIFO). Transfer it from the pending queue
532 scoped_refptr
<Job
> job
= pending_jobs_
.front();
533 pending_jobs_
.pop_front();
534 executor
->StartJob(job
.get());
539 class MultiThreadedProxyResolverFactory::Job
540 : public ProxyResolverFactory::Request
,
541 public Executor::Coordinator
{
543 Job(MultiThreadedProxyResolverFactory
* factory
,
544 const scoped_refptr
<ProxyResolverScriptData
>& script_data
,
545 scoped_ptr
<ProxyResolver
>* resolver
,
546 scoped_ptr
<ProxyResolverFactory
> resolver_factory
,
547 size_t max_num_threads
,
548 const CompletionCallback
& callback
)
550 resolver_out_(resolver
),
551 resolver_factory_(resolver_factory
.Pass()),
552 max_num_threads_(max_num_threads
),
553 script_data_(script_data
),
554 executor_(new Executor(this, 0)),
555 callback_(callback
) {
557 new CreateResolverJob(script_data_
, resolver_factory_
.get()));
562 executor_
->Destroy();
563 factory_
->RemoveJob(this);
567 void FactoryDestroyed() {
568 executor_
->Destroy();
574 void OnExecutorReady(Executor
* executor
) override
{
576 if (executor
->resolver()) {
577 resolver_out_
->reset(new MultiThreadedProxyResolver(
578 resolver_factory_
.Pass(), max_num_threads_
, script_data_
.Pass(),
581 error
= ERR_PAC_SCRIPT_FAILED
;
582 executor_
->Destroy();
584 factory_
->RemoveJob(this);
586 callback_
.Run(error
);
589 MultiThreadedProxyResolverFactory
* factory_
;
590 scoped_ptr
<ProxyResolver
>* const resolver_out_
;
591 scoped_ptr
<ProxyResolverFactory
> resolver_factory_
;
592 const size_t max_num_threads_
;
593 scoped_refptr
<ProxyResolverScriptData
> script_data_
;
594 scoped_refptr
<Executor
> executor_
;
595 const CompletionCallback callback_
;
598 MultiThreadedProxyResolverFactory::MultiThreadedProxyResolverFactory(
599 size_t max_num_threads
,
600 bool factory_expects_bytes
)
601 : ProxyResolverFactory(factory_expects_bytes
),
602 max_num_threads_(max_num_threads
) {
603 DCHECK_GE(max_num_threads
, 1u);
606 MultiThreadedProxyResolverFactory::~MultiThreadedProxyResolverFactory() {
607 for (auto job
: jobs_
) {
608 job
->FactoryDestroyed();
612 int MultiThreadedProxyResolverFactory::CreateProxyResolver(
613 const scoped_refptr
<ProxyResolverScriptData
>& pac_script
,
614 scoped_ptr
<ProxyResolver
>* resolver
,
615 const CompletionCallback
& callback
,
616 scoped_ptr
<Request
>* request
) {
617 scoped_ptr
<Job
> job(new Job(this, pac_script
, resolver
,
618 CreateProxyResolverFactory(), max_num_threads_
,
620 jobs_
.insert(job
.get());
621 *request
= job
.Pass();
622 return ERR_IO_PENDING
;
625 void MultiThreadedProxyResolverFactory::RemoveJob(
626 MultiThreadedProxyResolverFactory::Job
* job
) {
627 size_t erased
= jobs_
.erase(job
);
628 DCHECK_EQ(1u, erased
);