Extract SIGPIPE ignoring code to a common place.
[chromium-blink-merge.git] / net / proxy / multi_threaded_proxy_resolver.cc
blobde739133848e39f9008b68b529110fdc423966ff
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "net/proxy/multi_threaded_proxy_resolver.h"
7 #include "base/bind.h"
8 #include "base/message_loop_proxy.h"
9 #include "base/string_util.h"
10 #include "base/stringprintf.h"
11 #include "base/threading/thread.h"
12 #include "base/threading/thread_restrictions.h"
13 #include "net/base/net_errors.h"
14 #include "net/base/net_log.h"
15 #include "net/proxy/proxy_info.h"
17 // TODO(eroman): Have the MultiThreadedProxyResolver clear its PAC script
18 // data when SetPacScript fails. That will reclaim memory when
19 // testing bogus scripts.
21 namespace net {
23 namespace {
25 class PurgeMemoryTask : public base::RefCountedThreadSafe<PurgeMemoryTask> {
26 public:
27 explicit PurgeMemoryTask(ProxyResolver* resolver) : resolver_(resolver) {}
28 void PurgeMemory() { resolver_->PurgeMemory(); }
29 private:
30 friend class base::RefCountedThreadSafe<PurgeMemoryTask>;
31 ~PurgeMemoryTask() {}
32 ProxyResolver* resolver_;
35 } // namespace
37 // An "executor" is a job-runner for PAC requests. It encapsulates a worker
38 // thread and a synchronous ProxyResolver (which will be operated on said
39 // thread.)
40 class MultiThreadedProxyResolver::Executor
41 : public base::RefCountedThreadSafe<MultiThreadedProxyResolver::Executor > {
42 public:
43 // |coordinator| must remain valid throughout our lifetime. It is used to
44 // signal when the executor is ready to receive work by calling
45 // |coordinator->OnExecutorReady()|.
46 // The constructor takes ownership of |resolver|.
47 // |thread_number| is an identifier used when naming the worker thread.
48 Executor(MultiThreadedProxyResolver* coordinator,
49 ProxyResolver* resolver,
50 int thread_number);
52 // Submit a job to this executor.
53 void StartJob(Job* job);
55 // Callback for when a job has completed running on the executor's thread.
56 void OnJobCompleted(Job* job);
58 // Cleanup the executor. Cancels all outstanding work, and frees the thread
59 // and resolver.
60 void Destroy();
62 void PurgeMemory();
64 // Returns the outstanding job, or NULL.
65 Job* outstanding_job() const { return outstanding_job_.get(); }
67 ProxyResolver* resolver() { return resolver_.get(); }
69 int thread_number() const { return thread_number_; }
71 private:
72 friend class base::RefCountedThreadSafe<Executor>;
73 ~Executor();
75 MultiThreadedProxyResolver* coordinator_;
76 const int thread_number_;
78 // The currently active job for this executor (either a SetPacScript or
79 // GetProxyForURL task).
80 scoped_refptr<Job> outstanding_job_;
82 // The synchronous resolver implementation.
83 scoped_ptr<ProxyResolver> resolver_;
85 // The thread where |resolver_| is run on.
86 // Note that declaration ordering is important here. |thread_| needs to be
87 // destroyed *before* |resolver_|, in case |resolver_| is currently
88 // executing on |thread_|.
89 scoped_ptr<base::Thread> thread_;
92 // MultiThreadedProxyResolver::Job ---------------------------------------------
94 class MultiThreadedProxyResolver::Job
95 : public base::RefCountedThreadSafe<MultiThreadedProxyResolver::Job> {
96 public:
97 // Identifies the subclass of Job (only being used for debugging purposes).
98 enum Type {
99 TYPE_GET_PROXY_FOR_URL,
100 TYPE_SET_PAC_SCRIPT,
101 TYPE_SET_PAC_SCRIPT_INTERNAL,
104 Job(Type type, const CompletionCallback& callback)
105 : type_(type),
106 callback_(callback),
107 executor_(NULL),
108 was_cancelled_(false) {
111 void set_executor(Executor* executor) {
112 executor_ = executor;
115 // The "executor" is the job runner that is scheduling this job. If
116 // this job has not been submitted to an executor yet, this will be
117 // NULL (and we know it hasn't started yet).
118 Executor* executor() {
119 return executor_;
122 // Mark the job as having been cancelled.
123 void Cancel() {
124 was_cancelled_ = true;
127 // Returns true if Cancel() has been called.
128 bool was_cancelled() const { return was_cancelled_; }
130 Type type() const { return type_; }
132 // Returns true if this job still has a user callback. Some jobs
133 // do not have a user callback, because they were helper jobs
134 // scheduled internally (for example TYPE_SET_PAC_SCRIPT_INTERNAL).
136 // Otherwise jobs that correspond with user-initiated work will
137 // have a non-null callback up until the callback is run.
138 bool has_user_callback() const { return !callback_.is_null(); }
140 // This method is called when the job is inserted into a wait queue
141 // because no executors were ready to accept it.
142 virtual void WaitingForThread() {}
144 // This method is called just before the job is posted to the work thread.
145 virtual void FinishedWaitingForThread() {}
147 // This method is called on the worker thread to do the job's work. On
148 // completion, implementors are expected to call OnJobCompleted() on
149 // |origin_loop|.
150 virtual void Run(scoped_refptr<base::MessageLoopProxy> origin_loop) = 0;
152 protected:
153 void OnJobCompleted() {
154 // |executor_| will be NULL if the executor has already been deleted.
155 if (executor_)
156 executor_->OnJobCompleted(this);
159 void RunUserCallback(int result) {
160 DCHECK(has_user_callback());
161 CompletionCallback callback = callback_;
162 // Reset the callback so has_user_callback() will now return false.
163 callback_.Reset();
164 callback.Run(result);
167 friend class base::RefCountedThreadSafe<MultiThreadedProxyResolver::Job>;
169 virtual ~Job() {}
171 private:
172 const Type type_;
173 CompletionCallback callback_;
174 Executor* executor_;
175 bool was_cancelled_;
178 // MultiThreadedProxyResolver::SetPacScriptJob ---------------------------------
180 // Runs on the worker thread to call ProxyResolver::SetPacScript.
181 class MultiThreadedProxyResolver::SetPacScriptJob
182 : public MultiThreadedProxyResolver::Job {
183 public:
184 SetPacScriptJob(const scoped_refptr<ProxyResolverScriptData>& script_data,
185 const CompletionCallback& callback)
186 : Job(!callback.is_null() ? TYPE_SET_PAC_SCRIPT :
187 TYPE_SET_PAC_SCRIPT_INTERNAL,
188 callback),
189 script_data_(script_data) {
192 // Runs on the worker thread.
193 virtual void Run(scoped_refptr<base::MessageLoopProxy> origin_loop) OVERRIDE {
194 ProxyResolver* resolver = executor()->resolver();
195 int rv = resolver->SetPacScript(script_data_, CompletionCallback());
197 DCHECK_NE(rv, ERR_IO_PENDING);
198 origin_loop->PostTask(
199 FROM_HERE,
200 base::Bind(&SetPacScriptJob::RequestComplete, this, rv));
203 protected:
204 virtual ~SetPacScriptJob() {}
206 private:
207 // Runs the completion callback on the origin thread.
208 void RequestComplete(int result_code) {
209 // The task may have been cancelled after it was started.
210 if (!was_cancelled() && has_user_callback()) {
211 RunUserCallback(result_code);
213 OnJobCompleted();
216 const scoped_refptr<ProxyResolverScriptData> script_data_;
219 // MultiThreadedProxyResolver::GetProxyForURLJob ------------------------------
221 class MultiThreadedProxyResolver::GetProxyForURLJob
222 : public MultiThreadedProxyResolver::Job {
223 public:
224 // |url| -- the URL of the query.
225 // |results| -- the structure to fill with proxy resolve results.
226 GetProxyForURLJob(const GURL& url,
227 ProxyInfo* results,
228 const CompletionCallback& callback,
229 const BoundNetLog& net_log)
230 : Job(TYPE_GET_PROXY_FOR_URL, callback),
231 results_(results),
232 net_log_(net_log),
233 url_(url),
234 was_waiting_for_thread_(false) {
235 DCHECK(!callback.is_null());
238 BoundNetLog* net_log() { return &net_log_; }
240 virtual void WaitingForThread() OVERRIDE {
241 was_waiting_for_thread_ = true;
242 net_log_.BeginEvent(NetLog::TYPE_WAITING_FOR_PROXY_RESOLVER_THREAD);
245 virtual void FinishedWaitingForThread() OVERRIDE {
246 DCHECK(executor());
248 if (was_waiting_for_thread_) {
249 net_log_.EndEvent(NetLog::TYPE_WAITING_FOR_PROXY_RESOLVER_THREAD);
252 net_log_.AddEvent(
253 NetLog::TYPE_SUBMITTED_TO_RESOLVER_THREAD,
254 NetLog::IntegerCallback("thread_number", executor()->thread_number()));
257 // Runs on the worker thread.
258 virtual void Run(scoped_refptr<base::MessageLoopProxy> origin_loop) OVERRIDE {
259 ProxyResolver* resolver = executor()->resolver();
260 int rv = resolver->GetProxyForURL(
261 url_, &results_buf_, CompletionCallback(), NULL, net_log_);
262 DCHECK_NE(rv, ERR_IO_PENDING);
264 origin_loop->PostTask(
265 FROM_HERE,
266 base::Bind(&GetProxyForURLJob::QueryComplete, this, rv));
269 protected:
270 virtual ~GetProxyForURLJob() {}
272 private:
273 // Runs the completion callback on the origin thread.
274 void QueryComplete(int result_code) {
275 // The Job may have been cancelled after it was started.
276 if (!was_cancelled()) {
277 if (result_code >= OK) { // Note: unit-tests use values > 0.
278 results_->Use(results_buf_);
280 RunUserCallback(result_code);
282 OnJobCompleted();
285 // Must only be used on the "origin" thread.
286 ProxyInfo* results_;
288 // Can be used on either "origin" or worker thread.
289 BoundNetLog net_log_;
290 const GURL url_;
292 // Usable from within DoQuery on the worker thread.
293 ProxyInfo results_buf_;
295 bool was_waiting_for_thread_;
298 // MultiThreadedProxyResolver::Executor ----------------------------------------
300 MultiThreadedProxyResolver::Executor::Executor(
301 MultiThreadedProxyResolver* coordinator,
302 ProxyResolver* resolver,
303 int thread_number)
304 : coordinator_(coordinator),
305 thread_number_(thread_number),
306 resolver_(resolver) {
307 DCHECK(coordinator);
308 DCHECK(resolver);
309 // Start up the thread.
310 // Note that it is safe to pass a temporary C-String to Thread(), as it will
311 // make a copy.
312 std::string thread_name =
313 base::StringPrintf("PAC thread #%d", thread_number);
314 thread_.reset(new base::Thread(thread_name.c_str()));
315 CHECK(thread_->Start());
318 void MultiThreadedProxyResolver::Executor::StartJob(Job* job) {
319 DCHECK(!outstanding_job_);
320 outstanding_job_ = job;
322 // Run the job. Once it has completed (regardless of whether it was
323 // cancelled), it will invoke OnJobCompleted() on this thread.
324 job->set_executor(this);
325 job->FinishedWaitingForThread();
326 thread_->message_loop()->PostTask(
327 FROM_HERE,
328 base::Bind(&Job::Run, job, base::MessageLoopProxy::current()));
331 void MultiThreadedProxyResolver::Executor::OnJobCompleted(Job* job) {
332 DCHECK_EQ(job, outstanding_job_.get());
333 outstanding_job_ = NULL;
334 coordinator_->OnExecutorReady(this);
337 void MultiThreadedProxyResolver::Executor::Destroy() {
338 DCHECK(coordinator_);
340 // Give the resolver an opportunity to shutdown from THIS THREAD before
341 // joining on the resolver thread. This allows certain implementations
342 // to avoid deadlocks.
343 resolver_->Shutdown();
346 // See http://crbug.com/69710.
347 base::ThreadRestrictions::ScopedAllowIO allow_io;
349 // Join the worker thread.
350 thread_.reset();
353 // Cancel any outstanding job.
354 if (outstanding_job_) {
355 outstanding_job_->Cancel();
356 // Orphan the job (since this executor may be deleted soon).
357 outstanding_job_->set_executor(NULL);
360 // It is now safe to free the ProxyResolver, since all the tasks that
361 // were using it on the resolver thread have completed.
362 resolver_.reset();
364 // Null some stuff as a precaution.
365 coordinator_ = NULL;
366 outstanding_job_ = NULL;
369 void MultiThreadedProxyResolver::Executor::PurgeMemory() {
370 scoped_refptr<PurgeMemoryTask> helper(new PurgeMemoryTask(resolver_.get()));
371 thread_->message_loop()->PostTask(
372 FROM_HERE,
373 base::Bind(&PurgeMemoryTask::PurgeMemory, helper.get()));
376 MultiThreadedProxyResolver::Executor::~Executor() {
377 // The important cleanup happens as part of Destroy(), which should always be
378 // called first.
379 DCHECK(!coordinator_) << "Destroy() was not called";
380 DCHECK(!thread_.get());
381 DCHECK(!resolver_.get());
382 DCHECK(!outstanding_job_);
385 // MultiThreadedProxyResolver --------------------------------------------------
387 MultiThreadedProxyResolver::MultiThreadedProxyResolver(
388 ProxyResolverFactory* resolver_factory,
389 size_t max_num_threads)
390 : ProxyResolver(resolver_factory->resolvers_expect_pac_bytes()),
391 resolver_factory_(resolver_factory),
392 max_num_threads_(max_num_threads) {
393 DCHECK_GE(max_num_threads, 1u);
396 MultiThreadedProxyResolver::~MultiThreadedProxyResolver() {
397 // We will cancel all outstanding requests.
398 pending_jobs_.clear();
399 ReleaseAllExecutors();
402 int MultiThreadedProxyResolver::GetProxyForURL(
403 const GURL& url, ProxyInfo* results, const CompletionCallback& callback,
404 RequestHandle* request, const BoundNetLog& net_log) {
405 DCHECK(CalledOnValidThread());
406 DCHECK(!callback.is_null());
407 DCHECK(current_script_data_.get())
408 << "Resolver is un-initialized. Must call SetPacScript() first!";
410 scoped_refptr<GetProxyForURLJob> job(
411 new GetProxyForURLJob(url, results, callback, net_log));
413 // Completion will be notified through |callback|, unless the caller cancels
414 // the request using |request|.
415 if (request)
416 *request = reinterpret_cast<RequestHandle>(job.get());
418 // If there is an executor that is ready to run this request, submit it!
419 Executor* executor = FindIdleExecutor();
420 if (executor) {
421 DCHECK_EQ(0u, pending_jobs_.size());
422 executor->StartJob(job);
423 return ERR_IO_PENDING;
426 // Otherwise queue this request. (We will schedule it to a thread once one
427 // becomes available).
428 job->WaitingForThread();
429 pending_jobs_.push_back(job);
431 // If we haven't already reached the thread limit, provision a new thread to
432 // drain the requests more quickly.
433 if (executors_.size() < max_num_threads_) {
434 executor = AddNewExecutor();
435 executor->StartJob(
436 new SetPacScriptJob(current_script_data_, CompletionCallback()));
439 return ERR_IO_PENDING;
442 void MultiThreadedProxyResolver::CancelRequest(RequestHandle req) {
443 DCHECK(CalledOnValidThread());
444 DCHECK(req);
446 Job* job = reinterpret_cast<Job*>(req);
447 DCHECK_EQ(Job::TYPE_GET_PROXY_FOR_URL, job->type());
449 if (job->executor()) {
450 // If the job was already submitted to the executor, just mark it
451 // as cancelled so the user callback isn't run on completion.
452 job->Cancel();
453 } else {
454 // Otherwise the job is just sitting in a queue.
455 PendingJobsQueue::iterator it =
456 std::find(pending_jobs_.begin(), pending_jobs_.end(), job);
457 DCHECK(it != pending_jobs_.end());
458 pending_jobs_.erase(it);
462 LoadState MultiThreadedProxyResolver::GetLoadState(RequestHandle req) const {
463 DCHECK(CalledOnValidThread());
464 DCHECK(req);
466 Job* job = reinterpret_cast<Job*>(req);
467 if (job->executor())
468 return job->executor()->resolver()->GetLoadStateThreadSafe(NULL);
469 return LOAD_STATE_RESOLVING_PROXY_FOR_URL;
472 LoadState MultiThreadedProxyResolver::GetLoadStateThreadSafe(
473 RequestHandle req) const {
474 NOTIMPLEMENTED();
475 return LOAD_STATE_IDLE;
478 void MultiThreadedProxyResolver::CancelSetPacScript() {
479 DCHECK(CalledOnValidThread());
480 DCHECK_EQ(0u, pending_jobs_.size());
481 DCHECK_EQ(1u, executors_.size());
482 DCHECK_EQ(Job::TYPE_SET_PAC_SCRIPT,
483 executors_[0]->outstanding_job()->type());
485 // Defensively clear some data which shouldn't be getting used
486 // anymore.
487 current_script_data_ = NULL;
489 ReleaseAllExecutors();
492 void MultiThreadedProxyResolver::PurgeMemory() {
493 DCHECK(CalledOnValidThread());
494 for (ExecutorList::iterator it = executors_.begin();
495 it != executors_.end(); ++it) {
496 Executor* executor = *it;
497 executor->PurgeMemory();
501 int MultiThreadedProxyResolver::SetPacScript(
502 const scoped_refptr<ProxyResolverScriptData>& script_data,
503 const CompletionCallback&callback) {
504 DCHECK(CalledOnValidThread());
505 DCHECK(!callback.is_null());
507 // Save the script details, so we can provision new executors later.
508 current_script_data_ = script_data;
510 // The user should not have any outstanding requests when they call
511 // SetPacScript().
512 CheckNoOutstandingUserRequests();
514 // Destroy all of the current threads and their proxy resolvers.
515 ReleaseAllExecutors();
517 // Provision a new executor, and run the SetPacScript request. On completion
518 // notification will be sent through |callback|.
519 Executor* executor = AddNewExecutor();
520 executor->StartJob(new SetPacScriptJob(script_data, callback));
521 return ERR_IO_PENDING;
524 void MultiThreadedProxyResolver::CheckNoOutstandingUserRequests() const {
525 DCHECK(CalledOnValidThread());
526 CHECK_EQ(0u, pending_jobs_.size());
528 for (ExecutorList::const_iterator it = executors_.begin();
529 it != executors_.end(); ++it) {
530 const Executor* executor = *it;
531 Job* job = executor->outstanding_job();
532 // The "has_user_callback()" is to exclude jobs for which the callback
533 // has already been invoked, or was not user-initiated (as in the case of
534 // lazy thread provisions). User-initiated jobs may !has_user_callback()
535 // when the callback has already been run. (Since we only clear the
536 // outstanding job AFTER the callback has been invoked, it is possible
537 // for a new request to be started from within the callback).
538 CHECK(!job || job->was_cancelled() || !job->has_user_callback());
542 void MultiThreadedProxyResolver::ReleaseAllExecutors() {
543 DCHECK(CalledOnValidThread());
544 for (ExecutorList::iterator it = executors_.begin();
545 it != executors_.end(); ++it) {
546 Executor* executor = *it;
547 executor->Destroy();
549 executors_.clear();
552 MultiThreadedProxyResolver::Executor*
553 MultiThreadedProxyResolver::FindIdleExecutor() {
554 DCHECK(CalledOnValidThread());
555 for (ExecutorList::iterator it = executors_.begin();
556 it != executors_.end(); ++it) {
557 Executor* executor = *it;
558 if (!executor->outstanding_job())
559 return executor;
561 return NULL;
564 MultiThreadedProxyResolver::Executor*
565 MultiThreadedProxyResolver::AddNewExecutor() {
566 DCHECK(CalledOnValidThread());
567 DCHECK_LT(executors_.size(), max_num_threads_);
568 // The "thread number" is used to give the thread a unique name.
569 int thread_number = executors_.size();
570 ProxyResolver* resolver = resolver_factory_->CreateProxyResolver();
571 Executor* executor = new Executor(
572 this, resolver, thread_number);
573 executors_.push_back(make_scoped_refptr(executor));
574 return executor;
577 void MultiThreadedProxyResolver::OnExecutorReady(Executor* executor) {
578 DCHECK(CalledOnValidThread());
579 if (pending_jobs_.empty())
580 return;
582 // Get the next job to process (FIFO). Transfer it from the pending queue
583 // to the executor.
584 scoped_refptr<Job> job = pending_jobs_.front();
585 pending_jobs_.pop_front();
586 executor->StartJob(job);
589 } // namespace net