1 // SPDX-License-Identifier: GPL-2.0
3 * Contains the core associated with submission side polling of the SQ
4 * ring, offloading submissions from the application to a kernel thread.
6 #include <linux/kernel.h>
7 #include <linux/errno.h>
8 #include <linux/file.h>
10 #include <linux/slab.h>
11 #include <linux/audit.h>
12 #include <linux/security.h>
13 #include <linux/cpuset.h>
14 #include <linux/io_uring.h>
16 #include <uapi/linux/io_uring.h>
22 #define IORING_SQPOLL_CAP_ENTRIES_VALUE 8
23 #define IORING_TW_CAP_ENTRIES_VALUE 8
26 IO_SQ_THREAD_SHOULD_STOP
= 0,
27 IO_SQ_THREAD_SHOULD_PARK
,
30 void io_sq_thread_unpark(struct io_sq_data
*sqd
)
31 __releases(&sqd
->lock
)
33 WARN_ON_ONCE(sqd
->thread
== current
);
36 * Do the dance but not conditional clear_bit() because it'd race with
37 * other threads incrementing park_pending and setting the bit.
39 clear_bit(IO_SQ_THREAD_SHOULD_PARK
, &sqd
->state
);
40 if (atomic_dec_return(&sqd
->park_pending
))
41 set_bit(IO_SQ_THREAD_SHOULD_PARK
, &sqd
->state
);
42 mutex_unlock(&sqd
->lock
);
46 void io_sq_thread_park(struct io_sq_data
*sqd
)
47 __acquires(&sqd
->lock
)
49 WARN_ON_ONCE(data_race(sqd
->thread
) == current
);
51 atomic_inc(&sqd
->park_pending
);
52 set_bit(IO_SQ_THREAD_SHOULD_PARK
, &sqd
->state
);
53 mutex_lock(&sqd
->lock
);
55 wake_up_process(sqd
->thread
);
58 void io_sq_thread_stop(struct io_sq_data
*sqd
)
60 WARN_ON_ONCE(sqd
->thread
== current
);
61 WARN_ON_ONCE(test_bit(IO_SQ_THREAD_SHOULD_STOP
, &sqd
->state
));
63 set_bit(IO_SQ_THREAD_SHOULD_STOP
, &sqd
->state
);
64 mutex_lock(&sqd
->lock
);
66 wake_up_process(sqd
->thread
);
67 mutex_unlock(&sqd
->lock
);
68 wait_for_completion(&sqd
->exited
);
71 void io_put_sq_data(struct io_sq_data
*sqd
)
73 if (refcount_dec_and_test(&sqd
->refs
)) {
74 WARN_ON_ONCE(atomic_read(&sqd
->park_pending
));
76 io_sq_thread_stop(sqd
);
81 static __cold
void io_sqd_update_thread_idle(struct io_sq_data
*sqd
)
83 struct io_ring_ctx
*ctx
;
84 unsigned sq_thread_idle
= 0;
86 list_for_each_entry(ctx
, &sqd
->ctx_list
, sqd_list
)
87 sq_thread_idle
= max(sq_thread_idle
, ctx
->sq_thread_idle
);
88 sqd
->sq_thread_idle
= sq_thread_idle
;
91 void io_sq_thread_finish(struct io_ring_ctx
*ctx
)
93 struct io_sq_data
*sqd
= ctx
->sq_data
;
96 io_sq_thread_park(sqd
);
97 list_del_init(&ctx
->sqd_list
);
98 io_sqd_update_thread_idle(sqd
);
99 io_sq_thread_unpark(sqd
);
106 static struct io_sq_data
*io_attach_sq_data(struct io_uring_params
*p
)
108 struct io_ring_ctx
*ctx_attach
;
109 struct io_sq_data
*sqd
;
110 CLASS(fd
, f
)(p
->wq_fd
);
113 return ERR_PTR(-ENXIO
);
114 if (!io_is_uring_fops(fd_file(f
)))
115 return ERR_PTR(-EINVAL
);
117 ctx_attach
= fd_file(f
)->private_data
;
118 sqd
= ctx_attach
->sq_data
;
120 return ERR_PTR(-EINVAL
);
121 if (sqd
->task_tgid
!= current
->tgid
)
122 return ERR_PTR(-EPERM
);
124 refcount_inc(&sqd
->refs
);
128 static struct io_sq_data
*io_get_sq_data(struct io_uring_params
*p
,
131 struct io_sq_data
*sqd
;
134 if (p
->flags
& IORING_SETUP_ATTACH_WQ
) {
135 sqd
= io_attach_sq_data(p
);
140 /* fall through for EPERM case, setup new sqd/task */
141 if (PTR_ERR(sqd
) != -EPERM
)
145 sqd
= kzalloc(sizeof(*sqd
), GFP_KERNEL
);
147 return ERR_PTR(-ENOMEM
);
149 atomic_set(&sqd
->park_pending
, 0);
150 refcount_set(&sqd
->refs
, 1);
151 INIT_LIST_HEAD(&sqd
->ctx_list
);
152 mutex_init(&sqd
->lock
);
153 init_waitqueue_head(&sqd
->wait
);
154 init_completion(&sqd
->exited
);
158 static inline bool io_sqd_events_pending(struct io_sq_data
*sqd
)
160 return READ_ONCE(sqd
->state
);
163 static int __io_sq_thread(struct io_ring_ctx
*ctx
, bool cap_entries
)
165 unsigned int to_submit
;
168 to_submit
= io_sqring_entries(ctx
);
169 /* if we're handling multiple rings, cap submit size for fairness */
170 if (cap_entries
&& to_submit
> IORING_SQPOLL_CAP_ENTRIES_VALUE
)
171 to_submit
= IORING_SQPOLL_CAP_ENTRIES_VALUE
;
173 if (to_submit
|| !wq_list_empty(&ctx
->iopoll_list
)) {
174 const struct cred
*creds
= NULL
;
176 if (ctx
->sq_creds
!= current_cred())
177 creds
= override_creds(ctx
->sq_creds
);
179 mutex_lock(&ctx
->uring_lock
);
180 if (!wq_list_empty(&ctx
->iopoll_list
))
181 io_do_iopoll(ctx
, true);
184 * Don't submit if refs are dying, good for io_uring_register(),
185 * but also it is relied upon by io_ring_exit_work()
187 if (to_submit
&& likely(!percpu_ref_is_dying(&ctx
->refs
)) &&
188 !(ctx
->flags
& IORING_SETUP_R_DISABLED
))
189 ret
= io_submit_sqes(ctx
, to_submit
);
190 mutex_unlock(&ctx
->uring_lock
);
192 if (to_submit
&& wq_has_sleeper(&ctx
->sqo_sq_wait
))
193 wake_up(&ctx
->sqo_sq_wait
);
201 static bool io_sqd_handle_event(struct io_sq_data
*sqd
)
203 bool did_sig
= false;
206 if (test_bit(IO_SQ_THREAD_SHOULD_PARK
, &sqd
->state
) ||
207 signal_pending(current
)) {
208 mutex_unlock(&sqd
->lock
);
209 if (signal_pending(current
))
210 did_sig
= get_signal(&ksig
);
211 wait_event(sqd
->wait
, !atomic_read(&sqd
->park_pending
));
212 mutex_lock(&sqd
->lock
);
213 sqd
->sq_cpu
= raw_smp_processor_id();
215 return did_sig
|| test_bit(IO_SQ_THREAD_SHOULD_STOP
, &sqd
->state
);
219 * Run task_work, processing the retry_list first. The retry_list holds
220 * entries that we passed on in the previous run, if we had more task_work
221 * than we were asked to process. Newly queued task_work isn't run until the
222 * retry list has been fully processed.
224 static unsigned int io_sq_tw(struct llist_node
**retry_list
, int max_entries
)
226 struct io_uring_task
*tctx
= current
->io_uring
;
227 unsigned int count
= 0;
230 *retry_list
= io_handle_tw_list(*retry_list
, &count
, max_entries
);
231 if (count
>= max_entries
)
233 max_entries
-= count
;
235 *retry_list
= tctx_task_work_run(tctx
, max_entries
, &count
);
237 if (task_work_pending(current
))
242 static bool io_sq_tw_pending(struct llist_node
*retry_list
)
244 struct io_uring_task
*tctx
= current
->io_uring
;
246 return retry_list
|| !llist_empty(&tctx
->task_list
);
249 static void io_sq_update_worktime(struct io_sq_data
*sqd
, struct rusage
*start
)
253 getrusage(current
, RUSAGE_SELF
, &end
);
254 end
.ru_stime
.tv_sec
-= start
->ru_stime
.tv_sec
;
255 end
.ru_stime
.tv_usec
-= start
->ru_stime
.tv_usec
;
257 sqd
->work_time
+= end
.ru_stime
.tv_usec
+ end
.ru_stime
.tv_sec
* 1000000;
260 static int io_sq_thread(void *data
)
262 struct llist_node
*retry_list
= NULL
;
263 struct io_sq_data
*sqd
= data
;
264 struct io_ring_ctx
*ctx
;
266 unsigned long timeout
= 0;
267 char buf
[TASK_COMM_LEN
];
270 /* offload context creation failed, just exit */
271 if (!current
->io_uring
)
274 snprintf(buf
, sizeof(buf
), "iou-sqp-%d", sqd
->task_pid
);
275 set_task_comm(current
, buf
);
277 /* reset to our pid after we've set task_comm, for fdinfo */
278 sqd
->task_pid
= current
->pid
;
280 if (sqd
->sq_cpu
!= -1) {
281 set_cpus_allowed_ptr(current
, cpumask_of(sqd
->sq_cpu
));
283 set_cpus_allowed_ptr(current
, cpu_online_mask
);
284 sqd
->sq_cpu
= raw_smp_processor_id();
288 * Force audit context to get setup, in case we do prep side async
289 * operations that would trigger an audit call before any issue side
290 * audit has been done.
292 audit_uring_entry(IORING_OP_NOP
);
293 audit_uring_exit(true, 0);
295 mutex_lock(&sqd
->lock
);
297 bool cap_entries
, sqt_spin
= false;
299 if (io_sqd_events_pending(sqd
) || signal_pending(current
)) {
300 if (io_sqd_handle_event(sqd
))
302 timeout
= jiffies
+ sqd
->sq_thread_idle
;
305 cap_entries
= !list_is_singular(&sqd
->ctx_list
);
306 getrusage(current
, RUSAGE_SELF
, &start
);
307 list_for_each_entry(ctx
, &sqd
->ctx_list
, sqd_list
) {
308 int ret
= __io_sq_thread(ctx
, cap_entries
);
310 if (!sqt_spin
&& (ret
> 0 || !wq_list_empty(&ctx
->iopoll_list
)))
313 if (io_sq_tw(&retry_list
, IORING_TW_CAP_ENTRIES_VALUE
))
316 list_for_each_entry(ctx
, &sqd
->ctx_list
, sqd_list
)
318 io_napi_sqpoll_busy_poll(ctx
);
320 if (sqt_spin
|| !time_after(jiffies
, timeout
)) {
322 io_sq_update_worktime(sqd
, &start
);
323 timeout
= jiffies
+ sqd
->sq_thread_idle
;
325 if (unlikely(need_resched())) {
326 mutex_unlock(&sqd
->lock
);
328 mutex_lock(&sqd
->lock
);
329 sqd
->sq_cpu
= raw_smp_processor_id();
334 prepare_to_wait(&sqd
->wait
, &wait
, TASK_INTERRUPTIBLE
);
335 if (!io_sqd_events_pending(sqd
) && !io_sq_tw_pending(retry_list
)) {
336 bool needs_sched
= true;
338 list_for_each_entry(ctx
, &sqd
->ctx_list
, sqd_list
) {
339 atomic_or(IORING_SQ_NEED_WAKEUP
,
340 &ctx
->rings
->sq_flags
);
341 if ((ctx
->flags
& IORING_SETUP_IOPOLL
) &&
342 !wq_list_empty(&ctx
->iopoll_list
)) {
348 * Ensure the store of the wakeup flag is not
349 * reordered with the load of the SQ tail
351 smp_mb__after_atomic();
353 if (io_sqring_entries(ctx
)) {
360 mutex_unlock(&sqd
->lock
);
362 mutex_lock(&sqd
->lock
);
363 sqd
->sq_cpu
= raw_smp_processor_id();
365 list_for_each_entry(ctx
, &sqd
->ctx_list
, sqd_list
)
366 atomic_andnot(IORING_SQ_NEED_WAKEUP
,
367 &ctx
->rings
->sq_flags
);
370 finish_wait(&sqd
->wait
, &wait
);
371 timeout
= jiffies
+ sqd
->sq_thread_idle
;
375 io_sq_tw(&retry_list
, UINT_MAX
);
377 io_uring_cancel_generic(true, sqd
);
379 list_for_each_entry(ctx
, &sqd
->ctx_list
, sqd_list
)
380 atomic_or(IORING_SQ_NEED_WAKEUP
, &ctx
->rings
->sq_flags
);
382 mutex_unlock(&sqd
->lock
);
384 complete(&sqd
->exited
);
388 void io_sqpoll_wait_sq(struct io_ring_ctx
*ctx
)
393 if (!io_sqring_full(ctx
))
395 prepare_to_wait(&ctx
->sqo_sq_wait
, &wait
, TASK_INTERRUPTIBLE
);
397 if (!io_sqring_full(ctx
))
400 } while (!signal_pending(current
));
402 finish_wait(&ctx
->sqo_sq_wait
, &wait
);
405 __cold
int io_sq_offload_create(struct io_ring_ctx
*ctx
,
406 struct io_uring_params
*p
)
410 /* Retain compatibility with failing for an invalid attach attempt */
411 if ((ctx
->flags
& (IORING_SETUP_ATTACH_WQ
| IORING_SETUP_SQPOLL
)) ==
412 IORING_SETUP_ATTACH_WQ
) {
413 CLASS(fd
, f
)(p
->wq_fd
);
416 if (!io_is_uring_fops(fd_file(f
)))
419 if (ctx
->flags
& IORING_SETUP_SQPOLL
) {
420 struct task_struct
*tsk
;
421 struct io_sq_data
*sqd
;
424 ret
= security_uring_sqpoll();
428 sqd
= io_get_sq_data(p
, &attached
);
434 ctx
->sq_creds
= get_current_cred();
436 ctx
->sq_thread_idle
= msecs_to_jiffies(p
->sq_thread_idle
);
437 if (!ctx
->sq_thread_idle
)
438 ctx
->sq_thread_idle
= HZ
;
440 io_sq_thread_park(sqd
);
441 list_add(&ctx
->sqd_list
, &sqd
->ctx_list
);
442 io_sqd_update_thread_idle(sqd
);
443 /* don't attach to a dying SQPOLL thread, would be racy */
444 ret
= (attached
&& !sqd
->thread
) ? -ENXIO
: 0;
445 io_sq_thread_unpark(sqd
);
452 if (p
->flags
& IORING_SETUP_SQ_AFF
) {
453 cpumask_var_t allowed_mask
;
454 int cpu
= p
->sq_thread_cpu
;
457 if (cpu
>= nr_cpu_ids
|| !cpu_online(cpu
))
460 if (!alloc_cpumask_var(&allowed_mask
, GFP_KERNEL
))
463 cpuset_cpus_allowed(current
, allowed_mask
);
464 if (!cpumask_test_cpu(cpu
, allowed_mask
)) {
465 free_cpumask_var(allowed_mask
);
468 free_cpumask_var(allowed_mask
);
474 sqd
->task_pid
= current
->pid
;
475 sqd
->task_tgid
= current
->tgid
;
476 tsk
= create_io_thread(io_sq_thread
, sqd
, NUMA_NO_NODE
);
483 ret
= io_uring_alloc_task_context(tsk
, ctx
);
484 wake_up_new_task(tsk
);
487 } else if (p
->flags
& IORING_SETUP_SQ_AFF
) {
488 /* Can't have SQ_AFF without SQPOLL */
495 complete(&ctx
->sq_data
->exited
);
497 io_sq_thread_finish(ctx
);
501 __cold
int io_sqpoll_wq_cpu_affinity(struct io_ring_ctx
*ctx
,
504 struct io_sq_data
*sqd
= ctx
->sq_data
;
508 io_sq_thread_park(sqd
);
509 /* Don't set affinity for a dying thread */
511 ret
= io_wq_cpu_affinity(sqd
->thread
->io_uring
, mask
);
512 io_sq_thread_unpark(sqd
);