1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "net/proxy/multi_threaded_proxy_resolver.h"
8 #include "base/bind_helpers.h"
9 #include "base/message_loop/message_loop_proxy.h"
10 #include "base/strings/string_util.h"
11 #include "base/strings/stringprintf.h"
12 #include "base/threading/thread.h"
13 #include "base/threading/thread_restrictions.h"
14 #include "net/base/net_errors.h"
15 #include "net/base/net_log.h"
16 #include "net/proxy/proxy_info.h"
18 // TODO(eroman): Have the MultiThreadedProxyResolver clear its PAC script
19 // data when SetPacScript fails. That will reclaim memory when
20 // testing bogus scripts.
24 // An "executor" is a job-runner for PAC requests. It encapsulates a worker
25 // thread and a synchronous ProxyResolver (which will be operated on said
27 class MultiThreadedProxyResolver::Executor
28 : public base::RefCountedThreadSafe
<MultiThreadedProxyResolver::Executor
> {
30 // |coordinator| must remain valid throughout our lifetime. It is used to
31 // signal when the executor is ready to receive work by calling
32 // |coordinator->OnExecutorReady()|.
33 // The constructor takes ownership of |resolver|.
34 // |thread_number| is an identifier used when naming the worker thread.
35 Executor(MultiThreadedProxyResolver
* coordinator
,
36 ProxyResolver
* resolver
,
39 // Submit a job to this executor.
40 void StartJob(Job
* job
);
42 // Callback for when a job has completed running on the executor's thread.
43 void OnJobCompleted(Job
* job
);
45 // Cleanup the executor. Cancels all outstanding work, and frees the thread
49 // Returns the outstanding job, or NULL.
50 Job
* outstanding_job() const { return outstanding_job_
.get(); }
52 ProxyResolver
* resolver() { return resolver_
.get(); }
54 int thread_number() const { return thread_number_
; }
57 friend class base::RefCountedThreadSafe
<Executor
>;
60 MultiThreadedProxyResolver
* coordinator_
;
61 const int thread_number_
;
63 // The currently active job for this executor (either a SetPacScript or
64 // GetProxyForURL task).
65 scoped_refptr
<Job
> outstanding_job_
;
67 // The synchronous resolver implementation.
68 scoped_ptr
<ProxyResolver
> resolver_
;
70 // The thread where |resolver_| is run on.
71 // Note that declaration ordering is important here. |thread_| needs to be
72 // destroyed *before* |resolver_|, in case |resolver_| is currently
73 // executing on |thread_|.
74 scoped_ptr
<base::Thread
> thread_
;
77 // MultiThreadedProxyResolver::Job ---------------------------------------------
79 class MultiThreadedProxyResolver::Job
80 : public base::RefCountedThreadSafe
<MultiThreadedProxyResolver::Job
> {
82 // Identifies the subclass of Job (only being used for debugging purposes).
84 TYPE_GET_PROXY_FOR_URL
,
86 TYPE_SET_PAC_SCRIPT_INTERNAL
,
89 Job(Type type
, const CompletionCallback
& callback
)
93 was_cancelled_(false) {
96 void set_executor(Executor
* executor
) {
100 // The "executor" is the job runner that is scheduling this job. If
101 // this job has not been submitted to an executor yet, this will be
102 // NULL (and we know it hasn't started yet).
103 Executor
* executor() {
107 // Mark the job as having been cancelled.
109 was_cancelled_
= true;
112 // Returns true if Cancel() has been called.
113 bool was_cancelled() const { return was_cancelled_
; }
115 Type
type() const { return type_
; }
117 // Returns true if this job still has a user callback. Some jobs
118 // do not have a user callback, because they were helper jobs
119 // scheduled internally (for example TYPE_SET_PAC_SCRIPT_INTERNAL).
121 // Otherwise jobs that correspond with user-initiated work will
122 // have a non-null callback up until the callback is run.
123 bool has_user_callback() const { return !callback_
.is_null(); }
125 // This method is called when the job is inserted into a wait queue
126 // because no executors were ready to accept it.
127 virtual void WaitingForThread() {}
129 // This method is called just before the job is posted to the work thread.
130 virtual void FinishedWaitingForThread() {}
132 // This method is called on the worker thread to do the job's work. On
133 // completion, implementors are expected to call OnJobCompleted() on
135 virtual void Run(scoped_refptr
<base::MessageLoopProxy
> origin_loop
) = 0;
138 void OnJobCompleted() {
139 // |executor_| will be NULL if the executor has already been deleted.
141 executor_
->OnJobCompleted(this);
144 void RunUserCallback(int result
) {
145 DCHECK(has_user_callback());
146 CompletionCallback callback
= callback_
;
147 // Reset the callback so has_user_callback() will now return false.
149 callback
.Run(result
);
152 friend class base::RefCountedThreadSafe
<MultiThreadedProxyResolver::Job
>;
158 CompletionCallback callback_
;
163 // MultiThreadedProxyResolver::SetPacScriptJob ---------------------------------
165 // Runs on the worker thread to call ProxyResolver::SetPacScript.
166 class MultiThreadedProxyResolver::SetPacScriptJob
167 : public MultiThreadedProxyResolver::Job
{
169 SetPacScriptJob(const scoped_refptr
<ProxyResolverScriptData
>& script_data
,
170 const CompletionCallback
& callback
)
171 : Job(!callback
.is_null() ? TYPE_SET_PAC_SCRIPT
:
172 TYPE_SET_PAC_SCRIPT_INTERNAL
,
174 script_data_(script_data
) {
177 // Runs on the worker thread.
178 virtual void Run(scoped_refptr
<base::MessageLoopProxy
> origin_loop
) override
{
179 ProxyResolver
* resolver
= executor()->resolver();
180 int rv
= resolver
->SetPacScript(script_data_
, CompletionCallback());
182 DCHECK_NE(rv
, ERR_IO_PENDING
);
183 origin_loop
->PostTask(
185 base::Bind(&SetPacScriptJob::RequestComplete
, this, rv
));
189 virtual ~SetPacScriptJob() {}
192 // Runs the completion callback on the origin thread.
193 void RequestComplete(int result_code
) {
194 // The task may have been cancelled after it was started.
195 if (!was_cancelled() && has_user_callback()) {
196 RunUserCallback(result_code
);
201 const scoped_refptr
<ProxyResolverScriptData
> script_data_
;
204 // MultiThreadedProxyResolver::GetProxyForURLJob ------------------------------
206 class MultiThreadedProxyResolver::GetProxyForURLJob
207 : public MultiThreadedProxyResolver::Job
{
209 // |url| -- the URL of the query.
210 // |results| -- the structure to fill with proxy resolve results.
211 GetProxyForURLJob(const GURL
& url
,
213 const CompletionCallback
& callback
,
214 const BoundNetLog
& net_log
)
215 : Job(TYPE_GET_PROXY_FOR_URL
, callback
),
219 was_waiting_for_thread_(false) {
220 DCHECK(!callback
.is_null());
223 BoundNetLog
* net_log() { return &net_log_
; }
225 virtual void WaitingForThread() override
{
226 was_waiting_for_thread_
= true;
227 net_log_
.BeginEvent(NetLog::TYPE_WAITING_FOR_PROXY_RESOLVER_THREAD
);
230 virtual void FinishedWaitingForThread() override
{
233 if (was_waiting_for_thread_
) {
234 net_log_
.EndEvent(NetLog::TYPE_WAITING_FOR_PROXY_RESOLVER_THREAD
);
238 NetLog::TYPE_SUBMITTED_TO_RESOLVER_THREAD
,
239 NetLog::IntegerCallback("thread_number", executor()->thread_number()));
242 // Runs on the worker thread.
243 virtual void Run(scoped_refptr
<base::MessageLoopProxy
> origin_loop
) override
{
244 ProxyResolver
* resolver
= executor()->resolver();
245 int rv
= resolver
->GetProxyForURL(
246 url_
, &results_buf_
, CompletionCallback(), NULL
, net_log_
);
247 DCHECK_NE(rv
, ERR_IO_PENDING
);
249 origin_loop
->PostTask(
251 base::Bind(&GetProxyForURLJob::QueryComplete
, this, rv
));
255 virtual ~GetProxyForURLJob() {}
258 // Runs the completion callback on the origin thread.
259 void QueryComplete(int result_code
) {
260 // The Job may have been cancelled after it was started.
261 if (!was_cancelled()) {
262 if (result_code
>= OK
) { // Note: unit-tests use values > 0.
263 results_
->Use(results_buf_
);
265 RunUserCallback(result_code
);
270 // Must only be used on the "origin" thread.
273 // Can be used on either "origin" or worker thread.
274 BoundNetLog net_log_
;
277 // Usable from within DoQuery on the worker thread.
278 ProxyInfo results_buf_
;
280 bool was_waiting_for_thread_
;
283 // MultiThreadedProxyResolver::Executor ----------------------------------------
285 MultiThreadedProxyResolver::Executor::Executor(
286 MultiThreadedProxyResolver
* coordinator
,
287 ProxyResolver
* resolver
,
289 : coordinator_(coordinator
),
290 thread_number_(thread_number
),
291 resolver_(resolver
) {
294 // Start up the thread.
295 thread_
.reset(new base::Thread(base::StringPrintf("PAC thread #%d",
297 CHECK(thread_
->Start());
300 void MultiThreadedProxyResolver::Executor::StartJob(Job
* job
) {
301 DCHECK(!outstanding_job_
.get());
302 outstanding_job_
= job
;
304 // Run the job. Once it has completed (regardless of whether it was
305 // cancelled), it will invoke OnJobCompleted() on this thread.
306 job
->set_executor(this);
307 job
->FinishedWaitingForThread();
308 thread_
->message_loop()->PostTask(
310 base::Bind(&Job::Run
, job
, base::MessageLoopProxy::current()));
313 void MultiThreadedProxyResolver::Executor::OnJobCompleted(Job
* job
) {
314 DCHECK_EQ(job
, outstanding_job_
.get());
315 outstanding_job_
= NULL
;
316 coordinator_
->OnExecutorReady(this);
319 void MultiThreadedProxyResolver::Executor::Destroy() {
320 DCHECK(coordinator_
);
323 // See http://crbug.com/69710.
324 base::ThreadRestrictions::ScopedAllowIO allow_io
;
326 // Join the worker thread.
330 // Cancel any outstanding job.
331 if (outstanding_job_
.get()) {
332 outstanding_job_
->Cancel();
333 // Orphan the job (since this executor may be deleted soon).
334 outstanding_job_
->set_executor(NULL
);
337 // It is now safe to free the ProxyResolver, since all the tasks that
338 // were using it on the resolver thread have completed.
341 // Null some stuff as a precaution.
343 outstanding_job_
= NULL
;
346 MultiThreadedProxyResolver::Executor::~Executor() {
347 // The important cleanup happens as part of Destroy(), which should always be
349 DCHECK(!coordinator_
) << "Destroy() was not called";
350 DCHECK(!thread_
.get());
351 DCHECK(!resolver_
.get());
352 DCHECK(!outstanding_job_
.get());
355 // MultiThreadedProxyResolver --------------------------------------------------
357 MultiThreadedProxyResolver::MultiThreadedProxyResolver(
358 ProxyResolverFactory
* resolver_factory
,
359 size_t max_num_threads
)
360 : ProxyResolver(resolver_factory
->resolvers_expect_pac_bytes()),
361 resolver_factory_(resolver_factory
),
362 max_num_threads_(max_num_threads
) {
363 DCHECK_GE(max_num_threads
, 1u);
366 MultiThreadedProxyResolver::~MultiThreadedProxyResolver() {
367 // We will cancel all outstanding requests.
368 pending_jobs_
.clear();
369 ReleaseAllExecutors();
372 int MultiThreadedProxyResolver::GetProxyForURL(
373 const GURL
& url
, ProxyInfo
* results
, const CompletionCallback
& callback
,
374 RequestHandle
* request
, const BoundNetLog
& net_log
) {
375 DCHECK(CalledOnValidThread());
376 DCHECK(!callback
.is_null());
377 DCHECK(current_script_data_
.get())
378 << "Resolver is un-initialized. Must call SetPacScript() first!";
380 scoped_refptr
<GetProxyForURLJob
> job(
381 new GetProxyForURLJob(url
, results
, callback
, net_log
));
383 // Completion will be notified through |callback|, unless the caller cancels
384 // the request using |request|.
386 *request
= reinterpret_cast<RequestHandle
>(job
.get());
388 // If there is an executor that is ready to run this request, submit it!
389 Executor
* executor
= FindIdleExecutor();
391 DCHECK_EQ(0u, pending_jobs_
.size());
392 executor
->StartJob(job
.get());
393 return ERR_IO_PENDING
;
396 // Otherwise queue this request. (We will schedule it to a thread once one
397 // becomes available).
398 job
->WaitingForThread();
399 pending_jobs_
.push_back(job
);
401 // If we haven't already reached the thread limit, provision a new thread to
402 // drain the requests more quickly.
403 if (executors_
.size() < max_num_threads_
) {
404 executor
= AddNewExecutor();
406 new SetPacScriptJob(current_script_data_
, CompletionCallback()));
409 return ERR_IO_PENDING
;
412 void MultiThreadedProxyResolver::CancelRequest(RequestHandle req
) {
413 DCHECK(CalledOnValidThread());
416 Job
* job
= reinterpret_cast<Job
*>(req
);
417 DCHECK_EQ(Job::TYPE_GET_PROXY_FOR_URL
, job
->type());
419 if (job
->executor()) {
420 // If the job was already submitted to the executor, just mark it
421 // as cancelled so the user callback isn't run on completion.
424 // Otherwise the job is just sitting in a queue.
425 PendingJobsQueue::iterator it
=
426 std::find(pending_jobs_
.begin(), pending_jobs_
.end(), job
);
427 DCHECK(it
!= pending_jobs_
.end());
428 pending_jobs_
.erase(it
);
432 LoadState
MultiThreadedProxyResolver::GetLoadState(RequestHandle req
) const {
433 DCHECK(CalledOnValidThread());
435 return LOAD_STATE_RESOLVING_PROXY_FOR_URL
;
438 void MultiThreadedProxyResolver::CancelSetPacScript() {
439 DCHECK(CalledOnValidThread());
440 DCHECK_EQ(0u, pending_jobs_
.size());
441 DCHECK_EQ(1u, executors_
.size());
442 DCHECK_EQ(Job::TYPE_SET_PAC_SCRIPT
,
443 executors_
[0]->outstanding_job()->type());
445 // Defensively clear some data which shouldn't be getting used
447 current_script_data_
= NULL
;
449 ReleaseAllExecutors();
452 int MultiThreadedProxyResolver::SetPacScript(
453 const scoped_refptr
<ProxyResolverScriptData
>& script_data
,
454 const CompletionCallback
&callback
) {
455 DCHECK(CalledOnValidThread());
456 DCHECK(!callback
.is_null());
458 // Save the script details, so we can provision new executors later.
459 current_script_data_
= script_data
;
461 // The user should not have any outstanding requests when they call
463 CheckNoOutstandingUserRequests();
465 // Destroy all of the current threads and their proxy resolvers.
466 ReleaseAllExecutors();
468 // Provision a new executor, and run the SetPacScript request. On completion
469 // notification will be sent through |callback|.
470 Executor
* executor
= AddNewExecutor();
471 executor
->StartJob(new SetPacScriptJob(script_data
, callback
));
472 return ERR_IO_PENDING
;
475 void MultiThreadedProxyResolver::CheckNoOutstandingUserRequests() const {
476 DCHECK(CalledOnValidThread());
477 CHECK_EQ(0u, pending_jobs_
.size());
479 for (ExecutorList::const_iterator it
= executors_
.begin();
480 it
!= executors_
.end(); ++it
) {
481 const Executor
* executor
= it
->get();
482 Job
* job
= executor
->outstanding_job();
483 // The "has_user_callback()" is to exclude jobs for which the callback
484 // has already been invoked, or was not user-initiated (as in the case of
485 // lazy thread provisions). User-initiated jobs may !has_user_callback()
486 // when the callback has already been run. (Since we only clear the
487 // outstanding job AFTER the callback has been invoked, it is possible
488 // for a new request to be started from within the callback).
489 CHECK(!job
|| job
->was_cancelled() || !job
->has_user_callback());
493 void MultiThreadedProxyResolver::ReleaseAllExecutors() {
494 DCHECK(CalledOnValidThread());
495 for (ExecutorList::iterator it
= executors_
.begin();
496 it
!= executors_
.end(); ++it
) {
497 Executor
* executor
= it
->get();
503 MultiThreadedProxyResolver::Executor
*
504 MultiThreadedProxyResolver::FindIdleExecutor() {
505 DCHECK(CalledOnValidThread());
506 for (ExecutorList::iterator it
= executors_
.begin();
507 it
!= executors_
.end(); ++it
) {
508 Executor
* executor
= it
->get();
509 if (!executor
->outstanding_job())
515 MultiThreadedProxyResolver::Executor
*
516 MultiThreadedProxyResolver::AddNewExecutor() {
517 DCHECK(CalledOnValidThread());
518 DCHECK_LT(executors_
.size(), max_num_threads_
);
519 // The "thread number" is used to give the thread a unique name.
520 int thread_number
= executors_
.size();
521 ProxyResolver
* resolver
= resolver_factory_
->CreateProxyResolver();
522 Executor
* executor
= new Executor(
523 this, resolver
, thread_number
);
524 executors_
.push_back(make_scoped_refptr(executor
));
528 void MultiThreadedProxyResolver::OnExecutorReady(Executor
* executor
) {
529 DCHECK(CalledOnValidThread());
530 if (pending_jobs_
.empty())
533 // Get the next job to process (FIFO). Transfer it from the pending queue
535 scoped_refptr
<Job
> job
= pending_jobs_
.front();
536 pending_jobs_
.pop_front();
537 executor
->StartJob(job
.get());