1 // SPDX-License-Identifier: GPL-2.0
3 * Basic worker thread pool for io_uring
5 * Copyright (C) 2019 Jens Axboe
8 #include <linux/kernel.h>
9 #include <linux/init.h>
10 #include <linux/errno.h>
11 #include <linux/sched/signal.h>
13 #include <linux/mmu_context.h>
14 #include <linux/sched/mm.h>
15 #include <linux/percpu.h>
16 #include <linux/slab.h>
17 #include <linux/kthread.h>
18 #include <linux/rculist_nulls.h>
22 #define WORKER_IDLE_TIMEOUT (5 * HZ)
25 IO_WORKER_F_UP
= 1, /* up and active */
26 IO_WORKER_F_RUNNING
= 2, /* account as running */
27 IO_WORKER_F_FREE
= 4, /* worker on free list */
28 IO_WORKER_F_EXITING
= 8, /* worker exiting */
29 IO_WORKER_F_FIXED
= 16, /* static idle worker */
30 IO_WORKER_F_BOUND
= 32, /* is doing bounded work */
34 IO_WQ_BIT_EXIT
= 0, /* wq exiting */
35 IO_WQ_BIT_CANCEL
= 1, /* cancel work on list */
36 IO_WQ_BIT_ERROR
= 2, /* error on setup */
40 IO_WQE_FLAG_STALLED
= 1, /* stalled on hash */
44 * One for each thread in a wqe pool
49 struct hlist_nulls_node nulls_node
;
50 struct list_head all_list
;
51 struct task_struct
*task
;
54 struct io_wq_work
*cur_work
;
59 const struct cred
*cur_creds
;
60 const struct cred
*saved_creds
;
61 struct files_struct
*restore_files
;
64 #if BITS_PER_LONG == 64
65 #define IO_WQ_HASH_ORDER 6
67 #define IO_WQ_HASH_ORDER 5
82 * Per-node worker thread pool
87 struct io_wq_work_list work_list
;
88 unsigned long hash_map
;
90 } ____cacheline_aligned_in_smp
;
93 struct io_wqe_acct acct
[2];
95 struct hlist_nulls_head free_list
;
96 struct list_head all_list
;
105 struct io_wqe
**wqes
;
108 get_work_fn
*get_work
;
109 put_work_fn
*put_work
;
111 struct task_struct
*manager
;
112 struct user_struct
*user
;
114 struct completion done
;
119 static bool io_worker_get(struct io_worker
*worker
)
121 return refcount_inc_not_zero(&worker
->ref
);
124 static void io_worker_release(struct io_worker
*worker
)
126 if (refcount_dec_and_test(&worker
->ref
))
127 wake_up_process(worker
->task
);
131 * Note: drops the wqe->lock if returning true! The caller must re-acquire
132 * the lock in that case. Some callers need to restart handling if this
133 * happens, so we can't just re-acquire the lock on behalf of the caller.
135 static bool __io_worker_unuse(struct io_wqe
*wqe
, struct io_worker
*worker
)
137 bool dropped_lock
= false;
139 if (worker
->saved_creds
) {
140 revert_creds(worker
->saved_creds
);
141 worker
->cur_creds
= worker
->saved_creds
= NULL
;
144 if (current
->files
!= worker
->restore_files
) {
145 __acquire(&wqe
->lock
);
146 spin_unlock_irq(&wqe
->lock
);
150 current
->files
= worker
->restore_files
;
151 task_unlock(current
);
155 * If we have an active mm, we need to drop the wq lock before unusing
156 * it. If we do, return true and let the caller retry the idle loop.
160 __acquire(&wqe
->lock
);
161 spin_unlock_irq(&wqe
->lock
);
164 __set_current_state(TASK_RUNNING
);
166 unuse_mm(worker
->mm
);
174 static inline struct io_wqe_acct
*io_work_get_acct(struct io_wqe
*wqe
,
175 struct io_wq_work
*work
)
177 if (work
->flags
& IO_WQ_WORK_UNBOUND
)
178 return &wqe
->acct
[IO_WQ_ACCT_UNBOUND
];
180 return &wqe
->acct
[IO_WQ_ACCT_BOUND
];
183 static inline struct io_wqe_acct
*io_wqe_get_acct(struct io_wqe
*wqe
,
184 struct io_worker
*worker
)
186 if (worker
->flags
& IO_WORKER_F_BOUND
)
187 return &wqe
->acct
[IO_WQ_ACCT_BOUND
];
189 return &wqe
->acct
[IO_WQ_ACCT_UNBOUND
];
192 static void io_worker_exit(struct io_worker
*worker
)
194 struct io_wqe
*wqe
= worker
->wqe
;
195 struct io_wqe_acct
*acct
= io_wqe_get_acct(wqe
, worker
);
199 * If we're not at zero, someone else is holding a brief reference
200 * to the worker. Wait for that to go away.
202 set_current_state(TASK_INTERRUPTIBLE
);
203 if (!refcount_dec_and_test(&worker
->ref
))
205 __set_current_state(TASK_RUNNING
);
208 current
->flags
&= ~PF_IO_WORKER
;
209 if (worker
->flags
& IO_WORKER_F_RUNNING
)
210 atomic_dec(&acct
->nr_running
);
211 if (!(worker
->flags
& IO_WORKER_F_BOUND
))
212 atomic_dec(&wqe
->wq
->user
->processes
);
216 spin_lock_irq(&wqe
->lock
);
217 hlist_nulls_del_rcu(&worker
->nulls_node
);
218 list_del_rcu(&worker
->all_list
);
219 if (__io_worker_unuse(wqe
, worker
)) {
220 __release(&wqe
->lock
);
221 spin_lock_irq(&wqe
->lock
);
224 nr_workers
= wqe
->acct
[IO_WQ_ACCT_BOUND
].nr_workers
+
225 wqe
->acct
[IO_WQ_ACCT_UNBOUND
].nr_workers
;
226 spin_unlock_irq(&wqe
->lock
);
228 /* all workers gone, wq exit can proceed */
229 if (!nr_workers
&& refcount_dec_and_test(&wqe
->wq
->refs
))
230 complete(&wqe
->wq
->done
);
232 kfree_rcu(worker
, rcu
);
235 static inline bool io_wqe_run_queue(struct io_wqe
*wqe
)
236 __must_hold(wqe
->lock
)
238 if (!wq_list_empty(&wqe
->work_list
) &&
239 !(wqe
->flags
& IO_WQE_FLAG_STALLED
))
245 * Check head of free list for an available worker. If one isn't available,
246 * caller must wake up the wq manager to create one.
248 static bool io_wqe_activate_free_worker(struct io_wqe
*wqe
)
251 struct hlist_nulls_node
*n
;
252 struct io_worker
*worker
;
254 n
= rcu_dereference(hlist_nulls_first_rcu(&wqe
->free_list
));
258 worker
= hlist_nulls_entry(n
, struct io_worker
, nulls_node
);
259 if (io_worker_get(worker
)) {
260 wake_up_process(worker
->task
);
261 io_worker_release(worker
);
269 * We need a worker. If we find a free one, we're good. If not, and we're
270 * below the max number of workers, wake up the manager to create one.
272 static void io_wqe_wake_worker(struct io_wqe
*wqe
, struct io_wqe_acct
*acct
)
277 * Most likely an attempt to queue unbounded work on an io_wq that
278 * wasn't setup with any unbounded workers.
280 WARN_ON_ONCE(!acct
->max_workers
);
283 ret
= io_wqe_activate_free_worker(wqe
);
286 if (!ret
&& acct
->nr_workers
< acct
->max_workers
)
287 wake_up_process(wqe
->wq
->manager
);
290 static void io_wqe_inc_running(struct io_wqe
*wqe
, struct io_worker
*worker
)
292 struct io_wqe_acct
*acct
= io_wqe_get_acct(wqe
, worker
);
294 atomic_inc(&acct
->nr_running
);
297 static void io_wqe_dec_running(struct io_wqe
*wqe
, struct io_worker
*worker
)
298 __must_hold(wqe
->lock
)
300 struct io_wqe_acct
*acct
= io_wqe_get_acct(wqe
, worker
);
302 if (atomic_dec_and_test(&acct
->nr_running
) && io_wqe_run_queue(wqe
))
303 io_wqe_wake_worker(wqe
, acct
);
306 static void io_worker_start(struct io_wqe
*wqe
, struct io_worker
*worker
)
308 allow_kernel_signal(SIGINT
);
310 current
->flags
|= PF_IO_WORKER
;
312 worker
->flags
|= (IO_WORKER_F_UP
| IO_WORKER_F_RUNNING
);
313 worker
->restore_files
= current
->files
;
314 io_wqe_inc_running(wqe
, worker
);
318 * Worker will start processing some work. Move it to the busy list, if
319 * it's currently on the freelist
321 static void __io_worker_busy(struct io_wqe
*wqe
, struct io_worker
*worker
,
322 struct io_wq_work
*work
)
323 __must_hold(wqe
->lock
)
325 bool worker_bound
, work_bound
;
327 if (worker
->flags
& IO_WORKER_F_FREE
) {
328 worker
->flags
&= ~IO_WORKER_F_FREE
;
329 hlist_nulls_del_init_rcu(&worker
->nulls_node
);
333 * If worker is moving from bound to unbound (or vice versa), then
334 * ensure we update the running accounting.
336 worker_bound
= (worker
->flags
& IO_WORKER_F_BOUND
) != 0;
337 work_bound
= (work
->flags
& IO_WQ_WORK_UNBOUND
) == 0;
338 if (worker_bound
!= work_bound
) {
339 io_wqe_dec_running(wqe
, worker
);
341 worker
->flags
|= IO_WORKER_F_BOUND
;
342 wqe
->acct
[IO_WQ_ACCT_UNBOUND
].nr_workers
--;
343 wqe
->acct
[IO_WQ_ACCT_BOUND
].nr_workers
++;
344 atomic_dec(&wqe
->wq
->user
->processes
);
346 worker
->flags
&= ~IO_WORKER_F_BOUND
;
347 wqe
->acct
[IO_WQ_ACCT_UNBOUND
].nr_workers
++;
348 wqe
->acct
[IO_WQ_ACCT_BOUND
].nr_workers
--;
349 atomic_inc(&wqe
->wq
->user
->processes
);
351 io_wqe_inc_running(wqe
, worker
);
356 * No work, worker going to sleep. Move to freelist, and unuse mm if we
357 * have one attached. Dropping the mm may potentially sleep, so we drop
358 * the lock in that case and return success. Since the caller has to
359 * retry the loop in that case (we changed task state), we don't regrab
360 * the lock if we return success.
362 static bool __io_worker_idle(struct io_wqe
*wqe
, struct io_worker
*worker
)
363 __must_hold(wqe
->lock
)
365 if (!(worker
->flags
& IO_WORKER_F_FREE
)) {
366 worker
->flags
|= IO_WORKER_F_FREE
;
367 hlist_nulls_add_head_rcu(&worker
->nulls_node
, &wqe
->free_list
);
370 return __io_worker_unuse(wqe
, worker
);
373 static struct io_wq_work
*io_get_next_work(struct io_wqe
*wqe
, unsigned *hash
)
374 __must_hold(wqe
->lock
)
376 struct io_wq_work_node
*node
, *prev
;
377 struct io_wq_work
*work
;
379 wq_list_for_each(node
, prev
, &wqe
->work_list
) {
380 work
= container_of(node
, struct io_wq_work
, list
);
382 /* not hashed, can run anytime */
383 if (!(work
->flags
& IO_WQ_WORK_HASHED
)) {
384 wq_node_del(&wqe
->work_list
, node
, prev
);
388 /* hashed, can run if not already running */
389 *hash
= work
->flags
>> IO_WQ_HASH_SHIFT
;
390 if (!(wqe
->hash_map
& BIT_ULL(*hash
))) {
391 wqe
->hash_map
|= BIT_ULL(*hash
);
392 wq_node_del(&wqe
->work_list
, node
, prev
);
400 static void io_wq_switch_mm(struct io_worker
*worker
, struct io_wq_work
*work
)
403 unuse_mm(worker
->mm
);
411 if (mmget_not_zero(work
->mm
)) {
415 worker
->mm
= work
->mm
;
416 /* hang on to this mm */
421 /* failed grabbing mm, ensure work gets cancelled */
422 work
->flags
|= IO_WQ_WORK_CANCEL
;
425 static void io_wq_switch_creds(struct io_worker
*worker
,
426 struct io_wq_work
*work
)
428 const struct cred
*old_creds
= override_creds(work
->creds
);
430 worker
->cur_creds
= work
->creds
;
431 if (worker
->saved_creds
)
432 put_cred(old_creds
); /* creds set by previous switch */
434 worker
->saved_creds
= old_creds
;
437 static void io_worker_handle_work(struct io_worker
*worker
)
438 __releases(wqe
->lock
)
440 struct io_wq_work
*work
, *old_work
= NULL
, *put_work
= NULL
;
441 struct io_wqe
*wqe
= worker
->wqe
;
442 struct io_wq
*wq
= wqe
->wq
;
448 * If we got some work, mark us as busy. If we didn't, but
449 * the list isn't empty, it means we stalled on hashed work.
450 * Mark us stalled so we don't keep looking for work when we
451 * can't make progress, any work completion or insertion will
452 * clear the stalled flag.
454 work
= io_get_next_work(wqe
, &hash
);
456 __io_worker_busy(wqe
, worker
, work
);
457 else if (!wq_list_empty(&wqe
->work_list
))
458 wqe
->flags
|= IO_WQE_FLAG_STALLED
;
460 spin_unlock_irq(&wqe
->lock
);
461 if (put_work
&& wq
->put_work
)
462 wq
->put_work(old_work
);
466 /* flush any pending signals before assigning new work */
467 if (signal_pending(current
))
468 flush_signals(current
);
472 spin_lock_irq(&worker
->lock
);
473 worker
->cur_work
= work
;
474 spin_unlock_irq(&worker
->lock
);
476 if (work
->flags
& IO_WQ_WORK_CB
)
479 if (work
->files
&& current
->files
!= work
->files
) {
481 current
->files
= work
->files
;
482 task_unlock(current
);
484 if (work
->mm
!= worker
->mm
)
485 io_wq_switch_mm(worker
, work
);
486 if (worker
->cur_creds
!= work
->creds
)
487 io_wq_switch_creds(worker
, work
);
489 * OK to set IO_WQ_WORK_CANCEL even for uncancellable work,
490 * the worker function will do the right thing.
492 if (test_bit(IO_WQ_BIT_CANCEL
, &wq
->state
))
493 work
->flags
|= IO_WQ_WORK_CANCEL
;
495 work
->flags
|= IO_WQ_WORK_HAS_MM
;
497 if (wq
->get_work
&& !(work
->flags
& IO_WQ_WORK_INTERNAL
)) {
505 spin_lock_irq(&worker
->lock
);
506 worker
->cur_work
= NULL
;
507 spin_unlock_irq(&worker
->lock
);
509 spin_lock_irq(&wqe
->lock
);
512 wqe
->hash_map
&= ~BIT_ULL(hash
);
513 wqe
->flags
&= ~IO_WQE_FLAG_STALLED
;
515 if (work
&& work
!= old_work
) {
516 spin_unlock_irq(&wqe
->lock
);
518 if (put_work
&& wq
->put_work
) {
519 wq
->put_work(put_work
);
523 /* dependent work not hashed */
530 static inline void io_worker_spin_for_work(struct io_wqe
*wqe
)
535 if (io_wqe_run_queue(wqe
))
543 static int io_wqe_worker(void *data
)
545 struct io_worker
*worker
= data
;
546 struct io_wqe
*wqe
= worker
->wqe
;
547 struct io_wq
*wq
= wqe
->wq
;
550 io_worker_start(wqe
, worker
);
553 while (!test_bit(IO_WQ_BIT_EXIT
, &wq
->state
)) {
554 set_current_state(TASK_INTERRUPTIBLE
);
557 io_worker_spin_for_work(wqe
);
558 spin_lock_irq(&wqe
->lock
);
559 if (io_wqe_run_queue(wqe
)) {
560 __set_current_state(TASK_RUNNING
);
561 io_worker_handle_work(worker
);
566 /* drops the lock on success, retry */
567 if (__io_worker_idle(wqe
, worker
)) {
568 __release(&wqe
->lock
);
571 spin_unlock_irq(&wqe
->lock
);
572 if (signal_pending(current
))
573 flush_signals(current
);
574 if (schedule_timeout(WORKER_IDLE_TIMEOUT
))
576 /* timed out, exit unless we're the fixed worker */
577 if (test_bit(IO_WQ_BIT_EXIT
, &wq
->state
) ||
578 !(worker
->flags
& IO_WORKER_F_FIXED
))
582 if (test_bit(IO_WQ_BIT_EXIT
, &wq
->state
)) {
583 spin_lock_irq(&wqe
->lock
);
584 if (!wq_list_empty(&wqe
->work_list
))
585 io_worker_handle_work(worker
);
587 spin_unlock_irq(&wqe
->lock
);
590 io_worker_exit(worker
);
595 * Called when a worker is scheduled in. Mark us as currently running.
597 void io_wq_worker_running(struct task_struct
*tsk
)
599 struct io_worker
*worker
= kthread_data(tsk
);
600 struct io_wqe
*wqe
= worker
->wqe
;
602 if (!(worker
->flags
& IO_WORKER_F_UP
))
604 if (worker
->flags
& IO_WORKER_F_RUNNING
)
606 worker
->flags
|= IO_WORKER_F_RUNNING
;
607 io_wqe_inc_running(wqe
, worker
);
611 * Called when worker is going to sleep. If there are no workers currently
612 * running and we have work pending, wake up a free one or have the manager
615 void io_wq_worker_sleeping(struct task_struct
*tsk
)
617 struct io_worker
*worker
= kthread_data(tsk
);
618 struct io_wqe
*wqe
= worker
->wqe
;
620 if (!(worker
->flags
& IO_WORKER_F_UP
))
622 if (!(worker
->flags
& IO_WORKER_F_RUNNING
))
625 worker
->flags
&= ~IO_WORKER_F_RUNNING
;
627 spin_lock_irq(&wqe
->lock
);
628 io_wqe_dec_running(wqe
, worker
);
629 spin_unlock_irq(&wqe
->lock
);
632 static bool create_io_worker(struct io_wq
*wq
, struct io_wqe
*wqe
, int index
)
634 struct io_wqe_acct
*acct
=&wqe
->acct
[index
];
635 struct io_worker
*worker
;
637 worker
= kzalloc_node(sizeof(*worker
), GFP_KERNEL
, wqe
->node
);
641 refcount_set(&worker
->ref
, 1);
642 worker
->nulls_node
.pprev
= NULL
;
644 spin_lock_init(&worker
->lock
);
646 worker
->task
= kthread_create_on_node(io_wqe_worker
, worker
, wqe
->node
,
647 "io_wqe_worker-%d/%d", index
, wqe
->node
);
648 if (IS_ERR(worker
->task
)) {
653 spin_lock_irq(&wqe
->lock
);
654 hlist_nulls_add_head_rcu(&worker
->nulls_node
, &wqe
->free_list
);
655 list_add_tail_rcu(&worker
->all_list
, &wqe
->all_list
);
656 worker
->flags
|= IO_WORKER_F_FREE
;
657 if (index
== IO_WQ_ACCT_BOUND
)
658 worker
->flags
|= IO_WORKER_F_BOUND
;
659 if (!acct
->nr_workers
&& (worker
->flags
& IO_WORKER_F_BOUND
))
660 worker
->flags
|= IO_WORKER_F_FIXED
;
662 spin_unlock_irq(&wqe
->lock
);
664 if (index
== IO_WQ_ACCT_UNBOUND
)
665 atomic_inc(&wq
->user
->processes
);
667 wake_up_process(worker
->task
);
671 static inline bool io_wqe_need_worker(struct io_wqe
*wqe
, int index
)
672 __must_hold(wqe
->lock
)
674 struct io_wqe_acct
*acct
= &wqe
->acct
[index
];
676 /* if we have available workers or no work, no need */
677 if (!hlist_nulls_empty(&wqe
->free_list
) || !io_wqe_run_queue(wqe
))
679 return acct
->nr_workers
< acct
->max_workers
;
683 * Manager thread. Tasked with creating new workers, if we need them.
685 static int io_wq_manager(void *data
)
687 struct io_wq
*wq
= data
;
688 int workers_to_create
= num_possible_nodes();
691 /* create fixed workers */
692 refcount_set(&wq
->refs
, workers_to_create
);
693 for_each_node(node
) {
694 if (!create_io_worker(wq
, wq
->wqes
[node
], IO_WQ_ACCT_BOUND
))
701 while (!kthread_should_stop()) {
702 for_each_node(node
) {
703 struct io_wqe
*wqe
= wq
->wqes
[node
];
704 bool fork_worker
[2] = { false, false };
706 spin_lock_irq(&wqe
->lock
);
707 if (io_wqe_need_worker(wqe
, IO_WQ_ACCT_BOUND
))
708 fork_worker
[IO_WQ_ACCT_BOUND
] = true;
709 if (io_wqe_need_worker(wqe
, IO_WQ_ACCT_UNBOUND
))
710 fork_worker
[IO_WQ_ACCT_UNBOUND
] = true;
711 spin_unlock_irq(&wqe
->lock
);
712 if (fork_worker
[IO_WQ_ACCT_BOUND
])
713 create_io_worker(wq
, wqe
, IO_WQ_ACCT_BOUND
);
714 if (fork_worker
[IO_WQ_ACCT_UNBOUND
])
715 create_io_worker(wq
, wqe
, IO_WQ_ACCT_UNBOUND
);
717 set_current_state(TASK_INTERRUPTIBLE
);
718 schedule_timeout(HZ
);
723 set_bit(IO_WQ_BIT_ERROR
, &wq
->state
);
724 set_bit(IO_WQ_BIT_EXIT
, &wq
->state
);
725 if (refcount_sub_and_test(workers_to_create
, &wq
->refs
))
730 static bool io_wq_can_queue(struct io_wqe
*wqe
, struct io_wqe_acct
*acct
,
731 struct io_wq_work
*work
)
735 if (!(work
->flags
& IO_WQ_WORK_UNBOUND
))
737 if (atomic_read(&acct
->nr_running
))
741 free_worker
= !hlist_nulls_empty(&wqe
->free_list
);
746 if (atomic_read(&wqe
->wq
->user
->processes
) >= acct
->max_workers
&&
747 !(capable(CAP_SYS_RESOURCE
) || capable(CAP_SYS_ADMIN
)))
753 static void io_wqe_enqueue(struct io_wqe
*wqe
, struct io_wq_work
*work
)
755 struct io_wqe_acct
*acct
= io_work_get_acct(wqe
, work
);
760 * Do early check to see if we need a new unbound worker, and if we do,
761 * if we're allowed to do so. This isn't 100% accurate as there's a
762 * gap between this check and incrementing the value, but that's OK.
763 * It's close enough to not be an issue, fork() has the same delay.
765 if (unlikely(!io_wq_can_queue(wqe
, acct
, work
))) {
766 work
->flags
|= IO_WQ_WORK_CANCEL
;
771 work_flags
= work
->flags
;
772 spin_lock_irqsave(&wqe
->lock
, flags
);
773 wq_list_add_tail(&work
->list
, &wqe
->work_list
);
774 wqe
->flags
&= ~IO_WQE_FLAG_STALLED
;
775 spin_unlock_irqrestore(&wqe
->lock
, flags
);
777 if ((work_flags
& IO_WQ_WORK_CONCURRENT
) ||
778 !atomic_read(&acct
->nr_running
))
779 io_wqe_wake_worker(wqe
, acct
);
782 void io_wq_enqueue(struct io_wq
*wq
, struct io_wq_work
*work
)
784 struct io_wqe
*wqe
= wq
->wqes
[numa_node_id()];
786 io_wqe_enqueue(wqe
, work
);
790 * Enqueue work, hashed by some key. Work items that hash to the same value
791 * will not be done in parallel. Used to limit concurrent writes, generally
794 void io_wq_enqueue_hashed(struct io_wq
*wq
, struct io_wq_work
*work
, void *val
)
796 struct io_wqe
*wqe
= wq
->wqes
[numa_node_id()];
800 bit
= hash_ptr(val
, IO_WQ_HASH_ORDER
);
801 work
->flags
|= (IO_WQ_WORK_HASHED
| (bit
<< IO_WQ_HASH_SHIFT
));
802 io_wqe_enqueue(wqe
, work
);
805 static bool io_wqe_worker_send_sig(struct io_worker
*worker
, void *data
)
807 send_sig(SIGINT
, worker
->task
, 1);
812 * Iterate the passed in list and call the specific function for each
813 * worker that isn't exiting
815 static bool io_wq_for_each_worker(struct io_wqe
*wqe
,
816 bool (*func
)(struct io_worker
*, void *),
819 struct io_worker
*worker
;
822 list_for_each_entry_rcu(worker
, &wqe
->all_list
, all_list
) {
823 if (io_worker_get(worker
)) {
824 ret
= func(worker
, data
);
825 io_worker_release(worker
);
834 void io_wq_cancel_all(struct io_wq
*wq
)
838 set_bit(IO_WQ_BIT_CANCEL
, &wq
->state
);
841 for_each_node(node
) {
842 struct io_wqe
*wqe
= wq
->wqes
[node
];
844 io_wq_for_each_worker(wqe
, io_wqe_worker_send_sig
, NULL
);
849 struct io_cb_cancel_data
{
851 work_cancel_fn
*cancel
;
855 static bool io_work_cancel(struct io_worker
*worker
, void *cancel_data
)
857 struct io_cb_cancel_data
*data
= cancel_data
;
862 * Hold the lock to avoid ->cur_work going out of scope, caller
863 * may dereference the passed in work.
865 spin_lock_irqsave(&worker
->lock
, flags
);
866 if (worker
->cur_work
&&
867 !(worker
->cur_work
->flags
& IO_WQ_WORK_NO_CANCEL
) &&
868 data
->cancel(worker
->cur_work
, data
->caller_data
)) {
869 send_sig(SIGINT
, worker
->task
, 1);
872 spin_unlock_irqrestore(&worker
->lock
, flags
);
877 static enum io_wq_cancel
io_wqe_cancel_cb_work(struct io_wqe
*wqe
,
878 work_cancel_fn
*cancel
,
881 struct io_cb_cancel_data data
= {
884 .caller_data
= cancel_data
,
886 struct io_wq_work_node
*node
, *prev
;
887 struct io_wq_work
*work
;
891 spin_lock_irqsave(&wqe
->lock
, flags
);
892 wq_list_for_each(node
, prev
, &wqe
->work_list
) {
893 work
= container_of(node
, struct io_wq_work
, list
);
895 if (cancel(work
, cancel_data
)) {
896 wq_node_del(&wqe
->work_list
, node
, prev
);
901 spin_unlock_irqrestore(&wqe
->lock
, flags
);
904 work
->flags
|= IO_WQ_WORK_CANCEL
;
906 return IO_WQ_CANCEL_OK
;
910 found
= io_wq_for_each_worker(wqe
, io_work_cancel
, &data
);
912 return found
? IO_WQ_CANCEL_RUNNING
: IO_WQ_CANCEL_NOTFOUND
;
915 enum io_wq_cancel
io_wq_cancel_cb(struct io_wq
*wq
, work_cancel_fn
*cancel
,
918 enum io_wq_cancel ret
= IO_WQ_CANCEL_NOTFOUND
;
921 for_each_node(node
) {
922 struct io_wqe
*wqe
= wq
->wqes
[node
];
924 ret
= io_wqe_cancel_cb_work(wqe
, cancel
, data
);
925 if (ret
!= IO_WQ_CANCEL_NOTFOUND
)
932 static bool io_wq_worker_cancel(struct io_worker
*worker
, void *data
)
934 struct io_wq_work
*work
= data
;
938 if (worker
->cur_work
!= work
)
941 spin_lock_irqsave(&worker
->lock
, flags
);
942 if (worker
->cur_work
== work
&&
943 !(worker
->cur_work
->flags
& IO_WQ_WORK_NO_CANCEL
)) {
944 send_sig(SIGINT
, worker
->task
, 1);
947 spin_unlock_irqrestore(&worker
->lock
, flags
);
952 static enum io_wq_cancel
io_wqe_cancel_work(struct io_wqe
*wqe
,
953 struct io_wq_work
*cwork
)
955 struct io_wq_work_node
*node
, *prev
;
956 struct io_wq_work
*work
;
960 cwork
->flags
|= IO_WQ_WORK_CANCEL
;
963 * First check pending list, if we're lucky we can just remove it
964 * from there. CANCEL_OK means that the work is returned as-new,
965 * no completion will be posted for it.
967 spin_lock_irqsave(&wqe
->lock
, flags
);
968 wq_list_for_each(node
, prev
, &wqe
->work_list
) {
969 work
= container_of(node
, struct io_wq_work
, list
);
972 wq_node_del(&wqe
->work_list
, node
, prev
);
977 spin_unlock_irqrestore(&wqe
->lock
, flags
);
980 work
->flags
|= IO_WQ_WORK_CANCEL
;
982 return IO_WQ_CANCEL_OK
;
986 * Now check if a free (going busy) or busy worker has the work
987 * currently running. If we find it there, we'll return CANCEL_RUNNING
988 * as an indication that we attempt to signal cancellation. The
989 * completion will run normally in this case.
992 found
= io_wq_for_each_worker(wqe
, io_wq_worker_cancel
, cwork
);
994 return found
? IO_WQ_CANCEL_RUNNING
: IO_WQ_CANCEL_NOTFOUND
;
997 enum io_wq_cancel
io_wq_cancel_work(struct io_wq
*wq
, struct io_wq_work
*cwork
)
999 enum io_wq_cancel ret
= IO_WQ_CANCEL_NOTFOUND
;
1002 for_each_node(node
) {
1003 struct io_wqe
*wqe
= wq
->wqes
[node
];
1005 ret
= io_wqe_cancel_work(wqe
, cwork
);
1006 if (ret
!= IO_WQ_CANCEL_NOTFOUND
)
1013 struct io_wq_flush_data
{
1014 struct io_wq_work work
;
1015 struct completion done
;
1018 static void io_wq_flush_func(struct io_wq_work
**workptr
)
1020 struct io_wq_work
*work
= *workptr
;
1021 struct io_wq_flush_data
*data
;
1023 data
= container_of(work
, struct io_wq_flush_data
, work
);
1024 complete(&data
->done
);
1028 * Doesn't wait for previously queued work to finish. When this completes,
1029 * it just means that previously queued work was started.
1031 void io_wq_flush(struct io_wq
*wq
)
1033 struct io_wq_flush_data data
;
1036 for_each_node(node
) {
1037 struct io_wqe
*wqe
= wq
->wqes
[node
];
1039 init_completion(&data
.done
);
1040 INIT_IO_WORK(&data
.work
, io_wq_flush_func
);
1041 data
.work
.flags
|= IO_WQ_WORK_INTERNAL
;
1042 io_wqe_enqueue(wqe
, &data
.work
);
1043 wait_for_completion(&data
.done
);
1047 struct io_wq
*io_wq_create(unsigned bounded
, struct io_wq_data
*data
)
1049 int ret
= -ENOMEM
, node
;
1052 wq
= kzalloc(sizeof(*wq
), GFP_KERNEL
);
1054 return ERR_PTR(-ENOMEM
);
1056 wq
->wqes
= kcalloc(nr_node_ids
, sizeof(struct io_wqe
*), GFP_KERNEL
);
1059 return ERR_PTR(-ENOMEM
);
1062 wq
->get_work
= data
->get_work
;
1063 wq
->put_work
= data
->put_work
;
1065 /* caller must already hold a reference to this */
1066 wq
->user
= data
->user
;
1068 for_each_node(node
) {
1071 wqe
= kzalloc_node(sizeof(struct io_wqe
), GFP_KERNEL
, node
);
1074 wq
->wqes
[node
] = wqe
;
1076 wqe
->acct
[IO_WQ_ACCT_BOUND
].max_workers
= bounded
;
1077 atomic_set(&wqe
->acct
[IO_WQ_ACCT_BOUND
].nr_running
, 0);
1079 wqe
->acct
[IO_WQ_ACCT_UNBOUND
].max_workers
=
1080 task_rlimit(current
, RLIMIT_NPROC
);
1082 atomic_set(&wqe
->acct
[IO_WQ_ACCT_UNBOUND
].nr_running
, 0);
1085 spin_lock_init(&wqe
->lock
);
1086 INIT_WQ_LIST(&wqe
->work_list
);
1087 INIT_HLIST_NULLS_HEAD(&wqe
->free_list
, 0);
1088 INIT_LIST_HEAD(&wqe
->all_list
);
1091 init_completion(&wq
->done
);
1093 wq
->manager
= kthread_create(io_wq_manager
, wq
, "io_wq_manager");
1094 if (!IS_ERR(wq
->manager
)) {
1095 wake_up_process(wq
->manager
);
1096 wait_for_completion(&wq
->done
);
1097 if (test_bit(IO_WQ_BIT_ERROR
, &wq
->state
)) {
1101 refcount_set(&wq
->use_refs
, 1);
1102 reinit_completion(&wq
->done
);
1106 ret
= PTR_ERR(wq
->manager
);
1107 complete(&wq
->done
);
1110 kfree(wq
->wqes
[node
]);
1113 return ERR_PTR(ret
);
1116 bool io_wq_get(struct io_wq
*wq
, struct io_wq_data
*data
)
1118 if (data
->get_work
!= wq
->get_work
|| data
->put_work
!= wq
->put_work
)
1121 return refcount_inc_not_zero(&wq
->use_refs
);
1124 static bool io_wq_worker_wake(struct io_worker
*worker
, void *data
)
1126 wake_up_process(worker
->task
);
1130 static void __io_wq_destroy(struct io_wq
*wq
)
1134 set_bit(IO_WQ_BIT_EXIT
, &wq
->state
);
1136 kthread_stop(wq
->manager
);
1140 io_wq_for_each_worker(wq
->wqes
[node
], io_wq_worker_wake
, NULL
);
1143 wait_for_completion(&wq
->done
);
1146 kfree(wq
->wqes
[node
]);
1151 void io_wq_destroy(struct io_wq
*wq
)
1153 if (refcount_dec_and_test(&wq
->use_refs
))
1154 __io_wq_destroy(wq
);