1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #ifndef SYNC_INTERNAL_API_PUBLIC_ATTACHMENTS_TASK_QUEUE_H_
6 #define SYNC_INTERNAL_API_PUBLIC_ATTACHMENTS_TASK_QUEUE_H_
11 #include "base/bind.h"
12 #include "base/callback.h"
13 #include "base/gtest_prod_util.h"
14 #include "base/macros.h"
15 #include "base/memory/weak_ptr.h"
16 #include "base/message_loop/message_loop.h"
17 #include "base/threading/non_thread_safe.h"
18 #include "base/time/time.h"
19 #include "base/timer/timer.h"
20 #include "net/base/backoff_entry.h"
24 // A queue that dispatches tasks, ignores duplicates, and provides backoff
27 // |T| is the task type.
29 // For each task added to the queue, the HandleTaskCallback will eventually be
30 // invoked. For each invocation, the user of TaskQueue must call exactly one of
31 // |MarkAsSucceeded|, |MarkAsFailed|, or |Cancel|.
33 // To retry a failed task, call MarkAsFailed(task) then AddToQueue(task).
37 // void Handle(const Foo& foo);
39 // TaskQueue<Foo> queue(base::Bind(&Handle),
40 // base::TimeDelta::FromSeconds(1),
41 // base::TimeDelta::FromMinutes(1));
45 // // Add foo to the queue. At some point, Handle will be invoked in this
47 // queue.AddToQueue(foo);
50 // void Handle(const Foo& foo) {
51 // DoSomethingWith(foo);
52 // // We must call one of the three methods to tell the queue how we're
53 // // dealing with foo. Of course, we are free to call in the the context of
54 // // this HandleTaskCallback or outside the context if we so choose.
55 // if (SuccessfullyHandled(foo)) {
56 // queue.MarkAsSucceeded(foo);
57 // } else if (Failed(foo)) {
58 // queue.MarkAsFailed(foo);
59 // if (ShouldRetry(foo)) {
60 // queue.AddToQueue(foo);
68 class TaskQueue
: base::NonThreadSafe
{
70 // A callback provided by users of the TaskQueue to handle tasks.
72 // This callback is invoked by the queue with a task to be handled. The
73 // callee is expected to (eventually) call |MarkAsSucceeded|, |MarkAsFailed|,
74 // or |Cancel| to signify completion of the task.
75 typedef base::Callback
<void(const T
&)> HandleTaskCallback
;
77 // Construct a TaskQueue.
79 // |callback| the callback to be invoked for handling tasks.
81 // |initial_backoff_delay| the initial amount of time the queue will wait
82 // before dispatching tasks after a failed task (see |MarkAsFailed|). May be
83 // zero. Subsequent failures will increase the delay up to
84 // |max_backoff_delay|.
86 // |max_backoff_delay| the maximum amount of time the queue will wait before
87 // dispatching tasks. May be zero. Must be greater than or equal to
88 // |initial_backoff_delay|.
89 TaskQueue(const HandleTaskCallback
& callback
,
90 const base::TimeDelta
& initial_backoff_delay
,
91 const base::TimeDelta
& max_backoff_delay
);
93 // Add |task| to the end of the queue.
95 // If |task| is already present (as determined by operator==) it is not added.
96 void AddToQueue(const T
& task
);
98 // Mark |task| as completing successfully.
100 // Marking a task as completing successfully will reduce or eliminate any
101 // backoff delay in effect.
103 // May only be called after the HandleTaskCallback has been invoked with
105 void MarkAsSucceeded(const T
& task
);
107 // Mark |task| as failed.
109 // Marking a task as failed will cause a backoff, i.e. a delay in dispatching
110 // of subsequent tasks. Repeated failures will increase the delay.
112 // May only be called after the HandleTaskCallback has been invoked with
114 void MarkAsFailed(const T
& task
);
118 // |task| is removed from the queue and will not be retried. Does not affect
119 // the backoff delay.
121 // May only be called after the HandleTaskCallback has been invoked with
123 void Cancel(const T
& task
);
125 // Reset any backoff delay and resume dispatching of tasks.
127 // Useful for when you know the cause of previous failures has been resolved
128 // and you want don't want to wait for the accumulated backoff delay to
132 // Use |timer| for scheduled events.
134 // Used in tests. See also MockTimer.
135 void SetTimerForTest(scoped_ptr
<base::Timer
> timer
);
138 void FinishTask(const T
& task
);
139 void ScheduleDispatch();
141 // Return true if we should dispatch tasks.
142 bool ShouldDispatch();
144 const HandleTaskCallback process_callback_
;
145 net::BackoffEntry::Policy backoff_policy_
;
146 scoped_ptr
<net::BackoffEntry
> backoff_entry_
;
147 // The number of tasks currently being handled.
148 int num_in_progress_
;
149 std::deque
<T
> queue_
;
150 // The set of tasks in queue_ or currently being handled.
152 base::Closure dispatch_closure_
;
153 scoped_ptr
<base::Timer
> backoff_timer_
;
154 base::TimeDelta delay_
;
156 // Must be last data member.
157 base::WeakPtrFactory
<TaskQueue
> weak_ptr_factory_
;
159 DISALLOW_COPY_AND_ASSIGN(TaskQueue
);
162 // The maximum number of tasks that may be concurrently executed. Think
163 // carefully before changing this value. The desired behavior of backoff may
164 // not be obvious when there is more than one concurrent task
165 const int kMaxConcurrentTasks
= 1;
167 template <typename T
>
168 TaskQueue
<T
>::TaskQueue(const HandleTaskCallback
& callback
,
169 const base::TimeDelta
& initial_backoff_delay
,
170 const base::TimeDelta
& max_backoff_delay
)
171 : process_callback_(callback
),
174 weak_ptr_factory_(this) {
175 DCHECK_LE(initial_backoff_delay
.InMicroseconds(),
176 max_backoff_delay
.InMicroseconds());
177 backoff_policy_
.initial_delay_ms
= initial_backoff_delay
.InMilliseconds();
178 backoff_policy_
.multiply_factor
= 2.0;
179 backoff_policy_
.jitter_factor
= 0.1;
180 backoff_policy_
.maximum_backoff_ms
= max_backoff_delay
.InMilliseconds();
181 backoff_policy_
.entry_lifetime_ms
= -1;
182 backoff_policy_
.always_use_initial_delay
= false;
183 backoff_entry_
.reset(new net::BackoffEntry(&backoff_policy_
));
185 base::Bind(&TaskQueue::Dispatch
, weak_ptr_factory_
.GetWeakPtr());
186 backoff_timer_
.reset(new base::Timer(false, false));
189 template <typename T
>
190 void TaskQueue
<T
>::AddToQueue(const T
& task
) {
191 DCHECK(CalledOnValidThread());
192 // Ignore duplicates.
193 if (tasks_
.find(task
) == tasks_
.end()) {
194 queue_
.push_back(task
);
200 template <typename T
>
201 void TaskQueue
<T
>::MarkAsSucceeded(const T
& task
) {
202 DCHECK(CalledOnValidThread());
204 // The task succeeded. Stop any pending timer, reset (clear) the backoff, and
205 // reschedule a dispatch.
206 backoff_timer_
->Stop();
207 backoff_entry_
->Reset();
211 template <typename T
>
212 void TaskQueue
<T
>::MarkAsFailed(const T
& task
) {
213 DCHECK(CalledOnValidThread());
215 backoff_entry_
->InformOfRequest(false);
219 template <typename T
>
220 void TaskQueue
<T
>::Cancel(const T
& task
) {
221 DCHECK(CalledOnValidThread());
226 template <typename T
>
227 void TaskQueue
<T
>::ResetBackoff() {
228 backoff_timer_
->Stop();
229 backoff_entry_
->Reset();
233 template <typename T
>
234 void TaskQueue
<T
>::SetTimerForTest(scoped_ptr
<base::Timer
> timer
) {
235 DCHECK(CalledOnValidThread());
237 backoff_timer_
= timer
.Pass();
240 template <typename T
>
241 void TaskQueue
<T
>::FinishTask(const T
& task
) {
242 DCHECK(CalledOnValidThread());
243 DCHECK_GE(num_in_progress_
, 1);
245 const size_t num_erased
= tasks_
.erase(task
);
246 DCHECK_EQ(1U, num_erased
);
249 template <typename T
>
250 void TaskQueue
<T
>::ScheduleDispatch() {
251 DCHECK(CalledOnValidThread());
252 if (backoff_timer_
->IsRunning() || !ShouldDispatch()) {
256 backoff_timer_
->Start(
257 FROM_HERE
, backoff_entry_
->GetTimeUntilRelease(), dispatch_closure_
);
260 template <typename T
>
261 void TaskQueue
<T
>::Dispatch() {
262 DCHECK(CalledOnValidThread());
263 if (!ShouldDispatch()) {
267 DCHECK(!queue_
.empty());
268 const T
& task
= queue_
.front();
270 DCHECK_LE(num_in_progress_
, kMaxConcurrentTasks
);
271 base::MessageLoop::current()->PostTask(FROM_HERE
,
272 base::Bind(process_callback_
, task
));
276 template <typename T
>
277 bool TaskQueue
<T
>::ShouldDispatch() {
278 return num_in_progress_
< kMaxConcurrentTasks
&& !queue_
.empty();
281 } // namespace syncer
283 #endif // SYNC_INTERNAL_API_PUBLIC_ATTACHMENTS_TASK_QUEUE_H_