[Sync] Finish transition to new AndroidSyncSettings interface.
[chromium-blink-merge.git] / net / proxy / multi_threaded_proxy_resolver.cc
blob7c38275a7a1f2391a8a68cfaf8a9b77b46a8680b
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "net/proxy/multi_threaded_proxy_resolver.h"
7 #include <deque>
8 #include <vector>
10 #include "base/bind.h"
11 #include "base/bind_helpers.h"
12 #include "base/single_thread_task_runner.h"
13 #include "base/stl_util.h"
14 #include "base/strings/string_util.h"
15 #include "base/strings/stringprintf.h"
16 #include "base/thread_task_runner_handle.h"
17 #include "base/threading/non_thread_safe.h"
18 #include "base/threading/thread.h"
19 #include "base/threading/thread_restrictions.h"
20 #include "net/base/net_errors.h"
21 #include "net/log/net_log.h"
22 #include "net/proxy/proxy_info.h"
23 #include "net/proxy/proxy_resolver.h"
25 namespace net {
26 namespace {
27 class Job;
29 // An "executor" is a job-runner for PAC requests. It encapsulates a worker
30 // thread and a synchronous ProxyResolver (which will be operated on said
31 // thread.)
32 class Executor : public base::RefCountedThreadSafe<Executor> {
33 public:
34 class Coordinator {
35 public:
36 virtual void OnExecutorReady(Executor* executor) = 0;
38 protected:
39 virtual ~Coordinator() = default;
42 // |coordinator| must remain valid throughout our lifetime. It is used to
43 // signal when the executor is ready to receive work by calling
44 // |coordinator->OnExecutorReady()|.
45 // |thread_number| is an identifier used when naming the worker thread.
46 Executor(Coordinator* coordinator, int thread_number);
48 // Submit a job to this executor.
49 void StartJob(Job* job);
51 // Callback for when a job has completed running on the executor's thread.
52 void OnJobCompleted(Job* job);
54 // Cleanup the executor. Cancels all outstanding work, and frees the thread
55 // and resolver.
56 void Destroy();
58 // Returns the outstanding job, or NULL.
59 Job* outstanding_job() const { return outstanding_job_.get(); }
61 ProxyResolver* resolver() { return resolver_.get(); }
63 int thread_number() const { return thread_number_; }
65 void set_resolver(scoped_ptr<ProxyResolver> resolver) {
66 resolver_ = resolver.Pass();
69 void set_coordinator(Coordinator* coordinator) {
70 DCHECK(coordinator);
71 DCHECK(coordinator_);
72 coordinator_ = coordinator;
75 private:
76 friend class base::RefCountedThreadSafe<Executor>;
77 ~Executor();
79 Coordinator* coordinator_;
80 const int thread_number_;
82 // The currently active job for this executor (either a CreateProxyResolver or
83 // GetProxyForURL task).
84 scoped_refptr<Job> outstanding_job_;
86 // The synchronous resolver implementation.
87 scoped_ptr<ProxyResolver> resolver_;
89 // The thread where |resolver_| is run on.
90 // Note that declaration ordering is important here. |thread_| needs to be
91 // destroyed *before* |resolver_|, in case |resolver_| is currently
92 // executing on |thread_|.
93 scoped_ptr<base::Thread> thread_;
96 class MultiThreadedProxyResolver : public ProxyResolver,
97 public Executor::Coordinator,
98 public base::NonThreadSafe {
99 public:
100 // Creates an asynchronous ProxyResolver that runs requests on up to
101 // |max_num_threads|.
103 // For each thread that is created, an accompanying synchronous ProxyResolver
104 // will be provisioned using |resolver_factory|. All methods on these
105 // ProxyResolvers will be called on the one thread.
106 MultiThreadedProxyResolver(
107 scoped_ptr<ProxyResolverFactory> resolver_factory,
108 size_t max_num_threads,
109 const scoped_refptr<ProxyResolverScriptData>& script_data,
110 scoped_refptr<Executor> executor);
112 ~MultiThreadedProxyResolver() override;
114 // ProxyResolver implementation:
115 int GetProxyForURL(const GURL& url,
116 ProxyInfo* results,
117 const CompletionCallback& callback,
118 RequestHandle* request,
119 const BoundNetLog& net_log) override;
120 void CancelRequest(RequestHandle request) override;
121 LoadState GetLoadState(RequestHandle request) const override;
122 void CancelSetPacScript() override;
123 int SetPacScript(const scoped_refptr<ProxyResolverScriptData>& script_data,
124 const CompletionCallback& callback) override;
126 private:
127 class GetProxyForURLJob;
128 // FIFO queue of pending jobs waiting to be started.
129 // TODO(eroman): Make this priority queue.
130 typedef std::deque<scoped_refptr<Job>> PendingJobsQueue;
131 typedef std::vector<scoped_refptr<Executor>> ExecutorList;
133 // Returns an idle worker thread which is ready to receive GetProxyForURL()
134 // requests. If all threads are occupied, returns NULL.
135 Executor* FindIdleExecutor();
137 // Creates a new worker thread, and appends it to |executors_|.
138 void AddNewExecutor();
140 // Starts the next job from |pending_jobs_| if possible.
141 void OnExecutorReady(Executor* executor) override;
143 const scoped_ptr<ProxyResolverFactory> resolver_factory_;
144 const size_t max_num_threads_;
145 PendingJobsQueue pending_jobs_;
146 ExecutorList executors_;
147 scoped_refptr<ProxyResolverScriptData> script_data_;
150 // Job ---------------------------------------------
152 class Job : public base::RefCountedThreadSafe<Job> {
153 public:
154 // Identifies the subclass of Job (only being used for debugging purposes).
155 enum Type {
156 TYPE_GET_PROXY_FOR_URL,
157 TYPE_CREATE_RESOLVER,
160 Job(Type type, const CompletionCallback& callback)
161 : type_(type),
162 callback_(callback),
163 executor_(NULL),
164 was_cancelled_(false) {
167 void set_executor(Executor* executor) {
168 executor_ = executor;
171 // The "executor" is the job runner that is scheduling this job. If
172 // this job has not been submitted to an executor yet, this will be
173 // NULL (and we know it hasn't started yet).
174 Executor* executor() {
175 return executor_;
178 // Mark the job as having been cancelled.
179 void Cancel() {
180 was_cancelled_ = true;
183 // Returns true if Cancel() has been called.
184 bool was_cancelled() const { return was_cancelled_; }
186 Type type() const { return type_; }
188 // Returns true if this job still has a user callback. Some jobs
189 // do not have a user callback, because they were helper jobs
190 // scheduled internally (for example TYPE_CREATE_RESOLVER).
192 // Otherwise jobs that correspond with user-initiated work will
193 // have a non-null callback up until the callback is run.
194 bool has_user_callback() const { return !callback_.is_null(); }
196 // This method is called when the job is inserted into a wait queue
197 // because no executors were ready to accept it.
198 virtual void WaitingForThread() {}
200 // This method is called just before the job is posted to the work thread.
201 virtual void FinishedWaitingForThread() {}
203 // This method is called on the worker thread to do the job's work. On
204 // completion, implementors are expected to call OnJobCompleted() on
205 // |origin_runner|.
206 virtual void Run(
207 scoped_refptr<base::SingleThreadTaskRunner> origin_runner) = 0;
209 protected:
210 void OnJobCompleted() {
211 // |executor_| will be NULL if the executor has already been deleted.
212 if (executor_)
213 executor_->OnJobCompleted(this);
216 void RunUserCallback(int result) {
217 DCHECK(has_user_callback());
218 CompletionCallback callback = callback_;
219 // Reset the callback so has_user_callback() will now return false.
220 callback_.Reset();
221 callback.Run(result);
224 friend class base::RefCountedThreadSafe<Job>;
226 virtual ~Job() {}
228 private:
229 const Type type_;
230 CompletionCallback callback_;
231 Executor* executor_;
232 bool was_cancelled_;
235 // CreateResolverJob -----------------------------------------------------------
237 // Runs on the worker thread to call ProxyResolverFactory::CreateProxyResolver.
238 class CreateResolverJob : public Job {
239 public:
240 CreateResolverJob(const scoped_refptr<ProxyResolverScriptData>& script_data,
241 ProxyResolverFactory* factory)
242 : Job(TYPE_CREATE_RESOLVER, CompletionCallback()),
243 script_data_(script_data),
244 factory_(factory) {}
246 // Runs on the worker thread.
247 void Run(scoped_refptr<base::SingleThreadTaskRunner> origin_runner) override {
248 scoped_ptr<ProxyResolverFactory::Request> request;
249 int rv = factory_->CreateProxyResolver(script_data_, &resolver_,
250 CompletionCallback(), &request);
252 DCHECK_NE(rv, ERR_IO_PENDING);
253 origin_runner->PostTask(
254 FROM_HERE, base::Bind(&CreateResolverJob::RequestComplete, this, rv));
257 protected:
258 ~CreateResolverJob() override {}
260 private:
261 // Runs the completion callback on the origin thread.
262 void RequestComplete(int result_code) {
263 // The task may have been cancelled after it was started.
264 if (!was_cancelled()) {
265 DCHECK(executor());
266 executor()->set_resolver(resolver_.Pass());
268 OnJobCompleted();
271 const scoped_refptr<ProxyResolverScriptData> script_data_;
272 ProxyResolverFactory* factory_;
273 scoped_ptr<ProxyResolver> resolver_;
276 // MultiThreadedProxyResolver::GetProxyForURLJob ------------------------------
278 class MultiThreadedProxyResolver::GetProxyForURLJob : public Job {
279 public:
280 // |url| -- the URL of the query.
281 // |results| -- the structure to fill with proxy resolve results.
282 GetProxyForURLJob(const GURL& url,
283 ProxyInfo* results,
284 const CompletionCallback& callback,
285 const BoundNetLog& net_log)
286 : Job(TYPE_GET_PROXY_FOR_URL, callback),
287 results_(results),
288 net_log_(net_log),
289 url_(url),
290 was_waiting_for_thread_(false) {
291 DCHECK(!callback.is_null());
294 BoundNetLog* net_log() { return &net_log_; }
296 void WaitingForThread() override {
297 was_waiting_for_thread_ = true;
298 net_log_.BeginEvent(NetLog::TYPE_WAITING_FOR_PROXY_RESOLVER_THREAD);
301 void FinishedWaitingForThread() override {
302 DCHECK(executor());
304 if (was_waiting_for_thread_) {
305 net_log_.EndEvent(NetLog::TYPE_WAITING_FOR_PROXY_RESOLVER_THREAD);
308 net_log_.AddEvent(
309 NetLog::TYPE_SUBMITTED_TO_RESOLVER_THREAD,
310 NetLog::IntegerCallback("thread_number", executor()->thread_number()));
313 // Runs on the worker thread.
314 void Run(scoped_refptr<base::SingleThreadTaskRunner> origin_runner) override {
315 ProxyResolver* resolver = executor()->resolver();
316 DCHECK(resolver);
317 int rv = resolver->GetProxyForURL(
318 url_, &results_buf_, CompletionCallback(), NULL, net_log_);
319 DCHECK_NE(rv, ERR_IO_PENDING);
321 origin_runner->PostTask(
322 FROM_HERE, base::Bind(&GetProxyForURLJob::QueryComplete, this, rv));
325 protected:
326 ~GetProxyForURLJob() override {}
328 private:
329 // Runs the completion callback on the origin thread.
330 void QueryComplete(int result_code) {
331 // The Job may have been cancelled after it was started.
332 if (!was_cancelled()) {
333 if (result_code >= OK) { // Note: unit-tests use values > 0.
334 results_->Use(results_buf_);
336 RunUserCallback(result_code);
338 OnJobCompleted();
341 // Must only be used on the "origin" thread.
342 ProxyInfo* results_;
344 // Can be used on either "origin" or worker thread.
345 BoundNetLog net_log_;
346 const GURL url_;
348 // Usable from within DoQuery on the worker thread.
349 ProxyInfo results_buf_;
351 bool was_waiting_for_thread_;
354 // Executor ----------------------------------------
356 Executor::Executor(Executor::Coordinator* coordinator, int thread_number)
357 : coordinator_(coordinator), thread_number_(thread_number) {
358 DCHECK(coordinator);
359 // Start up the thread.
360 thread_.reset(new base::Thread(base::StringPrintf("PAC thread #%d",
361 thread_number)));
362 CHECK(thread_->Start());
365 void Executor::StartJob(Job* job) {
366 DCHECK(!outstanding_job_.get());
367 outstanding_job_ = job;
369 // Run the job. Once it has completed (regardless of whether it was
370 // cancelled), it will invoke OnJobCompleted() on this thread.
371 job->set_executor(this);
372 job->FinishedWaitingForThread();
373 thread_->message_loop()->PostTask(
374 FROM_HERE,
375 base::Bind(&Job::Run, job, base::ThreadTaskRunnerHandle::Get()));
378 void Executor::OnJobCompleted(Job* job) {
379 DCHECK_EQ(job, outstanding_job_.get());
380 outstanding_job_ = NULL;
381 coordinator_->OnExecutorReady(this);
384 void Executor::Destroy() {
385 DCHECK(coordinator_);
388 // See http://crbug.com/69710.
389 base::ThreadRestrictions::ScopedAllowIO allow_io;
391 // Join the worker thread.
392 thread_.reset();
395 // Cancel any outstanding job.
396 if (outstanding_job_.get()) {
397 outstanding_job_->Cancel();
398 // Orphan the job (since this executor may be deleted soon).
399 outstanding_job_->set_executor(NULL);
402 // It is now safe to free the ProxyResolver, since all the tasks that
403 // were using it on the resolver thread have completed.
404 resolver_.reset();
406 // Null some stuff as a precaution.
407 coordinator_ = NULL;
408 outstanding_job_ = NULL;
411 Executor::~Executor() {
412 // The important cleanup happens as part of Destroy(), which should always be
413 // called first.
414 DCHECK(!coordinator_) << "Destroy() was not called";
415 DCHECK(!thread_.get());
416 DCHECK(!resolver_.get());
417 DCHECK(!outstanding_job_.get());
420 // MultiThreadedProxyResolver --------------------------------------------------
422 MultiThreadedProxyResolver::MultiThreadedProxyResolver(
423 scoped_ptr<ProxyResolverFactory> resolver_factory,
424 size_t max_num_threads,
425 const scoped_refptr<ProxyResolverScriptData>& script_data,
426 scoped_refptr<Executor> executor)
427 : ProxyResolver(resolver_factory->expects_pac_bytes()),
428 resolver_factory_(resolver_factory.Pass()),
429 max_num_threads_(max_num_threads),
430 script_data_(script_data) {
431 DCHECK(script_data_);
432 executor->set_coordinator(this);
433 executors_.push_back(executor);
436 MultiThreadedProxyResolver::~MultiThreadedProxyResolver() {
437 DCHECK(CalledOnValidThread());
438 // We will cancel all outstanding requests.
439 pending_jobs_.clear();
441 for (auto& executor : executors_) {
442 executor->Destroy();
446 int MultiThreadedProxyResolver::GetProxyForURL(
447 const GURL& url, ProxyInfo* results, const CompletionCallback& callback,
448 RequestHandle* request, const BoundNetLog& net_log) {
449 DCHECK(CalledOnValidThread());
450 DCHECK(!callback.is_null());
452 scoped_refptr<GetProxyForURLJob> job(
453 new GetProxyForURLJob(url, results, callback, net_log));
455 // Completion will be notified through |callback|, unless the caller cancels
456 // the request using |request|.
457 if (request)
458 *request = reinterpret_cast<RequestHandle>(job.get());
460 // If there is an executor that is ready to run this request, submit it!
461 Executor* executor = FindIdleExecutor();
462 if (executor) {
463 DCHECK_EQ(0u, pending_jobs_.size());
464 executor->StartJob(job.get());
465 return ERR_IO_PENDING;
468 // Otherwise queue this request. (We will schedule it to a thread once one
469 // becomes available).
470 job->WaitingForThread();
471 pending_jobs_.push_back(job);
473 // If we haven't already reached the thread limit, provision a new thread to
474 // drain the requests more quickly.
475 if (executors_.size() < max_num_threads_)
476 AddNewExecutor();
478 return ERR_IO_PENDING;
481 void MultiThreadedProxyResolver::CancelRequest(RequestHandle req) {
482 DCHECK(CalledOnValidThread());
483 DCHECK(req);
485 Job* job = reinterpret_cast<Job*>(req);
486 DCHECK_EQ(Job::TYPE_GET_PROXY_FOR_URL, job->type());
488 if (job->executor()) {
489 // If the job was already submitted to the executor, just mark it
490 // as cancelled so the user callback isn't run on completion.
491 job->Cancel();
492 } else {
493 // Otherwise the job is just sitting in a queue.
494 PendingJobsQueue::iterator it =
495 std::find(pending_jobs_.begin(), pending_jobs_.end(), job);
496 DCHECK(it != pending_jobs_.end());
497 pending_jobs_.erase(it);
501 LoadState MultiThreadedProxyResolver::GetLoadState(RequestHandle req) const {
502 DCHECK(CalledOnValidThread());
503 DCHECK(req);
504 return LOAD_STATE_RESOLVING_PROXY_FOR_URL;
507 void MultiThreadedProxyResolver::CancelSetPacScript() {
508 NOTREACHED();
511 int MultiThreadedProxyResolver::SetPacScript(
512 const scoped_refptr<ProxyResolverScriptData>& script_data,
513 const CompletionCallback&callback) {
514 NOTREACHED();
515 return ERR_NOT_IMPLEMENTED;
518 Executor* MultiThreadedProxyResolver::FindIdleExecutor() {
519 DCHECK(CalledOnValidThread());
520 for (ExecutorList::iterator it = executors_.begin();
521 it != executors_.end(); ++it) {
522 Executor* executor = it->get();
523 if (!executor->outstanding_job())
524 return executor;
526 return NULL;
529 void MultiThreadedProxyResolver::AddNewExecutor() {
530 DCHECK(CalledOnValidThread());
531 DCHECK_LT(executors_.size(), max_num_threads_);
532 // The "thread number" is used to give the thread a unique name.
533 int thread_number = executors_.size();
534 Executor* executor = new Executor(this, thread_number);
535 executor->StartJob(
536 new CreateResolverJob(script_data_, resolver_factory_.get()));
537 executors_.push_back(make_scoped_refptr(executor));
540 void MultiThreadedProxyResolver::OnExecutorReady(Executor* executor) {
541 DCHECK(CalledOnValidThread());
542 if (pending_jobs_.empty())
543 return;
545 // Get the next job to process (FIFO). Transfer it from the pending queue
546 // to the executor.
547 scoped_refptr<Job> job = pending_jobs_.front();
548 pending_jobs_.pop_front();
549 executor->StartJob(job.get());
552 } // namespace
554 class MultiThreadedProxyResolverFactory::Job
555 : public ProxyResolverFactory::Request,
556 public Executor::Coordinator {
557 public:
558 Job(MultiThreadedProxyResolverFactory* factory,
559 const scoped_refptr<ProxyResolverScriptData>& script_data,
560 scoped_ptr<ProxyResolver>* resolver,
561 scoped_ptr<ProxyResolverFactory> resolver_factory,
562 size_t max_num_threads,
563 const CompletionCallback& callback)
564 : factory_(factory),
565 resolver_out_(resolver),
566 resolver_factory_(resolver_factory.Pass()),
567 max_num_threads_(max_num_threads),
568 script_data_(script_data),
569 executor_(new Executor(this, 0)),
570 callback_(callback) {
571 executor_->StartJob(
572 new CreateResolverJob(script_data_, resolver_factory_.get()));
575 ~Job() override {
576 if (factory_) {
577 executor_->Destroy();
578 factory_->RemoveJob(this);
582 void FactoryDestroyed() {
583 executor_->Destroy();
584 executor_ = nullptr;
585 factory_ = nullptr;
588 private:
589 void OnExecutorReady(Executor* executor) override {
590 int error = OK;
591 if (executor->resolver()) {
592 resolver_out_->reset(new MultiThreadedProxyResolver(
593 resolver_factory_.Pass(), max_num_threads_, script_data_.Pass(),
594 executor_));
595 } else {
596 error = ERR_PAC_SCRIPT_FAILED;
597 executor_->Destroy();
599 factory_->RemoveJob(this);
600 factory_ = nullptr;
601 callback_.Run(error);
604 MultiThreadedProxyResolverFactory* factory_;
605 scoped_ptr<ProxyResolver>* const resolver_out_;
606 scoped_ptr<ProxyResolverFactory> resolver_factory_;
607 const size_t max_num_threads_;
608 scoped_refptr<ProxyResolverScriptData> script_data_;
609 scoped_refptr<Executor> executor_;
610 const CompletionCallback callback_;
613 MultiThreadedProxyResolverFactory::MultiThreadedProxyResolverFactory(
614 size_t max_num_threads,
615 bool factory_expects_bytes)
616 : ProxyResolverFactory(factory_expects_bytes),
617 max_num_threads_(max_num_threads) {
618 DCHECK_GE(max_num_threads, 1u);
621 MultiThreadedProxyResolverFactory::~MultiThreadedProxyResolverFactory() {
622 for (auto job : jobs_) {
623 job->FactoryDestroyed();
627 int MultiThreadedProxyResolverFactory::CreateProxyResolver(
628 const scoped_refptr<ProxyResolverScriptData>& pac_script,
629 scoped_ptr<ProxyResolver>* resolver,
630 const CompletionCallback& callback,
631 scoped_ptr<Request>* request) {
632 scoped_ptr<Job> job(new Job(this, pac_script, resolver,
633 CreateProxyResolverFactory(), max_num_threads_,
634 callback));
635 jobs_.insert(job.get());
636 *request = job.Pass();
637 return ERR_IO_PENDING;
640 void MultiThreadedProxyResolverFactory::RemoveJob(
641 MultiThreadedProxyResolverFactory::Job* job) {
642 size_t erased = jobs_.erase(job);
643 DCHECK_EQ(1u, erased);
646 } // namespace net