Don't show supervised user as "already on this device" while they're being imported.
[chromium-blink-merge.git] / net / proxy / multi_threaded_proxy_resolver.cc
blobd735787015762e71f8ccbf8094a65df4a3bcfdd1
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "net/proxy/multi_threaded_proxy_resolver.h"
7 #include <deque>
8 #include <vector>
10 #include "base/bind.h"
11 #include "base/bind_helpers.h"
12 #include "base/single_thread_task_runner.h"
13 #include "base/stl_util.h"
14 #include "base/strings/string_util.h"
15 #include "base/strings/stringprintf.h"
16 #include "base/thread_task_runner_handle.h"
17 #include "base/threading/non_thread_safe.h"
18 #include "base/threading/thread.h"
19 #include "base/threading/thread_restrictions.h"
20 #include "net/base/net_errors.h"
21 #include "net/log/net_log.h"
22 #include "net/proxy/proxy_info.h"
23 #include "net/proxy/proxy_resolver.h"
25 namespace net {
26 namespace {
27 class Job;
29 // An "executor" is a job-runner for PAC requests. It encapsulates a worker
30 // thread and a synchronous ProxyResolver (which will be operated on said
31 // thread.)
32 class Executor : public base::RefCountedThreadSafe<Executor> {
33 public:
34 class Coordinator {
35 public:
36 virtual void OnExecutorReady(Executor* executor) = 0;
38 protected:
39 virtual ~Coordinator() = default;
42 // |coordinator| must remain valid throughout our lifetime. It is used to
43 // signal when the executor is ready to receive work by calling
44 // |coordinator->OnExecutorReady()|.
45 // |thread_number| is an identifier used when naming the worker thread.
46 Executor(Coordinator* coordinator, int thread_number);
48 // Submit a job to this executor.
49 void StartJob(Job* job);
51 // Callback for when a job has completed running on the executor's thread.
52 void OnJobCompleted(Job* job);
54 // Cleanup the executor. Cancels all outstanding work, and frees the thread
55 // and resolver.
56 void Destroy();
58 // Returns the outstanding job, or NULL.
59 Job* outstanding_job() const { return outstanding_job_.get(); }
61 ProxyResolver* resolver() { return resolver_.get(); }
63 int thread_number() const { return thread_number_; }
65 void set_resolver(scoped_ptr<ProxyResolver> resolver) {
66 resolver_ = resolver.Pass();
69 void set_coordinator(Coordinator* coordinator) {
70 DCHECK(coordinator);
71 DCHECK(coordinator_);
72 coordinator_ = coordinator;
75 private:
76 friend class base::RefCountedThreadSafe<Executor>;
77 ~Executor();
79 Coordinator* coordinator_;
80 const int thread_number_;
82 // The currently active job for this executor (either a CreateProxyResolver or
83 // GetProxyForURL task).
84 scoped_refptr<Job> outstanding_job_;
86 // The synchronous resolver implementation.
87 scoped_ptr<ProxyResolver> resolver_;
89 // The thread where |resolver_| is run on.
90 // Note that declaration ordering is important here. |thread_| needs to be
91 // destroyed *before* |resolver_|, in case |resolver_| is currently
92 // executing on |thread_|.
93 scoped_ptr<base::Thread> thread_;
96 class MultiThreadedProxyResolver : public ProxyResolver,
97 public Executor::Coordinator,
98 public base::NonThreadSafe {
99 public:
100 // Creates an asynchronous ProxyResolver that runs requests on up to
101 // |max_num_threads|.
103 // For each thread that is created, an accompanying synchronous ProxyResolver
104 // will be provisioned using |resolver_factory|. All methods on these
105 // ProxyResolvers will be called on the one thread.
106 MultiThreadedProxyResolver(
107 scoped_ptr<ProxyResolverFactory> resolver_factory,
108 size_t max_num_threads,
109 const scoped_refptr<ProxyResolverScriptData>& script_data,
110 scoped_refptr<Executor> executor);
112 ~MultiThreadedProxyResolver() override;
114 // ProxyResolver implementation:
115 int GetProxyForURL(const GURL& url,
116 ProxyInfo* results,
117 const CompletionCallback& callback,
118 RequestHandle* request,
119 const BoundNetLog& net_log) override;
120 void CancelRequest(RequestHandle request) override;
121 LoadState GetLoadState(RequestHandle request) const override;
123 private:
124 class GetProxyForURLJob;
125 // FIFO queue of pending jobs waiting to be started.
126 // TODO(eroman): Make this priority queue.
127 typedef std::deque<scoped_refptr<Job>> PendingJobsQueue;
128 typedef std::vector<scoped_refptr<Executor>> ExecutorList;
130 // Returns an idle worker thread which is ready to receive GetProxyForURL()
131 // requests. If all threads are occupied, returns NULL.
132 Executor* FindIdleExecutor();
134 // Creates a new worker thread, and appends it to |executors_|.
135 void AddNewExecutor();
137 // Starts the next job from |pending_jobs_| if possible.
138 void OnExecutorReady(Executor* executor) override;
140 const scoped_ptr<ProxyResolverFactory> resolver_factory_;
141 const size_t max_num_threads_;
142 PendingJobsQueue pending_jobs_;
143 ExecutorList executors_;
144 scoped_refptr<ProxyResolverScriptData> script_data_;
147 // Job ---------------------------------------------
149 class Job : public base::RefCountedThreadSafe<Job> {
150 public:
151 // Identifies the subclass of Job (only being used for debugging purposes).
152 enum Type {
153 TYPE_GET_PROXY_FOR_URL,
154 TYPE_CREATE_RESOLVER,
157 Job(Type type, const CompletionCallback& callback)
158 : type_(type),
159 callback_(callback),
160 executor_(NULL),
161 was_cancelled_(false) {
164 void set_executor(Executor* executor) {
165 executor_ = executor;
168 // The "executor" is the job runner that is scheduling this job. If
169 // this job has not been submitted to an executor yet, this will be
170 // NULL (and we know it hasn't started yet).
171 Executor* executor() {
172 return executor_;
175 // Mark the job as having been cancelled.
176 void Cancel() {
177 was_cancelled_ = true;
180 // Returns true if Cancel() has been called.
181 bool was_cancelled() const { return was_cancelled_; }
183 Type type() const { return type_; }
185 // Returns true if this job still has a user callback. Some jobs
186 // do not have a user callback, because they were helper jobs
187 // scheduled internally (for example TYPE_CREATE_RESOLVER).
189 // Otherwise jobs that correspond with user-initiated work will
190 // have a non-null callback up until the callback is run.
191 bool has_user_callback() const { return !callback_.is_null(); }
193 // This method is called when the job is inserted into a wait queue
194 // because no executors were ready to accept it.
195 virtual void WaitingForThread() {}
197 // This method is called just before the job is posted to the work thread.
198 virtual void FinishedWaitingForThread() {}
200 // This method is called on the worker thread to do the job's work. On
201 // completion, implementors are expected to call OnJobCompleted() on
202 // |origin_runner|.
203 virtual void Run(
204 scoped_refptr<base::SingleThreadTaskRunner> origin_runner) = 0;
206 protected:
207 void OnJobCompleted() {
208 // |executor_| will be NULL if the executor has already been deleted.
209 if (executor_)
210 executor_->OnJobCompleted(this);
213 void RunUserCallback(int result) {
214 DCHECK(has_user_callback());
215 CompletionCallback callback = callback_;
216 // Reset the callback so has_user_callback() will now return false.
217 callback_.Reset();
218 callback.Run(result);
221 friend class base::RefCountedThreadSafe<Job>;
223 virtual ~Job() {}
225 private:
226 const Type type_;
227 CompletionCallback callback_;
228 Executor* executor_;
229 bool was_cancelled_;
232 // CreateResolverJob -----------------------------------------------------------
234 // Runs on the worker thread to call ProxyResolverFactory::CreateProxyResolver.
235 class CreateResolverJob : public Job {
236 public:
237 CreateResolverJob(const scoped_refptr<ProxyResolverScriptData>& script_data,
238 ProxyResolverFactory* factory)
239 : Job(TYPE_CREATE_RESOLVER, CompletionCallback()),
240 script_data_(script_data),
241 factory_(factory) {}
243 // Runs on the worker thread.
244 void Run(scoped_refptr<base::SingleThreadTaskRunner> origin_runner) override {
245 scoped_ptr<ProxyResolverFactory::Request> request;
246 int rv = factory_->CreateProxyResolver(script_data_, &resolver_,
247 CompletionCallback(), &request);
249 DCHECK_NE(rv, ERR_IO_PENDING);
250 origin_runner->PostTask(
251 FROM_HERE, base::Bind(&CreateResolverJob::RequestComplete, this, rv));
254 protected:
255 ~CreateResolverJob() override {}
257 private:
258 // Runs the completion callback on the origin thread.
259 void RequestComplete(int result_code) {
260 // The task may have been cancelled after it was started.
261 if (!was_cancelled()) {
262 DCHECK(executor());
263 executor()->set_resolver(resolver_.Pass());
265 OnJobCompleted();
268 const scoped_refptr<ProxyResolverScriptData> script_data_;
269 ProxyResolverFactory* factory_;
270 scoped_ptr<ProxyResolver> resolver_;
273 // MultiThreadedProxyResolver::GetProxyForURLJob ------------------------------
275 class MultiThreadedProxyResolver::GetProxyForURLJob : public Job {
276 public:
277 // |url| -- the URL of the query.
278 // |results| -- the structure to fill with proxy resolve results.
279 GetProxyForURLJob(const GURL& url,
280 ProxyInfo* results,
281 const CompletionCallback& callback,
282 const BoundNetLog& net_log)
283 : Job(TYPE_GET_PROXY_FOR_URL, callback),
284 results_(results),
285 net_log_(net_log),
286 url_(url),
287 was_waiting_for_thread_(false) {
288 DCHECK(!callback.is_null());
291 BoundNetLog* net_log() { return &net_log_; }
293 void WaitingForThread() override {
294 was_waiting_for_thread_ = true;
295 net_log_.BeginEvent(NetLog::TYPE_WAITING_FOR_PROXY_RESOLVER_THREAD);
298 void FinishedWaitingForThread() override {
299 DCHECK(executor());
301 if (was_waiting_for_thread_) {
302 net_log_.EndEvent(NetLog::TYPE_WAITING_FOR_PROXY_RESOLVER_THREAD);
305 net_log_.AddEvent(
306 NetLog::TYPE_SUBMITTED_TO_RESOLVER_THREAD,
307 NetLog::IntegerCallback("thread_number", executor()->thread_number()));
310 // Runs on the worker thread.
311 void Run(scoped_refptr<base::SingleThreadTaskRunner> origin_runner) override {
312 ProxyResolver* resolver = executor()->resolver();
313 DCHECK(resolver);
314 int rv = resolver->GetProxyForURL(
315 url_, &results_buf_, CompletionCallback(), NULL, net_log_);
316 DCHECK_NE(rv, ERR_IO_PENDING);
318 origin_runner->PostTask(
319 FROM_HERE, base::Bind(&GetProxyForURLJob::QueryComplete, this, rv));
322 protected:
323 ~GetProxyForURLJob() override {}
325 private:
326 // Runs the completion callback on the origin thread.
327 void QueryComplete(int result_code) {
328 // The Job may have been cancelled after it was started.
329 if (!was_cancelled()) {
330 if (result_code >= OK) { // Note: unit-tests use values > 0.
331 results_->Use(results_buf_);
333 RunUserCallback(result_code);
335 OnJobCompleted();
338 // Must only be used on the "origin" thread.
339 ProxyInfo* results_;
341 // Can be used on either "origin" or worker thread.
342 BoundNetLog net_log_;
343 const GURL url_;
345 // Usable from within DoQuery on the worker thread.
346 ProxyInfo results_buf_;
348 bool was_waiting_for_thread_;
351 // Executor ----------------------------------------
353 Executor::Executor(Executor::Coordinator* coordinator, int thread_number)
354 : coordinator_(coordinator), thread_number_(thread_number) {
355 DCHECK(coordinator);
356 // Start up the thread.
357 thread_.reset(new base::Thread(base::StringPrintf("PAC thread #%d",
358 thread_number)));
359 CHECK(thread_->Start());
362 void Executor::StartJob(Job* job) {
363 DCHECK(!outstanding_job_.get());
364 outstanding_job_ = job;
366 // Run the job. Once it has completed (regardless of whether it was
367 // cancelled), it will invoke OnJobCompleted() on this thread.
368 job->set_executor(this);
369 job->FinishedWaitingForThread();
370 thread_->message_loop()->PostTask(
371 FROM_HERE,
372 base::Bind(&Job::Run, job, base::ThreadTaskRunnerHandle::Get()));
375 void Executor::OnJobCompleted(Job* job) {
376 DCHECK_EQ(job, outstanding_job_.get());
377 outstanding_job_ = NULL;
378 coordinator_->OnExecutorReady(this);
381 void Executor::Destroy() {
382 DCHECK(coordinator_);
385 // See http://crbug.com/69710.
386 base::ThreadRestrictions::ScopedAllowIO allow_io;
388 // Join the worker thread.
389 thread_.reset();
392 // Cancel any outstanding job.
393 if (outstanding_job_.get()) {
394 outstanding_job_->Cancel();
395 // Orphan the job (since this executor may be deleted soon).
396 outstanding_job_->set_executor(NULL);
399 // It is now safe to free the ProxyResolver, since all the tasks that
400 // were using it on the resolver thread have completed.
401 resolver_.reset();
403 // Null some stuff as a precaution.
404 coordinator_ = NULL;
405 outstanding_job_ = NULL;
408 Executor::~Executor() {
409 // The important cleanup happens as part of Destroy(), which should always be
410 // called first.
411 DCHECK(!coordinator_) << "Destroy() was not called";
412 DCHECK(!thread_.get());
413 DCHECK(!resolver_.get());
414 DCHECK(!outstanding_job_.get());
417 // MultiThreadedProxyResolver --------------------------------------------------
419 MultiThreadedProxyResolver::MultiThreadedProxyResolver(
420 scoped_ptr<ProxyResolverFactory> resolver_factory,
421 size_t max_num_threads,
422 const scoped_refptr<ProxyResolverScriptData>& script_data,
423 scoped_refptr<Executor> executor)
424 : resolver_factory_(resolver_factory.Pass()),
425 max_num_threads_(max_num_threads),
426 script_data_(script_data) {
427 DCHECK(script_data_);
428 executor->set_coordinator(this);
429 executors_.push_back(executor);
432 MultiThreadedProxyResolver::~MultiThreadedProxyResolver() {
433 DCHECK(CalledOnValidThread());
434 // We will cancel all outstanding requests.
435 pending_jobs_.clear();
437 for (auto& executor : executors_) {
438 executor->Destroy();
442 int MultiThreadedProxyResolver::GetProxyForURL(
443 const GURL& url, ProxyInfo* results, const CompletionCallback& callback,
444 RequestHandle* request, const BoundNetLog& net_log) {
445 DCHECK(CalledOnValidThread());
446 DCHECK(!callback.is_null());
448 scoped_refptr<GetProxyForURLJob> job(
449 new GetProxyForURLJob(url, results, callback, net_log));
451 // Completion will be notified through |callback|, unless the caller cancels
452 // the request using |request|.
453 if (request)
454 *request = reinterpret_cast<RequestHandle>(job.get());
456 // If there is an executor that is ready to run this request, submit it!
457 Executor* executor = FindIdleExecutor();
458 if (executor) {
459 DCHECK_EQ(0u, pending_jobs_.size());
460 executor->StartJob(job.get());
461 return ERR_IO_PENDING;
464 // Otherwise queue this request. (We will schedule it to a thread once one
465 // becomes available).
466 job->WaitingForThread();
467 pending_jobs_.push_back(job);
469 // If we haven't already reached the thread limit, provision a new thread to
470 // drain the requests more quickly.
471 if (executors_.size() < max_num_threads_)
472 AddNewExecutor();
474 return ERR_IO_PENDING;
477 void MultiThreadedProxyResolver::CancelRequest(RequestHandle req) {
478 DCHECK(CalledOnValidThread());
479 DCHECK(req);
481 Job* job = reinterpret_cast<Job*>(req);
482 DCHECK_EQ(Job::TYPE_GET_PROXY_FOR_URL, job->type());
484 if (job->executor()) {
485 // If the job was already submitted to the executor, just mark it
486 // as cancelled so the user callback isn't run on completion.
487 job->Cancel();
488 } else {
489 // Otherwise the job is just sitting in a queue.
490 PendingJobsQueue::iterator it =
491 std::find(pending_jobs_.begin(), pending_jobs_.end(), job);
492 DCHECK(it != pending_jobs_.end());
493 pending_jobs_.erase(it);
497 LoadState MultiThreadedProxyResolver::GetLoadState(RequestHandle req) const {
498 DCHECK(CalledOnValidThread());
499 DCHECK(req);
500 return LOAD_STATE_RESOLVING_PROXY_FOR_URL;
503 Executor* MultiThreadedProxyResolver::FindIdleExecutor() {
504 DCHECK(CalledOnValidThread());
505 for (ExecutorList::iterator it = executors_.begin();
506 it != executors_.end(); ++it) {
507 Executor* executor = it->get();
508 if (!executor->outstanding_job())
509 return executor;
511 return NULL;
514 void MultiThreadedProxyResolver::AddNewExecutor() {
515 DCHECK(CalledOnValidThread());
516 DCHECK_LT(executors_.size(), max_num_threads_);
517 // The "thread number" is used to give the thread a unique name.
518 int thread_number = executors_.size();
519 Executor* executor = new Executor(this, thread_number);
520 executor->StartJob(
521 new CreateResolverJob(script_data_, resolver_factory_.get()));
522 executors_.push_back(make_scoped_refptr(executor));
525 void MultiThreadedProxyResolver::OnExecutorReady(Executor* executor) {
526 DCHECK(CalledOnValidThread());
527 if (pending_jobs_.empty())
528 return;
530 // Get the next job to process (FIFO). Transfer it from the pending queue
531 // to the executor.
532 scoped_refptr<Job> job = pending_jobs_.front();
533 pending_jobs_.pop_front();
534 executor->StartJob(job.get());
537 } // namespace
539 class MultiThreadedProxyResolverFactory::Job
540 : public ProxyResolverFactory::Request,
541 public Executor::Coordinator {
542 public:
543 Job(MultiThreadedProxyResolverFactory* factory,
544 const scoped_refptr<ProxyResolverScriptData>& script_data,
545 scoped_ptr<ProxyResolver>* resolver,
546 scoped_ptr<ProxyResolverFactory> resolver_factory,
547 size_t max_num_threads,
548 const CompletionCallback& callback)
549 : factory_(factory),
550 resolver_out_(resolver),
551 resolver_factory_(resolver_factory.Pass()),
552 max_num_threads_(max_num_threads),
553 script_data_(script_data),
554 executor_(new Executor(this, 0)),
555 callback_(callback) {
556 executor_->StartJob(
557 new CreateResolverJob(script_data_, resolver_factory_.get()));
560 ~Job() override {
561 if (factory_) {
562 executor_->Destroy();
563 factory_->RemoveJob(this);
567 void FactoryDestroyed() {
568 executor_->Destroy();
569 executor_ = nullptr;
570 factory_ = nullptr;
573 private:
574 void OnExecutorReady(Executor* executor) override {
575 int error = OK;
576 if (executor->resolver()) {
577 resolver_out_->reset(new MultiThreadedProxyResolver(
578 resolver_factory_.Pass(), max_num_threads_, script_data_.Pass(),
579 executor_));
580 } else {
581 error = ERR_PAC_SCRIPT_FAILED;
582 executor_->Destroy();
584 factory_->RemoveJob(this);
585 factory_ = nullptr;
586 callback_.Run(error);
589 MultiThreadedProxyResolverFactory* factory_;
590 scoped_ptr<ProxyResolver>* const resolver_out_;
591 scoped_ptr<ProxyResolverFactory> resolver_factory_;
592 const size_t max_num_threads_;
593 scoped_refptr<ProxyResolverScriptData> script_data_;
594 scoped_refptr<Executor> executor_;
595 const CompletionCallback callback_;
598 MultiThreadedProxyResolverFactory::MultiThreadedProxyResolverFactory(
599 size_t max_num_threads,
600 bool factory_expects_bytes)
601 : ProxyResolverFactory(factory_expects_bytes),
602 max_num_threads_(max_num_threads) {
603 DCHECK_GE(max_num_threads, 1u);
606 MultiThreadedProxyResolverFactory::~MultiThreadedProxyResolverFactory() {
607 for (auto job : jobs_) {
608 job->FactoryDestroyed();
612 int MultiThreadedProxyResolverFactory::CreateProxyResolver(
613 const scoped_refptr<ProxyResolverScriptData>& pac_script,
614 scoped_ptr<ProxyResolver>* resolver,
615 const CompletionCallback& callback,
616 scoped_ptr<Request>* request) {
617 scoped_ptr<Job> job(new Job(this, pac_script, resolver,
618 CreateProxyResolverFactory(), max_num_threads_,
619 callback));
620 jobs_.insert(job.get());
621 *request = job.Pass();
622 return ERR_IO_PENDING;
625 void MultiThreadedProxyResolverFactory::RemoveJob(
626 MultiThreadedProxyResolverFactory::Job* job) {
627 size_t erased = jobs_.erase(job);
628 DCHECK_EQ(1u, erased);
631 } // namespace net