1 // SPDX-License-Identifier: GPL-2.0
3 * Shared application/kernel submission and completion ring pairs, for
4 * supporting fast/efficient IO.
6 * A note on the read/write ordering memory barriers that are matched between
7 * the application and kernel side.
9 * After the application reads the CQ ring tail, it must use an
10 * appropriate smp_rmb() to pair with the smp_wmb() the kernel uses
11 * before writing the tail (using smp_load_acquire to read the tail will
12 * do). It also needs a smp_mb() before updating CQ head (ordering the
13 * entry load(s) with the head store), pairing with an implicit barrier
14 * through a control-dependency in io_get_cqring (smp_store_release to
15 * store head will do). Failure to do so could lead to reading invalid
18 * Likewise, the application must use an appropriate smp_wmb() before
19 * writing the SQ tail (ordering SQ entry stores with the tail store),
20 * which pairs with smp_load_acquire in io_get_sqring (smp_store_release
21 * to store the tail will do). And it needs a barrier ordering the SQ
22 * head load before writing new SQ entries (smp_load_acquire to read
25 * When using the SQ poll thread (IORING_SETUP_SQPOLL), the application
26 * needs to check the SQ flags for IORING_SQ_NEED_WAKEUP *after*
27 * updating the SQ tail; a full memory barrier smp_mb() is needed
30 * Also see the examples in the liburing library:
32 * git://git.kernel.dk/liburing
34 * io_uring also uses READ/WRITE_ONCE() for _any_ store or load that happens
35 * from data shared between the kernel and application. This is done both
36 * for ordering purposes, but also to ensure that once a value is loaded from
37 * data that the application could potentially modify, it remains stable.
39 * Copyright (C) 2018-2019 Jens Axboe
40 * Copyright (c) 2018-2019 Christoph Hellwig
42 #include <linux/kernel.h>
43 #include <linux/init.h>
44 #include <linux/errno.h>
45 #include <linux/syscalls.h>
46 #include <linux/compat.h>
47 #include <linux/refcount.h>
48 #include <linux/uio.h>
50 #include <linux/sched/signal.h>
52 #include <linux/file.h>
53 #include <linux/fdtable.h>
55 #include <linux/mman.h>
56 #include <linux/mmu_context.h>
57 #include <linux/percpu.h>
58 #include <linux/slab.h>
59 #include <linux/kthread.h>
60 #include <linux/blkdev.h>
61 #include <linux/bvec.h>
62 #include <linux/net.h>
64 #include <net/af_unix.h>
66 #include <linux/anon_inodes.h>
67 #include <linux/sched/mm.h>
68 #include <linux/uaccess.h>
69 #include <linux/nospec.h>
70 #include <linux/sizes.h>
71 #include <linux/hugetlb.h>
72 #include <linux/highmem.h>
74 #define CREATE_TRACE_POINTS
75 #include <trace/events/io_uring.h>
77 #include <uapi/linux/io_uring.h>
82 #define IORING_MAX_ENTRIES 32768
83 #define IORING_MAX_CQ_ENTRIES (2 * IORING_MAX_ENTRIES)
86 * Shift of 9 is 512 entries, or exactly one page on 64-bit archs
88 #define IORING_FILE_TABLE_SHIFT 9
89 #define IORING_MAX_FILES_TABLE (1U << IORING_FILE_TABLE_SHIFT)
90 #define IORING_FILE_TABLE_MASK (IORING_MAX_FILES_TABLE - 1)
91 #define IORING_MAX_FIXED_FILES (64 * IORING_MAX_FILES_TABLE)
94 u32 head ____cacheline_aligned_in_smp
;
95 u32 tail ____cacheline_aligned_in_smp
;
99 * This data is shared with the application through the mmap at offsets
100 * IORING_OFF_SQ_RING and IORING_OFF_CQ_RING.
102 * The offsets to the member fields are published through struct
103 * io_sqring_offsets when calling io_uring_setup.
107 * Head and tail offsets into the ring; the offsets need to be
108 * masked to get valid indices.
110 * The kernel controls head of the sq ring and the tail of the cq ring,
111 * and the application controls tail of the sq ring and the head of the
114 struct io_uring sq
, cq
;
116 * Bitmasks to apply to head and tail offsets (constant, equals
119 u32 sq_ring_mask
, cq_ring_mask
;
120 /* Ring sizes (constant, power of 2) */
121 u32 sq_ring_entries
, cq_ring_entries
;
123 * Number of invalid entries dropped by the kernel due to
124 * invalid index stored in array
126 * Written by the kernel, shouldn't be modified by the
127 * application (i.e. get number of "new events" by comparing to
130 * After a new SQ head value was read by the application this
131 * counter includes all submissions that were dropped reaching
132 * the new SQ head (and possibly more).
138 * Written by the kernel, shouldn't be modified by the
141 * The application needs a full memory barrier before checking
142 * for IORING_SQ_NEED_WAKEUP after updating the sq tail.
146 * Number of completion events lost because the queue was full;
147 * this should be avoided by the application by making sure
148 * there are not more requests pending than there is space in
149 * the completion queue.
151 * Written by the kernel, shouldn't be modified by the
152 * application (i.e. get number of "new events" by comparing to
155 * As completion events come in out of order this counter is not
156 * ordered with any other data.
160 * Ring buffer of completion events.
162 * The kernel writes completion events fresh every time they are
163 * produced, so the application is allowed to modify pending
166 struct io_uring_cqe cqes
[] ____cacheline_aligned_in_smp
;
169 struct io_mapped_ubuf
{
172 struct bio_vec
*bvec
;
173 unsigned int nr_bvecs
;
176 struct fixed_file_table
{
182 struct percpu_ref refs
;
183 } ____cacheline_aligned_in_smp
;
189 bool cq_overflow_flushed
;
194 * Ring buffer of indices into array of io_uring_sqe, which is
195 * mmapped by the application using the IORING_OFF_SQES offset.
197 * This indirection could e.g. be used to assign fixed
198 * io_uring_sqe entries to operations and only submit them to
199 * the queue when needed.
201 * The kernel modifies neither the indices array nor the entries
205 unsigned cached_sq_head
;
208 unsigned sq_thread_idle
;
209 unsigned cached_sq_dropped
;
210 atomic_t cached_cq_overflow
;
211 struct io_uring_sqe
*sq_sqes
;
213 struct list_head defer_list
;
214 struct list_head timeout_list
;
215 struct list_head cq_overflow_list
;
217 wait_queue_head_t inflight_wait
;
218 } ____cacheline_aligned_in_smp
;
220 struct io_rings
*rings
;
224 struct task_struct
*sqo_thread
; /* if using sq thread polling */
225 struct mm_struct
*sqo_mm
;
226 wait_queue_head_t sqo_wait
;
229 * If used, fixed file set. Writers must ensure that ->refs is dead,
230 * readers must ensure that ->refs is alive as long as the file* is
231 * used. Only updated through io_uring_register(2).
233 struct fixed_file_table
*file_table
;
234 unsigned nr_user_files
;
236 /* if used, fixed mapped user buffers */
237 unsigned nr_user_bufs
;
238 struct io_mapped_ubuf
*user_bufs
;
240 struct user_struct
*user
;
242 const struct cred
*creds
;
244 /* 0 is for ctx quiesce/reinit/free, 1 is for sqo_thread started */
245 struct completion
*completions
;
247 /* if all else fails... */
248 struct io_kiocb
*fallback_req
;
250 #if defined(CONFIG_UNIX)
251 struct socket
*ring_sock
;
255 unsigned cached_cq_tail
;
258 atomic_t cq_timeouts
;
259 struct wait_queue_head cq_wait
;
260 struct fasync_struct
*cq_fasync
;
261 struct eventfd_ctx
*cq_ev_fd
;
262 } ____cacheline_aligned_in_smp
;
265 struct mutex uring_lock
;
266 wait_queue_head_t wait
;
267 } ____cacheline_aligned_in_smp
;
270 spinlock_t completion_lock
;
271 bool poll_multi_file
;
273 * ->poll_list is protected by the ctx->uring_lock for
274 * io_uring instances that don't use IORING_SETUP_SQPOLL.
275 * For SQPOLL, only the single threaded io_sq_thread() will
276 * manipulate the list, hence no extra locking is needed there.
278 struct list_head poll_list
;
279 struct hlist_head
*cancel_hash
;
280 unsigned cancel_hash_bits
;
282 spinlock_t inflight_lock
;
283 struct list_head inflight_list
;
284 } ____cacheline_aligned_in_smp
;
288 * First field must be the file pointer in all the
289 * iocb unions! See also 'struct kiocb' in <linux/fs.h>
291 struct io_poll_iocb
{
294 struct wait_queue_head
*head
;
300 struct wait_queue_entry wait
;
303 struct io_timeout_data
{
304 struct io_kiocb
*req
;
305 struct hrtimer timer
;
306 struct timespec64 ts
;
307 enum hrtimer_mode mode
;
313 struct sockaddr __user
*addr
;
314 int __user
*addr_len
;
338 /* NOTE: kiocb has the file as the first member, so don't do it here */
346 struct sockaddr __user
*addr
;
352 struct user_msghdr __user
*msg
;
356 struct io_async_connect
{
357 struct sockaddr_storage address
;
360 struct io_async_msghdr
{
361 struct iovec fast_iov
[UIO_FASTIOV
];
363 struct sockaddr __user
*uaddr
;
368 struct iovec fast_iov
[UIO_FASTIOV
];
374 struct io_async_ctx
{
376 struct io_async_rw rw
;
377 struct io_async_msghdr msg
;
378 struct io_async_connect connect
;
379 struct io_timeout_data timeout
;
384 * NOTE! Each of the iocb union members has the file pointer
385 * as the first entry in their struct definition. So you can
386 * access the file pointer through any of the sub-structs,
387 * or directly as just 'ki_filp' in this struct.
393 struct io_poll_iocb poll
;
394 struct io_accept accept
;
396 struct io_cancel cancel
;
397 struct io_timeout timeout
;
398 struct io_connect connect
;
399 struct io_sr_msg sr_msg
;
402 struct io_async_ctx
*io
;
403 struct file
*ring_file
;
407 bool needs_fixed_file
;
410 struct io_ring_ctx
*ctx
;
412 struct list_head list
;
413 struct hlist_node hash_node
;
415 struct list_head link_list
;
418 #define REQ_F_NOWAIT 1 /* must not punt to workers */
419 #define REQ_F_IOPOLL_COMPLETED 2 /* polled IO has completed */
420 #define REQ_F_FIXED_FILE 4 /* ctx owns file */
421 #define REQ_F_LINK_NEXT 8 /* already grabbed next link */
422 #define REQ_F_IO_DRAIN 16 /* drain existing IO first */
423 #define REQ_F_IO_DRAINED 32 /* drain done */
424 #define REQ_F_LINK 64 /* linked sqes */
425 #define REQ_F_LINK_TIMEOUT 128 /* has linked timeout */
426 #define REQ_F_FAIL_LINK 256 /* fail rest of links */
427 #define REQ_F_DRAIN_LINK 512 /* link should be fully drained */
428 #define REQ_F_TIMEOUT 1024 /* timeout request */
429 #define REQ_F_ISREG 2048 /* regular file */
430 #define REQ_F_MUST_PUNT 4096 /* must be punted even for NONBLOCK */
431 #define REQ_F_TIMEOUT_NOSEQ 8192 /* no timeout sequence */
432 #define REQ_F_INFLIGHT 16384 /* on inflight list */
433 #define REQ_F_COMP_LOCKED 32768 /* completion under lock */
434 #define REQ_F_HARDLINK 65536 /* doesn't sever on completion < 0 */
439 struct list_head inflight_entry
;
441 struct io_wq_work work
;
444 #define IO_PLUG_THRESHOLD 2
445 #define IO_IOPOLL_BATCH 8
447 struct io_submit_state
{
448 struct blk_plug plug
;
451 * io_kiocb alloc cache
453 void *reqs
[IO_IOPOLL_BATCH
];
454 unsigned int free_reqs
;
455 unsigned int cur_req
;
458 * File reference cache
462 unsigned int has_refs
;
463 unsigned int used_refs
;
464 unsigned int ios_left
;
467 static void io_wq_submit_work(struct io_wq_work
**workptr
);
468 static void io_cqring_fill_event(struct io_kiocb
*req
, long res
);
469 static void __io_free_req(struct io_kiocb
*req
);
470 static void io_put_req(struct io_kiocb
*req
);
471 static void io_double_put_req(struct io_kiocb
*req
);
472 static void __io_double_put_req(struct io_kiocb
*req
);
473 static struct io_kiocb
*io_prep_linked_timeout(struct io_kiocb
*req
);
474 static void io_queue_linked_timeout(struct io_kiocb
*req
);
476 static struct kmem_cache
*req_cachep
;
478 static const struct file_operations io_uring_fops
;
480 struct sock
*io_uring_get_socket(struct file
*file
)
482 #if defined(CONFIG_UNIX)
483 if (file
->f_op
== &io_uring_fops
) {
484 struct io_ring_ctx
*ctx
= file
->private_data
;
486 return ctx
->ring_sock
->sk
;
491 EXPORT_SYMBOL(io_uring_get_socket
);
493 static void io_ring_ctx_ref_free(struct percpu_ref
*ref
)
495 struct io_ring_ctx
*ctx
= container_of(ref
, struct io_ring_ctx
, refs
);
497 complete(&ctx
->completions
[0]);
500 static struct io_ring_ctx
*io_ring_ctx_alloc(struct io_uring_params
*p
)
502 struct io_ring_ctx
*ctx
;
505 ctx
= kzalloc(sizeof(*ctx
), GFP_KERNEL
);
509 ctx
->fallback_req
= kmem_cache_alloc(req_cachep
, GFP_KERNEL
);
510 if (!ctx
->fallback_req
)
513 ctx
->completions
= kmalloc(2 * sizeof(struct completion
), GFP_KERNEL
);
514 if (!ctx
->completions
)
518 * Use 5 bits less than the max cq entries, that should give us around
519 * 32 entries per hash list if totally full and uniformly spread.
521 hash_bits
= ilog2(p
->cq_entries
);
525 ctx
->cancel_hash_bits
= hash_bits
;
526 ctx
->cancel_hash
= kmalloc((1U << hash_bits
) * sizeof(struct hlist_head
),
528 if (!ctx
->cancel_hash
)
530 __hash_init(ctx
->cancel_hash
, 1U << hash_bits
);
532 if (percpu_ref_init(&ctx
->refs
, io_ring_ctx_ref_free
,
533 PERCPU_REF_ALLOW_REINIT
, GFP_KERNEL
))
536 ctx
->flags
= p
->flags
;
537 init_waitqueue_head(&ctx
->cq_wait
);
538 INIT_LIST_HEAD(&ctx
->cq_overflow_list
);
539 init_completion(&ctx
->completions
[0]);
540 init_completion(&ctx
->completions
[1]);
541 mutex_init(&ctx
->uring_lock
);
542 init_waitqueue_head(&ctx
->wait
);
543 spin_lock_init(&ctx
->completion_lock
);
544 INIT_LIST_HEAD(&ctx
->poll_list
);
545 INIT_LIST_HEAD(&ctx
->defer_list
);
546 INIT_LIST_HEAD(&ctx
->timeout_list
);
547 init_waitqueue_head(&ctx
->inflight_wait
);
548 spin_lock_init(&ctx
->inflight_lock
);
549 INIT_LIST_HEAD(&ctx
->inflight_list
);
552 if (ctx
->fallback_req
)
553 kmem_cache_free(req_cachep
, ctx
->fallback_req
);
554 kfree(ctx
->completions
);
555 kfree(ctx
->cancel_hash
);
560 static inline bool __req_need_defer(struct io_kiocb
*req
)
562 struct io_ring_ctx
*ctx
= req
->ctx
;
564 return req
->sequence
!= ctx
->cached_cq_tail
+ ctx
->cached_sq_dropped
565 + atomic_read(&ctx
->cached_cq_overflow
);
568 static inline bool req_need_defer(struct io_kiocb
*req
)
570 if ((req
->flags
& (REQ_F_IO_DRAIN
|REQ_F_IO_DRAINED
)) == REQ_F_IO_DRAIN
)
571 return __req_need_defer(req
);
576 static struct io_kiocb
*io_get_deferred_req(struct io_ring_ctx
*ctx
)
578 struct io_kiocb
*req
;
580 req
= list_first_entry_or_null(&ctx
->defer_list
, struct io_kiocb
, list
);
581 if (req
&& !req_need_defer(req
)) {
582 list_del_init(&req
->list
);
589 static struct io_kiocb
*io_get_timeout_req(struct io_ring_ctx
*ctx
)
591 struct io_kiocb
*req
;
593 req
= list_first_entry_or_null(&ctx
->timeout_list
, struct io_kiocb
, list
);
595 if (req
->flags
& REQ_F_TIMEOUT_NOSEQ
)
597 if (!__req_need_defer(req
)) {
598 list_del_init(&req
->list
);
606 static void __io_commit_cqring(struct io_ring_ctx
*ctx
)
608 struct io_rings
*rings
= ctx
->rings
;
610 if (ctx
->cached_cq_tail
!= READ_ONCE(rings
->cq
.tail
)) {
611 /* order cqe stores with ring update */
612 smp_store_release(&rings
->cq
.tail
, ctx
->cached_cq_tail
);
614 if (wq_has_sleeper(&ctx
->cq_wait
)) {
615 wake_up_interruptible(&ctx
->cq_wait
);
616 kill_fasync(&ctx
->cq_fasync
, SIGIO
, POLL_IN
);
621 static inline bool io_req_needs_user(struct io_kiocb
*req
)
623 return !(req
->opcode
== IORING_OP_READ_FIXED
||
624 req
->opcode
== IORING_OP_WRITE_FIXED
);
627 static inline bool io_prep_async_work(struct io_kiocb
*req
,
628 struct io_kiocb
**link
)
630 bool do_hashed
= false;
632 switch (req
->opcode
) {
633 case IORING_OP_WRITEV
:
634 case IORING_OP_WRITE_FIXED
:
635 /* only regular files should be hashed for writes */
636 if (req
->flags
& REQ_F_ISREG
)
639 case IORING_OP_READV
:
640 case IORING_OP_READ_FIXED
:
641 case IORING_OP_SENDMSG
:
642 case IORING_OP_RECVMSG
:
643 case IORING_OP_ACCEPT
:
644 case IORING_OP_POLL_ADD
:
645 case IORING_OP_CONNECT
:
647 * We know REQ_F_ISREG is not set on some of these
648 * opcodes, but this enables us to keep the check in
651 if (!(req
->flags
& REQ_F_ISREG
))
652 req
->work
.flags
|= IO_WQ_WORK_UNBOUND
;
655 if (io_req_needs_user(req
))
656 req
->work
.flags
|= IO_WQ_WORK_NEEDS_USER
;
658 *link
= io_prep_linked_timeout(req
);
662 static inline void io_queue_async_work(struct io_kiocb
*req
)
664 struct io_ring_ctx
*ctx
= req
->ctx
;
665 struct io_kiocb
*link
;
668 do_hashed
= io_prep_async_work(req
, &link
);
670 trace_io_uring_queue_async_work(ctx
, do_hashed
, req
, &req
->work
,
673 io_wq_enqueue(ctx
->io_wq
, &req
->work
);
675 io_wq_enqueue_hashed(ctx
->io_wq
, &req
->work
,
676 file_inode(req
->file
));
680 io_queue_linked_timeout(link
);
683 static void io_kill_timeout(struct io_kiocb
*req
)
687 ret
= hrtimer_try_to_cancel(&req
->io
->timeout
.timer
);
689 atomic_inc(&req
->ctx
->cq_timeouts
);
690 list_del_init(&req
->list
);
691 req
->flags
|= REQ_F_COMP_LOCKED
;
692 io_cqring_fill_event(req
, 0);
697 static void io_kill_timeouts(struct io_ring_ctx
*ctx
)
699 struct io_kiocb
*req
, *tmp
;
701 spin_lock_irq(&ctx
->completion_lock
);
702 list_for_each_entry_safe(req
, tmp
, &ctx
->timeout_list
, list
)
703 io_kill_timeout(req
);
704 spin_unlock_irq(&ctx
->completion_lock
);
707 static void io_commit_cqring(struct io_ring_ctx
*ctx
)
709 struct io_kiocb
*req
;
711 while ((req
= io_get_timeout_req(ctx
)) != NULL
)
712 io_kill_timeout(req
);
714 __io_commit_cqring(ctx
);
716 while ((req
= io_get_deferred_req(ctx
)) != NULL
) {
717 req
->flags
|= REQ_F_IO_DRAINED
;
718 io_queue_async_work(req
);
722 static struct io_uring_cqe
*io_get_cqring(struct io_ring_ctx
*ctx
)
724 struct io_rings
*rings
= ctx
->rings
;
727 tail
= ctx
->cached_cq_tail
;
729 * writes to the cq entry need to come after reading head; the
730 * control dependency is enough as we're using WRITE_ONCE to
733 if (tail
- READ_ONCE(rings
->cq
.head
) == rings
->cq_ring_entries
)
736 ctx
->cached_cq_tail
++;
737 return &rings
->cqes
[tail
& ctx
->cq_mask
];
740 static inline bool io_should_trigger_evfd(struct io_ring_ctx
*ctx
)
744 if (!ctx
->eventfd_async
)
746 return io_wq_current_is_worker() || in_interrupt();
749 static void __io_cqring_ev_posted(struct io_ring_ctx
*ctx
, bool trigger_ev
)
751 if (waitqueue_active(&ctx
->wait
))
753 if (waitqueue_active(&ctx
->sqo_wait
))
754 wake_up(&ctx
->sqo_wait
);
756 eventfd_signal(ctx
->cq_ev_fd
, 1);
759 static void io_cqring_ev_posted(struct io_ring_ctx
*ctx
)
761 __io_cqring_ev_posted(ctx
, io_should_trigger_evfd(ctx
));
764 /* Returns true if there are no backlogged entries after the flush */
765 static bool io_cqring_overflow_flush(struct io_ring_ctx
*ctx
, bool force
)
767 struct io_rings
*rings
= ctx
->rings
;
768 struct io_uring_cqe
*cqe
;
769 struct io_kiocb
*req
;
774 if (list_empty_careful(&ctx
->cq_overflow_list
))
776 if ((ctx
->cached_cq_tail
- READ_ONCE(rings
->cq
.head
) ==
777 rings
->cq_ring_entries
))
781 spin_lock_irqsave(&ctx
->completion_lock
, flags
);
783 /* if force is set, the ring is going away. always drop after that */
785 ctx
->cq_overflow_flushed
= true;
788 while (!list_empty(&ctx
->cq_overflow_list
)) {
789 cqe
= io_get_cqring(ctx
);
793 req
= list_first_entry(&ctx
->cq_overflow_list
, struct io_kiocb
,
795 list_move(&req
->list
, &list
);
797 WRITE_ONCE(cqe
->user_data
, req
->user_data
);
798 WRITE_ONCE(cqe
->res
, req
->result
);
799 WRITE_ONCE(cqe
->flags
, 0);
801 WRITE_ONCE(ctx
->rings
->cq_overflow
,
802 atomic_inc_return(&ctx
->cached_cq_overflow
));
806 io_commit_cqring(ctx
);
807 spin_unlock_irqrestore(&ctx
->completion_lock
, flags
);
808 io_cqring_ev_posted(ctx
);
810 while (!list_empty(&list
)) {
811 req
= list_first_entry(&list
, struct io_kiocb
, list
);
812 list_del(&req
->list
);
819 static void io_cqring_fill_event(struct io_kiocb
*req
, long res
)
821 struct io_ring_ctx
*ctx
= req
->ctx
;
822 struct io_uring_cqe
*cqe
;
824 trace_io_uring_complete(ctx
, req
->user_data
, res
);
827 * If we can't get a cq entry, userspace overflowed the
828 * submission (by quite a lot). Increment the overflow count in
831 cqe
= io_get_cqring(ctx
);
833 WRITE_ONCE(cqe
->user_data
, req
->user_data
);
834 WRITE_ONCE(cqe
->res
, res
);
835 WRITE_ONCE(cqe
->flags
, 0);
836 } else if (ctx
->cq_overflow_flushed
) {
837 WRITE_ONCE(ctx
->rings
->cq_overflow
,
838 atomic_inc_return(&ctx
->cached_cq_overflow
));
840 refcount_inc(&req
->refs
);
842 list_add_tail(&req
->list
, &ctx
->cq_overflow_list
);
846 static void io_cqring_add_event(struct io_kiocb
*req
, long res
)
848 struct io_ring_ctx
*ctx
= req
->ctx
;
851 spin_lock_irqsave(&ctx
->completion_lock
, flags
);
852 io_cqring_fill_event(req
, res
);
853 io_commit_cqring(ctx
);
854 spin_unlock_irqrestore(&ctx
->completion_lock
, flags
);
856 io_cqring_ev_posted(ctx
);
859 static inline bool io_is_fallback_req(struct io_kiocb
*req
)
861 return req
== (struct io_kiocb
*)
862 ((unsigned long) req
->ctx
->fallback_req
& ~1UL);
865 static struct io_kiocb
*io_get_fallback_req(struct io_ring_ctx
*ctx
)
867 struct io_kiocb
*req
;
869 req
= ctx
->fallback_req
;
870 if (!test_and_set_bit_lock(0, (unsigned long *) ctx
->fallback_req
))
876 static struct io_kiocb
*io_get_req(struct io_ring_ctx
*ctx
,
877 struct io_submit_state
*state
)
879 gfp_t gfp
= GFP_KERNEL
| __GFP_NOWARN
;
880 struct io_kiocb
*req
;
882 if (!percpu_ref_tryget(&ctx
->refs
))
886 req
= kmem_cache_alloc(req_cachep
, gfp
);
889 } else if (!state
->free_reqs
) {
893 sz
= min_t(size_t, state
->ios_left
, ARRAY_SIZE(state
->reqs
));
894 ret
= kmem_cache_alloc_bulk(req_cachep
, gfp
, sz
, state
->reqs
);
897 * Bulk alloc is all-or-nothing. If we fail to get a batch,
898 * retry single alloc to be on the safe side.
900 if (unlikely(ret
<= 0)) {
901 state
->reqs
[0] = kmem_cache_alloc(req_cachep
, gfp
);
906 state
->free_reqs
= ret
- 1;
908 req
= state
->reqs
[0];
910 req
= state
->reqs
[state
->cur_req
];
917 req
->ring_file
= NULL
;
921 /* one is dropped after submission, the other at completion */
922 refcount_set(&req
->refs
, 2);
924 INIT_IO_WORK(&req
->work
, io_wq_submit_work
);
927 req
= io_get_fallback_req(ctx
);
930 percpu_ref_put(&ctx
->refs
);
934 static void io_free_req_many(struct io_ring_ctx
*ctx
, void **reqs
, int *nr
)
937 kmem_cache_free_bulk(req_cachep
, *nr
, reqs
);
938 percpu_ref_put_many(&ctx
->refs
, *nr
);
943 static void __io_free_req(struct io_kiocb
*req
)
945 struct io_ring_ctx
*ctx
= req
->ctx
;
949 if (req
->file
&& !(req
->flags
& REQ_F_FIXED_FILE
))
951 if (req
->flags
& REQ_F_INFLIGHT
) {
954 spin_lock_irqsave(&ctx
->inflight_lock
, flags
);
955 list_del(&req
->inflight_entry
);
956 if (waitqueue_active(&ctx
->inflight_wait
))
957 wake_up(&ctx
->inflight_wait
);
958 spin_unlock_irqrestore(&ctx
->inflight_lock
, flags
);
960 percpu_ref_put(&ctx
->refs
);
961 if (likely(!io_is_fallback_req(req
)))
962 kmem_cache_free(req_cachep
, req
);
964 clear_bit_unlock(0, (unsigned long *) ctx
->fallback_req
);
967 static bool io_link_cancel_timeout(struct io_kiocb
*req
)
969 struct io_ring_ctx
*ctx
= req
->ctx
;
972 ret
= hrtimer_try_to_cancel(&req
->io
->timeout
.timer
);
974 io_cqring_fill_event(req
, -ECANCELED
);
975 io_commit_cqring(ctx
);
976 req
->flags
&= ~REQ_F_LINK
;
984 static void io_req_link_next(struct io_kiocb
*req
, struct io_kiocb
**nxtptr
)
986 struct io_ring_ctx
*ctx
= req
->ctx
;
987 bool wake_ev
= false;
989 /* Already got next link */
990 if (req
->flags
& REQ_F_LINK_NEXT
)
994 * The list should never be empty when we are called here. But could
995 * potentially happen if the chain is messed up, check to be on the
998 while (!list_empty(&req
->link_list
)) {
999 struct io_kiocb
*nxt
= list_first_entry(&req
->link_list
,
1000 struct io_kiocb
, link_list
);
1002 if (unlikely((req
->flags
& REQ_F_LINK_TIMEOUT
) &&
1003 (nxt
->flags
& REQ_F_TIMEOUT
))) {
1004 list_del_init(&nxt
->link_list
);
1005 wake_ev
|= io_link_cancel_timeout(nxt
);
1006 req
->flags
&= ~REQ_F_LINK_TIMEOUT
;
1010 list_del_init(&req
->link_list
);
1011 if (!list_empty(&nxt
->link_list
))
1012 nxt
->flags
|= REQ_F_LINK
;
1017 req
->flags
|= REQ_F_LINK_NEXT
;
1019 io_cqring_ev_posted(ctx
);
1023 * Called if REQ_F_LINK is set, and we fail the head request
1025 static void io_fail_links(struct io_kiocb
*req
)
1027 struct io_ring_ctx
*ctx
= req
->ctx
;
1028 unsigned long flags
;
1030 spin_lock_irqsave(&ctx
->completion_lock
, flags
);
1032 while (!list_empty(&req
->link_list
)) {
1033 struct io_kiocb
*link
= list_first_entry(&req
->link_list
,
1034 struct io_kiocb
, link_list
);
1036 list_del_init(&link
->link_list
);
1037 trace_io_uring_fail_link(req
, link
);
1039 if ((req
->flags
& REQ_F_LINK_TIMEOUT
) &&
1040 link
->opcode
== IORING_OP_LINK_TIMEOUT
) {
1041 io_link_cancel_timeout(link
);
1043 io_cqring_fill_event(link
, -ECANCELED
);
1044 __io_double_put_req(link
);
1046 req
->flags
&= ~REQ_F_LINK_TIMEOUT
;
1049 io_commit_cqring(ctx
);
1050 spin_unlock_irqrestore(&ctx
->completion_lock
, flags
);
1051 io_cqring_ev_posted(ctx
);
1054 static void io_req_find_next(struct io_kiocb
*req
, struct io_kiocb
**nxt
)
1056 if (likely(!(req
->flags
& REQ_F_LINK
)))
1060 * If LINK is set, we have dependent requests in this chain. If we
1061 * didn't fail this request, queue the first one up, moving any other
1062 * dependencies to the next request. In case of failure, fail the rest
1065 if (req
->flags
& REQ_F_FAIL_LINK
) {
1067 } else if ((req
->flags
& (REQ_F_LINK_TIMEOUT
| REQ_F_COMP_LOCKED
)) ==
1068 REQ_F_LINK_TIMEOUT
) {
1069 struct io_ring_ctx
*ctx
= req
->ctx
;
1070 unsigned long flags
;
1073 * If this is a timeout link, we could be racing with the
1074 * timeout timer. Grab the completion lock for this case to
1075 * protect against that.
1077 spin_lock_irqsave(&ctx
->completion_lock
, flags
);
1078 io_req_link_next(req
, nxt
);
1079 spin_unlock_irqrestore(&ctx
->completion_lock
, flags
);
1081 io_req_link_next(req
, nxt
);
1085 static void io_free_req(struct io_kiocb
*req
)
1087 struct io_kiocb
*nxt
= NULL
;
1089 io_req_find_next(req
, &nxt
);
1093 io_queue_async_work(nxt
);
1097 * Drop reference to request, return next in chain (if there is one) if this
1098 * was the last reference to this request.
1100 __attribute__((nonnull
))
1101 static void io_put_req_find_next(struct io_kiocb
*req
, struct io_kiocb
**nxtptr
)
1103 if (refcount_dec_and_test(&req
->refs
)) {
1104 io_req_find_next(req
, nxtptr
);
1109 static void io_put_req(struct io_kiocb
*req
)
1111 if (refcount_dec_and_test(&req
->refs
))
1116 * Must only be used if we don't need to care about links, usually from
1117 * within the completion handling itself.
1119 static void __io_double_put_req(struct io_kiocb
*req
)
1121 /* drop both submit and complete references */
1122 if (refcount_sub_and_test(2, &req
->refs
))
1126 static void io_double_put_req(struct io_kiocb
*req
)
1128 /* drop both submit and complete references */
1129 if (refcount_sub_and_test(2, &req
->refs
))
1133 static unsigned io_cqring_events(struct io_ring_ctx
*ctx
, bool noflush
)
1135 struct io_rings
*rings
= ctx
->rings
;
1138 * noflush == true is from the waitqueue handler, just ensure we wake
1139 * up the task, and the next invocation will flush the entries. We
1140 * cannot safely to it from here.
1142 if (noflush
&& !list_empty(&ctx
->cq_overflow_list
))
1145 io_cqring_overflow_flush(ctx
, false);
1147 /* See comment at the top of this file */
1149 return READ_ONCE(rings
->cq
.tail
) - READ_ONCE(rings
->cq
.head
);
1152 static inline unsigned int io_sqring_entries(struct io_ring_ctx
*ctx
)
1154 struct io_rings
*rings
= ctx
->rings
;
1156 /* make sure SQ entry isn't read before tail */
1157 return smp_load_acquire(&rings
->sq
.tail
) - ctx
->cached_sq_head
;
1161 * Find and free completed poll iocbs
1163 static void io_iopoll_complete(struct io_ring_ctx
*ctx
, unsigned int *nr_events
,
1164 struct list_head
*done
)
1166 void *reqs
[IO_IOPOLL_BATCH
];
1167 struct io_kiocb
*req
;
1171 while (!list_empty(done
)) {
1172 req
= list_first_entry(done
, struct io_kiocb
, list
);
1173 list_del(&req
->list
);
1175 io_cqring_fill_event(req
, req
->result
);
1178 if (refcount_dec_and_test(&req
->refs
)) {
1179 /* If we're not using fixed files, we have to pair the
1180 * completion part with the file put. Use regular
1181 * completions for those, only batch free for fixed
1182 * file and non-linked commands.
1184 if (((req
->flags
& (REQ_F_FIXED_FILE
|REQ_F_LINK
)) ==
1185 REQ_F_FIXED_FILE
) && !io_is_fallback_req(req
) &&
1187 reqs
[to_free
++] = req
;
1188 if (to_free
== ARRAY_SIZE(reqs
))
1189 io_free_req_many(ctx
, reqs
, &to_free
);
1196 io_commit_cqring(ctx
);
1197 io_free_req_many(ctx
, reqs
, &to_free
);
1200 static int io_do_iopoll(struct io_ring_ctx
*ctx
, unsigned int *nr_events
,
1203 struct io_kiocb
*req
, *tmp
;
1209 * Only spin for completions if we don't have multiple devices hanging
1210 * off our complete list, and we're under the requested amount.
1212 spin
= !ctx
->poll_multi_file
&& *nr_events
< min
;
1215 list_for_each_entry_safe(req
, tmp
, &ctx
->poll_list
, list
) {
1216 struct kiocb
*kiocb
= &req
->rw
.kiocb
;
1219 * Move completed entries to our local list. If we find a
1220 * request that requires polling, break out and complete
1221 * the done list first, if we have entries there.
1223 if (req
->flags
& REQ_F_IOPOLL_COMPLETED
) {
1224 list_move_tail(&req
->list
, &done
);
1227 if (!list_empty(&done
))
1230 ret
= kiocb
->ki_filp
->f_op
->iopoll(kiocb
, spin
);
1239 if (!list_empty(&done
))
1240 io_iopoll_complete(ctx
, nr_events
, &done
);
1246 * Poll for a minimum of 'min' events. Note that if min == 0 we consider that a
1247 * non-spinning poll check - we'll still enter the driver poll loop, but only
1248 * as a non-spinning completion check.
1250 static int io_iopoll_getevents(struct io_ring_ctx
*ctx
, unsigned int *nr_events
,
1253 while (!list_empty(&ctx
->poll_list
) && !need_resched()) {
1256 ret
= io_do_iopoll(ctx
, nr_events
, min
);
1259 if (!min
|| *nr_events
>= min
)
1267 * We can't just wait for polled events to come to us, we have to actively
1268 * find and complete them.
1270 static void io_iopoll_reap_events(struct io_ring_ctx
*ctx
)
1272 if (!(ctx
->flags
& IORING_SETUP_IOPOLL
))
1275 mutex_lock(&ctx
->uring_lock
);
1276 while (!list_empty(&ctx
->poll_list
)) {
1277 unsigned int nr_events
= 0;
1279 io_iopoll_getevents(ctx
, &nr_events
, 1);
1282 * Ensure we allow local-to-the-cpu processing to take place,
1283 * in this case we need to ensure that we reap all events.
1287 mutex_unlock(&ctx
->uring_lock
);
1290 static int io_iopoll_check(struct io_ring_ctx
*ctx
, unsigned *nr_events
,
1293 int iters
= 0, ret
= 0;
1296 * We disallow the app entering submit/complete with polling, but we
1297 * still need to lock the ring to prevent racing with polled issue
1298 * that got punted to a workqueue.
1300 mutex_lock(&ctx
->uring_lock
);
1305 * Don't enter poll loop if we already have events pending.
1306 * If we do, we can potentially be spinning for commands that
1307 * already triggered a CQE (eg in error).
1309 if (io_cqring_events(ctx
, false))
1313 * If a submit got punted to a workqueue, we can have the
1314 * application entering polling for a command before it gets
1315 * issued. That app will hold the uring_lock for the duration
1316 * of the poll right here, so we need to take a breather every
1317 * now and then to ensure that the issue has a chance to add
1318 * the poll to the issued list. Otherwise we can spin here
1319 * forever, while the workqueue is stuck trying to acquire the
1322 if (!(++iters
& 7)) {
1323 mutex_unlock(&ctx
->uring_lock
);
1324 mutex_lock(&ctx
->uring_lock
);
1327 if (*nr_events
< min
)
1328 tmin
= min
- *nr_events
;
1330 ret
= io_iopoll_getevents(ctx
, nr_events
, tmin
);
1334 } while (min
&& !*nr_events
&& !need_resched());
1336 mutex_unlock(&ctx
->uring_lock
);
1340 static void kiocb_end_write(struct io_kiocb
*req
)
1343 * Tell lockdep we inherited freeze protection from submission
1346 if (req
->flags
& REQ_F_ISREG
) {
1347 struct inode
*inode
= file_inode(req
->file
);
1349 __sb_writers_acquired(inode
->i_sb
, SB_FREEZE_WRITE
);
1351 file_end_write(req
->file
);
1354 static inline void req_set_fail_links(struct io_kiocb
*req
)
1356 if ((req
->flags
& (REQ_F_LINK
| REQ_F_HARDLINK
)) == REQ_F_LINK
)
1357 req
->flags
|= REQ_F_FAIL_LINK
;
1360 static void io_complete_rw_common(struct kiocb
*kiocb
, long res
)
1362 struct io_kiocb
*req
= container_of(kiocb
, struct io_kiocb
, rw
.kiocb
);
1364 if (kiocb
->ki_flags
& IOCB_WRITE
)
1365 kiocb_end_write(req
);
1367 if (res
!= req
->result
)
1368 req_set_fail_links(req
);
1369 io_cqring_add_event(req
, res
);
1372 static void io_complete_rw(struct kiocb
*kiocb
, long res
, long res2
)
1374 struct io_kiocb
*req
= container_of(kiocb
, struct io_kiocb
, rw
.kiocb
);
1376 io_complete_rw_common(kiocb
, res
);
1380 static struct io_kiocb
*__io_complete_rw(struct kiocb
*kiocb
, long res
)
1382 struct io_kiocb
*req
= container_of(kiocb
, struct io_kiocb
, rw
.kiocb
);
1383 struct io_kiocb
*nxt
= NULL
;
1385 io_complete_rw_common(kiocb
, res
);
1386 io_put_req_find_next(req
, &nxt
);
1391 static void io_complete_rw_iopoll(struct kiocb
*kiocb
, long res
, long res2
)
1393 struct io_kiocb
*req
= container_of(kiocb
, struct io_kiocb
, rw
.kiocb
);
1395 if (kiocb
->ki_flags
& IOCB_WRITE
)
1396 kiocb_end_write(req
);
1398 if (res
!= req
->result
)
1399 req_set_fail_links(req
);
1402 req
->flags
|= REQ_F_IOPOLL_COMPLETED
;
1406 * After the iocb has been issued, it's safe to be found on the poll list.
1407 * Adding the kiocb to the list AFTER submission ensures that we don't
1408 * find it from a io_iopoll_getevents() thread before the issuer is done
1409 * accessing the kiocb cookie.
1411 static void io_iopoll_req_issued(struct io_kiocb
*req
)
1413 struct io_ring_ctx
*ctx
= req
->ctx
;
1416 * Track whether we have multiple files in our lists. This will impact
1417 * how we do polling eventually, not spinning if we're on potentially
1418 * different devices.
1420 if (list_empty(&ctx
->poll_list
)) {
1421 ctx
->poll_multi_file
= false;
1422 } else if (!ctx
->poll_multi_file
) {
1423 struct io_kiocb
*list_req
;
1425 list_req
= list_first_entry(&ctx
->poll_list
, struct io_kiocb
,
1427 if (list_req
->file
!= req
->file
)
1428 ctx
->poll_multi_file
= true;
1432 * For fast devices, IO may have already completed. If it has, add
1433 * it to the front so we find it first.
1435 if (req
->flags
& REQ_F_IOPOLL_COMPLETED
)
1436 list_add(&req
->list
, &ctx
->poll_list
);
1438 list_add_tail(&req
->list
, &ctx
->poll_list
);
1440 if ((ctx
->flags
& IORING_SETUP_SQPOLL
) &&
1441 wq_has_sleeper(&ctx
->sqo_wait
))
1442 wake_up(&ctx
->sqo_wait
);
1445 static void io_file_put(struct io_submit_state
*state
)
1448 int diff
= state
->has_refs
- state
->used_refs
;
1451 fput_many(state
->file
, diff
);
1457 * Get as many references to a file as we have IOs left in this submission,
1458 * assuming most submissions are for one file, or at least that each file
1459 * has more than one submission.
1461 static struct file
*io_file_get(struct io_submit_state
*state
, int fd
)
1467 if (state
->fd
== fd
) {
1474 state
->file
= fget_many(fd
, state
->ios_left
);
1479 state
->has_refs
= state
->ios_left
;
1480 state
->used_refs
= 1;
1486 * If we tracked the file through the SCM inflight mechanism, we could support
1487 * any file. For now, just ensure that anything potentially problematic is done
1490 static bool io_file_supports_async(struct file
*file
)
1492 umode_t mode
= file_inode(file
)->i_mode
;
1494 if (S_ISBLK(mode
) || S_ISCHR(mode
) || S_ISSOCK(mode
))
1496 if (S_ISREG(mode
) && file
->f_op
!= &io_uring_fops
)
1502 static int io_prep_rw(struct io_kiocb
*req
, const struct io_uring_sqe
*sqe
,
1503 bool force_nonblock
)
1505 struct io_ring_ctx
*ctx
= req
->ctx
;
1506 struct kiocb
*kiocb
= &req
->rw
.kiocb
;
1513 if (S_ISREG(file_inode(req
->file
)->i_mode
))
1514 req
->flags
|= REQ_F_ISREG
;
1516 kiocb
->ki_pos
= READ_ONCE(sqe
->off
);
1517 kiocb
->ki_flags
= iocb_flags(kiocb
->ki_filp
);
1518 kiocb
->ki_hint
= ki_hint_validate(file_write_hint(kiocb
->ki_filp
));
1520 ioprio
= READ_ONCE(sqe
->ioprio
);
1522 ret
= ioprio_check_cap(ioprio
);
1526 kiocb
->ki_ioprio
= ioprio
;
1528 kiocb
->ki_ioprio
= get_current_ioprio();
1530 ret
= kiocb_set_rw_flags(kiocb
, READ_ONCE(sqe
->rw_flags
));
1534 /* don't allow async punt if RWF_NOWAIT was requested */
1535 if ((kiocb
->ki_flags
& IOCB_NOWAIT
) ||
1536 (req
->file
->f_flags
& O_NONBLOCK
))
1537 req
->flags
|= REQ_F_NOWAIT
;
1540 kiocb
->ki_flags
|= IOCB_NOWAIT
;
1542 if (ctx
->flags
& IORING_SETUP_IOPOLL
) {
1543 if (!(kiocb
->ki_flags
& IOCB_DIRECT
) ||
1544 !kiocb
->ki_filp
->f_op
->iopoll
)
1547 kiocb
->ki_flags
|= IOCB_HIPRI
;
1548 kiocb
->ki_complete
= io_complete_rw_iopoll
;
1551 if (kiocb
->ki_flags
& IOCB_HIPRI
)
1553 kiocb
->ki_complete
= io_complete_rw
;
1556 req
->rw
.addr
= READ_ONCE(sqe
->addr
);
1557 req
->rw
.len
= READ_ONCE(sqe
->len
);
1558 /* we own ->private, reuse it for the buffer index */
1559 req
->rw
.kiocb
.private = (void *) (unsigned long)
1560 READ_ONCE(sqe
->buf_index
);
1564 static inline void io_rw_done(struct kiocb
*kiocb
, ssize_t ret
)
1570 case -ERESTARTNOINTR
:
1571 case -ERESTARTNOHAND
:
1572 case -ERESTART_RESTARTBLOCK
:
1574 * We can't just restart the syscall, since previously
1575 * submitted sqes may already be in progress. Just fail this
1581 kiocb
->ki_complete(kiocb
, ret
, 0);
1585 static void kiocb_done(struct kiocb
*kiocb
, ssize_t ret
, struct io_kiocb
**nxt
,
1588 if (in_async
&& ret
>= 0 && kiocb
->ki_complete
== io_complete_rw
)
1589 *nxt
= __io_complete_rw(kiocb
, ret
);
1591 io_rw_done(kiocb
, ret
);
1594 static ssize_t
io_import_fixed(struct io_kiocb
*req
, int rw
,
1595 struct iov_iter
*iter
)
1597 struct io_ring_ctx
*ctx
= req
->ctx
;
1598 size_t len
= req
->rw
.len
;
1599 struct io_mapped_ubuf
*imu
;
1600 unsigned index
, buf_index
;
1604 /* attempt to use fixed buffers without having provided iovecs */
1605 if (unlikely(!ctx
->user_bufs
))
1608 buf_index
= (unsigned long) req
->rw
.kiocb
.private;
1609 if (unlikely(buf_index
>= ctx
->nr_user_bufs
))
1612 index
= array_index_nospec(buf_index
, ctx
->nr_user_bufs
);
1613 imu
= &ctx
->user_bufs
[index
];
1614 buf_addr
= req
->rw
.addr
;
1617 if (buf_addr
+ len
< buf_addr
)
1619 /* not inside the mapped region */
1620 if (buf_addr
< imu
->ubuf
|| buf_addr
+ len
> imu
->ubuf
+ imu
->len
)
1624 * May not be a start of buffer, set size appropriately
1625 * and advance us to the beginning.
1627 offset
= buf_addr
- imu
->ubuf
;
1628 iov_iter_bvec(iter
, rw
, imu
->bvec
, imu
->nr_bvecs
, offset
+ len
);
1632 * Don't use iov_iter_advance() here, as it's really slow for
1633 * using the latter parts of a big fixed buffer - it iterates
1634 * over each segment manually. We can cheat a bit here, because
1637 * 1) it's a BVEC iter, we set it up
1638 * 2) all bvecs are PAGE_SIZE in size, except potentially the
1639 * first and last bvec
1641 * So just find our index, and adjust the iterator afterwards.
1642 * If the offset is within the first bvec (or the whole first
1643 * bvec, just use iov_iter_advance(). This makes it easier
1644 * since we can just skip the first segment, which may not
1645 * be PAGE_SIZE aligned.
1647 const struct bio_vec
*bvec
= imu
->bvec
;
1649 if (offset
<= bvec
->bv_len
) {
1650 iov_iter_advance(iter
, offset
);
1652 unsigned long seg_skip
;
1654 /* skip first vec */
1655 offset
-= bvec
->bv_len
;
1656 seg_skip
= 1 + (offset
>> PAGE_SHIFT
);
1658 iter
->bvec
= bvec
+ seg_skip
;
1659 iter
->nr_segs
-= seg_skip
;
1660 iter
->count
-= bvec
->bv_len
+ offset
;
1661 iter
->iov_offset
= offset
& ~PAGE_MASK
;
1668 static ssize_t
io_import_iovec(int rw
, struct io_kiocb
*req
,
1669 struct iovec
**iovec
, struct iov_iter
*iter
)
1671 void __user
*buf
= u64_to_user_ptr(req
->rw
.addr
);
1672 size_t sqe_len
= req
->rw
.len
;
1675 opcode
= req
->opcode
;
1676 if (opcode
== IORING_OP_READ_FIXED
|| opcode
== IORING_OP_WRITE_FIXED
) {
1678 return io_import_fixed(req
, rw
, iter
);
1681 /* buffer index only valid with fixed read/write */
1682 if (req
->rw
.kiocb
.private)
1686 struct io_async_rw
*iorw
= &req
->io
->rw
;
1689 iov_iter_init(iter
, rw
, *iovec
, iorw
->nr_segs
, iorw
->size
);
1690 if (iorw
->iov
== iorw
->fast_iov
)
1698 #ifdef CONFIG_COMPAT
1699 if (req
->ctx
->compat
)
1700 return compat_import_iovec(rw
, buf
, sqe_len
, UIO_FASTIOV
,
1704 return import_iovec(rw
, buf
, sqe_len
, UIO_FASTIOV
, iovec
, iter
);
1708 * For files that don't have ->read_iter() and ->write_iter(), handle them
1709 * by looping over ->read() or ->write() manually.
1711 static ssize_t
loop_rw_iter(int rw
, struct file
*file
, struct kiocb
*kiocb
,
1712 struct iov_iter
*iter
)
1717 * Don't support polled IO through this interface, and we can't
1718 * support non-blocking either. For the latter, this just causes
1719 * the kiocb to be handled from an async context.
1721 if (kiocb
->ki_flags
& IOCB_HIPRI
)
1723 if (kiocb
->ki_flags
& IOCB_NOWAIT
)
1726 while (iov_iter_count(iter
)) {
1730 if (!iov_iter_is_bvec(iter
)) {
1731 iovec
= iov_iter_iovec(iter
);
1733 /* fixed buffers import bvec */
1734 iovec
.iov_base
= kmap(iter
->bvec
->bv_page
)
1736 iovec
.iov_len
= min(iter
->count
,
1737 iter
->bvec
->bv_len
- iter
->iov_offset
);
1741 nr
= file
->f_op
->read(file
, iovec
.iov_base
,
1742 iovec
.iov_len
, &kiocb
->ki_pos
);
1744 nr
= file
->f_op
->write(file
, iovec
.iov_base
,
1745 iovec
.iov_len
, &kiocb
->ki_pos
);
1748 if (iov_iter_is_bvec(iter
))
1749 kunmap(iter
->bvec
->bv_page
);
1757 if (nr
!= iovec
.iov_len
)
1759 iov_iter_advance(iter
, nr
);
1765 static void io_req_map_rw(struct io_kiocb
*req
, ssize_t io_size
,
1766 struct iovec
*iovec
, struct iovec
*fast_iov
,
1767 struct iov_iter
*iter
)
1769 req
->io
->rw
.nr_segs
= iter
->nr_segs
;
1770 req
->io
->rw
.size
= io_size
;
1771 req
->io
->rw
.iov
= iovec
;
1772 if (!req
->io
->rw
.iov
) {
1773 req
->io
->rw
.iov
= req
->io
->rw
.fast_iov
;
1774 memcpy(req
->io
->rw
.iov
, fast_iov
,
1775 sizeof(struct iovec
) * iter
->nr_segs
);
1779 static int io_alloc_async_ctx(struct io_kiocb
*req
)
1781 req
->io
= kmalloc(sizeof(*req
->io
), GFP_KERNEL
);
1782 return req
->io
== NULL
;
1785 static int io_setup_async_rw(struct io_kiocb
*req
, ssize_t io_size
,
1786 struct iovec
*iovec
, struct iovec
*fast_iov
,
1787 struct iov_iter
*iter
)
1789 if (req
->opcode
== IORING_OP_READ_FIXED
||
1790 req
->opcode
== IORING_OP_WRITE_FIXED
)
1793 if (io_alloc_async_ctx(req
))
1796 io_req_map_rw(req
, io_size
, iovec
, fast_iov
, iter
);
1801 static int io_read_prep(struct io_kiocb
*req
, const struct io_uring_sqe
*sqe
,
1802 bool force_nonblock
)
1804 struct io_async_ctx
*io
;
1805 struct iov_iter iter
;
1808 ret
= io_prep_rw(req
, sqe
, force_nonblock
);
1812 if (unlikely(!(req
->file
->f_mode
& FMODE_READ
)))
1819 io
->rw
.iov
= io
->rw
.fast_iov
;
1821 ret
= io_import_iovec(READ
, req
, &io
->rw
.iov
, &iter
);
1826 io_req_map_rw(req
, ret
, io
->rw
.iov
, io
->rw
.fast_iov
, &iter
);
1830 static int io_read(struct io_kiocb
*req
, struct io_kiocb
**nxt
,
1831 bool force_nonblock
)
1833 struct iovec inline_vecs
[UIO_FASTIOV
], *iovec
= inline_vecs
;
1834 struct kiocb
*kiocb
= &req
->rw
.kiocb
;
1835 struct iov_iter iter
;
1837 ssize_t io_size
, ret
;
1839 ret
= io_import_iovec(READ
, req
, &iovec
, &iter
);
1843 /* Ensure we clear previously set non-block flag */
1844 if (!force_nonblock
)
1845 req
->rw
.kiocb
.ki_flags
&= ~IOCB_NOWAIT
;
1849 if (req
->flags
& REQ_F_LINK
)
1850 req
->result
= io_size
;
1853 * If the file doesn't support async, mark it as REQ_F_MUST_PUNT so
1854 * we know to async punt it even if it was opened O_NONBLOCK
1856 if (force_nonblock
&& !io_file_supports_async(req
->file
)) {
1857 req
->flags
|= REQ_F_MUST_PUNT
;
1861 iov_count
= iov_iter_count(&iter
);
1862 ret
= rw_verify_area(READ
, req
->file
, &kiocb
->ki_pos
, iov_count
);
1866 if (req
->file
->f_op
->read_iter
)
1867 ret2
= call_read_iter(req
->file
, kiocb
, &iter
);
1869 ret2
= loop_rw_iter(READ
, req
->file
, kiocb
, &iter
);
1871 /* Catch -EAGAIN return for forced non-blocking submission */
1872 if (!force_nonblock
|| ret2
!= -EAGAIN
) {
1873 kiocb_done(kiocb
, ret2
, nxt
, req
->in_async
);
1876 ret
= io_setup_async_rw(req
, io_size
, iovec
,
1877 inline_vecs
, &iter
);
1888 static int io_write_prep(struct io_kiocb
*req
, const struct io_uring_sqe
*sqe
,
1889 bool force_nonblock
)
1891 struct io_async_ctx
*io
;
1892 struct iov_iter iter
;
1895 ret
= io_prep_rw(req
, sqe
, force_nonblock
);
1899 if (unlikely(!(req
->file
->f_mode
& FMODE_WRITE
)))
1906 io
->rw
.iov
= io
->rw
.fast_iov
;
1908 ret
= io_import_iovec(WRITE
, req
, &io
->rw
.iov
, &iter
);
1913 io_req_map_rw(req
, ret
, io
->rw
.iov
, io
->rw
.fast_iov
, &iter
);
1917 static int io_write(struct io_kiocb
*req
, struct io_kiocb
**nxt
,
1918 bool force_nonblock
)
1920 struct iovec inline_vecs
[UIO_FASTIOV
], *iovec
= inline_vecs
;
1921 struct kiocb
*kiocb
= &req
->rw
.kiocb
;
1922 struct iov_iter iter
;
1924 ssize_t ret
, io_size
;
1926 ret
= io_import_iovec(WRITE
, req
, &iovec
, &iter
);
1930 /* Ensure we clear previously set non-block flag */
1931 if (!force_nonblock
)
1932 req
->rw
.kiocb
.ki_flags
&= ~IOCB_NOWAIT
;
1936 if (req
->flags
& REQ_F_LINK
)
1937 req
->result
= io_size
;
1940 * If the file doesn't support async, mark it as REQ_F_MUST_PUNT so
1941 * we know to async punt it even if it was opened O_NONBLOCK
1943 if (force_nonblock
&& !io_file_supports_async(req
->file
)) {
1944 req
->flags
|= REQ_F_MUST_PUNT
;
1948 /* file path doesn't support NOWAIT for non-direct_IO */
1949 if (force_nonblock
&& !(kiocb
->ki_flags
& IOCB_DIRECT
) &&
1950 (req
->flags
& REQ_F_ISREG
))
1953 iov_count
= iov_iter_count(&iter
);
1954 ret
= rw_verify_area(WRITE
, req
->file
, &kiocb
->ki_pos
, iov_count
);
1959 * Open-code file_start_write here to grab freeze protection,
1960 * which will be released by another thread in
1961 * io_complete_rw(). Fool lockdep by telling it the lock got
1962 * released so that it doesn't complain about the held lock when
1963 * we return to userspace.
1965 if (req
->flags
& REQ_F_ISREG
) {
1966 __sb_start_write(file_inode(req
->file
)->i_sb
,
1967 SB_FREEZE_WRITE
, true);
1968 __sb_writers_release(file_inode(req
->file
)->i_sb
,
1971 kiocb
->ki_flags
|= IOCB_WRITE
;
1973 if (req
->file
->f_op
->write_iter
)
1974 ret2
= call_write_iter(req
->file
, kiocb
, &iter
);
1976 ret2
= loop_rw_iter(WRITE
, req
->file
, kiocb
, &iter
);
1978 * Raw bdev writes will -EOPNOTSUPP for IOCB_NOWAIT. Just
1979 * retry them without IOCB_NOWAIT.
1981 if (ret2
== -EOPNOTSUPP
&& (kiocb
->ki_flags
& IOCB_NOWAIT
))
1983 if (!force_nonblock
|| ret2
!= -EAGAIN
) {
1984 kiocb_done(kiocb
, ret2
, nxt
, req
->in_async
);
1987 ret
= io_setup_async_rw(req
, io_size
, iovec
,
1988 inline_vecs
, &iter
);
2000 * IORING_OP_NOP just posts a completion event, nothing else.
2002 static int io_nop(struct io_kiocb
*req
)
2004 struct io_ring_ctx
*ctx
= req
->ctx
;
2006 if (unlikely(ctx
->flags
& IORING_SETUP_IOPOLL
))
2009 io_cqring_add_event(req
, 0);
2014 static int io_prep_fsync(struct io_kiocb
*req
, const struct io_uring_sqe
*sqe
)
2016 struct io_ring_ctx
*ctx
= req
->ctx
;
2021 if (unlikely(ctx
->flags
& IORING_SETUP_IOPOLL
))
2023 if (unlikely(sqe
->addr
|| sqe
->ioprio
|| sqe
->buf_index
))
2026 req
->sync
.flags
= READ_ONCE(sqe
->fsync_flags
);
2027 if (unlikely(req
->sync
.flags
& ~IORING_FSYNC_DATASYNC
))
2030 req
->sync
.off
= READ_ONCE(sqe
->off
);
2031 req
->sync
.len
= READ_ONCE(sqe
->len
);
2035 static bool io_req_cancelled(struct io_kiocb
*req
)
2037 if (req
->work
.flags
& IO_WQ_WORK_CANCEL
) {
2038 req_set_fail_links(req
);
2039 io_cqring_add_event(req
, -ECANCELED
);
2047 static void io_link_work_cb(struct io_wq_work
**workptr
)
2049 struct io_wq_work
*work
= *workptr
;
2050 struct io_kiocb
*link
= work
->data
;
2052 io_queue_linked_timeout(link
);
2053 work
->func
= io_wq_submit_work
;
2056 static void io_wq_assign_next(struct io_wq_work
**workptr
, struct io_kiocb
*nxt
)
2058 struct io_kiocb
*link
;
2060 io_prep_async_work(nxt
, &link
);
2061 *workptr
= &nxt
->work
;
2063 nxt
->work
.flags
|= IO_WQ_WORK_CB
;
2064 nxt
->work
.func
= io_link_work_cb
;
2065 nxt
->work
.data
= link
;
2069 static void io_fsync_finish(struct io_wq_work
**workptr
)
2071 struct io_kiocb
*req
= container_of(*workptr
, struct io_kiocb
, work
);
2072 loff_t end
= req
->sync
.off
+ req
->sync
.len
;
2073 struct io_kiocb
*nxt
= NULL
;
2076 if (io_req_cancelled(req
))
2079 ret
= vfs_fsync_range(req
->file
, req
->sync
.off
,
2080 end
> 0 ? end
: LLONG_MAX
,
2081 req
->sync
.flags
& IORING_FSYNC_DATASYNC
);
2083 req_set_fail_links(req
);
2084 io_cqring_add_event(req
, ret
);
2085 io_put_req_find_next(req
, &nxt
);
2087 io_wq_assign_next(workptr
, nxt
);
2090 static int io_fsync(struct io_kiocb
*req
, struct io_kiocb
**nxt
,
2091 bool force_nonblock
)
2093 struct io_wq_work
*work
, *old_work
;
2095 /* fsync always requires a blocking context */
2096 if (force_nonblock
) {
2098 req
->work
.func
= io_fsync_finish
;
2102 work
= old_work
= &req
->work
;
2103 io_fsync_finish(&work
);
2104 if (work
&& work
!= old_work
)
2105 *nxt
= container_of(work
, struct io_kiocb
, work
);
2109 static int io_prep_sfr(struct io_kiocb
*req
, const struct io_uring_sqe
*sqe
)
2111 struct io_ring_ctx
*ctx
= req
->ctx
;
2116 if (unlikely(ctx
->flags
& IORING_SETUP_IOPOLL
))
2118 if (unlikely(sqe
->addr
|| sqe
->ioprio
|| sqe
->buf_index
))
2121 req
->sync
.off
= READ_ONCE(sqe
->off
);
2122 req
->sync
.len
= READ_ONCE(sqe
->len
);
2123 req
->sync
.flags
= READ_ONCE(sqe
->sync_range_flags
);
2127 static void io_sync_file_range_finish(struct io_wq_work
**workptr
)
2129 struct io_kiocb
*req
= container_of(*workptr
, struct io_kiocb
, work
);
2130 struct io_kiocb
*nxt
= NULL
;
2133 if (io_req_cancelled(req
))
2136 ret
= sync_file_range(req
->file
, req
->sync
.off
, req
->sync
.len
,
2139 req_set_fail_links(req
);
2140 io_cqring_add_event(req
, ret
);
2141 io_put_req_find_next(req
, &nxt
);
2143 io_wq_assign_next(workptr
, nxt
);
2146 static int io_sync_file_range(struct io_kiocb
*req
, struct io_kiocb
**nxt
,
2147 bool force_nonblock
)
2149 struct io_wq_work
*work
, *old_work
;
2151 /* sync_file_range always requires a blocking context */
2152 if (force_nonblock
) {
2154 req
->work
.func
= io_sync_file_range_finish
;
2158 work
= old_work
= &req
->work
;
2159 io_sync_file_range_finish(&work
);
2160 if (work
&& work
!= old_work
)
2161 *nxt
= container_of(work
, struct io_kiocb
, work
);
2165 static int io_sendmsg_prep(struct io_kiocb
*req
, const struct io_uring_sqe
*sqe
)
2167 #if defined(CONFIG_NET)
2168 struct io_sr_msg
*sr
= &req
->sr_msg
;
2169 struct io_async_ctx
*io
= req
->io
;
2171 sr
->msg_flags
= READ_ONCE(sqe
->msg_flags
);
2172 sr
->msg
= u64_to_user_ptr(READ_ONCE(sqe
->addr
));
2174 #ifdef CONFIG_COMPAT
2175 if (req
->ctx
->compat
)
2176 sr
->msg_flags
|= MSG_CMSG_COMPAT
;
2182 io
->msg
.iov
= io
->msg
.fast_iov
;
2183 return sendmsg_copy_msghdr(&io
->msg
.msg
, sr
->msg
, sr
->msg_flags
,
2190 static int io_sendmsg(struct io_kiocb
*req
, struct io_kiocb
**nxt
,
2191 bool force_nonblock
)
2193 #if defined(CONFIG_NET)
2194 struct io_async_msghdr
*kmsg
= NULL
;
2195 struct socket
*sock
;
2198 if (unlikely(req
->ctx
->flags
& IORING_SETUP_IOPOLL
))
2201 sock
= sock_from_file(req
->file
, &ret
);
2203 struct io_async_ctx io
;
2204 struct sockaddr_storage addr
;
2208 kmsg
= &req
->io
->msg
;
2209 kmsg
->msg
.msg_name
= &addr
;
2210 /* if iov is set, it's allocated already */
2212 kmsg
->iov
= kmsg
->fast_iov
;
2213 kmsg
->msg
.msg_iter
.iov
= kmsg
->iov
;
2215 struct io_sr_msg
*sr
= &req
->sr_msg
;
2218 kmsg
->msg
.msg_name
= &addr
;
2220 io
.msg
.iov
= io
.msg
.fast_iov
;
2221 ret
= sendmsg_copy_msghdr(&io
.msg
.msg
, sr
->msg
,
2222 sr
->msg_flags
, &io
.msg
.iov
);
2227 flags
= req
->sr_msg
.msg_flags
;
2228 if (flags
& MSG_DONTWAIT
)
2229 req
->flags
|= REQ_F_NOWAIT
;
2230 else if (force_nonblock
)
2231 flags
|= MSG_DONTWAIT
;
2233 ret
= __sys_sendmsg_sock(sock
, &kmsg
->msg
, flags
);
2234 if (force_nonblock
&& ret
== -EAGAIN
) {
2237 if (io_alloc_async_ctx(req
)) {
2238 if (kmsg
&& kmsg
->iov
!= kmsg
->fast_iov
)
2242 memcpy(&req
->io
->msg
, &io
.msg
, sizeof(io
.msg
));
2245 if (ret
== -ERESTARTSYS
)
2249 if (kmsg
&& kmsg
->iov
!= kmsg
->fast_iov
)
2251 io_cqring_add_event(req
, ret
);
2253 req_set_fail_links(req
);
2254 io_put_req_find_next(req
, nxt
);
2261 static int io_recvmsg_prep(struct io_kiocb
*req
,
2262 const struct io_uring_sqe
*sqe
)
2264 #if defined(CONFIG_NET)
2265 struct io_sr_msg
*sr
= &req
->sr_msg
;
2266 struct io_async_ctx
*io
= req
->io
;
2268 sr
->msg_flags
= READ_ONCE(sqe
->msg_flags
);
2269 sr
->msg
= u64_to_user_ptr(READ_ONCE(sqe
->addr
));
2271 #ifdef CONFIG_COMPAT
2272 if (req
->ctx
->compat
)
2273 sr
->msg_flags
|= MSG_CMSG_COMPAT
;
2279 io
->msg
.iov
= io
->msg
.fast_iov
;
2280 return recvmsg_copy_msghdr(&io
->msg
.msg
, sr
->msg
, sr
->msg_flags
,
2281 &io
->msg
.uaddr
, &io
->msg
.iov
);
2287 static int io_recvmsg(struct io_kiocb
*req
, struct io_kiocb
**nxt
,
2288 bool force_nonblock
)
2290 #if defined(CONFIG_NET)
2291 struct io_async_msghdr
*kmsg
= NULL
;
2292 struct socket
*sock
;
2295 if (unlikely(req
->ctx
->flags
& IORING_SETUP_IOPOLL
))
2298 sock
= sock_from_file(req
->file
, &ret
);
2300 struct io_async_ctx io
;
2301 struct sockaddr_storage addr
;
2305 kmsg
= &req
->io
->msg
;
2306 kmsg
->msg
.msg_name
= &addr
;
2307 /* if iov is set, it's allocated already */
2309 kmsg
->iov
= kmsg
->fast_iov
;
2310 kmsg
->msg
.msg_iter
.iov
= kmsg
->iov
;
2312 struct io_sr_msg
*sr
= &req
->sr_msg
;
2315 kmsg
->msg
.msg_name
= &addr
;
2317 io
.msg
.iov
= io
.msg
.fast_iov
;
2318 ret
= recvmsg_copy_msghdr(&io
.msg
.msg
, sr
->msg
,
2319 sr
->msg_flags
, &io
.msg
.uaddr
,
2325 flags
= req
->sr_msg
.msg_flags
;
2326 if (flags
& MSG_DONTWAIT
)
2327 req
->flags
|= REQ_F_NOWAIT
;
2328 else if (force_nonblock
)
2329 flags
|= MSG_DONTWAIT
;
2331 ret
= __sys_recvmsg_sock(sock
, &kmsg
->msg
, req
->sr_msg
.msg
,
2332 kmsg
->uaddr
, flags
);
2333 if (force_nonblock
&& ret
== -EAGAIN
) {
2336 if (io_alloc_async_ctx(req
)) {
2337 if (kmsg
&& kmsg
->iov
!= kmsg
->fast_iov
)
2341 memcpy(&req
->io
->msg
, &io
.msg
, sizeof(io
.msg
));
2344 if (ret
== -ERESTARTSYS
)
2348 if (kmsg
&& kmsg
->iov
!= kmsg
->fast_iov
)
2350 io_cqring_add_event(req
, ret
);
2352 req_set_fail_links(req
);
2353 io_put_req_find_next(req
, nxt
);
2360 static int io_accept_prep(struct io_kiocb
*req
, const struct io_uring_sqe
*sqe
)
2362 #if defined(CONFIG_NET)
2363 struct io_accept
*accept
= &req
->accept
;
2365 if (unlikely(req
->ctx
->flags
& (IORING_SETUP_IOPOLL
|IORING_SETUP_SQPOLL
)))
2367 if (sqe
->ioprio
|| sqe
->len
|| sqe
->buf_index
)
2370 accept
->addr
= u64_to_user_ptr(READ_ONCE(sqe
->addr
));
2371 accept
->addr_len
= u64_to_user_ptr(READ_ONCE(sqe
->addr2
));
2372 accept
->flags
= READ_ONCE(sqe
->accept_flags
);
2379 #if defined(CONFIG_NET)
2380 static int __io_accept(struct io_kiocb
*req
, struct io_kiocb
**nxt
,
2381 bool force_nonblock
)
2383 struct io_accept
*accept
= &req
->accept
;
2384 unsigned file_flags
;
2387 file_flags
= force_nonblock
? O_NONBLOCK
: 0;
2388 ret
= __sys_accept4_file(req
->file
, file_flags
, accept
->addr
,
2389 accept
->addr_len
, accept
->flags
);
2390 if (ret
== -EAGAIN
&& force_nonblock
)
2392 if (ret
== -ERESTARTSYS
)
2395 req_set_fail_links(req
);
2396 io_cqring_add_event(req
, ret
);
2397 io_put_req_find_next(req
, nxt
);
2401 static void io_accept_finish(struct io_wq_work
**workptr
)
2403 struct io_kiocb
*req
= container_of(*workptr
, struct io_kiocb
, work
);
2404 struct io_kiocb
*nxt
= NULL
;
2406 if (io_req_cancelled(req
))
2408 __io_accept(req
, &nxt
, false);
2410 io_wq_assign_next(workptr
, nxt
);
2414 static int io_accept(struct io_kiocb
*req
, struct io_kiocb
**nxt
,
2415 bool force_nonblock
)
2417 #if defined(CONFIG_NET)
2420 ret
= __io_accept(req
, nxt
, force_nonblock
);
2421 if (ret
== -EAGAIN
&& force_nonblock
) {
2422 req
->work
.func
= io_accept_finish
;
2423 req
->work
.flags
|= IO_WQ_WORK_NEEDS_FILES
;
2433 static int io_connect_prep(struct io_kiocb
*req
, const struct io_uring_sqe
*sqe
)
2435 #if defined(CONFIG_NET)
2436 struct io_connect
*conn
= &req
->connect
;
2437 struct io_async_ctx
*io
= req
->io
;
2439 if (unlikely(req
->ctx
->flags
& (IORING_SETUP_IOPOLL
|IORING_SETUP_SQPOLL
)))
2441 if (sqe
->ioprio
|| sqe
->len
|| sqe
->buf_index
|| sqe
->rw_flags
)
2444 conn
->addr
= u64_to_user_ptr(READ_ONCE(sqe
->addr
));
2445 conn
->addr_len
= READ_ONCE(sqe
->addr2
);
2450 return move_addr_to_kernel(conn
->addr
, conn
->addr_len
,
2451 &io
->connect
.address
);
2457 static int io_connect(struct io_kiocb
*req
, struct io_kiocb
**nxt
,
2458 bool force_nonblock
)
2460 #if defined(CONFIG_NET)
2461 struct io_async_ctx __io
, *io
;
2462 unsigned file_flags
;
2468 ret
= move_addr_to_kernel(req
->connect
.addr
,
2469 req
->connect
.addr_len
,
2470 &__io
.connect
.address
);
2476 file_flags
= force_nonblock
? O_NONBLOCK
: 0;
2478 ret
= __sys_connect_file(req
->file
, &io
->connect
.address
,
2479 req
->connect
.addr_len
, file_flags
);
2480 if ((ret
== -EAGAIN
|| ret
== -EINPROGRESS
) && force_nonblock
) {
2483 if (io_alloc_async_ctx(req
)) {
2487 memcpy(&req
->io
->connect
, &__io
.connect
, sizeof(__io
.connect
));
2490 if (ret
== -ERESTARTSYS
)
2494 req_set_fail_links(req
);
2495 io_cqring_add_event(req
, ret
);
2496 io_put_req_find_next(req
, nxt
);
2503 static void io_poll_remove_one(struct io_kiocb
*req
)
2505 struct io_poll_iocb
*poll
= &req
->poll
;
2507 spin_lock(&poll
->head
->lock
);
2508 WRITE_ONCE(poll
->canceled
, true);
2509 if (!list_empty(&poll
->wait
.entry
)) {
2510 list_del_init(&poll
->wait
.entry
);
2511 io_queue_async_work(req
);
2513 spin_unlock(&poll
->head
->lock
);
2514 hash_del(&req
->hash_node
);
2517 static void io_poll_remove_all(struct io_ring_ctx
*ctx
)
2519 struct hlist_node
*tmp
;
2520 struct io_kiocb
*req
;
2523 spin_lock_irq(&ctx
->completion_lock
);
2524 for (i
= 0; i
< (1U << ctx
->cancel_hash_bits
); i
++) {
2525 struct hlist_head
*list
;
2527 list
= &ctx
->cancel_hash
[i
];
2528 hlist_for_each_entry_safe(req
, tmp
, list
, hash_node
)
2529 io_poll_remove_one(req
);
2531 spin_unlock_irq(&ctx
->completion_lock
);
2534 static int io_poll_cancel(struct io_ring_ctx
*ctx
, __u64 sqe_addr
)
2536 struct hlist_head
*list
;
2537 struct io_kiocb
*req
;
2539 list
= &ctx
->cancel_hash
[hash_long(sqe_addr
, ctx
->cancel_hash_bits
)];
2540 hlist_for_each_entry(req
, list
, hash_node
) {
2541 if (sqe_addr
== req
->user_data
) {
2542 io_poll_remove_one(req
);
2550 static int io_poll_remove_prep(struct io_kiocb
*req
,
2551 const struct io_uring_sqe
*sqe
)
2553 if (unlikely(req
->ctx
->flags
& IORING_SETUP_IOPOLL
))
2555 if (sqe
->ioprio
|| sqe
->off
|| sqe
->len
|| sqe
->buf_index
||
2559 req
->poll
.addr
= READ_ONCE(sqe
->addr
);
2564 * Find a running poll command that matches one specified in sqe->addr,
2565 * and remove it if found.
2567 static int io_poll_remove(struct io_kiocb
*req
)
2569 struct io_ring_ctx
*ctx
= req
->ctx
;
2573 addr
= req
->poll
.addr
;
2574 spin_lock_irq(&ctx
->completion_lock
);
2575 ret
= io_poll_cancel(ctx
, addr
);
2576 spin_unlock_irq(&ctx
->completion_lock
);
2578 io_cqring_add_event(req
, ret
);
2580 req_set_fail_links(req
);
2585 static void io_poll_complete(struct io_kiocb
*req
, __poll_t mask
, int error
)
2587 struct io_ring_ctx
*ctx
= req
->ctx
;
2589 req
->poll
.done
= true;
2591 io_cqring_fill_event(req
, error
);
2593 io_cqring_fill_event(req
, mangle_poll(mask
));
2594 io_commit_cqring(ctx
);
2597 static void io_poll_complete_work(struct io_wq_work
**workptr
)
2599 struct io_wq_work
*work
= *workptr
;
2600 struct io_kiocb
*req
= container_of(work
, struct io_kiocb
, work
);
2601 struct io_poll_iocb
*poll
= &req
->poll
;
2602 struct poll_table_struct pt
= { ._key
= poll
->events
};
2603 struct io_ring_ctx
*ctx
= req
->ctx
;
2604 struct io_kiocb
*nxt
= NULL
;
2608 if (work
->flags
& IO_WQ_WORK_CANCEL
) {
2609 WRITE_ONCE(poll
->canceled
, true);
2611 } else if (READ_ONCE(poll
->canceled
)) {
2615 if (ret
!= -ECANCELED
)
2616 mask
= vfs_poll(poll
->file
, &pt
) & poll
->events
;
2619 * Note that ->ki_cancel callers also delete iocb from active_reqs after
2620 * calling ->ki_cancel. We need the ctx_lock roundtrip here to
2621 * synchronize with them. In the cancellation case the list_del_init
2622 * itself is not actually needed, but harmless so we keep it in to
2623 * avoid further branches in the fast path.
2625 spin_lock_irq(&ctx
->completion_lock
);
2626 if (!mask
&& ret
!= -ECANCELED
) {
2627 add_wait_queue(poll
->head
, &poll
->wait
);
2628 spin_unlock_irq(&ctx
->completion_lock
);
2631 hash_del(&req
->hash_node
);
2632 io_poll_complete(req
, mask
, ret
);
2633 spin_unlock_irq(&ctx
->completion_lock
);
2635 io_cqring_ev_posted(ctx
);
2638 req_set_fail_links(req
);
2639 io_put_req_find_next(req
, &nxt
);
2641 io_wq_assign_next(workptr
, nxt
);
2644 static void io_poll_trigger_evfd(struct io_wq_work
**workptr
)
2646 struct io_kiocb
*req
= container_of(*workptr
, struct io_kiocb
, work
);
2648 eventfd_signal(req
->ctx
->cq_ev_fd
, 1);
2652 static int io_poll_wake(struct wait_queue_entry
*wait
, unsigned mode
, int sync
,
2655 struct io_poll_iocb
*poll
= wait
->private;
2656 struct io_kiocb
*req
= container_of(poll
, struct io_kiocb
, poll
);
2657 struct io_ring_ctx
*ctx
= req
->ctx
;
2658 __poll_t mask
= key_to_poll(key
);
2659 unsigned long flags
;
2661 /* for instances that support it check for an event match first: */
2662 if (mask
&& !(mask
& poll
->events
))
2665 list_del_init(&poll
->wait
.entry
);
2668 * Run completion inline if we can. We're using trylock here because
2669 * we are violating the completion_lock -> poll wq lock ordering.
2670 * If we have a link timeout we're going to need the completion_lock
2671 * for finalizing the request, mark us as having grabbed that already.
2673 if (mask
&& spin_trylock_irqsave(&ctx
->completion_lock
, flags
)) {
2676 hash_del(&req
->hash_node
);
2677 io_poll_complete(req
, mask
, 0);
2678 trigger_ev
= io_should_trigger_evfd(ctx
);
2679 if (trigger_ev
&& eventfd_signal_count()) {
2681 req
->work
.func
= io_poll_trigger_evfd
;
2683 req
->flags
|= REQ_F_COMP_LOCKED
;
2687 spin_unlock_irqrestore(&ctx
->completion_lock
, flags
);
2688 __io_cqring_ev_posted(ctx
, trigger_ev
);
2690 io_queue_async_work(req
);
2696 struct io_poll_table
{
2697 struct poll_table_struct pt
;
2698 struct io_kiocb
*req
;
2702 static void io_poll_queue_proc(struct file
*file
, struct wait_queue_head
*head
,
2703 struct poll_table_struct
*p
)
2705 struct io_poll_table
*pt
= container_of(p
, struct io_poll_table
, pt
);
2707 if (unlikely(pt
->req
->poll
.head
)) {
2708 pt
->error
= -EINVAL
;
2713 pt
->req
->poll
.head
= head
;
2714 add_wait_queue(head
, &pt
->req
->poll
.wait
);
2717 static void io_poll_req_insert(struct io_kiocb
*req
)
2719 struct io_ring_ctx
*ctx
= req
->ctx
;
2720 struct hlist_head
*list
;
2722 list
= &ctx
->cancel_hash
[hash_long(req
->user_data
, ctx
->cancel_hash_bits
)];
2723 hlist_add_head(&req
->hash_node
, list
);
2726 static int io_poll_add_prep(struct io_kiocb
*req
, const struct io_uring_sqe
*sqe
)
2728 struct io_poll_iocb
*poll
= &req
->poll
;
2731 if (unlikely(req
->ctx
->flags
& IORING_SETUP_IOPOLL
))
2733 if (sqe
->addr
|| sqe
->ioprio
|| sqe
->off
|| sqe
->len
|| sqe
->buf_index
)
2738 events
= READ_ONCE(sqe
->poll_events
);
2739 poll
->events
= demangle_poll(events
) | EPOLLERR
| EPOLLHUP
;
2743 static int io_poll_add(struct io_kiocb
*req
, struct io_kiocb
**nxt
)
2745 struct io_poll_iocb
*poll
= &req
->poll
;
2746 struct io_ring_ctx
*ctx
= req
->ctx
;
2747 struct io_poll_table ipt
;
2748 bool cancel
= false;
2751 INIT_IO_WORK(&req
->work
, io_poll_complete_work
);
2752 INIT_HLIST_NODE(&req
->hash_node
);
2756 poll
->canceled
= false;
2758 ipt
.pt
._qproc
= io_poll_queue_proc
;
2759 ipt
.pt
._key
= poll
->events
;
2761 ipt
.error
= -EINVAL
; /* same as no support for IOCB_CMD_POLL */
2763 /* initialized the list so that we can do list_empty checks */
2764 INIT_LIST_HEAD(&poll
->wait
.entry
);
2765 init_waitqueue_func_entry(&poll
->wait
, io_poll_wake
);
2766 poll
->wait
.private = poll
;
2768 INIT_LIST_HEAD(&req
->list
);
2770 mask
= vfs_poll(poll
->file
, &ipt
.pt
) & poll
->events
;
2772 spin_lock_irq(&ctx
->completion_lock
);
2773 if (likely(poll
->head
)) {
2774 spin_lock(&poll
->head
->lock
);
2775 if (unlikely(list_empty(&poll
->wait
.entry
))) {
2781 if (mask
|| ipt
.error
)
2782 list_del_init(&poll
->wait
.entry
);
2784 WRITE_ONCE(poll
->canceled
, true);
2785 else if (!poll
->done
) /* actually waiting for an event */
2786 io_poll_req_insert(req
);
2787 spin_unlock(&poll
->head
->lock
);
2789 if (mask
) { /* no async, we'd stolen it */
2791 io_poll_complete(req
, mask
, 0);
2793 spin_unlock_irq(&ctx
->completion_lock
);
2796 io_cqring_ev_posted(ctx
);
2797 io_put_req_find_next(req
, nxt
);
2802 static enum hrtimer_restart
io_timeout_fn(struct hrtimer
*timer
)
2804 struct io_timeout_data
*data
= container_of(timer
,
2805 struct io_timeout_data
, timer
);
2806 struct io_kiocb
*req
= data
->req
;
2807 struct io_ring_ctx
*ctx
= req
->ctx
;
2808 unsigned long flags
;
2810 atomic_inc(&ctx
->cq_timeouts
);
2812 spin_lock_irqsave(&ctx
->completion_lock
, flags
);
2814 * We could be racing with timeout deletion. If the list is empty,
2815 * then timeout lookup already found it and will be handling it.
2817 if (!list_empty(&req
->list
)) {
2818 struct io_kiocb
*prev
;
2821 * Adjust the reqs sequence before the current one because it
2822 * will consume a slot in the cq_ring and the cq_tail
2823 * pointer will be increased, otherwise other timeout reqs may
2824 * return in advance without waiting for enough wait_nr.
2827 list_for_each_entry_continue_reverse(prev
, &ctx
->timeout_list
, list
)
2829 list_del_init(&req
->list
);
2832 io_cqring_fill_event(req
, -ETIME
);
2833 io_commit_cqring(ctx
);
2834 spin_unlock_irqrestore(&ctx
->completion_lock
, flags
);
2836 io_cqring_ev_posted(ctx
);
2837 req_set_fail_links(req
);
2839 return HRTIMER_NORESTART
;
2842 static int io_timeout_cancel(struct io_ring_ctx
*ctx
, __u64 user_data
)
2844 struct io_kiocb
*req
;
2847 list_for_each_entry(req
, &ctx
->timeout_list
, list
) {
2848 if (user_data
== req
->user_data
) {
2849 list_del_init(&req
->list
);
2858 ret
= hrtimer_try_to_cancel(&req
->io
->timeout
.timer
);
2862 req_set_fail_links(req
);
2863 io_cqring_fill_event(req
, -ECANCELED
);
2868 static int io_timeout_remove_prep(struct io_kiocb
*req
,
2869 const struct io_uring_sqe
*sqe
)
2871 if (unlikely(req
->ctx
->flags
& IORING_SETUP_IOPOLL
))
2873 if (sqe
->flags
|| sqe
->ioprio
|| sqe
->buf_index
|| sqe
->len
)
2876 req
->timeout
.addr
= READ_ONCE(sqe
->addr
);
2877 req
->timeout
.flags
= READ_ONCE(sqe
->timeout_flags
);
2878 if (req
->timeout
.flags
)
2885 * Remove or update an existing timeout command
2887 static int io_timeout_remove(struct io_kiocb
*req
)
2889 struct io_ring_ctx
*ctx
= req
->ctx
;
2892 spin_lock_irq(&ctx
->completion_lock
);
2893 ret
= io_timeout_cancel(ctx
, req
->timeout
.addr
);
2895 io_cqring_fill_event(req
, ret
);
2896 io_commit_cqring(ctx
);
2897 spin_unlock_irq(&ctx
->completion_lock
);
2898 io_cqring_ev_posted(ctx
);
2900 req_set_fail_links(req
);
2905 static int io_timeout_prep(struct io_kiocb
*req
, const struct io_uring_sqe
*sqe
,
2906 bool is_timeout_link
)
2908 struct io_timeout_data
*data
;
2911 if (unlikely(req
->ctx
->flags
& IORING_SETUP_IOPOLL
))
2913 if (sqe
->ioprio
|| sqe
->buf_index
|| sqe
->len
!= 1)
2915 if (sqe
->off
&& is_timeout_link
)
2917 flags
= READ_ONCE(sqe
->timeout_flags
);
2918 if (flags
& ~IORING_TIMEOUT_ABS
)
2921 req
->timeout
.count
= READ_ONCE(sqe
->off
);
2923 if (!req
->io
&& io_alloc_async_ctx(req
))
2926 data
= &req
->io
->timeout
;
2928 req
->flags
|= REQ_F_TIMEOUT
;
2930 if (get_timespec64(&data
->ts
, u64_to_user_ptr(sqe
->addr
)))
2933 if (flags
& IORING_TIMEOUT_ABS
)
2934 data
->mode
= HRTIMER_MODE_ABS
;
2936 data
->mode
= HRTIMER_MODE_REL
;
2938 hrtimer_init(&data
->timer
, CLOCK_MONOTONIC
, data
->mode
);
2942 static int io_timeout(struct io_kiocb
*req
)
2945 struct io_ring_ctx
*ctx
= req
->ctx
;
2946 struct io_timeout_data
*data
;
2947 struct list_head
*entry
;
2950 data
= &req
->io
->timeout
;
2953 * sqe->off holds how many events that need to occur for this
2954 * timeout event to be satisfied. If it isn't set, then this is
2955 * a pure timeout request, sequence isn't used.
2957 count
= req
->timeout
.count
;
2959 req
->flags
|= REQ_F_TIMEOUT_NOSEQ
;
2960 spin_lock_irq(&ctx
->completion_lock
);
2961 entry
= ctx
->timeout_list
.prev
;
2965 req
->sequence
= ctx
->cached_sq_head
+ count
- 1;
2966 data
->seq_offset
= count
;
2969 * Insertion sort, ensuring the first entry in the list is always
2970 * the one we need first.
2972 spin_lock_irq(&ctx
->completion_lock
);
2973 list_for_each_prev(entry
, &ctx
->timeout_list
) {
2974 struct io_kiocb
*nxt
= list_entry(entry
, struct io_kiocb
, list
);
2975 unsigned nxt_sq_head
;
2976 long long tmp
, tmp_nxt
;
2977 u32 nxt_offset
= nxt
->io
->timeout
.seq_offset
;
2979 if (nxt
->flags
& REQ_F_TIMEOUT_NOSEQ
)
2983 * Since cached_sq_head + count - 1 can overflow, use type long
2986 tmp
= (long long)ctx
->cached_sq_head
+ count
- 1;
2987 nxt_sq_head
= nxt
->sequence
- nxt_offset
+ 1;
2988 tmp_nxt
= (long long)nxt_sq_head
+ nxt_offset
- 1;
2991 * cached_sq_head may overflow, and it will never overflow twice
2992 * once there is some timeout req still be valid.
2994 if (ctx
->cached_sq_head
< nxt_sq_head
)
3001 * Sequence of reqs after the insert one and itself should
3002 * be adjusted because each timeout req consumes a slot.
3007 req
->sequence
-= span
;
3009 list_add(&req
->list
, entry
);
3010 data
->timer
.function
= io_timeout_fn
;
3011 hrtimer_start(&data
->timer
, timespec64_to_ktime(data
->ts
), data
->mode
);
3012 spin_unlock_irq(&ctx
->completion_lock
);
3016 static bool io_cancel_cb(struct io_wq_work
*work
, void *data
)
3018 struct io_kiocb
*req
= container_of(work
, struct io_kiocb
, work
);
3020 return req
->user_data
== (unsigned long) data
;
3023 static int io_async_cancel_one(struct io_ring_ctx
*ctx
, void *sqe_addr
)
3025 enum io_wq_cancel cancel_ret
;
3028 cancel_ret
= io_wq_cancel_cb(ctx
->io_wq
, io_cancel_cb
, sqe_addr
);
3029 switch (cancel_ret
) {
3030 case IO_WQ_CANCEL_OK
:
3033 case IO_WQ_CANCEL_RUNNING
:
3036 case IO_WQ_CANCEL_NOTFOUND
:
3044 static void io_async_find_and_cancel(struct io_ring_ctx
*ctx
,
3045 struct io_kiocb
*req
, __u64 sqe_addr
,
3046 struct io_kiocb
**nxt
, int success_ret
)
3048 unsigned long flags
;
3051 ret
= io_async_cancel_one(ctx
, (void *) (unsigned long) sqe_addr
);
3052 if (ret
!= -ENOENT
) {
3053 spin_lock_irqsave(&ctx
->completion_lock
, flags
);
3057 spin_lock_irqsave(&ctx
->completion_lock
, flags
);
3058 ret
= io_timeout_cancel(ctx
, sqe_addr
);
3061 ret
= io_poll_cancel(ctx
, sqe_addr
);
3065 io_cqring_fill_event(req
, ret
);
3066 io_commit_cqring(ctx
);
3067 spin_unlock_irqrestore(&ctx
->completion_lock
, flags
);
3068 io_cqring_ev_posted(ctx
);
3071 req_set_fail_links(req
);
3072 io_put_req_find_next(req
, nxt
);
3075 static int io_async_cancel_prep(struct io_kiocb
*req
,
3076 const struct io_uring_sqe
*sqe
)
3078 if (unlikely(req
->ctx
->flags
& IORING_SETUP_IOPOLL
))
3080 if (sqe
->flags
|| sqe
->ioprio
|| sqe
->off
|| sqe
->len
||
3084 req
->cancel
.addr
= READ_ONCE(sqe
->addr
);
3088 static int io_async_cancel(struct io_kiocb
*req
, struct io_kiocb
**nxt
)
3090 struct io_ring_ctx
*ctx
= req
->ctx
;
3092 io_async_find_and_cancel(ctx
, req
, req
->cancel
.addr
, nxt
, 0);
3096 static int io_req_defer_prep(struct io_kiocb
*req
,
3097 const struct io_uring_sqe
*sqe
)
3104 switch (req
->opcode
) {
3107 case IORING_OP_READV
:
3108 case IORING_OP_READ_FIXED
:
3109 ret
= io_read_prep(req
, sqe
, true);
3111 case IORING_OP_WRITEV
:
3112 case IORING_OP_WRITE_FIXED
:
3113 ret
= io_write_prep(req
, sqe
, true);
3115 case IORING_OP_POLL_ADD
:
3116 ret
= io_poll_add_prep(req
, sqe
);
3118 case IORING_OP_POLL_REMOVE
:
3119 ret
= io_poll_remove_prep(req
, sqe
);
3121 case IORING_OP_FSYNC
:
3122 ret
= io_prep_fsync(req
, sqe
);
3124 case IORING_OP_SYNC_FILE_RANGE
:
3125 ret
= io_prep_sfr(req
, sqe
);
3127 case IORING_OP_SENDMSG
:
3128 ret
= io_sendmsg_prep(req
, sqe
);
3130 case IORING_OP_RECVMSG
:
3131 ret
= io_recvmsg_prep(req
, sqe
);
3133 case IORING_OP_CONNECT
:
3134 ret
= io_connect_prep(req
, sqe
);
3136 case IORING_OP_TIMEOUT
:
3137 ret
= io_timeout_prep(req
, sqe
, false);
3139 case IORING_OP_TIMEOUT_REMOVE
:
3140 ret
= io_timeout_remove_prep(req
, sqe
);
3142 case IORING_OP_ASYNC_CANCEL
:
3143 ret
= io_async_cancel_prep(req
, sqe
);
3145 case IORING_OP_LINK_TIMEOUT
:
3146 ret
= io_timeout_prep(req
, sqe
, true);
3148 case IORING_OP_ACCEPT
:
3149 ret
= io_accept_prep(req
, sqe
);
3152 printk_once(KERN_WARNING
"io_uring: unhandled opcode %d\n",
3161 static int io_req_defer(struct io_kiocb
*req
, const struct io_uring_sqe
*sqe
)
3163 struct io_ring_ctx
*ctx
= req
->ctx
;
3166 /* Still need defer if there is pending req in defer list. */
3167 if (!req_need_defer(req
) && list_empty(&ctx
->defer_list
))
3170 if (!req
->io
&& io_alloc_async_ctx(req
))
3173 ret
= io_req_defer_prep(req
, sqe
);
3177 spin_lock_irq(&ctx
->completion_lock
);
3178 if (!req_need_defer(req
) && list_empty(&ctx
->defer_list
)) {
3179 spin_unlock_irq(&ctx
->completion_lock
);
3183 trace_io_uring_defer(ctx
, req
, req
->user_data
);
3184 list_add_tail(&req
->list
, &ctx
->defer_list
);
3185 spin_unlock_irq(&ctx
->completion_lock
);
3186 return -EIOCBQUEUED
;
3189 static int io_issue_sqe(struct io_kiocb
*req
, const struct io_uring_sqe
*sqe
,
3190 struct io_kiocb
**nxt
, bool force_nonblock
)
3192 struct io_ring_ctx
*ctx
= req
->ctx
;
3195 switch (req
->opcode
) {
3199 case IORING_OP_READV
:
3200 case IORING_OP_READ_FIXED
:
3202 ret
= io_read_prep(req
, sqe
, force_nonblock
);
3206 ret
= io_read(req
, nxt
, force_nonblock
);
3208 case IORING_OP_WRITEV
:
3209 case IORING_OP_WRITE_FIXED
:
3211 ret
= io_write_prep(req
, sqe
, force_nonblock
);
3215 ret
= io_write(req
, nxt
, force_nonblock
);
3217 case IORING_OP_FSYNC
:
3219 ret
= io_prep_fsync(req
, sqe
);
3223 ret
= io_fsync(req
, nxt
, force_nonblock
);
3225 case IORING_OP_POLL_ADD
:
3227 ret
= io_poll_add_prep(req
, sqe
);
3231 ret
= io_poll_add(req
, nxt
);
3233 case IORING_OP_POLL_REMOVE
:
3235 ret
= io_poll_remove_prep(req
, sqe
);
3239 ret
= io_poll_remove(req
);
3241 case IORING_OP_SYNC_FILE_RANGE
:
3243 ret
= io_prep_sfr(req
, sqe
);
3247 ret
= io_sync_file_range(req
, nxt
, force_nonblock
);
3249 case IORING_OP_SENDMSG
:
3251 ret
= io_sendmsg_prep(req
, sqe
);
3255 ret
= io_sendmsg(req
, nxt
, force_nonblock
);
3257 case IORING_OP_RECVMSG
:
3259 ret
= io_recvmsg_prep(req
, sqe
);
3263 ret
= io_recvmsg(req
, nxt
, force_nonblock
);
3265 case IORING_OP_TIMEOUT
:
3267 ret
= io_timeout_prep(req
, sqe
, false);
3271 ret
= io_timeout(req
);
3273 case IORING_OP_TIMEOUT_REMOVE
:
3275 ret
= io_timeout_remove_prep(req
, sqe
);
3279 ret
= io_timeout_remove(req
);
3281 case IORING_OP_ACCEPT
:
3283 ret
= io_accept_prep(req
, sqe
);
3287 ret
= io_accept(req
, nxt
, force_nonblock
);
3289 case IORING_OP_CONNECT
:
3291 ret
= io_connect_prep(req
, sqe
);
3295 ret
= io_connect(req
, nxt
, force_nonblock
);
3297 case IORING_OP_ASYNC_CANCEL
:
3299 ret
= io_async_cancel_prep(req
, sqe
);
3303 ret
= io_async_cancel(req
, nxt
);
3313 if (ctx
->flags
& IORING_SETUP_IOPOLL
) {
3314 const bool in_async
= io_wq_current_is_worker();
3316 if (req
->result
== -EAGAIN
)
3319 /* workqueue context doesn't hold uring_lock, grab it now */
3321 mutex_lock(&ctx
->uring_lock
);
3323 io_iopoll_req_issued(req
);
3326 mutex_unlock(&ctx
->uring_lock
);
3332 static void io_wq_submit_work(struct io_wq_work
**workptr
)
3334 struct io_wq_work
*work
= *workptr
;
3335 struct io_kiocb
*req
= container_of(work
, struct io_kiocb
, work
);
3336 struct io_kiocb
*nxt
= NULL
;
3339 if (work
->flags
& IO_WQ_WORK_CANCEL
)
3343 req
->has_user
= (work
->flags
& IO_WQ_WORK_HAS_MM
) != 0;
3344 req
->in_async
= true;
3346 ret
= io_issue_sqe(req
, NULL
, &nxt
, false);
3348 * We can get EAGAIN for polled IO even though we're
3349 * forcing a sync submission from here, since we can't
3350 * wait for request slots on the block side.
3358 /* drop submission reference */
3362 req_set_fail_links(req
);
3363 io_cqring_add_event(req
, ret
);
3367 /* if a dependent link is ready, pass it back */
3369 io_wq_assign_next(workptr
, nxt
);
3372 static bool io_req_op_valid(int op
)
3374 return op
>= IORING_OP_NOP
&& op
< IORING_OP_LAST
;
3377 static int io_req_needs_file(struct io_kiocb
*req
)
3379 switch (req
->opcode
) {
3381 case IORING_OP_POLL_REMOVE
:
3382 case IORING_OP_TIMEOUT
:
3383 case IORING_OP_TIMEOUT_REMOVE
:
3384 case IORING_OP_ASYNC_CANCEL
:
3385 case IORING_OP_LINK_TIMEOUT
:
3388 if (io_req_op_valid(req
->opcode
))
3394 static inline struct file
*io_file_from_index(struct io_ring_ctx
*ctx
,
3397 struct fixed_file_table
*table
;
3399 table
= &ctx
->file_table
[index
>> IORING_FILE_TABLE_SHIFT
];
3400 return table
->files
[index
& IORING_FILE_TABLE_MASK
];
3403 static int io_req_set_file(struct io_submit_state
*state
, struct io_kiocb
*req
,
3404 const struct io_uring_sqe
*sqe
)
3406 struct io_ring_ctx
*ctx
= req
->ctx
;
3410 flags
= READ_ONCE(sqe
->flags
);
3411 fd
= READ_ONCE(sqe
->fd
);
3413 if (flags
& IOSQE_IO_DRAIN
)
3414 req
->flags
|= REQ_F_IO_DRAIN
;
3416 ret
= io_req_needs_file(req
);
3420 if (flags
& IOSQE_FIXED_FILE
) {
3421 if (unlikely(!ctx
->file_table
||
3422 (unsigned) fd
>= ctx
->nr_user_files
))
3424 fd
= array_index_nospec(fd
, ctx
->nr_user_files
);
3425 req
->file
= io_file_from_index(ctx
, fd
);
3428 req
->flags
|= REQ_F_FIXED_FILE
;
3430 if (req
->needs_fixed_file
)
3432 trace_io_uring_file_get(ctx
, fd
);
3433 req
->file
= io_file_get(state
, fd
);
3434 if (unlikely(!req
->file
))
3441 static int io_grab_files(struct io_kiocb
*req
)
3444 struct io_ring_ctx
*ctx
= req
->ctx
;
3447 spin_lock_irq(&ctx
->inflight_lock
);
3449 * We use the f_ops->flush() handler to ensure that we can flush
3450 * out work accessing these files if the fd is closed. Check if
3451 * the fd has changed since we started down this path, and disallow
3452 * this operation if it has.
3454 if (fcheck(req
->ring_fd
) == req
->ring_file
) {
3455 list_add(&req
->inflight_entry
, &ctx
->inflight_list
);
3456 req
->flags
|= REQ_F_INFLIGHT
;
3457 req
->work
.files
= current
->files
;
3460 spin_unlock_irq(&ctx
->inflight_lock
);
3466 static enum hrtimer_restart
io_link_timeout_fn(struct hrtimer
*timer
)
3468 struct io_timeout_data
*data
= container_of(timer
,
3469 struct io_timeout_data
, timer
);
3470 struct io_kiocb
*req
= data
->req
;
3471 struct io_ring_ctx
*ctx
= req
->ctx
;
3472 struct io_kiocb
*prev
= NULL
;
3473 unsigned long flags
;
3475 spin_lock_irqsave(&ctx
->completion_lock
, flags
);
3478 * We don't expect the list to be empty, that will only happen if we
3479 * race with the completion of the linked work.
3481 if (!list_empty(&req
->link_list
)) {
3482 prev
= list_entry(req
->link_list
.prev
, struct io_kiocb
,
3484 if (refcount_inc_not_zero(&prev
->refs
)) {
3485 list_del_init(&req
->link_list
);
3486 prev
->flags
&= ~REQ_F_LINK_TIMEOUT
;
3491 spin_unlock_irqrestore(&ctx
->completion_lock
, flags
);
3494 req_set_fail_links(prev
);
3495 io_async_find_and_cancel(ctx
, req
, prev
->user_data
, NULL
,
3499 io_cqring_add_event(req
, -ETIME
);
3502 return HRTIMER_NORESTART
;
3505 static void io_queue_linked_timeout(struct io_kiocb
*req
)
3507 struct io_ring_ctx
*ctx
= req
->ctx
;
3510 * If the list is now empty, then our linked request finished before
3511 * we got a chance to setup the timer
3513 spin_lock_irq(&ctx
->completion_lock
);
3514 if (!list_empty(&req
->link_list
)) {
3515 struct io_timeout_data
*data
= &req
->io
->timeout
;
3517 data
->timer
.function
= io_link_timeout_fn
;
3518 hrtimer_start(&data
->timer
, timespec64_to_ktime(data
->ts
),
3521 spin_unlock_irq(&ctx
->completion_lock
);
3523 /* drop submission reference */
3527 static struct io_kiocb
*io_prep_linked_timeout(struct io_kiocb
*req
)
3529 struct io_kiocb
*nxt
;
3531 if (!(req
->flags
& REQ_F_LINK
))
3534 nxt
= list_first_entry_or_null(&req
->link_list
, struct io_kiocb
,
3536 if (!nxt
|| nxt
->opcode
!= IORING_OP_LINK_TIMEOUT
)
3539 req
->flags
|= REQ_F_LINK_TIMEOUT
;
3543 static void __io_queue_sqe(struct io_kiocb
*req
, const struct io_uring_sqe
*sqe
)
3545 struct io_kiocb
*linked_timeout
;
3546 struct io_kiocb
*nxt
= NULL
;
3550 linked_timeout
= io_prep_linked_timeout(req
);
3552 ret
= io_issue_sqe(req
, sqe
, &nxt
, true);
3555 * We async punt it if the file wasn't marked NOWAIT, or if the file
3556 * doesn't support non-blocking read/write attempts
3558 if (ret
== -EAGAIN
&& (!(req
->flags
& REQ_F_NOWAIT
) ||
3559 (req
->flags
& REQ_F_MUST_PUNT
))) {
3560 if (req
->work
.flags
& IO_WQ_WORK_NEEDS_FILES
) {
3561 ret
= io_grab_files(req
);
3567 * Queued up for async execution, worker will release
3568 * submit reference when the iocb is actually submitted.
3570 io_queue_async_work(req
);
3575 /* drop submission reference */
3576 io_put_req_find_next(req
, &nxt
);
3578 if (linked_timeout
) {
3580 io_queue_linked_timeout(linked_timeout
);
3582 io_put_req(linked_timeout
);
3585 /* and drop final reference, if we failed */
3587 io_cqring_add_event(req
, ret
);
3588 req_set_fail_links(req
);
3599 static void io_queue_sqe(struct io_kiocb
*req
, const struct io_uring_sqe
*sqe
)
3603 if (unlikely(req
->ctx
->drain_next
)) {
3604 req
->flags
|= REQ_F_IO_DRAIN
;
3605 req
->ctx
->drain_next
= false;
3607 req
->ctx
->drain_next
= (req
->flags
& REQ_F_DRAIN_LINK
);
3609 ret
= io_req_defer(req
, sqe
);
3611 if (ret
!= -EIOCBQUEUED
) {
3612 io_cqring_add_event(req
, ret
);
3613 req_set_fail_links(req
);
3614 io_double_put_req(req
);
3617 __io_queue_sqe(req
, sqe
);
3620 static inline void io_queue_link_head(struct io_kiocb
*req
)
3622 if (unlikely(req
->flags
& REQ_F_FAIL_LINK
)) {
3623 io_cqring_add_event(req
, -ECANCELED
);
3624 io_double_put_req(req
);
3626 io_queue_sqe(req
, NULL
);
3629 #define SQE_VALID_FLAGS (IOSQE_FIXED_FILE|IOSQE_IO_DRAIN|IOSQE_IO_LINK| \
3632 static bool io_submit_sqe(struct io_kiocb
*req
, const struct io_uring_sqe
*sqe
,
3633 struct io_submit_state
*state
, struct io_kiocb
**link
)
3635 struct io_ring_ctx
*ctx
= req
->ctx
;
3638 /* enforce forwards compatibility on users */
3639 if (unlikely(sqe
->flags
& ~SQE_VALID_FLAGS
)) {
3644 ret
= io_req_set_file(state
, req
, sqe
);
3645 if (unlikely(ret
)) {
3647 io_cqring_add_event(req
, ret
);
3648 io_double_put_req(req
);
3653 * If we already have a head request, queue this one for async
3654 * submittal once the head completes. If we don't have a head but
3655 * IOSQE_IO_LINK is set in the sqe, start a new head. This one will be
3656 * submitted sync once the chain is complete. If none of those
3657 * conditions are true (normal request), then just queue it.
3660 struct io_kiocb
*prev
= *link
;
3662 if (sqe
->flags
& IOSQE_IO_DRAIN
)
3663 (*link
)->flags
|= REQ_F_DRAIN_LINK
| REQ_F_IO_DRAIN
;
3665 if (sqe
->flags
& IOSQE_IO_HARDLINK
)
3666 req
->flags
|= REQ_F_HARDLINK
;
3668 if (io_alloc_async_ctx(req
)) {
3673 ret
= io_req_defer_prep(req
, sqe
);
3675 /* fail even hard links since we don't submit */
3676 prev
->flags
|= REQ_F_FAIL_LINK
;
3679 trace_io_uring_link(ctx
, req
, prev
);
3680 list_add_tail(&req
->link_list
, &prev
->link_list
);
3681 } else if (sqe
->flags
& (IOSQE_IO_LINK
|IOSQE_IO_HARDLINK
)) {
3682 req
->flags
|= REQ_F_LINK
;
3683 if (sqe
->flags
& IOSQE_IO_HARDLINK
)
3684 req
->flags
|= REQ_F_HARDLINK
;
3686 INIT_LIST_HEAD(&req
->link_list
);
3688 if (io_alloc_async_ctx(req
)) {
3692 ret
= io_req_defer_prep(req
, sqe
);
3694 req
->flags
|= REQ_F_FAIL_LINK
;
3697 io_queue_sqe(req
, sqe
);
3704 * Batched submission is done, ensure local IO is flushed out.
3706 static void io_submit_state_end(struct io_submit_state
*state
)
3708 blk_finish_plug(&state
->plug
);
3710 if (state
->free_reqs
)
3711 kmem_cache_free_bulk(req_cachep
, state
->free_reqs
,
3712 &state
->reqs
[state
->cur_req
]);
3716 * Start submission side cache.
3718 static void io_submit_state_start(struct io_submit_state
*state
,
3719 unsigned int max_ios
)
3721 blk_start_plug(&state
->plug
);
3722 state
->free_reqs
= 0;
3724 state
->ios_left
= max_ios
;
3727 static void io_commit_sqring(struct io_ring_ctx
*ctx
)
3729 struct io_rings
*rings
= ctx
->rings
;
3731 if (ctx
->cached_sq_head
!= READ_ONCE(rings
->sq
.head
)) {
3733 * Ensure any loads from the SQEs are done at this point,
3734 * since once we write the new head, the application could
3735 * write new data to them.
3737 smp_store_release(&rings
->sq
.head
, ctx
->cached_sq_head
);
3742 * Fetch an sqe, if one is available. Note that sqe_ptr will point to memory
3743 * that is mapped by userspace. This means that care needs to be taken to
3744 * ensure that reads are stable, as we cannot rely on userspace always
3745 * being a good citizen. If members of the sqe are validated and then later
3746 * used, it's important that those reads are done through READ_ONCE() to
3747 * prevent a re-load down the line.
3749 static bool io_get_sqring(struct io_ring_ctx
*ctx
, struct io_kiocb
*req
,
3750 const struct io_uring_sqe
**sqe_ptr
)
3752 struct io_rings
*rings
= ctx
->rings
;
3753 u32
*sq_array
= ctx
->sq_array
;
3757 * The cached sq head (or cq tail) serves two purposes:
3759 * 1) allows us to batch the cost of updating the user visible
3761 * 2) allows the kernel side to track the head on its own, even
3762 * though the application is the one updating it.
3764 head
= ctx
->cached_sq_head
;
3765 /* make sure SQ entry isn't read before tail */
3766 if (unlikely(head
== smp_load_acquire(&rings
->sq
.tail
)))
3769 head
= READ_ONCE(sq_array
[head
& ctx
->sq_mask
]);
3770 if (likely(head
< ctx
->sq_entries
)) {
3772 * All io need record the previous position, if LINK vs DARIN,
3773 * it can be used to mark the position of the first IO in the
3776 req
->sequence
= ctx
->cached_sq_head
;
3777 *sqe_ptr
= &ctx
->sq_sqes
[head
];
3778 req
->opcode
= READ_ONCE((*sqe_ptr
)->opcode
);
3779 req
->user_data
= READ_ONCE((*sqe_ptr
)->user_data
);
3780 ctx
->cached_sq_head
++;
3784 /* drop invalid entries */
3785 ctx
->cached_sq_head
++;
3786 ctx
->cached_sq_dropped
++;
3787 WRITE_ONCE(rings
->sq_dropped
, ctx
->cached_sq_dropped
);
3791 static int io_submit_sqes(struct io_ring_ctx
*ctx
, unsigned int nr
,
3792 struct file
*ring_file
, int ring_fd
,
3793 struct mm_struct
**mm
, bool async
)
3795 struct io_submit_state state
, *statep
= NULL
;
3796 struct io_kiocb
*link
= NULL
;
3797 int i
, submitted
= 0;
3798 bool mm_fault
= false;
3800 /* if we have a backlog and couldn't flush it all, return BUSY */
3801 if (!list_empty(&ctx
->cq_overflow_list
) &&
3802 !io_cqring_overflow_flush(ctx
, false))
3805 if (nr
> IO_PLUG_THRESHOLD
) {
3806 io_submit_state_start(&state
, nr
);
3810 for (i
= 0; i
< nr
; i
++) {
3811 const struct io_uring_sqe
*sqe
;
3812 struct io_kiocb
*req
;
3813 unsigned int sqe_flags
;
3815 req
= io_get_req(ctx
, statep
);
3816 if (unlikely(!req
)) {
3818 submitted
= -EAGAIN
;
3821 if (!io_get_sqring(ctx
, req
, &sqe
)) {
3826 if (io_req_needs_user(req
) && !*mm
) {
3827 mm_fault
= mm_fault
|| !mmget_not_zero(ctx
->sqo_mm
);
3829 use_mm(ctx
->sqo_mm
);
3835 sqe_flags
= sqe
->flags
;
3837 req
->ring_file
= ring_file
;
3838 req
->ring_fd
= ring_fd
;
3839 req
->has_user
= *mm
!= NULL
;
3840 req
->in_async
= async
;
3841 req
->needs_fixed_file
= async
;
3842 trace_io_uring_submit_sqe(ctx
, req
->user_data
, true, async
);
3843 if (!io_submit_sqe(req
, sqe
, statep
, &link
))
3846 * If previous wasn't linked and we have a linked command,
3847 * that's the end of the chain. Submit the previous link.
3849 if (!(sqe_flags
& (IOSQE_IO_LINK
|IOSQE_IO_HARDLINK
)) && link
) {
3850 io_queue_link_head(link
);
3856 io_queue_link_head(link
);
3858 io_submit_state_end(&state
);
3860 /* Commit SQ ring head once we've consumed and submitted all SQEs */
3861 io_commit_sqring(ctx
);
3866 static int io_sq_thread(void *data
)
3868 struct io_ring_ctx
*ctx
= data
;
3869 struct mm_struct
*cur_mm
= NULL
;
3870 const struct cred
*old_cred
;
3871 mm_segment_t old_fs
;
3873 unsigned long timeout
;
3876 complete(&ctx
->completions
[1]);
3880 old_cred
= override_creds(ctx
->creds
);
3882 timeout
= jiffies
+ ctx
->sq_thread_idle
;
3883 while (!kthread_should_park()) {
3884 unsigned int to_submit
;
3886 if (!list_empty(&ctx
->poll_list
)) {
3887 unsigned nr_events
= 0;
3889 mutex_lock(&ctx
->uring_lock
);
3890 if (!list_empty(&ctx
->poll_list
))
3891 io_iopoll_getevents(ctx
, &nr_events
, 0);
3893 timeout
= jiffies
+ ctx
->sq_thread_idle
;
3894 mutex_unlock(&ctx
->uring_lock
);
3897 to_submit
= io_sqring_entries(ctx
);
3900 * If submit got -EBUSY, flag us as needing the application
3901 * to enter the kernel to reap and flush events.
3903 if (!to_submit
|| ret
== -EBUSY
) {
3905 * Drop cur_mm before scheduling, we can't hold it for
3906 * long periods (or over schedule()). Do this before
3907 * adding ourselves to the waitqueue, as the unuse/drop
3917 * We're polling. If we're within the defined idle
3918 * period, then let us spin without work before going
3919 * to sleep. The exception is if we got EBUSY doing
3920 * more IO, we should wait for the application to
3921 * reap events and wake us up.
3923 if (!list_empty(&ctx
->poll_list
) ||
3924 (!time_after(jiffies
, timeout
) && ret
!= -EBUSY
&&
3925 !percpu_ref_is_dying(&ctx
->refs
))) {
3930 prepare_to_wait(&ctx
->sqo_wait
, &wait
,
3931 TASK_INTERRUPTIBLE
);
3934 * While doing polled IO, before going to sleep, we need
3935 * to check if there are new reqs added to poll_list, it
3936 * is because reqs may have been punted to io worker and
3937 * will be added to poll_list later, hence check the
3940 if ((ctx
->flags
& IORING_SETUP_IOPOLL
) &&
3941 !list_empty_careful(&ctx
->poll_list
)) {
3942 finish_wait(&ctx
->sqo_wait
, &wait
);
3946 /* Tell userspace we may need a wakeup call */
3947 ctx
->rings
->sq_flags
|= IORING_SQ_NEED_WAKEUP
;
3948 /* make sure to read SQ tail after writing flags */
3951 to_submit
= io_sqring_entries(ctx
);
3952 if (!to_submit
|| ret
== -EBUSY
) {
3953 if (kthread_should_park()) {
3954 finish_wait(&ctx
->sqo_wait
, &wait
);
3957 if (signal_pending(current
))
3958 flush_signals(current
);
3960 finish_wait(&ctx
->sqo_wait
, &wait
);
3962 ctx
->rings
->sq_flags
&= ~IORING_SQ_NEED_WAKEUP
;
3965 finish_wait(&ctx
->sqo_wait
, &wait
);
3967 ctx
->rings
->sq_flags
&= ~IORING_SQ_NEED_WAKEUP
;
3970 to_submit
= min(to_submit
, ctx
->sq_entries
);
3971 mutex_lock(&ctx
->uring_lock
);
3972 ret
= io_submit_sqes(ctx
, to_submit
, NULL
, -1, &cur_mm
, true);
3973 mutex_unlock(&ctx
->uring_lock
);
3974 timeout
= jiffies
+ ctx
->sq_thread_idle
;
3982 revert_creds(old_cred
);
3989 struct io_wait_queue
{
3990 struct wait_queue_entry wq
;
3991 struct io_ring_ctx
*ctx
;
3993 unsigned nr_timeouts
;
3996 static inline bool io_should_wake(struct io_wait_queue
*iowq
, bool noflush
)
3998 struct io_ring_ctx
*ctx
= iowq
->ctx
;
4001 * Wake up if we have enough events, or if a timeout occurred since we
4002 * started waiting. For timeouts, we always want to return to userspace,
4003 * regardless of event count.
4005 return io_cqring_events(ctx
, noflush
) >= iowq
->to_wait
||
4006 atomic_read(&ctx
->cq_timeouts
) != iowq
->nr_timeouts
;
4009 static int io_wake_function(struct wait_queue_entry
*curr
, unsigned int mode
,
4010 int wake_flags
, void *key
)
4012 struct io_wait_queue
*iowq
= container_of(curr
, struct io_wait_queue
,
4015 /* use noflush == true, as we can't safely rely on locking context */
4016 if (!io_should_wake(iowq
, true))
4019 return autoremove_wake_function(curr
, mode
, wake_flags
, key
);
4023 * Wait until events become available, if we don't already have some. The
4024 * application must reap them itself, as they reside on the shared cq ring.
4026 static int io_cqring_wait(struct io_ring_ctx
*ctx
, int min_events
,
4027 const sigset_t __user
*sig
, size_t sigsz
)
4029 struct io_wait_queue iowq
= {
4032 .func
= io_wake_function
,
4033 .entry
= LIST_HEAD_INIT(iowq
.wq
.entry
),
4036 .to_wait
= min_events
,
4038 struct io_rings
*rings
= ctx
->rings
;
4041 if (io_cqring_events(ctx
, false) >= min_events
)
4045 #ifdef CONFIG_COMPAT
4046 if (in_compat_syscall())
4047 ret
= set_compat_user_sigmask((const compat_sigset_t __user
*)sig
,
4051 ret
= set_user_sigmask(sig
, sigsz
);
4057 iowq
.nr_timeouts
= atomic_read(&ctx
->cq_timeouts
);
4058 trace_io_uring_cqring_wait(ctx
, min_events
);
4060 prepare_to_wait_exclusive(&ctx
->wait
, &iowq
.wq
,
4061 TASK_INTERRUPTIBLE
);
4062 if (io_should_wake(&iowq
, false))
4065 if (signal_pending(current
)) {
4070 finish_wait(&ctx
->wait
, &iowq
.wq
);
4072 restore_saved_sigmask_unless(ret
== -EINTR
);
4074 return READ_ONCE(rings
->cq
.head
) == READ_ONCE(rings
->cq
.tail
) ? ret
: 0;
4077 static void __io_sqe_files_unregister(struct io_ring_ctx
*ctx
)
4079 #if defined(CONFIG_UNIX)
4080 if (ctx
->ring_sock
) {
4081 struct sock
*sock
= ctx
->ring_sock
->sk
;
4082 struct sk_buff
*skb
;
4084 while ((skb
= skb_dequeue(&sock
->sk_receive_queue
)) != NULL
)
4090 for (i
= 0; i
< ctx
->nr_user_files
; i
++) {
4093 file
= io_file_from_index(ctx
, i
);
4100 static int io_sqe_files_unregister(struct io_ring_ctx
*ctx
)
4102 unsigned nr_tables
, i
;
4104 if (!ctx
->file_table
)
4107 __io_sqe_files_unregister(ctx
);
4108 nr_tables
= DIV_ROUND_UP(ctx
->nr_user_files
, IORING_MAX_FILES_TABLE
);
4109 for (i
= 0; i
< nr_tables
; i
++)
4110 kfree(ctx
->file_table
[i
].files
);
4111 kfree(ctx
->file_table
);
4112 ctx
->file_table
= NULL
;
4113 ctx
->nr_user_files
= 0;
4117 static void io_sq_thread_stop(struct io_ring_ctx
*ctx
)
4119 if (ctx
->sqo_thread
) {
4120 wait_for_completion(&ctx
->completions
[1]);
4122 * The park is a bit of a work-around, without it we get
4123 * warning spews on shutdown with SQPOLL set and affinity
4124 * set to a single CPU.
4126 kthread_park(ctx
->sqo_thread
);
4127 kthread_stop(ctx
->sqo_thread
);
4128 ctx
->sqo_thread
= NULL
;
4132 static void io_finish_async(struct io_ring_ctx
*ctx
)
4134 io_sq_thread_stop(ctx
);
4137 io_wq_destroy(ctx
->io_wq
);
4142 #if defined(CONFIG_UNIX)
4143 static void io_destruct_skb(struct sk_buff
*skb
)
4145 struct io_ring_ctx
*ctx
= skb
->sk
->sk_user_data
;
4148 io_wq_flush(ctx
->io_wq
);
4150 unix_destruct_scm(skb
);
4154 * Ensure the UNIX gc is aware of our file set, so we are certain that
4155 * the io_uring can be safely unregistered on process exit, even if we have
4156 * loops in the file referencing.
4158 static int __io_sqe_files_scm(struct io_ring_ctx
*ctx
, int nr
, int offset
)
4160 struct sock
*sk
= ctx
->ring_sock
->sk
;
4161 struct scm_fp_list
*fpl
;
4162 struct sk_buff
*skb
;
4165 if (!capable(CAP_SYS_RESOURCE
) && !capable(CAP_SYS_ADMIN
)) {
4166 unsigned long inflight
= ctx
->user
->unix_inflight
+ nr
;
4168 if (inflight
> task_rlimit(current
, RLIMIT_NOFILE
))
4172 fpl
= kzalloc(sizeof(*fpl
), GFP_KERNEL
);
4176 skb
= alloc_skb(0, GFP_KERNEL
);
4185 fpl
->user
= get_uid(ctx
->user
);
4186 for (i
= 0; i
< nr
; i
++) {
4187 struct file
*file
= io_file_from_index(ctx
, i
+ offset
);
4191 fpl
->fp
[nr_files
] = get_file(file
);
4192 unix_inflight(fpl
->user
, fpl
->fp
[nr_files
]);
4197 fpl
->max
= SCM_MAX_FD
;
4198 fpl
->count
= nr_files
;
4199 UNIXCB(skb
).fp
= fpl
;
4200 skb
->destructor
= io_destruct_skb
;
4201 refcount_add(skb
->truesize
, &sk
->sk_wmem_alloc
);
4202 skb_queue_head(&sk
->sk_receive_queue
, skb
);
4204 for (i
= 0; i
< nr_files
; i
++)
4215 * If UNIX sockets are enabled, fd passing can cause a reference cycle which
4216 * causes regular reference counting to break down. We rely on the UNIX
4217 * garbage collection to take care of this problem for us.
4219 static int io_sqe_files_scm(struct io_ring_ctx
*ctx
)
4221 unsigned left
, total
;
4225 left
= ctx
->nr_user_files
;
4227 unsigned this_files
= min_t(unsigned, left
, SCM_MAX_FD
);
4229 ret
= __io_sqe_files_scm(ctx
, this_files
, total
);
4233 total
+= this_files
;
4239 while (total
< ctx
->nr_user_files
) {
4240 struct file
*file
= io_file_from_index(ctx
, total
);
4250 static int io_sqe_files_scm(struct io_ring_ctx
*ctx
)
4256 static int io_sqe_alloc_file_tables(struct io_ring_ctx
*ctx
, unsigned nr_tables
,
4261 for (i
= 0; i
< nr_tables
; i
++) {
4262 struct fixed_file_table
*table
= &ctx
->file_table
[i
];
4263 unsigned this_files
;
4265 this_files
= min(nr_files
, IORING_MAX_FILES_TABLE
);
4266 table
->files
= kcalloc(this_files
, sizeof(struct file
*),
4270 nr_files
-= this_files
;
4276 for (i
= 0; i
< nr_tables
; i
++) {
4277 struct fixed_file_table
*table
= &ctx
->file_table
[i
];
4278 kfree(table
->files
);
4283 static int io_sqe_files_register(struct io_ring_ctx
*ctx
, void __user
*arg
,
4286 __s32 __user
*fds
= (__s32 __user
*) arg
;
4291 if (ctx
->file_table
)
4295 if (nr_args
> IORING_MAX_FIXED_FILES
)
4298 nr_tables
= DIV_ROUND_UP(nr_args
, IORING_MAX_FILES_TABLE
);
4299 ctx
->file_table
= kcalloc(nr_tables
, sizeof(struct fixed_file_table
),
4301 if (!ctx
->file_table
)
4304 if (io_sqe_alloc_file_tables(ctx
, nr_tables
, nr_args
)) {
4305 kfree(ctx
->file_table
);
4306 ctx
->file_table
= NULL
;
4310 for (i
= 0; i
< nr_args
; i
++, ctx
->nr_user_files
++) {
4311 struct fixed_file_table
*table
;
4315 if (copy_from_user(&fd
, &fds
[i
], sizeof(fd
)))
4317 /* allow sparse sets */
4323 table
= &ctx
->file_table
[i
>> IORING_FILE_TABLE_SHIFT
];
4324 index
= i
& IORING_FILE_TABLE_MASK
;
4325 table
->files
[index
] = fget(fd
);
4328 if (!table
->files
[index
])
4331 * Don't allow io_uring instances to be registered. If UNIX
4332 * isn't enabled, then this causes a reference cycle and this
4333 * instance can never get freed. If UNIX is enabled we'll
4334 * handle it just fine, but there's still no point in allowing
4335 * a ring fd as it doesn't support regular read/write anyway.
4337 if (table
->files
[index
]->f_op
== &io_uring_fops
) {
4338 fput(table
->files
[index
]);
4345 for (i
= 0; i
< ctx
->nr_user_files
; i
++) {
4348 file
= io_file_from_index(ctx
, i
);
4352 for (i
= 0; i
< nr_tables
; i
++)
4353 kfree(ctx
->file_table
[i
].files
);
4355 kfree(ctx
->file_table
);
4356 ctx
->file_table
= NULL
;
4357 ctx
->nr_user_files
= 0;
4361 ret
= io_sqe_files_scm(ctx
);
4363 io_sqe_files_unregister(ctx
);
4368 static void io_sqe_file_unregister(struct io_ring_ctx
*ctx
, int index
)
4370 #if defined(CONFIG_UNIX)
4371 struct file
*file
= io_file_from_index(ctx
, index
);
4372 struct sock
*sock
= ctx
->ring_sock
->sk
;
4373 struct sk_buff_head list
, *head
= &sock
->sk_receive_queue
;
4374 struct sk_buff
*skb
;
4377 __skb_queue_head_init(&list
);
4380 * Find the skb that holds this file in its SCM_RIGHTS. When found,
4381 * remove this entry and rearrange the file array.
4383 skb
= skb_dequeue(head
);
4385 struct scm_fp_list
*fp
;
4387 fp
= UNIXCB(skb
).fp
;
4388 for (i
= 0; i
< fp
->count
; i
++) {
4391 if (fp
->fp
[i
] != file
)
4394 unix_notinflight(fp
->user
, fp
->fp
[i
]);
4395 left
= fp
->count
- 1 - i
;
4397 memmove(&fp
->fp
[i
], &fp
->fp
[i
+ 1],
4398 left
* sizeof(struct file
*));
4405 __skb_queue_tail(&list
, skb
);
4415 __skb_queue_tail(&list
, skb
);
4417 skb
= skb_dequeue(head
);
4420 if (skb_peek(&list
)) {
4421 spin_lock_irq(&head
->lock
);
4422 while ((skb
= __skb_dequeue(&list
)) != NULL
)
4423 __skb_queue_tail(head
, skb
);
4424 spin_unlock_irq(&head
->lock
);
4427 fput(io_file_from_index(ctx
, index
));
4431 static int io_sqe_file_register(struct io_ring_ctx
*ctx
, struct file
*file
,
4434 #if defined(CONFIG_UNIX)
4435 struct sock
*sock
= ctx
->ring_sock
->sk
;
4436 struct sk_buff_head
*head
= &sock
->sk_receive_queue
;
4437 struct sk_buff
*skb
;
4440 * See if we can merge this file into an existing skb SCM_RIGHTS
4441 * file set. If there's no room, fall back to allocating a new skb
4442 * and filling it in.
4444 spin_lock_irq(&head
->lock
);
4445 skb
= skb_peek(head
);
4447 struct scm_fp_list
*fpl
= UNIXCB(skb
).fp
;
4449 if (fpl
->count
< SCM_MAX_FD
) {
4450 __skb_unlink(skb
, head
);
4451 spin_unlock_irq(&head
->lock
);
4452 fpl
->fp
[fpl
->count
] = get_file(file
);
4453 unix_inflight(fpl
->user
, fpl
->fp
[fpl
->count
]);
4455 spin_lock_irq(&head
->lock
);
4456 __skb_queue_head(head
, skb
);
4461 spin_unlock_irq(&head
->lock
);
4468 return __io_sqe_files_scm(ctx
, 1, index
);
4474 static int io_sqe_files_update(struct io_ring_ctx
*ctx
, void __user
*arg
,
4477 struct io_uring_files_update up
;
4482 if (!ctx
->file_table
)
4486 if (copy_from_user(&up
, arg
, sizeof(up
)))
4490 if (check_add_overflow(up
.offset
, nr_args
, &done
))
4492 if (done
> ctx
->nr_user_files
)
4496 fds
= u64_to_user_ptr(up
.fds
);
4498 struct fixed_file_table
*table
;
4502 if (copy_from_user(&fd
, &fds
[done
], sizeof(fd
))) {
4506 i
= array_index_nospec(up
.offset
, ctx
->nr_user_files
);
4507 table
= &ctx
->file_table
[i
>> IORING_FILE_TABLE_SHIFT
];
4508 index
= i
& IORING_FILE_TABLE_MASK
;
4509 if (table
->files
[index
]) {
4510 io_sqe_file_unregister(ctx
, i
);
4511 table
->files
[index
] = NULL
;
4522 * Don't allow io_uring instances to be registered. If
4523 * UNIX isn't enabled, then this causes a reference
4524 * cycle and this instance can never get freed. If UNIX
4525 * is enabled we'll handle it just fine, but there's
4526 * still no point in allowing a ring fd as it doesn't
4527 * support regular read/write anyway.
4529 if (file
->f_op
== &io_uring_fops
) {
4534 table
->files
[index
] = file
;
4535 err
= io_sqe_file_register(ctx
, file
, i
);
4544 return done
? done
: err
;
4547 static void io_put_work(struct io_wq_work
*work
)
4549 struct io_kiocb
*req
= container_of(work
, struct io_kiocb
, work
);
4554 static void io_get_work(struct io_wq_work
*work
)
4556 struct io_kiocb
*req
= container_of(work
, struct io_kiocb
, work
);
4558 refcount_inc(&req
->refs
);
4561 static int io_sq_offload_start(struct io_ring_ctx
*ctx
,
4562 struct io_uring_params
*p
)
4564 struct io_wq_data data
;
4565 unsigned concurrency
;
4568 init_waitqueue_head(&ctx
->sqo_wait
);
4569 mmgrab(current
->mm
);
4570 ctx
->sqo_mm
= current
->mm
;
4572 if (ctx
->flags
& IORING_SETUP_SQPOLL
) {
4574 if (!capable(CAP_SYS_ADMIN
))
4577 ctx
->sq_thread_idle
= msecs_to_jiffies(p
->sq_thread_idle
);
4578 if (!ctx
->sq_thread_idle
)
4579 ctx
->sq_thread_idle
= HZ
;
4581 if (p
->flags
& IORING_SETUP_SQ_AFF
) {
4582 int cpu
= p
->sq_thread_cpu
;
4585 if (cpu
>= nr_cpu_ids
)
4587 if (!cpu_online(cpu
))
4590 ctx
->sqo_thread
= kthread_create_on_cpu(io_sq_thread
,
4594 ctx
->sqo_thread
= kthread_create(io_sq_thread
, ctx
,
4597 if (IS_ERR(ctx
->sqo_thread
)) {
4598 ret
= PTR_ERR(ctx
->sqo_thread
);
4599 ctx
->sqo_thread
= NULL
;
4602 wake_up_process(ctx
->sqo_thread
);
4603 } else if (p
->flags
& IORING_SETUP_SQ_AFF
) {
4604 /* Can't have SQ_AFF without SQPOLL */
4609 data
.mm
= ctx
->sqo_mm
;
4610 data
.user
= ctx
->user
;
4611 data
.creds
= ctx
->creds
;
4612 data
.get_work
= io_get_work
;
4613 data
.put_work
= io_put_work
;
4615 /* Do QD, or 4 * CPUS, whatever is smallest */
4616 concurrency
= min(ctx
->sq_entries
, 4 * num_online_cpus());
4617 ctx
->io_wq
= io_wq_create(concurrency
, &data
);
4618 if (IS_ERR(ctx
->io_wq
)) {
4619 ret
= PTR_ERR(ctx
->io_wq
);
4626 io_finish_async(ctx
);
4627 mmdrop(ctx
->sqo_mm
);
4632 static void io_unaccount_mem(struct user_struct
*user
, unsigned long nr_pages
)
4634 atomic_long_sub(nr_pages
, &user
->locked_vm
);
4637 static int io_account_mem(struct user_struct
*user
, unsigned long nr_pages
)
4639 unsigned long page_limit
, cur_pages
, new_pages
;
4641 /* Don't allow more pages than we can safely lock */
4642 page_limit
= rlimit(RLIMIT_MEMLOCK
) >> PAGE_SHIFT
;
4645 cur_pages
= atomic_long_read(&user
->locked_vm
);
4646 new_pages
= cur_pages
+ nr_pages
;
4647 if (new_pages
> page_limit
)
4649 } while (atomic_long_cmpxchg(&user
->locked_vm
, cur_pages
,
4650 new_pages
) != cur_pages
);
4655 static void io_mem_free(void *ptr
)
4662 page
= virt_to_head_page(ptr
);
4663 if (put_page_testzero(page
))
4664 free_compound_page(page
);
4667 static void *io_mem_alloc(size_t size
)
4669 gfp_t gfp_flags
= GFP_KERNEL
| __GFP_ZERO
| __GFP_NOWARN
| __GFP_COMP
|
4672 return (void *) __get_free_pages(gfp_flags
, get_order(size
));
4675 static unsigned long rings_size(unsigned sq_entries
, unsigned cq_entries
,
4678 struct io_rings
*rings
;
4679 size_t off
, sq_array_size
;
4681 off
= struct_size(rings
, cqes
, cq_entries
);
4682 if (off
== SIZE_MAX
)
4686 off
= ALIGN(off
, SMP_CACHE_BYTES
);
4691 sq_array_size
= array_size(sizeof(u32
), sq_entries
);
4692 if (sq_array_size
== SIZE_MAX
)
4695 if (check_add_overflow(off
, sq_array_size
, &off
))
4704 static unsigned long ring_pages(unsigned sq_entries
, unsigned cq_entries
)
4708 pages
= (size_t)1 << get_order(
4709 rings_size(sq_entries
, cq_entries
, NULL
));
4710 pages
+= (size_t)1 << get_order(
4711 array_size(sizeof(struct io_uring_sqe
), sq_entries
));
4716 static int io_sqe_buffer_unregister(struct io_ring_ctx
*ctx
)
4720 if (!ctx
->user_bufs
)
4723 for (i
= 0; i
< ctx
->nr_user_bufs
; i
++) {
4724 struct io_mapped_ubuf
*imu
= &ctx
->user_bufs
[i
];
4726 for (j
= 0; j
< imu
->nr_bvecs
; j
++)
4727 put_user_page(imu
->bvec
[j
].bv_page
);
4729 if (ctx
->account_mem
)
4730 io_unaccount_mem(ctx
->user
, imu
->nr_bvecs
);
4735 kfree(ctx
->user_bufs
);
4736 ctx
->user_bufs
= NULL
;
4737 ctx
->nr_user_bufs
= 0;
4741 static int io_copy_iov(struct io_ring_ctx
*ctx
, struct iovec
*dst
,
4742 void __user
*arg
, unsigned index
)
4744 struct iovec __user
*src
;
4746 #ifdef CONFIG_COMPAT
4748 struct compat_iovec __user
*ciovs
;
4749 struct compat_iovec ciov
;
4751 ciovs
= (struct compat_iovec __user
*) arg
;
4752 if (copy_from_user(&ciov
, &ciovs
[index
], sizeof(ciov
)))
4755 dst
->iov_base
= u64_to_user_ptr((u64
)ciov
.iov_base
);
4756 dst
->iov_len
= ciov
.iov_len
;
4760 src
= (struct iovec __user
*) arg
;
4761 if (copy_from_user(dst
, &src
[index
], sizeof(*dst
)))
4766 static int io_sqe_buffer_register(struct io_ring_ctx
*ctx
, void __user
*arg
,
4769 struct vm_area_struct
**vmas
= NULL
;
4770 struct page
**pages
= NULL
;
4771 int i
, j
, got_pages
= 0;
4776 if (!nr_args
|| nr_args
> UIO_MAXIOV
)
4779 ctx
->user_bufs
= kcalloc(nr_args
, sizeof(struct io_mapped_ubuf
),
4781 if (!ctx
->user_bufs
)
4784 for (i
= 0; i
< nr_args
; i
++) {
4785 struct io_mapped_ubuf
*imu
= &ctx
->user_bufs
[i
];
4786 unsigned long off
, start
, end
, ubuf
;
4791 ret
= io_copy_iov(ctx
, &iov
, arg
, i
);
4796 * Don't impose further limits on the size and buffer
4797 * constraints here, we'll -EINVAL later when IO is
4798 * submitted if they are wrong.
4801 if (!iov
.iov_base
|| !iov
.iov_len
)
4804 /* arbitrary limit, but we need something */
4805 if (iov
.iov_len
> SZ_1G
)
4808 ubuf
= (unsigned long) iov
.iov_base
;
4809 end
= (ubuf
+ iov
.iov_len
+ PAGE_SIZE
- 1) >> PAGE_SHIFT
;
4810 start
= ubuf
>> PAGE_SHIFT
;
4811 nr_pages
= end
- start
;
4813 if (ctx
->account_mem
) {
4814 ret
= io_account_mem(ctx
->user
, nr_pages
);
4820 if (!pages
|| nr_pages
> got_pages
) {
4823 pages
= kvmalloc_array(nr_pages
, sizeof(struct page
*),
4825 vmas
= kvmalloc_array(nr_pages
,
4826 sizeof(struct vm_area_struct
*),
4828 if (!pages
|| !vmas
) {
4830 if (ctx
->account_mem
)
4831 io_unaccount_mem(ctx
->user
, nr_pages
);
4834 got_pages
= nr_pages
;
4837 imu
->bvec
= kvmalloc_array(nr_pages
, sizeof(struct bio_vec
),
4841 if (ctx
->account_mem
)
4842 io_unaccount_mem(ctx
->user
, nr_pages
);
4847 down_read(¤t
->mm
->mmap_sem
);
4848 pret
= get_user_pages(ubuf
, nr_pages
,
4849 FOLL_WRITE
| FOLL_LONGTERM
,
4851 if (pret
== nr_pages
) {
4852 /* don't support file backed memory */
4853 for (j
= 0; j
< nr_pages
; j
++) {
4854 struct vm_area_struct
*vma
= vmas
[j
];
4857 !is_file_hugepages(vma
->vm_file
)) {
4863 ret
= pret
< 0 ? pret
: -EFAULT
;
4865 up_read(¤t
->mm
->mmap_sem
);
4868 * if we did partial map, or found file backed vmas,
4869 * release any pages we did get
4872 put_user_pages(pages
, pret
);
4873 if (ctx
->account_mem
)
4874 io_unaccount_mem(ctx
->user
, nr_pages
);
4879 off
= ubuf
& ~PAGE_MASK
;
4881 for (j
= 0; j
< nr_pages
; j
++) {
4884 vec_len
= min_t(size_t, size
, PAGE_SIZE
- off
);
4885 imu
->bvec
[j
].bv_page
= pages
[j
];
4886 imu
->bvec
[j
].bv_len
= vec_len
;
4887 imu
->bvec
[j
].bv_offset
= off
;
4891 /* store original address for later verification */
4893 imu
->len
= iov
.iov_len
;
4894 imu
->nr_bvecs
= nr_pages
;
4896 ctx
->nr_user_bufs
++;
4904 io_sqe_buffer_unregister(ctx
);
4908 static int io_eventfd_register(struct io_ring_ctx
*ctx
, void __user
*arg
)
4910 __s32 __user
*fds
= arg
;
4916 if (copy_from_user(&fd
, fds
, sizeof(*fds
)))
4919 ctx
->cq_ev_fd
= eventfd_ctx_fdget(fd
);
4920 if (IS_ERR(ctx
->cq_ev_fd
)) {
4921 int ret
= PTR_ERR(ctx
->cq_ev_fd
);
4922 ctx
->cq_ev_fd
= NULL
;
4929 static int io_eventfd_unregister(struct io_ring_ctx
*ctx
)
4931 if (ctx
->cq_ev_fd
) {
4932 eventfd_ctx_put(ctx
->cq_ev_fd
);
4933 ctx
->cq_ev_fd
= NULL
;
4940 static void io_ring_ctx_free(struct io_ring_ctx
*ctx
)
4942 io_finish_async(ctx
);
4944 mmdrop(ctx
->sqo_mm
);
4946 io_iopoll_reap_events(ctx
);
4947 io_sqe_buffer_unregister(ctx
);
4948 io_sqe_files_unregister(ctx
);
4949 io_eventfd_unregister(ctx
);
4951 #if defined(CONFIG_UNIX)
4952 if (ctx
->ring_sock
) {
4953 ctx
->ring_sock
->file
= NULL
; /* so that iput() is called */
4954 sock_release(ctx
->ring_sock
);
4958 io_mem_free(ctx
->rings
);
4959 io_mem_free(ctx
->sq_sqes
);
4961 percpu_ref_exit(&ctx
->refs
);
4962 if (ctx
->account_mem
)
4963 io_unaccount_mem(ctx
->user
,
4964 ring_pages(ctx
->sq_entries
, ctx
->cq_entries
));
4965 free_uid(ctx
->user
);
4966 put_cred(ctx
->creds
);
4967 kfree(ctx
->completions
);
4968 kfree(ctx
->cancel_hash
);
4969 kmem_cache_free(req_cachep
, ctx
->fallback_req
);
4973 static __poll_t
io_uring_poll(struct file
*file
, poll_table
*wait
)
4975 struct io_ring_ctx
*ctx
= file
->private_data
;
4978 poll_wait(file
, &ctx
->cq_wait
, wait
);
4980 * synchronizes with barrier from wq_has_sleeper call in
4984 if (READ_ONCE(ctx
->rings
->sq
.tail
) - ctx
->cached_sq_head
!=
4985 ctx
->rings
->sq_ring_entries
)
4986 mask
|= EPOLLOUT
| EPOLLWRNORM
;
4987 if (io_cqring_events(ctx
, false))
4988 mask
|= EPOLLIN
| EPOLLRDNORM
;
4993 static int io_uring_fasync(int fd
, struct file
*file
, int on
)
4995 struct io_ring_ctx
*ctx
= file
->private_data
;
4997 return fasync_helper(fd
, file
, on
, &ctx
->cq_fasync
);
5000 static void io_ring_ctx_wait_and_kill(struct io_ring_ctx
*ctx
)
5002 mutex_lock(&ctx
->uring_lock
);
5003 percpu_ref_kill(&ctx
->refs
);
5004 mutex_unlock(&ctx
->uring_lock
);
5007 * Wait for sq thread to idle, if we have one. It won't spin on new
5008 * work after we've killed the ctx ref above. This is important to do
5009 * before we cancel existing commands, as the thread could otherwise
5010 * be queueing new work post that. If that's work we need to cancel,
5011 * it could cause shutdown to hang.
5013 while (ctx
->sqo_thread
&& !wq_has_sleeper(&ctx
->sqo_wait
))
5016 io_kill_timeouts(ctx
);
5017 io_poll_remove_all(ctx
);
5020 io_wq_cancel_all(ctx
->io_wq
);
5022 io_iopoll_reap_events(ctx
);
5023 /* if we failed setting up the ctx, we might not have any rings */
5025 io_cqring_overflow_flush(ctx
, true);
5026 wait_for_completion(&ctx
->completions
[0]);
5027 io_ring_ctx_free(ctx
);
5030 static int io_uring_release(struct inode
*inode
, struct file
*file
)
5032 struct io_ring_ctx
*ctx
= file
->private_data
;
5034 file
->private_data
= NULL
;
5035 io_ring_ctx_wait_and_kill(ctx
);
5039 static void io_uring_cancel_files(struct io_ring_ctx
*ctx
,
5040 struct files_struct
*files
)
5042 struct io_kiocb
*req
;
5045 while (!list_empty_careful(&ctx
->inflight_list
)) {
5046 struct io_kiocb
*cancel_req
= NULL
;
5048 spin_lock_irq(&ctx
->inflight_lock
);
5049 list_for_each_entry(req
, &ctx
->inflight_list
, inflight_entry
) {
5050 if (req
->work
.files
!= files
)
5052 /* req is being completed, ignore */
5053 if (!refcount_inc_not_zero(&req
->refs
))
5059 prepare_to_wait(&ctx
->inflight_wait
, &wait
,
5060 TASK_UNINTERRUPTIBLE
);
5061 spin_unlock_irq(&ctx
->inflight_lock
);
5063 /* We need to keep going until we don't find a matching req */
5067 io_wq_cancel_work(ctx
->io_wq
, &cancel_req
->work
);
5068 io_put_req(cancel_req
);
5071 finish_wait(&ctx
->inflight_wait
, &wait
);
5074 static int io_uring_flush(struct file
*file
, void *data
)
5076 struct io_ring_ctx
*ctx
= file
->private_data
;
5078 io_uring_cancel_files(ctx
, data
);
5082 static void *io_uring_validate_mmap_request(struct file
*file
,
5083 loff_t pgoff
, size_t sz
)
5085 struct io_ring_ctx
*ctx
= file
->private_data
;
5086 loff_t offset
= pgoff
<< PAGE_SHIFT
;
5091 case IORING_OFF_SQ_RING
:
5092 case IORING_OFF_CQ_RING
:
5095 case IORING_OFF_SQES
:
5099 return ERR_PTR(-EINVAL
);
5102 page
= virt_to_head_page(ptr
);
5103 if (sz
> page_size(page
))
5104 return ERR_PTR(-EINVAL
);
5111 static int io_uring_mmap(struct file
*file
, struct vm_area_struct
*vma
)
5113 size_t sz
= vma
->vm_end
- vma
->vm_start
;
5117 ptr
= io_uring_validate_mmap_request(file
, vma
->vm_pgoff
, sz
);
5119 return PTR_ERR(ptr
);
5121 pfn
= virt_to_phys(ptr
) >> PAGE_SHIFT
;
5122 return remap_pfn_range(vma
, vma
->vm_start
, pfn
, sz
, vma
->vm_page_prot
);
5125 #else /* !CONFIG_MMU */
5127 static int io_uring_mmap(struct file
*file
, struct vm_area_struct
*vma
)
5129 return vma
->vm_flags
& (VM_SHARED
| VM_MAYSHARE
) ? 0 : -EINVAL
;
5132 static unsigned int io_uring_nommu_mmap_capabilities(struct file
*file
)
5134 return NOMMU_MAP_DIRECT
| NOMMU_MAP_READ
| NOMMU_MAP_WRITE
;
5137 static unsigned long io_uring_nommu_get_unmapped_area(struct file
*file
,
5138 unsigned long addr
, unsigned long len
,
5139 unsigned long pgoff
, unsigned long flags
)
5143 ptr
= io_uring_validate_mmap_request(file
, pgoff
, len
);
5145 return PTR_ERR(ptr
);
5147 return (unsigned long) ptr
;
5150 #endif /* !CONFIG_MMU */
5152 SYSCALL_DEFINE6(io_uring_enter
, unsigned int, fd
, u32
, to_submit
,
5153 u32
, min_complete
, u32
, flags
, const sigset_t __user
*, sig
,
5156 struct io_ring_ctx
*ctx
;
5161 if (flags
& ~(IORING_ENTER_GETEVENTS
| IORING_ENTER_SQ_WAKEUP
))
5169 if (f
.file
->f_op
!= &io_uring_fops
)
5173 ctx
= f
.file
->private_data
;
5174 if (!percpu_ref_tryget(&ctx
->refs
))
5178 * For SQ polling, the thread will do all submissions and completions.
5179 * Just return the requested submit count, and wake the thread if
5183 if (ctx
->flags
& IORING_SETUP_SQPOLL
) {
5184 if (!list_empty_careful(&ctx
->cq_overflow_list
))
5185 io_cqring_overflow_flush(ctx
, false);
5186 if (flags
& IORING_ENTER_SQ_WAKEUP
)
5187 wake_up(&ctx
->sqo_wait
);
5188 submitted
= to_submit
;
5189 } else if (to_submit
) {
5190 struct mm_struct
*cur_mm
;
5192 to_submit
= min(to_submit
, ctx
->sq_entries
);
5193 mutex_lock(&ctx
->uring_lock
);
5194 /* already have mm, so io_submit_sqes() won't try to grab it */
5195 cur_mm
= ctx
->sqo_mm
;
5196 submitted
= io_submit_sqes(ctx
, to_submit
, f
.file
, fd
,
5198 mutex_unlock(&ctx
->uring_lock
);
5200 if (submitted
!= to_submit
)
5203 if (flags
& IORING_ENTER_GETEVENTS
) {
5204 unsigned nr_events
= 0;
5206 min_complete
= min(min_complete
, ctx
->cq_entries
);
5208 if (ctx
->flags
& IORING_SETUP_IOPOLL
) {
5209 ret
= io_iopoll_check(ctx
, &nr_events
, min_complete
);
5211 ret
= io_cqring_wait(ctx
, min_complete
, sig
, sigsz
);
5216 percpu_ref_put(&ctx
->refs
);
5219 return submitted
? submitted
: ret
;
5222 static const struct file_operations io_uring_fops
= {
5223 .release
= io_uring_release
,
5224 .flush
= io_uring_flush
,
5225 .mmap
= io_uring_mmap
,
5227 .get_unmapped_area
= io_uring_nommu_get_unmapped_area
,
5228 .mmap_capabilities
= io_uring_nommu_mmap_capabilities
,
5230 .poll
= io_uring_poll
,
5231 .fasync
= io_uring_fasync
,
5234 static int io_allocate_scq_urings(struct io_ring_ctx
*ctx
,
5235 struct io_uring_params
*p
)
5237 struct io_rings
*rings
;
5238 size_t size
, sq_array_offset
;
5240 size
= rings_size(p
->sq_entries
, p
->cq_entries
, &sq_array_offset
);
5241 if (size
== SIZE_MAX
)
5244 rings
= io_mem_alloc(size
);
5249 ctx
->sq_array
= (u32
*)((char *)rings
+ sq_array_offset
);
5250 rings
->sq_ring_mask
= p
->sq_entries
- 1;
5251 rings
->cq_ring_mask
= p
->cq_entries
- 1;
5252 rings
->sq_ring_entries
= p
->sq_entries
;
5253 rings
->cq_ring_entries
= p
->cq_entries
;
5254 ctx
->sq_mask
= rings
->sq_ring_mask
;
5255 ctx
->cq_mask
= rings
->cq_ring_mask
;
5256 ctx
->sq_entries
= rings
->sq_ring_entries
;
5257 ctx
->cq_entries
= rings
->cq_ring_entries
;
5259 size
= array_size(sizeof(struct io_uring_sqe
), p
->sq_entries
);
5260 if (size
== SIZE_MAX
) {
5261 io_mem_free(ctx
->rings
);
5266 ctx
->sq_sqes
= io_mem_alloc(size
);
5267 if (!ctx
->sq_sqes
) {
5268 io_mem_free(ctx
->rings
);
5277 * Allocate an anonymous fd, this is what constitutes the application
5278 * visible backing of an io_uring instance. The application mmaps this
5279 * fd to gain access to the SQ/CQ ring details. If UNIX sockets are enabled,
5280 * we have to tie this fd to a socket for file garbage collection purposes.
5282 static int io_uring_get_fd(struct io_ring_ctx
*ctx
)
5287 #if defined(CONFIG_UNIX)
5288 ret
= sock_create_kern(&init_net
, PF_UNIX
, SOCK_RAW
, IPPROTO_IP
,
5294 ret
= get_unused_fd_flags(O_RDWR
| O_CLOEXEC
);
5298 file
= anon_inode_getfile("[io_uring]", &io_uring_fops
, ctx
,
5299 O_RDWR
| O_CLOEXEC
);
5302 ret
= PTR_ERR(file
);
5306 #if defined(CONFIG_UNIX)
5307 ctx
->ring_sock
->file
= file
;
5308 ctx
->ring_sock
->sk
->sk_user_data
= ctx
;
5310 fd_install(ret
, file
);
5313 #if defined(CONFIG_UNIX)
5314 sock_release(ctx
->ring_sock
);
5315 ctx
->ring_sock
= NULL
;
5320 static int io_uring_create(unsigned entries
, struct io_uring_params
*p
)
5322 struct user_struct
*user
= NULL
;
5323 struct io_ring_ctx
*ctx
;
5327 if (!entries
|| entries
> IORING_MAX_ENTRIES
)
5331 * Use twice as many entries for the CQ ring. It's possible for the
5332 * application to drive a higher depth than the size of the SQ ring,
5333 * since the sqes are only used at submission time. This allows for
5334 * some flexibility in overcommitting a bit. If the application has
5335 * set IORING_SETUP_CQSIZE, it will have passed in the desired number
5336 * of CQ ring entries manually.
5338 p
->sq_entries
= roundup_pow_of_two(entries
);
5339 if (p
->flags
& IORING_SETUP_CQSIZE
) {
5341 * If IORING_SETUP_CQSIZE is set, we do the same roundup
5342 * to a power-of-two, if it isn't already. We do NOT impose
5343 * any cq vs sq ring sizing.
5345 if (p
->cq_entries
< p
->sq_entries
|| p
->cq_entries
> IORING_MAX_CQ_ENTRIES
)
5347 p
->cq_entries
= roundup_pow_of_two(p
->cq_entries
);
5349 p
->cq_entries
= 2 * p
->sq_entries
;
5352 user
= get_uid(current_user());
5353 account_mem
= !capable(CAP_IPC_LOCK
);
5356 ret
= io_account_mem(user
,
5357 ring_pages(p
->sq_entries
, p
->cq_entries
));
5364 ctx
= io_ring_ctx_alloc(p
);
5367 io_unaccount_mem(user
, ring_pages(p
->sq_entries
,
5372 ctx
->compat
= in_compat_syscall();
5373 ctx
->account_mem
= account_mem
;
5375 ctx
->creds
= get_current_cred();
5377 ret
= io_allocate_scq_urings(ctx
, p
);
5381 ret
= io_sq_offload_start(ctx
, p
);
5385 memset(&p
->sq_off
, 0, sizeof(p
->sq_off
));
5386 p
->sq_off
.head
= offsetof(struct io_rings
, sq
.head
);
5387 p
->sq_off
.tail
= offsetof(struct io_rings
, sq
.tail
);
5388 p
->sq_off
.ring_mask
= offsetof(struct io_rings
, sq_ring_mask
);
5389 p
->sq_off
.ring_entries
= offsetof(struct io_rings
, sq_ring_entries
);
5390 p
->sq_off
.flags
= offsetof(struct io_rings
, sq_flags
);
5391 p
->sq_off
.dropped
= offsetof(struct io_rings
, sq_dropped
);
5392 p
->sq_off
.array
= (char *)ctx
->sq_array
- (char *)ctx
->rings
;
5394 memset(&p
->cq_off
, 0, sizeof(p
->cq_off
));
5395 p
->cq_off
.head
= offsetof(struct io_rings
, cq
.head
);
5396 p
->cq_off
.tail
= offsetof(struct io_rings
, cq
.tail
);
5397 p
->cq_off
.ring_mask
= offsetof(struct io_rings
, cq_ring_mask
);
5398 p
->cq_off
.ring_entries
= offsetof(struct io_rings
, cq_ring_entries
);
5399 p
->cq_off
.overflow
= offsetof(struct io_rings
, cq_overflow
);
5400 p
->cq_off
.cqes
= offsetof(struct io_rings
, cqes
);
5403 * Install ring fd as the very last thing, so we don't risk someone
5404 * having closed it before we finish setup
5406 ret
= io_uring_get_fd(ctx
);
5410 p
->features
= IORING_FEAT_SINGLE_MMAP
| IORING_FEAT_NODROP
|
5411 IORING_FEAT_SUBMIT_STABLE
;
5412 trace_io_uring_create(ret
, ctx
, p
->sq_entries
, p
->cq_entries
, p
->flags
);
5415 io_ring_ctx_wait_and_kill(ctx
);
5420 * Sets up an aio uring context, and returns the fd. Applications asks for a
5421 * ring size, we return the actual sq/cq ring sizes (among other things) in the
5422 * params structure passed in.
5424 static long io_uring_setup(u32 entries
, struct io_uring_params __user
*params
)
5426 struct io_uring_params p
;
5430 if (copy_from_user(&p
, params
, sizeof(p
)))
5432 for (i
= 0; i
< ARRAY_SIZE(p
.resv
); i
++) {
5437 if (p
.flags
& ~(IORING_SETUP_IOPOLL
| IORING_SETUP_SQPOLL
|
5438 IORING_SETUP_SQ_AFF
| IORING_SETUP_CQSIZE
))
5441 ret
= io_uring_create(entries
, &p
);
5445 if (copy_to_user(params
, &p
, sizeof(p
)))
5451 SYSCALL_DEFINE2(io_uring_setup
, u32
, entries
,
5452 struct io_uring_params __user
*, params
)
5454 return io_uring_setup(entries
, params
);
5457 static int __io_uring_register(struct io_ring_ctx
*ctx
, unsigned opcode
,
5458 void __user
*arg
, unsigned nr_args
)
5459 __releases(ctx
->uring_lock
)
5460 __acquires(ctx
->uring_lock
)
5465 * We're inside the ring mutex, if the ref is already dying, then
5466 * someone else killed the ctx or is already going through
5467 * io_uring_register().
5469 if (percpu_ref_is_dying(&ctx
->refs
))
5472 percpu_ref_kill(&ctx
->refs
);
5475 * Drop uring mutex before waiting for references to exit. If another
5476 * thread is currently inside io_uring_enter() it might need to grab
5477 * the uring_lock to make progress. If we hold it here across the drain
5478 * wait, then we can deadlock. It's safe to drop the mutex here, since
5479 * no new references will come in after we've killed the percpu ref.
5481 mutex_unlock(&ctx
->uring_lock
);
5482 wait_for_completion(&ctx
->completions
[0]);
5483 mutex_lock(&ctx
->uring_lock
);
5486 case IORING_REGISTER_BUFFERS
:
5487 ret
= io_sqe_buffer_register(ctx
, arg
, nr_args
);
5489 case IORING_UNREGISTER_BUFFERS
:
5493 ret
= io_sqe_buffer_unregister(ctx
);
5495 case IORING_REGISTER_FILES
:
5496 ret
= io_sqe_files_register(ctx
, arg
, nr_args
);
5498 case IORING_UNREGISTER_FILES
:
5502 ret
= io_sqe_files_unregister(ctx
);
5504 case IORING_REGISTER_FILES_UPDATE
:
5505 ret
= io_sqe_files_update(ctx
, arg
, nr_args
);
5507 case IORING_REGISTER_EVENTFD
:
5508 case IORING_REGISTER_EVENTFD_ASYNC
:
5512 ret
= io_eventfd_register(ctx
, arg
);
5515 if (opcode
== IORING_REGISTER_EVENTFD_ASYNC
)
5516 ctx
->eventfd_async
= 1;
5518 ctx
->eventfd_async
= 0;
5520 case IORING_UNREGISTER_EVENTFD
:
5524 ret
= io_eventfd_unregister(ctx
);
5531 /* bring the ctx back to life */
5532 reinit_completion(&ctx
->completions
[0]);
5533 percpu_ref_reinit(&ctx
->refs
);
5537 SYSCALL_DEFINE4(io_uring_register
, unsigned int, fd
, unsigned int, opcode
,
5538 void __user
*, arg
, unsigned int, nr_args
)
5540 struct io_ring_ctx
*ctx
;
5549 if (f
.file
->f_op
!= &io_uring_fops
)
5552 ctx
= f
.file
->private_data
;
5554 mutex_lock(&ctx
->uring_lock
);
5555 ret
= __io_uring_register(ctx
, opcode
, arg
, nr_args
);
5556 mutex_unlock(&ctx
->uring_lock
);
5557 trace_io_uring_register(ctx
, opcode
, ctx
->nr_user_files
, ctx
->nr_user_bufs
,
5558 ctx
->cq_ev_fd
!= NULL
, ret
);
5564 static int __init
io_uring_init(void)
5566 req_cachep
= KMEM_CACHE(io_kiocb
, SLAB_HWCACHE_ALIGN
| SLAB_PANIC
);
5569 __initcall(io_uring_init
);