1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "net/proxy/multi_threaded_proxy_resolver.h"
8 #include "base/message_loop_proxy.h"
9 #include "base/string_util.h"
10 #include "base/stringprintf.h"
11 #include "base/threading/thread.h"
12 #include "base/threading/thread_restrictions.h"
13 #include "net/base/net_errors.h"
14 #include "net/base/net_log.h"
15 #include "net/proxy/proxy_info.h"
17 // TODO(eroman): Have the MultiThreadedProxyResolver clear its PAC script
18 // data when SetPacScript fails. That will reclaim memory when
19 // testing bogus scripts.
25 class PurgeMemoryTask
: public base::RefCountedThreadSafe
<PurgeMemoryTask
> {
27 explicit PurgeMemoryTask(ProxyResolver
* resolver
) : resolver_(resolver
) {}
28 void PurgeMemory() { resolver_
->PurgeMemory(); }
30 friend class base::RefCountedThreadSafe
<PurgeMemoryTask
>;
32 ProxyResolver
* resolver_
;
37 // An "executor" is a job-runner for PAC requests. It encapsulates a worker
38 // thread and a synchronous ProxyResolver (which will be operated on said
40 class MultiThreadedProxyResolver::Executor
41 : public base::RefCountedThreadSafe
<MultiThreadedProxyResolver::Executor
> {
43 // |coordinator| must remain valid throughout our lifetime. It is used to
44 // signal when the executor is ready to receive work by calling
45 // |coordinator->OnExecutorReady()|.
46 // The constructor takes ownership of |resolver|.
47 // |thread_number| is an identifier used when naming the worker thread.
48 Executor(MultiThreadedProxyResolver
* coordinator
,
49 ProxyResolver
* resolver
,
52 // Submit a job to this executor.
53 void StartJob(Job
* job
);
55 // Callback for when a job has completed running on the executor's thread.
56 void OnJobCompleted(Job
* job
);
58 // Cleanup the executor. Cancels all outstanding work, and frees the thread
64 // Returns the outstanding job, or NULL.
65 Job
* outstanding_job() const { return outstanding_job_
.get(); }
67 ProxyResolver
* resolver() { return resolver_
.get(); }
69 int thread_number() const { return thread_number_
; }
72 friend class base::RefCountedThreadSafe
<Executor
>;
75 MultiThreadedProxyResolver
* coordinator_
;
76 const int thread_number_
;
78 // The currently active job for this executor (either a SetPacScript or
79 // GetProxyForURL task).
80 scoped_refptr
<Job
> outstanding_job_
;
82 // The synchronous resolver implementation.
83 scoped_ptr
<ProxyResolver
> resolver_
;
85 // The thread where |resolver_| is run on.
86 // Note that declaration ordering is important here. |thread_| needs to be
87 // destroyed *before* |resolver_|, in case |resolver_| is currently
88 // executing on |thread_|.
89 scoped_ptr
<base::Thread
> thread_
;
92 // MultiThreadedProxyResolver::Job ---------------------------------------------
94 class MultiThreadedProxyResolver::Job
95 : public base::RefCountedThreadSafe
<MultiThreadedProxyResolver::Job
> {
97 // Identifies the subclass of Job (only being used for debugging purposes).
99 TYPE_GET_PROXY_FOR_URL
,
101 TYPE_SET_PAC_SCRIPT_INTERNAL
,
104 Job(Type type
, const CompletionCallback
& callback
)
108 was_cancelled_(false) {
111 void set_executor(Executor
* executor
) {
112 executor_
= executor
;
115 // The "executor" is the job runner that is scheduling this job. If
116 // this job has not been submitted to an executor yet, this will be
117 // NULL (and we know it hasn't started yet).
118 Executor
* executor() {
122 // Mark the job as having been cancelled.
124 was_cancelled_
= true;
127 // Returns true if Cancel() has been called.
128 bool was_cancelled() const { return was_cancelled_
; }
130 Type
type() const { return type_
; }
132 // Returns true if this job still has a user callback. Some jobs
133 // do not have a user callback, because they were helper jobs
134 // scheduled internally (for example TYPE_SET_PAC_SCRIPT_INTERNAL).
136 // Otherwise jobs that correspond with user-initiated work will
137 // have a non-null callback up until the callback is run.
138 bool has_user_callback() const { return !callback_
.is_null(); }
140 // This method is called when the job is inserted into a wait queue
141 // because no executors were ready to accept it.
142 virtual void WaitingForThread() {}
144 // This method is called just before the job is posted to the work thread.
145 virtual void FinishedWaitingForThread() {}
147 // This method is called on the worker thread to do the job's work. On
148 // completion, implementors are expected to call OnJobCompleted() on
150 virtual void Run(scoped_refptr
<base::MessageLoopProxy
> origin_loop
) = 0;
153 void OnJobCompleted() {
154 // |executor_| will be NULL if the executor has already been deleted.
156 executor_
->OnJobCompleted(this);
159 void RunUserCallback(int result
) {
160 DCHECK(has_user_callback());
161 CompletionCallback callback
= callback_
;
162 // Reset the callback so has_user_callback() will now return false.
164 callback
.Run(result
);
167 friend class base::RefCountedThreadSafe
<MultiThreadedProxyResolver::Job
>;
173 CompletionCallback callback_
;
178 // MultiThreadedProxyResolver::SetPacScriptJob ---------------------------------
180 // Runs on the worker thread to call ProxyResolver::SetPacScript.
181 class MultiThreadedProxyResolver::SetPacScriptJob
182 : public MultiThreadedProxyResolver::Job
{
184 SetPacScriptJob(const scoped_refptr
<ProxyResolverScriptData
>& script_data
,
185 const CompletionCallback
& callback
)
186 : Job(!callback
.is_null() ? TYPE_SET_PAC_SCRIPT
:
187 TYPE_SET_PAC_SCRIPT_INTERNAL
,
189 script_data_(script_data
) {
192 // Runs on the worker thread.
193 virtual void Run(scoped_refptr
<base::MessageLoopProxy
> origin_loop
) OVERRIDE
{
194 ProxyResolver
* resolver
= executor()->resolver();
195 int rv
= resolver
->SetPacScript(script_data_
, CompletionCallback());
197 DCHECK_NE(rv
, ERR_IO_PENDING
);
198 origin_loop
->PostTask(
200 base::Bind(&SetPacScriptJob::RequestComplete
, this, rv
));
204 virtual ~SetPacScriptJob() {}
207 // Runs the completion callback on the origin thread.
208 void RequestComplete(int result_code
) {
209 // The task may have been cancelled after it was started.
210 if (!was_cancelled() && has_user_callback()) {
211 RunUserCallback(result_code
);
216 const scoped_refptr
<ProxyResolverScriptData
> script_data_
;
219 // MultiThreadedProxyResolver::GetProxyForURLJob ------------------------------
221 class MultiThreadedProxyResolver::GetProxyForURLJob
222 : public MultiThreadedProxyResolver::Job
{
224 // |url| -- the URL of the query.
225 // |results| -- the structure to fill with proxy resolve results.
226 GetProxyForURLJob(const GURL
& url
,
228 const CompletionCallback
& callback
,
229 const BoundNetLog
& net_log
)
230 : Job(TYPE_GET_PROXY_FOR_URL
, callback
),
234 was_waiting_for_thread_(false) {
235 DCHECK(!callback
.is_null());
238 BoundNetLog
* net_log() { return &net_log_
; }
240 virtual void WaitingForThread() OVERRIDE
{
241 was_waiting_for_thread_
= true;
242 net_log_
.BeginEvent(NetLog::TYPE_WAITING_FOR_PROXY_RESOLVER_THREAD
);
245 virtual void FinishedWaitingForThread() OVERRIDE
{
248 if (was_waiting_for_thread_
) {
249 net_log_
.EndEvent(NetLog::TYPE_WAITING_FOR_PROXY_RESOLVER_THREAD
);
253 NetLog::TYPE_SUBMITTED_TO_RESOLVER_THREAD
,
254 NetLog::IntegerCallback("thread_number", executor()->thread_number()));
257 // Runs on the worker thread.
258 virtual void Run(scoped_refptr
<base::MessageLoopProxy
> origin_loop
) OVERRIDE
{
259 ProxyResolver
* resolver
= executor()->resolver();
260 int rv
= resolver
->GetProxyForURL(
261 url_
, &results_buf_
, CompletionCallback(), NULL
, net_log_
);
262 DCHECK_NE(rv
, ERR_IO_PENDING
);
264 origin_loop
->PostTask(
266 base::Bind(&GetProxyForURLJob::QueryComplete
, this, rv
));
270 virtual ~GetProxyForURLJob() {}
273 // Runs the completion callback on the origin thread.
274 void QueryComplete(int result_code
) {
275 // The Job may have been cancelled after it was started.
276 if (!was_cancelled()) {
277 if (result_code
>= OK
) { // Note: unit-tests use values > 0.
278 results_
->Use(results_buf_
);
280 RunUserCallback(result_code
);
285 // Must only be used on the "origin" thread.
288 // Can be used on either "origin" or worker thread.
289 BoundNetLog net_log_
;
292 // Usable from within DoQuery on the worker thread.
293 ProxyInfo results_buf_
;
295 bool was_waiting_for_thread_
;
298 // MultiThreadedProxyResolver::Executor ----------------------------------------
300 MultiThreadedProxyResolver::Executor::Executor(
301 MultiThreadedProxyResolver
* coordinator
,
302 ProxyResolver
* resolver
,
304 : coordinator_(coordinator
),
305 thread_number_(thread_number
),
306 resolver_(resolver
) {
309 // Start up the thread.
310 // Note that it is safe to pass a temporary C-String to Thread(), as it will
312 std::string thread_name
=
313 base::StringPrintf("PAC thread #%d", thread_number
);
314 thread_
.reset(new base::Thread(thread_name
.c_str()));
315 CHECK(thread_
->Start());
318 void MultiThreadedProxyResolver::Executor::StartJob(Job
* job
) {
319 DCHECK(!outstanding_job_
);
320 outstanding_job_
= job
;
322 // Run the job. Once it has completed (regardless of whether it was
323 // cancelled), it will invoke OnJobCompleted() on this thread.
324 job
->set_executor(this);
325 job
->FinishedWaitingForThread();
326 thread_
->message_loop()->PostTask(
328 base::Bind(&Job::Run
, job
, base::MessageLoopProxy::current()));
331 void MultiThreadedProxyResolver::Executor::OnJobCompleted(Job
* job
) {
332 DCHECK_EQ(job
, outstanding_job_
.get());
333 outstanding_job_
= NULL
;
334 coordinator_
->OnExecutorReady(this);
337 void MultiThreadedProxyResolver::Executor::Destroy() {
338 DCHECK(coordinator_
);
340 // Give the resolver an opportunity to shutdown from THIS THREAD before
341 // joining on the resolver thread. This allows certain implementations
342 // to avoid deadlocks.
343 resolver_
->Shutdown();
346 // See http://crbug.com/69710.
347 base::ThreadRestrictions::ScopedAllowIO allow_io
;
349 // Join the worker thread.
353 // Cancel any outstanding job.
354 if (outstanding_job_
) {
355 outstanding_job_
->Cancel();
356 // Orphan the job (since this executor may be deleted soon).
357 outstanding_job_
->set_executor(NULL
);
360 // It is now safe to free the ProxyResolver, since all the tasks that
361 // were using it on the resolver thread have completed.
364 // Null some stuff as a precaution.
366 outstanding_job_
= NULL
;
369 void MultiThreadedProxyResolver::Executor::PurgeMemory() {
370 scoped_refptr
<PurgeMemoryTask
> helper(new PurgeMemoryTask(resolver_
.get()));
371 thread_
->message_loop()->PostTask(
373 base::Bind(&PurgeMemoryTask::PurgeMemory
, helper
.get()));
376 MultiThreadedProxyResolver::Executor::~Executor() {
377 // The important cleanup happens as part of Destroy(), which should always be
379 DCHECK(!coordinator_
) << "Destroy() was not called";
380 DCHECK(!thread_
.get());
381 DCHECK(!resolver_
.get());
382 DCHECK(!outstanding_job_
);
385 // MultiThreadedProxyResolver --------------------------------------------------
387 MultiThreadedProxyResolver::MultiThreadedProxyResolver(
388 ProxyResolverFactory
* resolver_factory
,
389 size_t max_num_threads
)
390 : ProxyResolver(resolver_factory
->resolvers_expect_pac_bytes()),
391 resolver_factory_(resolver_factory
),
392 max_num_threads_(max_num_threads
) {
393 DCHECK_GE(max_num_threads
, 1u);
396 MultiThreadedProxyResolver::~MultiThreadedProxyResolver() {
397 // We will cancel all outstanding requests.
398 pending_jobs_
.clear();
399 ReleaseAllExecutors();
402 int MultiThreadedProxyResolver::GetProxyForURL(
403 const GURL
& url
, ProxyInfo
* results
, const CompletionCallback
& callback
,
404 RequestHandle
* request
, const BoundNetLog
& net_log
) {
405 DCHECK(CalledOnValidThread());
406 DCHECK(!callback
.is_null());
407 DCHECK(current_script_data_
.get())
408 << "Resolver is un-initialized. Must call SetPacScript() first!";
410 scoped_refptr
<GetProxyForURLJob
> job(
411 new GetProxyForURLJob(url
, results
, callback
, net_log
));
413 // Completion will be notified through |callback|, unless the caller cancels
414 // the request using |request|.
416 *request
= reinterpret_cast<RequestHandle
>(job
.get());
418 // If there is an executor that is ready to run this request, submit it!
419 Executor
* executor
= FindIdleExecutor();
421 DCHECK_EQ(0u, pending_jobs_
.size());
422 executor
->StartJob(job
);
423 return ERR_IO_PENDING
;
426 // Otherwise queue this request. (We will schedule it to a thread once one
427 // becomes available).
428 job
->WaitingForThread();
429 pending_jobs_
.push_back(job
);
431 // If we haven't already reached the thread limit, provision a new thread to
432 // drain the requests more quickly.
433 if (executors_
.size() < max_num_threads_
) {
434 executor
= AddNewExecutor();
436 new SetPacScriptJob(current_script_data_
, CompletionCallback()));
439 return ERR_IO_PENDING
;
442 void MultiThreadedProxyResolver::CancelRequest(RequestHandle req
) {
443 DCHECK(CalledOnValidThread());
446 Job
* job
= reinterpret_cast<Job
*>(req
);
447 DCHECK_EQ(Job::TYPE_GET_PROXY_FOR_URL
, job
->type());
449 if (job
->executor()) {
450 // If the job was already submitted to the executor, just mark it
451 // as cancelled so the user callback isn't run on completion.
454 // Otherwise the job is just sitting in a queue.
455 PendingJobsQueue::iterator it
=
456 std::find(pending_jobs_
.begin(), pending_jobs_
.end(), job
);
457 DCHECK(it
!= pending_jobs_
.end());
458 pending_jobs_
.erase(it
);
462 LoadState
MultiThreadedProxyResolver::GetLoadState(RequestHandle req
) const {
463 DCHECK(CalledOnValidThread());
466 Job
* job
= reinterpret_cast<Job
*>(req
);
468 return job
->executor()->resolver()->GetLoadStateThreadSafe(NULL
);
469 return LOAD_STATE_RESOLVING_PROXY_FOR_URL
;
472 LoadState
MultiThreadedProxyResolver::GetLoadStateThreadSafe(
473 RequestHandle req
) const {
475 return LOAD_STATE_IDLE
;
478 void MultiThreadedProxyResolver::CancelSetPacScript() {
479 DCHECK(CalledOnValidThread());
480 DCHECK_EQ(0u, pending_jobs_
.size());
481 DCHECK_EQ(1u, executors_
.size());
482 DCHECK_EQ(Job::TYPE_SET_PAC_SCRIPT
,
483 executors_
[0]->outstanding_job()->type());
485 // Defensively clear some data which shouldn't be getting used
487 current_script_data_
= NULL
;
489 ReleaseAllExecutors();
492 void MultiThreadedProxyResolver::PurgeMemory() {
493 DCHECK(CalledOnValidThread());
494 for (ExecutorList::iterator it
= executors_
.begin();
495 it
!= executors_
.end(); ++it
) {
496 Executor
* executor
= *it
;
497 executor
->PurgeMemory();
501 int MultiThreadedProxyResolver::SetPacScript(
502 const scoped_refptr
<ProxyResolverScriptData
>& script_data
,
503 const CompletionCallback
&callback
) {
504 DCHECK(CalledOnValidThread());
505 DCHECK(!callback
.is_null());
507 // Save the script details, so we can provision new executors later.
508 current_script_data_
= script_data
;
510 // The user should not have any outstanding requests when they call
512 CheckNoOutstandingUserRequests();
514 // Destroy all of the current threads and their proxy resolvers.
515 ReleaseAllExecutors();
517 // Provision a new executor, and run the SetPacScript request. On completion
518 // notification will be sent through |callback|.
519 Executor
* executor
= AddNewExecutor();
520 executor
->StartJob(new SetPacScriptJob(script_data
, callback
));
521 return ERR_IO_PENDING
;
524 void MultiThreadedProxyResolver::CheckNoOutstandingUserRequests() const {
525 DCHECK(CalledOnValidThread());
526 CHECK_EQ(0u, pending_jobs_
.size());
528 for (ExecutorList::const_iterator it
= executors_
.begin();
529 it
!= executors_
.end(); ++it
) {
530 const Executor
* executor
= *it
;
531 Job
* job
= executor
->outstanding_job();
532 // The "has_user_callback()" is to exclude jobs for which the callback
533 // has already been invoked, or was not user-initiated (as in the case of
534 // lazy thread provisions). User-initiated jobs may !has_user_callback()
535 // when the callback has already been run. (Since we only clear the
536 // outstanding job AFTER the callback has been invoked, it is possible
537 // for a new request to be started from within the callback).
538 CHECK(!job
|| job
->was_cancelled() || !job
->has_user_callback());
542 void MultiThreadedProxyResolver::ReleaseAllExecutors() {
543 DCHECK(CalledOnValidThread());
544 for (ExecutorList::iterator it
= executors_
.begin();
545 it
!= executors_
.end(); ++it
) {
546 Executor
* executor
= *it
;
552 MultiThreadedProxyResolver::Executor
*
553 MultiThreadedProxyResolver::FindIdleExecutor() {
554 DCHECK(CalledOnValidThread());
555 for (ExecutorList::iterator it
= executors_
.begin();
556 it
!= executors_
.end(); ++it
) {
557 Executor
* executor
= *it
;
558 if (!executor
->outstanding_job())
564 MultiThreadedProxyResolver::Executor
*
565 MultiThreadedProxyResolver::AddNewExecutor() {
566 DCHECK(CalledOnValidThread());
567 DCHECK_LT(executors_
.size(), max_num_threads_
);
568 // The "thread number" is used to give the thread a unique name.
569 int thread_number
= executors_
.size();
570 ProxyResolver
* resolver
= resolver_factory_
->CreateProxyResolver();
571 Executor
* executor
= new Executor(
572 this, resolver
, thread_number
);
573 executors_
.push_back(make_scoped_refptr(executor
));
577 void MultiThreadedProxyResolver::OnExecutorReady(Executor
* executor
) {
578 DCHECK(CalledOnValidThread());
579 if (pending_jobs_
.empty())
582 // Get the next job to process (FIFO). Transfer it from the pending queue
584 scoped_refptr
<Job
> job
= pending_jobs_
.front();
585 pending_jobs_
.pop_front();
586 executor
->StartJob(job
);