1 // SPDX-License-Identifier: GPL-2.0
3 * Copyright (C) 2007 Oracle. All rights reserved.
4 * Copyright (C) 2014 Fujitsu. All rights reserved.
7 #include <linux/kthread.h>
8 #include <linux/slab.h>
9 #include <linux/list.h>
10 #include <linux/spinlock.h>
11 #include <linux/freezer.h>
12 #include "async-thread.h"
21 #define NO_THRESHOLD (-1)
22 #define DFT_THRESHOLD (32)
24 struct __btrfs_workqueue
{
25 struct workqueue_struct
*normal_wq
;
27 /* File system this workqueue services */
28 struct btrfs_fs_info
*fs_info
;
30 /* List head pointing to ordered work list */
31 struct list_head ordered_list
;
33 /* Spinlock for ordered_list */
36 /* Thresholding related variants */
39 /* Up limit of concurrency workers */
42 /* Current number of concurrency workers */
45 /* Threshold to change current_active */
48 spinlock_t thres_lock
;
51 struct btrfs_workqueue
{
52 struct __btrfs_workqueue
*normal
;
53 struct __btrfs_workqueue
*high
;
56 struct btrfs_fs_info
* __pure
btrfs_workqueue_owner(const struct __btrfs_workqueue
*wq
)
61 struct btrfs_fs_info
* __pure
btrfs_work_owner(const struct btrfs_work
*work
)
63 return work
->wq
->fs_info
;
66 bool btrfs_workqueue_normal_congested(const struct btrfs_workqueue
*wq
)
69 * We could compare wq->normal->pending with num_online_cpus()
70 * to support "thresh == NO_THRESHOLD" case, but it requires
71 * moving up atomic_inc/dec in thresh_queue/exec_hook. Let's
72 * postpone it until someone needs the support of that case.
74 if (wq
->normal
->thresh
== NO_THRESHOLD
)
77 return atomic_read(&wq
->normal
->pending
) > wq
->normal
->thresh
* 2;
80 static struct __btrfs_workqueue
*
81 __btrfs_alloc_workqueue(struct btrfs_fs_info
*fs_info
, const char *name
,
82 unsigned int flags
, int limit_active
, int thresh
)
84 struct __btrfs_workqueue
*ret
= kzalloc(sizeof(*ret
), GFP_KERNEL
);
89 ret
->fs_info
= fs_info
;
90 ret
->limit_active
= limit_active
;
91 atomic_set(&ret
->pending
, 0);
93 thresh
= DFT_THRESHOLD
;
94 /* For low threshold, disabling threshold is a better choice */
95 if (thresh
< DFT_THRESHOLD
) {
96 ret
->current_active
= limit_active
;
97 ret
->thresh
= NO_THRESHOLD
;
100 * For threshold-able wq, let its concurrency grow on demand.
101 * Use minimal max_active at alloc time to reduce resource
104 ret
->current_active
= 1;
105 ret
->thresh
= thresh
;
108 if (flags
& WQ_HIGHPRI
)
109 ret
->normal_wq
= alloc_workqueue("btrfs-%s-high", flags
,
110 ret
->current_active
, name
);
112 ret
->normal_wq
= alloc_workqueue("btrfs-%s", flags
,
113 ret
->current_active
, name
);
114 if (!ret
->normal_wq
) {
119 INIT_LIST_HEAD(&ret
->ordered_list
);
120 spin_lock_init(&ret
->list_lock
);
121 spin_lock_init(&ret
->thres_lock
);
122 trace_btrfs_workqueue_alloc(ret
, name
, flags
& WQ_HIGHPRI
);
127 __btrfs_destroy_workqueue(struct __btrfs_workqueue
*wq
);
129 struct btrfs_workqueue
*btrfs_alloc_workqueue(struct btrfs_fs_info
*fs_info
,
135 struct btrfs_workqueue
*ret
= kzalloc(sizeof(*ret
), GFP_KERNEL
);
140 ret
->normal
= __btrfs_alloc_workqueue(fs_info
, name
,
142 limit_active
, thresh
);
148 if (flags
& WQ_HIGHPRI
) {
149 ret
->high
= __btrfs_alloc_workqueue(fs_info
, name
, flags
,
150 limit_active
, thresh
);
152 __btrfs_destroy_workqueue(ret
->normal
);
161 * Hook for threshold which will be called in btrfs_queue_work.
162 * This hook WILL be called in IRQ handler context,
163 * so workqueue_set_max_active MUST NOT be called in this hook
165 static inline void thresh_queue_hook(struct __btrfs_workqueue
*wq
)
167 if (wq
->thresh
== NO_THRESHOLD
)
169 atomic_inc(&wq
->pending
);
173 * Hook for threshold which will be called before executing the work,
174 * This hook is called in kthread content.
175 * So workqueue_set_max_active is called here.
177 static inline void thresh_exec_hook(struct __btrfs_workqueue
*wq
)
179 int new_current_active
;
183 if (wq
->thresh
== NO_THRESHOLD
)
186 atomic_dec(&wq
->pending
);
187 spin_lock(&wq
->thres_lock
);
189 * Use wq->count to limit the calling frequency of
190 * workqueue_set_max_active.
193 wq
->count
%= (wq
->thresh
/ 4);
196 new_current_active
= wq
->current_active
;
199 * pending may be changed later, but it's OK since we really
200 * don't need it so accurate to calculate new_max_active.
202 pending
= atomic_read(&wq
->pending
);
203 if (pending
> wq
->thresh
)
204 new_current_active
++;
205 if (pending
< wq
->thresh
/ 2)
206 new_current_active
--;
207 new_current_active
= clamp_val(new_current_active
, 1, wq
->limit_active
);
208 if (new_current_active
!= wq
->current_active
) {
210 wq
->current_active
= new_current_active
;
213 spin_unlock(&wq
->thres_lock
);
216 workqueue_set_max_active(wq
->normal_wq
, wq
->current_active
);
220 static void run_ordered_work(struct __btrfs_workqueue
*wq
,
221 struct btrfs_work
*self
)
223 struct list_head
*list
= &wq
->ordered_list
;
224 struct btrfs_work
*work
;
225 spinlock_t
*lock
= &wq
->list_lock
;
227 bool free_self
= false;
230 spin_lock_irqsave(lock
, flags
);
231 if (list_empty(list
))
233 work
= list_entry(list
->next
, struct btrfs_work
,
235 if (!test_bit(WORK_DONE_BIT
, &work
->flags
))
239 * we are going to call the ordered done function, but
240 * we leave the work item on the list as a barrier so
241 * that later work items that are done don't have their
242 * functions called before this one returns
244 if (test_and_set_bit(WORK_ORDER_DONE_BIT
, &work
->flags
))
246 trace_btrfs_ordered_sched(work
);
247 spin_unlock_irqrestore(lock
, flags
);
248 work
->ordered_func(work
);
250 /* now take the lock again and drop our item from the list */
251 spin_lock_irqsave(lock
, flags
);
252 list_del(&work
->ordered_list
);
253 spin_unlock_irqrestore(lock
, flags
);
257 * This is the work item that the worker is currently
260 * The kernel workqueue code guarantees non-reentrancy
261 * of work items. I.e., if a work item with the same
262 * address and work function is queued twice, the second
263 * execution is blocked until the first one finishes. A
264 * work item may be freed and recycled with the same
265 * work function; the workqueue code assumes that the
266 * original work item cannot depend on the recycled work
267 * item in that case (see find_worker_executing_work()).
269 * Note that different types of Btrfs work can depend on
270 * each other, and one type of work on one Btrfs
271 * filesystem may even depend on the same type of work
272 * on another Btrfs filesystem via, e.g., a loop device.
273 * Therefore, we must not allow the current work item to
274 * be recycled until we are really done, otherwise we
275 * break the above assumption and can deadlock.
280 * We don't want to call the ordered free functions with
283 work
->ordered_free(work
);
284 /* NB: work must not be dereferenced past this point. */
285 trace_btrfs_all_work_done(wq
->fs_info
, work
);
288 spin_unlock_irqrestore(lock
, flags
);
291 self
->ordered_free(self
);
292 /* NB: self must not be dereferenced past this point. */
293 trace_btrfs_all_work_done(wq
->fs_info
, self
);
297 static void btrfs_work_helper(struct work_struct
*normal_work
)
299 struct btrfs_work
*work
= container_of(normal_work
, struct btrfs_work
,
301 struct __btrfs_workqueue
*wq
;
305 * We should not touch things inside work in the following cases:
306 * 1) after work->func() if it has no ordered_free
307 * Since the struct is freed in work->func().
308 * 2) after setting WORK_DONE_BIT
309 * The work may be freed in other threads almost instantly.
310 * So we save the needed things here.
312 if (work
->ordered_func
)
316 trace_btrfs_work_sched(work
);
317 thresh_exec_hook(wq
);
320 set_bit(WORK_DONE_BIT
, &work
->flags
);
321 run_ordered_work(wq
, work
);
323 /* NB: work must not be dereferenced past this point. */
324 trace_btrfs_all_work_done(wq
->fs_info
, work
);
328 void btrfs_init_work(struct btrfs_work
*work
, btrfs_func_t func
,
329 btrfs_func_t ordered_func
, btrfs_func_t ordered_free
)
332 work
->ordered_func
= ordered_func
;
333 work
->ordered_free
= ordered_free
;
334 INIT_WORK(&work
->normal_work
, btrfs_work_helper
);
335 INIT_LIST_HEAD(&work
->ordered_list
);
339 static inline void __btrfs_queue_work(struct __btrfs_workqueue
*wq
,
340 struct btrfs_work
*work
)
345 thresh_queue_hook(wq
);
346 if (work
->ordered_func
) {
347 spin_lock_irqsave(&wq
->list_lock
, flags
);
348 list_add_tail(&work
->ordered_list
, &wq
->ordered_list
);
349 spin_unlock_irqrestore(&wq
->list_lock
, flags
);
351 trace_btrfs_work_queued(work
);
352 queue_work(wq
->normal_wq
, &work
->normal_work
);
355 void btrfs_queue_work(struct btrfs_workqueue
*wq
,
356 struct btrfs_work
*work
)
358 struct __btrfs_workqueue
*dest_wq
;
360 if (test_bit(WORK_HIGH_PRIO_BIT
, &work
->flags
) && wq
->high
)
363 dest_wq
= wq
->normal
;
364 __btrfs_queue_work(dest_wq
, work
);
368 __btrfs_destroy_workqueue(struct __btrfs_workqueue
*wq
)
370 destroy_workqueue(wq
->normal_wq
);
371 trace_btrfs_workqueue_destroy(wq
);
375 void btrfs_destroy_workqueue(struct btrfs_workqueue
*wq
)
380 __btrfs_destroy_workqueue(wq
->high
);
381 __btrfs_destroy_workqueue(wq
->normal
);
385 void btrfs_workqueue_set_max(struct btrfs_workqueue
*wq
, int limit_active
)
389 wq
->normal
->limit_active
= limit_active
;
391 wq
->high
->limit_active
= limit_active
;
394 void btrfs_set_work_high_priority(struct btrfs_work
*work
)
396 set_bit(WORK_HIGH_PRIO_BIT
, &work
->flags
);
399 void btrfs_flush_workqueue(struct btrfs_workqueue
*wq
)
402 flush_workqueue(wq
->high
->normal_wq
);
404 flush_workqueue(wq
->normal
->normal_wq
);