1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "net/proxy/multi_threaded_proxy_resolver.h"
10 #include "base/bind.h"
11 #include "base/bind_helpers.h"
12 #include "base/single_thread_task_runner.h"
13 #include "base/stl_util.h"
14 #include "base/strings/string_util.h"
15 #include "base/strings/stringprintf.h"
16 #include "base/thread_task_runner_handle.h"
17 #include "base/threading/non_thread_safe.h"
18 #include "base/threading/thread.h"
19 #include "base/threading/thread_restrictions.h"
20 #include "net/base/net_errors.h"
21 #include "net/log/net_log.h"
22 #include "net/proxy/proxy_info.h"
23 #include "net/proxy/proxy_resolver.h"
29 // An "executor" is a job-runner for PAC requests. It encapsulates a worker
30 // thread and a synchronous ProxyResolver (which will be operated on said
32 class Executor
: public base::RefCountedThreadSafe
<Executor
> {
36 virtual void OnExecutorReady(Executor
* executor
) = 0;
39 virtual ~Coordinator() = default;
42 // |coordinator| must remain valid throughout our lifetime. It is used to
43 // signal when the executor is ready to receive work by calling
44 // |coordinator->OnExecutorReady()|.
45 // |thread_number| is an identifier used when naming the worker thread.
46 Executor(Coordinator
* coordinator
, int thread_number
);
48 // Submit a job to this executor.
49 void StartJob(Job
* job
);
51 // Callback for when a job has completed running on the executor's thread.
52 void OnJobCompleted(Job
* job
);
54 // Cleanup the executor. Cancels all outstanding work, and frees the thread
58 // Returns the outstanding job, or NULL.
59 Job
* outstanding_job() const { return outstanding_job_
.get(); }
61 ProxyResolver
* resolver() { return resolver_
.get(); }
63 int thread_number() const { return thread_number_
; }
65 void set_resolver(scoped_ptr
<ProxyResolver
> resolver
) {
66 resolver_
= resolver
.Pass();
69 void set_coordinator(Coordinator
* coordinator
) {
72 coordinator_
= coordinator
;
76 friend class base::RefCountedThreadSafe
<Executor
>;
79 Coordinator
* coordinator_
;
80 const int thread_number_
;
82 // The currently active job for this executor (either a CreateProxyResolver or
83 // GetProxyForURL task).
84 scoped_refptr
<Job
> outstanding_job_
;
86 // The synchronous resolver implementation.
87 scoped_ptr
<ProxyResolver
> resolver_
;
89 // The thread where |resolver_| is run on.
90 // Note that declaration ordering is important here. |thread_| needs to be
91 // destroyed *before* |resolver_|, in case |resolver_| is currently
92 // executing on |thread_|.
93 scoped_ptr
<base::Thread
> thread_
;
96 class MultiThreadedProxyResolver
: public ProxyResolver
,
97 public Executor::Coordinator
,
98 public base::NonThreadSafe
{
100 // Creates an asynchronous ProxyResolver that runs requests on up to
101 // |max_num_threads|.
103 // For each thread that is created, an accompanying synchronous ProxyResolver
104 // will be provisioned using |resolver_factory|. All methods on these
105 // ProxyResolvers will be called on the one thread.
106 MultiThreadedProxyResolver(
107 scoped_ptr
<ProxyResolverFactory
> resolver_factory
,
108 size_t max_num_threads
,
109 const scoped_refptr
<ProxyResolverScriptData
>& script_data
,
110 scoped_refptr
<Executor
> executor
);
112 ~MultiThreadedProxyResolver() override
;
114 // ProxyResolver implementation:
115 int GetProxyForURL(const GURL
& url
,
117 const CompletionCallback
& callback
,
118 RequestHandle
* request
,
119 const BoundNetLog
& net_log
) override
;
120 void CancelRequest(RequestHandle request
) override
;
121 LoadState
GetLoadState(RequestHandle request
) const override
;
122 void CancelSetPacScript() override
;
123 int SetPacScript(const scoped_refptr
<ProxyResolverScriptData
>& script_data
,
124 const CompletionCallback
& callback
) override
;
127 class GetProxyForURLJob
;
128 // FIFO queue of pending jobs waiting to be started.
129 // TODO(eroman): Make this priority queue.
130 typedef std::deque
<scoped_refptr
<Job
>> PendingJobsQueue
;
131 typedef std::vector
<scoped_refptr
<Executor
>> ExecutorList
;
133 // Returns an idle worker thread which is ready to receive GetProxyForURL()
134 // requests. If all threads are occupied, returns NULL.
135 Executor
* FindIdleExecutor();
137 // Creates a new worker thread, and appends it to |executors_|.
138 void AddNewExecutor();
140 // Starts the next job from |pending_jobs_| if possible.
141 void OnExecutorReady(Executor
* executor
) override
;
143 const scoped_ptr
<ProxyResolverFactory
> resolver_factory_
;
144 const size_t max_num_threads_
;
145 PendingJobsQueue pending_jobs_
;
146 ExecutorList executors_
;
147 scoped_refptr
<ProxyResolverScriptData
> script_data_
;
150 // Job ---------------------------------------------
152 class Job
: public base::RefCountedThreadSafe
<Job
> {
154 // Identifies the subclass of Job (only being used for debugging purposes).
156 TYPE_GET_PROXY_FOR_URL
,
157 TYPE_CREATE_RESOLVER
,
160 Job(Type type
, const CompletionCallback
& callback
)
164 was_cancelled_(false) {
167 void set_executor(Executor
* executor
) {
168 executor_
= executor
;
171 // The "executor" is the job runner that is scheduling this job. If
172 // this job has not been submitted to an executor yet, this will be
173 // NULL (and we know it hasn't started yet).
174 Executor
* executor() {
178 // Mark the job as having been cancelled.
180 was_cancelled_
= true;
183 // Returns true if Cancel() has been called.
184 bool was_cancelled() const { return was_cancelled_
; }
186 Type
type() const { return type_
; }
188 // Returns true if this job still has a user callback. Some jobs
189 // do not have a user callback, because they were helper jobs
190 // scheduled internally (for example TYPE_CREATE_RESOLVER).
192 // Otherwise jobs that correspond with user-initiated work will
193 // have a non-null callback up until the callback is run.
194 bool has_user_callback() const { return !callback_
.is_null(); }
196 // This method is called when the job is inserted into a wait queue
197 // because no executors were ready to accept it.
198 virtual void WaitingForThread() {}
200 // This method is called just before the job is posted to the work thread.
201 virtual void FinishedWaitingForThread() {}
203 // This method is called on the worker thread to do the job's work. On
204 // completion, implementors are expected to call OnJobCompleted() on
207 scoped_refptr
<base::SingleThreadTaskRunner
> origin_runner
) = 0;
210 void OnJobCompleted() {
211 // |executor_| will be NULL if the executor has already been deleted.
213 executor_
->OnJobCompleted(this);
216 void RunUserCallback(int result
) {
217 DCHECK(has_user_callback());
218 CompletionCallback callback
= callback_
;
219 // Reset the callback so has_user_callback() will now return false.
221 callback
.Run(result
);
224 friend class base::RefCountedThreadSafe
<Job
>;
230 CompletionCallback callback_
;
235 // CreateResolverJob -----------------------------------------------------------
237 // Runs on the worker thread to call ProxyResolverFactory::CreateProxyResolver.
238 class CreateResolverJob
: public Job
{
240 CreateResolverJob(const scoped_refptr
<ProxyResolverScriptData
>& script_data
,
241 ProxyResolverFactory
* factory
)
242 : Job(TYPE_CREATE_RESOLVER
, CompletionCallback()),
243 script_data_(script_data
),
246 // Runs on the worker thread.
247 void Run(scoped_refptr
<base::SingleThreadTaskRunner
> origin_runner
) override
{
248 scoped_ptr
<ProxyResolverFactory::Request
> request
;
249 int rv
= factory_
->CreateProxyResolver(script_data_
, &resolver_
,
250 CompletionCallback(), &request
);
252 DCHECK_NE(rv
, ERR_IO_PENDING
);
253 origin_runner
->PostTask(
254 FROM_HERE
, base::Bind(&CreateResolverJob::RequestComplete
, this, rv
));
258 ~CreateResolverJob() override
{}
261 // Runs the completion callback on the origin thread.
262 void RequestComplete(int result_code
) {
263 // The task may have been cancelled after it was started.
264 if (!was_cancelled()) {
266 executor()->set_resolver(resolver_
.Pass());
271 const scoped_refptr
<ProxyResolverScriptData
> script_data_
;
272 ProxyResolverFactory
* factory_
;
273 scoped_ptr
<ProxyResolver
> resolver_
;
276 // MultiThreadedProxyResolver::GetProxyForURLJob ------------------------------
278 class MultiThreadedProxyResolver::GetProxyForURLJob
: public Job
{
280 // |url| -- the URL of the query.
281 // |results| -- the structure to fill with proxy resolve results.
282 GetProxyForURLJob(const GURL
& url
,
284 const CompletionCallback
& callback
,
285 const BoundNetLog
& net_log
)
286 : Job(TYPE_GET_PROXY_FOR_URL
, callback
),
290 was_waiting_for_thread_(false) {
291 DCHECK(!callback
.is_null());
294 BoundNetLog
* net_log() { return &net_log_
; }
296 void WaitingForThread() override
{
297 was_waiting_for_thread_
= true;
298 net_log_
.BeginEvent(NetLog::TYPE_WAITING_FOR_PROXY_RESOLVER_THREAD
);
301 void FinishedWaitingForThread() override
{
304 if (was_waiting_for_thread_
) {
305 net_log_
.EndEvent(NetLog::TYPE_WAITING_FOR_PROXY_RESOLVER_THREAD
);
309 NetLog::TYPE_SUBMITTED_TO_RESOLVER_THREAD
,
310 NetLog::IntegerCallback("thread_number", executor()->thread_number()));
313 // Runs on the worker thread.
314 void Run(scoped_refptr
<base::SingleThreadTaskRunner
> origin_runner
) override
{
315 ProxyResolver
* resolver
= executor()->resolver();
317 int rv
= resolver
->GetProxyForURL(
318 url_
, &results_buf_
, CompletionCallback(), NULL
, net_log_
);
319 DCHECK_NE(rv
, ERR_IO_PENDING
);
321 origin_runner
->PostTask(
322 FROM_HERE
, base::Bind(&GetProxyForURLJob::QueryComplete
, this, rv
));
326 ~GetProxyForURLJob() override
{}
329 // Runs the completion callback on the origin thread.
330 void QueryComplete(int result_code
) {
331 // The Job may have been cancelled after it was started.
332 if (!was_cancelled()) {
333 if (result_code
>= OK
) { // Note: unit-tests use values > 0.
334 results_
->Use(results_buf_
);
336 RunUserCallback(result_code
);
341 // Must only be used on the "origin" thread.
344 // Can be used on either "origin" or worker thread.
345 BoundNetLog net_log_
;
348 // Usable from within DoQuery on the worker thread.
349 ProxyInfo results_buf_
;
351 bool was_waiting_for_thread_
;
354 // Executor ----------------------------------------
356 Executor::Executor(Executor::Coordinator
* coordinator
, int thread_number
)
357 : coordinator_(coordinator
), thread_number_(thread_number
) {
359 // Start up the thread.
360 thread_
.reset(new base::Thread(base::StringPrintf("PAC thread #%d",
362 CHECK(thread_
->Start());
365 void Executor::StartJob(Job
* job
) {
366 DCHECK(!outstanding_job_
.get());
367 outstanding_job_
= job
;
369 // Run the job. Once it has completed (regardless of whether it was
370 // cancelled), it will invoke OnJobCompleted() on this thread.
371 job
->set_executor(this);
372 job
->FinishedWaitingForThread();
373 thread_
->message_loop()->PostTask(
375 base::Bind(&Job::Run
, job
, base::ThreadTaskRunnerHandle::Get()));
378 void Executor::OnJobCompleted(Job
* job
) {
379 DCHECK_EQ(job
, outstanding_job_
.get());
380 outstanding_job_
= NULL
;
381 coordinator_
->OnExecutorReady(this);
384 void Executor::Destroy() {
385 DCHECK(coordinator_
);
388 // See http://crbug.com/69710.
389 base::ThreadRestrictions::ScopedAllowIO allow_io
;
391 // Join the worker thread.
395 // Cancel any outstanding job.
396 if (outstanding_job_
.get()) {
397 outstanding_job_
->Cancel();
398 // Orphan the job (since this executor may be deleted soon).
399 outstanding_job_
->set_executor(NULL
);
402 // It is now safe to free the ProxyResolver, since all the tasks that
403 // were using it on the resolver thread have completed.
406 // Null some stuff as a precaution.
408 outstanding_job_
= NULL
;
411 Executor::~Executor() {
412 // The important cleanup happens as part of Destroy(), which should always be
414 DCHECK(!coordinator_
) << "Destroy() was not called";
415 DCHECK(!thread_
.get());
416 DCHECK(!resolver_
.get());
417 DCHECK(!outstanding_job_
.get());
420 // MultiThreadedProxyResolver --------------------------------------------------
422 MultiThreadedProxyResolver::MultiThreadedProxyResolver(
423 scoped_ptr
<ProxyResolverFactory
> resolver_factory
,
424 size_t max_num_threads
,
425 const scoped_refptr
<ProxyResolverScriptData
>& script_data
,
426 scoped_refptr
<Executor
> executor
)
427 : ProxyResolver(resolver_factory
->expects_pac_bytes()),
428 resolver_factory_(resolver_factory
.Pass()),
429 max_num_threads_(max_num_threads
),
430 script_data_(script_data
) {
431 DCHECK(script_data_
);
432 executor
->set_coordinator(this);
433 executors_
.push_back(executor
);
436 MultiThreadedProxyResolver::~MultiThreadedProxyResolver() {
437 DCHECK(CalledOnValidThread());
438 // We will cancel all outstanding requests.
439 pending_jobs_
.clear();
441 for (auto& executor
: executors_
) {
446 int MultiThreadedProxyResolver::GetProxyForURL(
447 const GURL
& url
, ProxyInfo
* results
, const CompletionCallback
& callback
,
448 RequestHandle
* request
, const BoundNetLog
& net_log
) {
449 DCHECK(CalledOnValidThread());
450 DCHECK(!callback
.is_null());
452 scoped_refptr
<GetProxyForURLJob
> job(
453 new GetProxyForURLJob(url
, results
, callback
, net_log
));
455 // Completion will be notified through |callback|, unless the caller cancels
456 // the request using |request|.
458 *request
= reinterpret_cast<RequestHandle
>(job
.get());
460 // If there is an executor that is ready to run this request, submit it!
461 Executor
* executor
= FindIdleExecutor();
463 DCHECK_EQ(0u, pending_jobs_
.size());
464 executor
->StartJob(job
.get());
465 return ERR_IO_PENDING
;
468 // Otherwise queue this request. (We will schedule it to a thread once one
469 // becomes available).
470 job
->WaitingForThread();
471 pending_jobs_
.push_back(job
);
473 // If we haven't already reached the thread limit, provision a new thread to
474 // drain the requests more quickly.
475 if (executors_
.size() < max_num_threads_
)
478 return ERR_IO_PENDING
;
481 void MultiThreadedProxyResolver::CancelRequest(RequestHandle req
) {
482 DCHECK(CalledOnValidThread());
485 Job
* job
= reinterpret_cast<Job
*>(req
);
486 DCHECK_EQ(Job::TYPE_GET_PROXY_FOR_URL
, job
->type());
488 if (job
->executor()) {
489 // If the job was already submitted to the executor, just mark it
490 // as cancelled so the user callback isn't run on completion.
493 // Otherwise the job is just sitting in a queue.
494 PendingJobsQueue::iterator it
=
495 std::find(pending_jobs_
.begin(), pending_jobs_
.end(), job
);
496 DCHECK(it
!= pending_jobs_
.end());
497 pending_jobs_
.erase(it
);
501 LoadState
MultiThreadedProxyResolver::GetLoadState(RequestHandle req
) const {
502 DCHECK(CalledOnValidThread());
504 return LOAD_STATE_RESOLVING_PROXY_FOR_URL
;
507 void MultiThreadedProxyResolver::CancelSetPacScript() {
511 int MultiThreadedProxyResolver::SetPacScript(
512 const scoped_refptr
<ProxyResolverScriptData
>& script_data
,
513 const CompletionCallback
&callback
) {
515 return ERR_NOT_IMPLEMENTED
;
518 Executor
* MultiThreadedProxyResolver::FindIdleExecutor() {
519 DCHECK(CalledOnValidThread());
520 for (ExecutorList::iterator it
= executors_
.begin();
521 it
!= executors_
.end(); ++it
) {
522 Executor
* executor
= it
->get();
523 if (!executor
->outstanding_job())
529 void MultiThreadedProxyResolver::AddNewExecutor() {
530 DCHECK(CalledOnValidThread());
531 DCHECK_LT(executors_
.size(), max_num_threads_
);
532 // The "thread number" is used to give the thread a unique name.
533 int thread_number
= executors_
.size();
534 Executor
* executor
= new Executor(this, thread_number
);
536 new CreateResolverJob(script_data_
, resolver_factory_
.get()));
537 executors_
.push_back(make_scoped_refptr(executor
));
540 void MultiThreadedProxyResolver::OnExecutorReady(Executor
* executor
) {
541 DCHECK(CalledOnValidThread());
542 if (pending_jobs_
.empty())
545 // Get the next job to process (FIFO). Transfer it from the pending queue
547 scoped_refptr
<Job
> job
= pending_jobs_
.front();
548 pending_jobs_
.pop_front();
549 executor
->StartJob(job
.get());
554 class MultiThreadedProxyResolverFactory::Job
555 : public ProxyResolverFactory::Request
,
556 public Executor::Coordinator
{
558 Job(MultiThreadedProxyResolverFactory
* factory
,
559 const scoped_refptr
<ProxyResolverScriptData
>& script_data
,
560 scoped_ptr
<ProxyResolver
>* resolver
,
561 scoped_ptr
<ProxyResolverFactory
> resolver_factory
,
562 size_t max_num_threads
,
563 const CompletionCallback
& callback
)
565 resolver_out_(resolver
),
566 resolver_factory_(resolver_factory
.Pass()),
567 max_num_threads_(max_num_threads
),
568 script_data_(script_data
),
569 executor_(new Executor(this, 0)),
570 callback_(callback
) {
572 new CreateResolverJob(script_data_
, resolver_factory_
.get()));
577 executor_
->Destroy();
578 factory_
->RemoveJob(this);
582 void FactoryDestroyed() {
583 executor_
->Destroy();
589 void OnExecutorReady(Executor
* executor
) override
{
591 if (executor
->resolver()) {
592 resolver_out_
->reset(new MultiThreadedProxyResolver(
593 resolver_factory_
.Pass(), max_num_threads_
, script_data_
.Pass(),
596 error
= ERR_PAC_SCRIPT_FAILED
;
597 executor_
->Destroy();
599 factory_
->RemoveJob(this);
601 callback_
.Run(error
);
604 MultiThreadedProxyResolverFactory
* factory_
;
605 scoped_ptr
<ProxyResolver
>* const resolver_out_
;
606 scoped_ptr
<ProxyResolverFactory
> resolver_factory_
;
607 const size_t max_num_threads_
;
608 scoped_refptr
<ProxyResolverScriptData
> script_data_
;
609 scoped_refptr
<Executor
> executor_
;
610 const CompletionCallback callback_
;
613 MultiThreadedProxyResolverFactory::MultiThreadedProxyResolverFactory(
614 size_t max_num_threads
,
615 bool factory_expects_bytes
)
616 : ProxyResolverFactory(factory_expects_bytes
),
617 max_num_threads_(max_num_threads
) {
618 DCHECK_GE(max_num_threads
, 1u);
621 MultiThreadedProxyResolverFactory::~MultiThreadedProxyResolverFactory() {
622 for (auto job
: jobs_
) {
623 job
->FactoryDestroyed();
627 int MultiThreadedProxyResolverFactory::CreateProxyResolver(
628 const scoped_refptr
<ProxyResolverScriptData
>& pac_script
,
629 scoped_ptr
<ProxyResolver
>* resolver
,
630 const CompletionCallback
& callback
,
631 scoped_ptr
<Request
>* request
) {
632 scoped_ptr
<Job
> job(new Job(this, pac_script
, resolver
,
633 CreateProxyResolverFactory(), max_num_threads_
,
635 jobs_
.insert(job
.get());
636 *request
= job
.Pass();
637 return ERR_IO_PENDING
;
640 void MultiThreadedProxyResolverFactory::RemoveJob(
641 MultiThreadedProxyResolverFactory::Job
* job
) {
642 size_t erased
= jobs_
.erase(job
);
643 DCHECK_EQ(1u, erased
);