media: stv06xx: add missing descriptor sanity checks
[linux/fpc-iii.git] / fs / io_uring.c
blobfaa0198c99ffd43c011b995a60b8b5e19e363d3b
1 // SPDX-License-Identifier: GPL-2.0
2 /*
3 * Shared application/kernel submission and completion ring pairs, for
4 * supporting fast/efficient IO.
6 * A note on the read/write ordering memory barriers that are matched between
7 * the application and kernel side.
9 * After the application reads the CQ ring tail, it must use an
10 * appropriate smp_rmb() to pair with the smp_wmb() the kernel uses
11 * before writing the tail (using smp_load_acquire to read the tail will
12 * do). It also needs a smp_mb() before updating CQ head (ordering the
13 * entry load(s) with the head store), pairing with an implicit barrier
14 * through a control-dependency in io_get_cqring (smp_store_release to
15 * store head will do). Failure to do so could lead to reading invalid
16 * CQ entries.
18 * Likewise, the application must use an appropriate smp_wmb() before
19 * writing the SQ tail (ordering SQ entry stores with the tail store),
20 * which pairs with smp_load_acquire in io_get_sqring (smp_store_release
21 * to store the tail will do). And it needs a barrier ordering the SQ
22 * head load before writing new SQ entries (smp_load_acquire to read
23 * head will do).
25 * When using the SQ poll thread (IORING_SETUP_SQPOLL), the application
26 * needs to check the SQ flags for IORING_SQ_NEED_WAKEUP *after*
27 * updating the SQ tail; a full memory barrier smp_mb() is needed
28 * between.
30 * Also see the examples in the liburing library:
32 * git://git.kernel.dk/liburing
34 * io_uring also uses READ/WRITE_ONCE() for _any_ store or load that happens
35 * from data shared between the kernel and application. This is done both
36 * for ordering purposes, but also to ensure that once a value is loaded from
37 * data that the application could potentially modify, it remains stable.
39 * Copyright (C) 2018-2019 Jens Axboe
40 * Copyright (c) 2018-2019 Christoph Hellwig
42 #include <linux/kernel.h>
43 #include <linux/init.h>
44 #include <linux/errno.h>
45 #include <linux/syscalls.h>
46 #include <linux/compat.h>
47 #include <linux/refcount.h>
48 #include <linux/uio.h>
50 #include <linux/sched/signal.h>
51 #include <linux/fs.h>
52 #include <linux/file.h>
53 #include <linux/fdtable.h>
54 #include <linux/mm.h>
55 #include <linux/mman.h>
56 #include <linux/mmu_context.h>
57 #include <linux/percpu.h>
58 #include <linux/slab.h>
59 #include <linux/kthread.h>
60 #include <linux/blkdev.h>
61 #include <linux/bvec.h>
62 #include <linux/net.h>
63 #include <net/sock.h>
64 #include <net/af_unix.h>
65 #include <net/scm.h>
66 #include <linux/anon_inodes.h>
67 #include <linux/sched/mm.h>
68 #include <linux/uaccess.h>
69 #include <linux/nospec.h>
70 #include <linux/sizes.h>
71 #include <linux/hugetlb.h>
72 #include <linux/highmem.h>
74 #define CREATE_TRACE_POINTS
75 #include <trace/events/io_uring.h>
77 #include <uapi/linux/io_uring.h>
79 #include "internal.h"
80 #include "io-wq.h"
82 #define IORING_MAX_ENTRIES 32768
83 #define IORING_MAX_CQ_ENTRIES (2 * IORING_MAX_ENTRIES)
86 * Shift of 9 is 512 entries, or exactly one page on 64-bit archs
88 #define IORING_FILE_TABLE_SHIFT 9
89 #define IORING_MAX_FILES_TABLE (1U << IORING_FILE_TABLE_SHIFT)
90 #define IORING_FILE_TABLE_MASK (IORING_MAX_FILES_TABLE - 1)
91 #define IORING_MAX_FIXED_FILES (64 * IORING_MAX_FILES_TABLE)
93 struct io_uring {
94 u32 head ____cacheline_aligned_in_smp;
95 u32 tail ____cacheline_aligned_in_smp;
99 * This data is shared with the application through the mmap at offsets
100 * IORING_OFF_SQ_RING and IORING_OFF_CQ_RING.
102 * The offsets to the member fields are published through struct
103 * io_sqring_offsets when calling io_uring_setup.
105 struct io_rings {
107 * Head and tail offsets into the ring; the offsets need to be
108 * masked to get valid indices.
110 * The kernel controls head of the sq ring and the tail of the cq ring,
111 * and the application controls tail of the sq ring and the head of the
112 * cq ring.
114 struct io_uring sq, cq;
116 * Bitmasks to apply to head and tail offsets (constant, equals
117 * ring_entries - 1)
119 u32 sq_ring_mask, cq_ring_mask;
120 /* Ring sizes (constant, power of 2) */
121 u32 sq_ring_entries, cq_ring_entries;
123 * Number of invalid entries dropped by the kernel due to
124 * invalid index stored in array
126 * Written by the kernel, shouldn't be modified by the
127 * application (i.e. get number of "new events" by comparing to
128 * cached value).
130 * After a new SQ head value was read by the application this
131 * counter includes all submissions that were dropped reaching
132 * the new SQ head (and possibly more).
134 u32 sq_dropped;
136 * Runtime flags
138 * Written by the kernel, shouldn't be modified by the
139 * application.
141 * The application needs a full memory barrier before checking
142 * for IORING_SQ_NEED_WAKEUP after updating the sq tail.
144 u32 sq_flags;
146 * Number of completion events lost because the queue was full;
147 * this should be avoided by the application by making sure
148 * there are not more requests pending than there is space in
149 * the completion queue.
151 * Written by the kernel, shouldn't be modified by the
152 * application (i.e. get number of "new events" by comparing to
153 * cached value).
155 * As completion events come in out of order this counter is not
156 * ordered with any other data.
158 u32 cq_overflow;
160 * Ring buffer of completion events.
162 * The kernel writes completion events fresh every time they are
163 * produced, so the application is allowed to modify pending
164 * entries.
166 struct io_uring_cqe cqes[] ____cacheline_aligned_in_smp;
169 struct io_mapped_ubuf {
170 u64 ubuf;
171 size_t len;
172 struct bio_vec *bvec;
173 unsigned int nr_bvecs;
176 struct fixed_file_table {
177 struct file **files;
180 struct io_ring_ctx {
181 struct {
182 struct percpu_ref refs;
183 } ____cacheline_aligned_in_smp;
185 struct {
186 unsigned int flags;
187 bool compat;
188 bool account_mem;
189 bool cq_overflow_flushed;
190 bool drain_next;
191 bool eventfd_async;
194 * Ring buffer of indices into array of io_uring_sqe, which is
195 * mmapped by the application using the IORING_OFF_SQES offset.
197 * This indirection could e.g. be used to assign fixed
198 * io_uring_sqe entries to operations and only submit them to
199 * the queue when needed.
201 * The kernel modifies neither the indices array nor the entries
202 * array.
204 u32 *sq_array;
205 unsigned cached_sq_head;
206 unsigned sq_entries;
207 unsigned sq_mask;
208 unsigned sq_thread_idle;
209 unsigned cached_sq_dropped;
210 atomic_t cached_cq_overflow;
211 struct io_uring_sqe *sq_sqes;
213 struct list_head defer_list;
214 struct list_head timeout_list;
215 struct list_head cq_overflow_list;
217 wait_queue_head_t inflight_wait;
218 } ____cacheline_aligned_in_smp;
220 struct io_rings *rings;
222 /* IO offload */
223 struct io_wq *io_wq;
224 struct task_struct *sqo_thread; /* if using sq thread polling */
225 struct mm_struct *sqo_mm;
226 wait_queue_head_t sqo_wait;
229 * If used, fixed file set. Writers must ensure that ->refs is dead,
230 * readers must ensure that ->refs is alive as long as the file* is
231 * used. Only updated through io_uring_register(2).
233 struct fixed_file_table *file_table;
234 unsigned nr_user_files;
236 /* if used, fixed mapped user buffers */
237 unsigned nr_user_bufs;
238 struct io_mapped_ubuf *user_bufs;
240 struct user_struct *user;
242 const struct cred *creds;
244 /* 0 is for ctx quiesce/reinit/free, 1 is for sqo_thread started */
245 struct completion *completions;
247 /* if all else fails... */
248 struct io_kiocb *fallback_req;
250 #if defined(CONFIG_UNIX)
251 struct socket *ring_sock;
252 #endif
254 struct {
255 unsigned cached_cq_tail;
256 unsigned cq_entries;
257 unsigned cq_mask;
258 atomic_t cq_timeouts;
259 struct wait_queue_head cq_wait;
260 struct fasync_struct *cq_fasync;
261 struct eventfd_ctx *cq_ev_fd;
262 } ____cacheline_aligned_in_smp;
264 struct {
265 struct mutex uring_lock;
266 wait_queue_head_t wait;
267 } ____cacheline_aligned_in_smp;
269 struct {
270 spinlock_t completion_lock;
271 bool poll_multi_file;
273 * ->poll_list is protected by the ctx->uring_lock for
274 * io_uring instances that don't use IORING_SETUP_SQPOLL.
275 * For SQPOLL, only the single threaded io_sq_thread() will
276 * manipulate the list, hence no extra locking is needed there.
278 struct list_head poll_list;
279 struct hlist_head *cancel_hash;
280 unsigned cancel_hash_bits;
282 spinlock_t inflight_lock;
283 struct list_head inflight_list;
284 } ____cacheline_aligned_in_smp;
288 * First field must be the file pointer in all the
289 * iocb unions! See also 'struct kiocb' in <linux/fs.h>
291 struct io_poll_iocb {
292 struct file *file;
293 union {
294 struct wait_queue_head *head;
295 u64 addr;
297 __poll_t events;
298 bool done;
299 bool canceled;
300 struct wait_queue_entry wait;
303 struct io_timeout_data {
304 struct io_kiocb *req;
305 struct hrtimer timer;
306 struct timespec64 ts;
307 enum hrtimer_mode mode;
308 u32 seq_offset;
311 struct io_accept {
312 struct file *file;
313 struct sockaddr __user *addr;
314 int __user *addr_len;
315 int flags;
318 struct io_sync {
319 struct file *file;
320 loff_t len;
321 loff_t off;
322 int flags;
325 struct io_cancel {
326 struct file *file;
327 u64 addr;
330 struct io_timeout {
331 struct file *file;
332 u64 addr;
333 int flags;
334 unsigned count;
337 struct io_rw {
338 /* NOTE: kiocb has the file as the first member, so don't do it here */
339 struct kiocb kiocb;
340 u64 addr;
341 u64 len;
344 struct io_connect {
345 struct file *file;
346 struct sockaddr __user *addr;
347 int addr_len;
350 struct io_sr_msg {
351 struct file *file;
352 struct user_msghdr __user *msg;
353 int msg_flags;
356 struct io_async_connect {
357 struct sockaddr_storage address;
360 struct io_async_msghdr {
361 struct iovec fast_iov[UIO_FASTIOV];
362 struct iovec *iov;
363 struct sockaddr __user *uaddr;
364 struct msghdr msg;
367 struct io_async_rw {
368 struct iovec fast_iov[UIO_FASTIOV];
369 struct iovec *iov;
370 ssize_t nr_segs;
371 ssize_t size;
374 struct io_async_ctx {
375 union {
376 struct io_async_rw rw;
377 struct io_async_msghdr msg;
378 struct io_async_connect connect;
379 struct io_timeout_data timeout;
384 * NOTE! Each of the iocb union members has the file pointer
385 * as the first entry in their struct definition. So you can
386 * access the file pointer through any of the sub-structs,
387 * or directly as just 'ki_filp' in this struct.
389 struct io_kiocb {
390 union {
391 struct file *file;
392 struct io_rw rw;
393 struct io_poll_iocb poll;
394 struct io_accept accept;
395 struct io_sync sync;
396 struct io_cancel cancel;
397 struct io_timeout timeout;
398 struct io_connect connect;
399 struct io_sr_msg sr_msg;
402 struct io_async_ctx *io;
403 struct file *ring_file;
404 int ring_fd;
405 bool has_user;
406 bool in_async;
407 bool needs_fixed_file;
408 u8 opcode;
410 struct io_ring_ctx *ctx;
411 union {
412 struct list_head list;
413 struct hlist_node hash_node;
415 struct list_head link_list;
416 unsigned int flags;
417 refcount_t refs;
418 #define REQ_F_NOWAIT 1 /* must not punt to workers */
419 #define REQ_F_IOPOLL_COMPLETED 2 /* polled IO has completed */
420 #define REQ_F_FIXED_FILE 4 /* ctx owns file */
421 #define REQ_F_LINK_NEXT 8 /* already grabbed next link */
422 #define REQ_F_IO_DRAIN 16 /* drain existing IO first */
423 #define REQ_F_IO_DRAINED 32 /* drain done */
424 #define REQ_F_LINK 64 /* linked sqes */
425 #define REQ_F_LINK_TIMEOUT 128 /* has linked timeout */
426 #define REQ_F_FAIL_LINK 256 /* fail rest of links */
427 #define REQ_F_DRAIN_LINK 512 /* link should be fully drained */
428 #define REQ_F_TIMEOUT 1024 /* timeout request */
429 #define REQ_F_ISREG 2048 /* regular file */
430 #define REQ_F_MUST_PUNT 4096 /* must be punted even for NONBLOCK */
431 #define REQ_F_TIMEOUT_NOSEQ 8192 /* no timeout sequence */
432 #define REQ_F_INFLIGHT 16384 /* on inflight list */
433 #define REQ_F_COMP_LOCKED 32768 /* completion under lock */
434 #define REQ_F_HARDLINK 65536 /* doesn't sever on completion < 0 */
435 u64 user_data;
436 u32 result;
437 u32 sequence;
439 struct list_head inflight_entry;
441 struct io_wq_work work;
444 #define IO_PLUG_THRESHOLD 2
445 #define IO_IOPOLL_BATCH 8
447 struct io_submit_state {
448 struct blk_plug plug;
451 * io_kiocb alloc cache
453 void *reqs[IO_IOPOLL_BATCH];
454 unsigned int free_reqs;
455 unsigned int cur_req;
458 * File reference cache
460 struct file *file;
461 unsigned int fd;
462 unsigned int has_refs;
463 unsigned int used_refs;
464 unsigned int ios_left;
467 static void io_wq_submit_work(struct io_wq_work **workptr);
468 static void io_cqring_fill_event(struct io_kiocb *req, long res);
469 static void __io_free_req(struct io_kiocb *req);
470 static void io_put_req(struct io_kiocb *req);
471 static void io_double_put_req(struct io_kiocb *req);
472 static void __io_double_put_req(struct io_kiocb *req);
473 static struct io_kiocb *io_prep_linked_timeout(struct io_kiocb *req);
474 static void io_queue_linked_timeout(struct io_kiocb *req);
476 static struct kmem_cache *req_cachep;
478 static const struct file_operations io_uring_fops;
480 struct sock *io_uring_get_socket(struct file *file)
482 #if defined(CONFIG_UNIX)
483 if (file->f_op == &io_uring_fops) {
484 struct io_ring_ctx *ctx = file->private_data;
486 return ctx->ring_sock->sk;
488 #endif
489 return NULL;
491 EXPORT_SYMBOL(io_uring_get_socket);
493 static void io_ring_ctx_ref_free(struct percpu_ref *ref)
495 struct io_ring_ctx *ctx = container_of(ref, struct io_ring_ctx, refs);
497 complete(&ctx->completions[0]);
500 static struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p)
502 struct io_ring_ctx *ctx;
503 int hash_bits;
505 ctx = kzalloc(sizeof(*ctx), GFP_KERNEL);
506 if (!ctx)
507 return NULL;
509 ctx->fallback_req = kmem_cache_alloc(req_cachep, GFP_KERNEL);
510 if (!ctx->fallback_req)
511 goto err;
513 ctx->completions = kmalloc(2 * sizeof(struct completion), GFP_KERNEL);
514 if (!ctx->completions)
515 goto err;
518 * Use 5 bits less than the max cq entries, that should give us around
519 * 32 entries per hash list if totally full and uniformly spread.
521 hash_bits = ilog2(p->cq_entries);
522 hash_bits -= 5;
523 if (hash_bits <= 0)
524 hash_bits = 1;
525 ctx->cancel_hash_bits = hash_bits;
526 ctx->cancel_hash = kmalloc((1U << hash_bits) * sizeof(struct hlist_head),
527 GFP_KERNEL);
528 if (!ctx->cancel_hash)
529 goto err;
530 __hash_init(ctx->cancel_hash, 1U << hash_bits);
532 if (percpu_ref_init(&ctx->refs, io_ring_ctx_ref_free,
533 PERCPU_REF_ALLOW_REINIT, GFP_KERNEL))
534 goto err;
536 ctx->flags = p->flags;
537 init_waitqueue_head(&ctx->cq_wait);
538 INIT_LIST_HEAD(&ctx->cq_overflow_list);
539 init_completion(&ctx->completions[0]);
540 init_completion(&ctx->completions[1]);
541 mutex_init(&ctx->uring_lock);
542 init_waitqueue_head(&ctx->wait);
543 spin_lock_init(&ctx->completion_lock);
544 INIT_LIST_HEAD(&ctx->poll_list);
545 INIT_LIST_HEAD(&ctx->defer_list);
546 INIT_LIST_HEAD(&ctx->timeout_list);
547 init_waitqueue_head(&ctx->inflight_wait);
548 spin_lock_init(&ctx->inflight_lock);
549 INIT_LIST_HEAD(&ctx->inflight_list);
550 return ctx;
551 err:
552 if (ctx->fallback_req)
553 kmem_cache_free(req_cachep, ctx->fallback_req);
554 kfree(ctx->completions);
555 kfree(ctx->cancel_hash);
556 kfree(ctx);
557 return NULL;
560 static inline bool __req_need_defer(struct io_kiocb *req)
562 struct io_ring_ctx *ctx = req->ctx;
564 return req->sequence != ctx->cached_cq_tail + ctx->cached_sq_dropped
565 + atomic_read(&ctx->cached_cq_overflow);
568 static inline bool req_need_defer(struct io_kiocb *req)
570 if ((req->flags & (REQ_F_IO_DRAIN|REQ_F_IO_DRAINED)) == REQ_F_IO_DRAIN)
571 return __req_need_defer(req);
573 return false;
576 static struct io_kiocb *io_get_deferred_req(struct io_ring_ctx *ctx)
578 struct io_kiocb *req;
580 req = list_first_entry_or_null(&ctx->defer_list, struct io_kiocb, list);
581 if (req && !req_need_defer(req)) {
582 list_del_init(&req->list);
583 return req;
586 return NULL;
589 static struct io_kiocb *io_get_timeout_req(struct io_ring_ctx *ctx)
591 struct io_kiocb *req;
593 req = list_first_entry_or_null(&ctx->timeout_list, struct io_kiocb, list);
594 if (req) {
595 if (req->flags & REQ_F_TIMEOUT_NOSEQ)
596 return NULL;
597 if (!__req_need_defer(req)) {
598 list_del_init(&req->list);
599 return req;
603 return NULL;
606 static void __io_commit_cqring(struct io_ring_ctx *ctx)
608 struct io_rings *rings = ctx->rings;
610 if (ctx->cached_cq_tail != READ_ONCE(rings->cq.tail)) {
611 /* order cqe stores with ring update */
612 smp_store_release(&rings->cq.tail, ctx->cached_cq_tail);
614 if (wq_has_sleeper(&ctx->cq_wait)) {
615 wake_up_interruptible(&ctx->cq_wait);
616 kill_fasync(&ctx->cq_fasync, SIGIO, POLL_IN);
621 static inline bool io_req_needs_user(struct io_kiocb *req)
623 return !(req->opcode == IORING_OP_READ_FIXED ||
624 req->opcode == IORING_OP_WRITE_FIXED);
627 static inline bool io_prep_async_work(struct io_kiocb *req,
628 struct io_kiocb **link)
630 bool do_hashed = false;
632 switch (req->opcode) {
633 case IORING_OP_WRITEV:
634 case IORING_OP_WRITE_FIXED:
635 /* only regular files should be hashed for writes */
636 if (req->flags & REQ_F_ISREG)
637 do_hashed = true;
638 /* fall-through */
639 case IORING_OP_READV:
640 case IORING_OP_READ_FIXED:
641 case IORING_OP_SENDMSG:
642 case IORING_OP_RECVMSG:
643 case IORING_OP_ACCEPT:
644 case IORING_OP_POLL_ADD:
645 case IORING_OP_CONNECT:
647 * We know REQ_F_ISREG is not set on some of these
648 * opcodes, but this enables us to keep the check in
649 * just one place.
651 if (!(req->flags & REQ_F_ISREG))
652 req->work.flags |= IO_WQ_WORK_UNBOUND;
653 break;
655 if (io_req_needs_user(req))
656 req->work.flags |= IO_WQ_WORK_NEEDS_USER;
658 *link = io_prep_linked_timeout(req);
659 return do_hashed;
662 static inline void io_queue_async_work(struct io_kiocb *req)
664 struct io_ring_ctx *ctx = req->ctx;
665 struct io_kiocb *link;
666 bool do_hashed;
668 do_hashed = io_prep_async_work(req, &link);
670 trace_io_uring_queue_async_work(ctx, do_hashed, req, &req->work,
671 req->flags);
672 if (!do_hashed) {
673 io_wq_enqueue(ctx->io_wq, &req->work);
674 } else {
675 io_wq_enqueue_hashed(ctx->io_wq, &req->work,
676 file_inode(req->file));
679 if (link)
680 io_queue_linked_timeout(link);
683 static void io_kill_timeout(struct io_kiocb *req)
685 int ret;
687 ret = hrtimer_try_to_cancel(&req->io->timeout.timer);
688 if (ret != -1) {
689 atomic_inc(&req->ctx->cq_timeouts);
690 list_del_init(&req->list);
691 req->flags |= REQ_F_COMP_LOCKED;
692 io_cqring_fill_event(req, 0);
693 io_put_req(req);
697 static void io_kill_timeouts(struct io_ring_ctx *ctx)
699 struct io_kiocb *req, *tmp;
701 spin_lock_irq(&ctx->completion_lock);
702 list_for_each_entry_safe(req, tmp, &ctx->timeout_list, list)
703 io_kill_timeout(req);
704 spin_unlock_irq(&ctx->completion_lock);
707 static void io_commit_cqring(struct io_ring_ctx *ctx)
709 struct io_kiocb *req;
711 while ((req = io_get_timeout_req(ctx)) != NULL)
712 io_kill_timeout(req);
714 __io_commit_cqring(ctx);
716 while ((req = io_get_deferred_req(ctx)) != NULL) {
717 req->flags |= REQ_F_IO_DRAINED;
718 io_queue_async_work(req);
722 static struct io_uring_cqe *io_get_cqring(struct io_ring_ctx *ctx)
724 struct io_rings *rings = ctx->rings;
725 unsigned tail;
727 tail = ctx->cached_cq_tail;
729 * writes to the cq entry need to come after reading head; the
730 * control dependency is enough as we're using WRITE_ONCE to
731 * fill the cq entry
733 if (tail - READ_ONCE(rings->cq.head) == rings->cq_ring_entries)
734 return NULL;
736 ctx->cached_cq_tail++;
737 return &rings->cqes[tail & ctx->cq_mask];
740 static inline bool io_should_trigger_evfd(struct io_ring_ctx *ctx)
742 if (!ctx->cq_ev_fd)
743 return false;
744 if (!ctx->eventfd_async)
745 return true;
746 return io_wq_current_is_worker() || in_interrupt();
749 static void __io_cqring_ev_posted(struct io_ring_ctx *ctx, bool trigger_ev)
751 if (waitqueue_active(&ctx->wait))
752 wake_up(&ctx->wait);
753 if (waitqueue_active(&ctx->sqo_wait))
754 wake_up(&ctx->sqo_wait);
755 if (trigger_ev)
756 eventfd_signal(ctx->cq_ev_fd, 1);
759 static void io_cqring_ev_posted(struct io_ring_ctx *ctx)
761 __io_cqring_ev_posted(ctx, io_should_trigger_evfd(ctx));
764 /* Returns true if there are no backlogged entries after the flush */
765 static bool io_cqring_overflow_flush(struct io_ring_ctx *ctx, bool force)
767 struct io_rings *rings = ctx->rings;
768 struct io_uring_cqe *cqe;
769 struct io_kiocb *req;
770 unsigned long flags;
771 LIST_HEAD(list);
773 if (!force) {
774 if (list_empty_careful(&ctx->cq_overflow_list))
775 return true;
776 if ((ctx->cached_cq_tail - READ_ONCE(rings->cq.head) ==
777 rings->cq_ring_entries))
778 return false;
781 spin_lock_irqsave(&ctx->completion_lock, flags);
783 /* if force is set, the ring is going away. always drop after that */
784 if (force)
785 ctx->cq_overflow_flushed = true;
787 cqe = NULL;
788 while (!list_empty(&ctx->cq_overflow_list)) {
789 cqe = io_get_cqring(ctx);
790 if (!cqe && !force)
791 break;
793 req = list_first_entry(&ctx->cq_overflow_list, struct io_kiocb,
794 list);
795 list_move(&req->list, &list);
796 if (cqe) {
797 WRITE_ONCE(cqe->user_data, req->user_data);
798 WRITE_ONCE(cqe->res, req->result);
799 WRITE_ONCE(cqe->flags, 0);
800 } else {
801 WRITE_ONCE(ctx->rings->cq_overflow,
802 atomic_inc_return(&ctx->cached_cq_overflow));
806 io_commit_cqring(ctx);
807 spin_unlock_irqrestore(&ctx->completion_lock, flags);
808 io_cqring_ev_posted(ctx);
810 while (!list_empty(&list)) {
811 req = list_first_entry(&list, struct io_kiocb, list);
812 list_del(&req->list);
813 io_put_req(req);
816 return cqe != NULL;
819 static void io_cqring_fill_event(struct io_kiocb *req, long res)
821 struct io_ring_ctx *ctx = req->ctx;
822 struct io_uring_cqe *cqe;
824 trace_io_uring_complete(ctx, req->user_data, res);
827 * If we can't get a cq entry, userspace overflowed the
828 * submission (by quite a lot). Increment the overflow count in
829 * the ring.
831 cqe = io_get_cqring(ctx);
832 if (likely(cqe)) {
833 WRITE_ONCE(cqe->user_data, req->user_data);
834 WRITE_ONCE(cqe->res, res);
835 WRITE_ONCE(cqe->flags, 0);
836 } else if (ctx->cq_overflow_flushed) {
837 WRITE_ONCE(ctx->rings->cq_overflow,
838 atomic_inc_return(&ctx->cached_cq_overflow));
839 } else {
840 refcount_inc(&req->refs);
841 req->result = res;
842 list_add_tail(&req->list, &ctx->cq_overflow_list);
846 static void io_cqring_add_event(struct io_kiocb *req, long res)
848 struct io_ring_ctx *ctx = req->ctx;
849 unsigned long flags;
851 spin_lock_irqsave(&ctx->completion_lock, flags);
852 io_cqring_fill_event(req, res);
853 io_commit_cqring(ctx);
854 spin_unlock_irqrestore(&ctx->completion_lock, flags);
856 io_cqring_ev_posted(ctx);
859 static inline bool io_is_fallback_req(struct io_kiocb *req)
861 return req == (struct io_kiocb *)
862 ((unsigned long) req->ctx->fallback_req & ~1UL);
865 static struct io_kiocb *io_get_fallback_req(struct io_ring_ctx *ctx)
867 struct io_kiocb *req;
869 req = ctx->fallback_req;
870 if (!test_and_set_bit_lock(0, (unsigned long *) ctx->fallback_req))
871 return req;
873 return NULL;
876 static struct io_kiocb *io_get_req(struct io_ring_ctx *ctx,
877 struct io_submit_state *state)
879 gfp_t gfp = GFP_KERNEL | __GFP_NOWARN;
880 struct io_kiocb *req;
882 if (!percpu_ref_tryget(&ctx->refs))
883 return NULL;
885 if (!state) {
886 req = kmem_cache_alloc(req_cachep, gfp);
887 if (unlikely(!req))
888 goto fallback;
889 } else if (!state->free_reqs) {
890 size_t sz;
891 int ret;
893 sz = min_t(size_t, state->ios_left, ARRAY_SIZE(state->reqs));
894 ret = kmem_cache_alloc_bulk(req_cachep, gfp, sz, state->reqs);
897 * Bulk alloc is all-or-nothing. If we fail to get a batch,
898 * retry single alloc to be on the safe side.
900 if (unlikely(ret <= 0)) {
901 state->reqs[0] = kmem_cache_alloc(req_cachep, gfp);
902 if (!state->reqs[0])
903 goto fallback;
904 ret = 1;
906 state->free_reqs = ret - 1;
907 state->cur_req = 1;
908 req = state->reqs[0];
909 } else {
910 req = state->reqs[state->cur_req];
911 state->free_reqs--;
912 state->cur_req++;
915 got_it:
916 req->io = NULL;
917 req->ring_file = NULL;
918 req->file = NULL;
919 req->ctx = ctx;
920 req->flags = 0;
921 /* one is dropped after submission, the other at completion */
922 refcount_set(&req->refs, 2);
923 req->result = 0;
924 INIT_IO_WORK(&req->work, io_wq_submit_work);
925 return req;
926 fallback:
927 req = io_get_fallback_req(ctx);
928 if (req)
929 goto got_it;
930 percpu_ref_put(&ctx->refs);
931 return NULL;
934 static void io_free_req_many(struct io_ring_ctx *ctx, void **reqs, int *nr)
936 if (*nr) {
937 kmem_cache_free_bulk(req_cachep, *nr, reqs);
938 percpu_ref_put_many(&ctx->refs, *nr);
939 *nr = 0;
943 static void __io_free_req(struct io_kiocb *req)
945 struct io_ring_ctx *ctx = req->ctx;
947 if (req->io)
948 kfree(req->io);
949 if (req->file && !(req->flags & REQ_F_FIXED_FILE))
950 fput(req->file);
951 if (req->flags & REQ_F_INFLIGHT) {
952 unsigned long flags;
954 spin_lock_irqsave(&ctx->inflight_lock, flags);
955 list_del(&req->inflight_entry);
956 if (waitqueue_active(&ctx->inflight_wait))
957 wake_up(&ctx->inflight_wait);
958 spin_unlock_irqrestore(&ctx->inflight_lock, flags);
960 percpu_ref_put(&ctx->refs);
961 if (likely(!io_is_fallback_req(req)))
962 kmem_cache_free(req_cachep, req);
963 else
964 clear_bit_unlock(0, (unsigned long *) ctx->fallback_req);
967 static bool io_link_cancel_timeout(struct io_kiocb *req)
969 struct io_ring_ctx *ctx = req->ctx;
970 int ret;
972 ret = hrtimer_try_to_cancel(&req->io->timeout.timer);
973 if (ret != -1) {
974 io_cqring_fill_event(req, -ECANCELED);
975 io_commit_cqring(ctx);
976 req->flags &= ~REQ_F_LINK;
977 io_put_req(req);
978 return true;
981 return false;
984 static void io_req_link_next(struct io_kiocb *req, struct io_kiocb **nxtptr)
986 struct io_ring_ctx *ctx = req->ctx;
987 bool wake_ev = false;
989 /* Already got next link */
990 if (req->flags & REQ_F_LINK_NEXT)
991 return;
994 * The list should never be empty when we are called here. But could
995 * potentially happen if the chain is messed up, check to be on the
996 * safe side.
998 while (!list_empty(&req->link_list)) {
999 struct io_kiocb *nxt = list_first_entry(&req->link_list,
1000 struct io_kiocb, link_list);
1002 if (unlikely((req->flags & REQ_F_LINK_TIMEOUT) &&
1003 (nxt->flags & REQ_F_TIMEOUT))) {
1004 list_del_init(&nxt->link_list);
1005 wake_ev |= io_link_cancel_timeout(nxt);
1006 req->flags &= ~REQ_F_LINK_TIMEOUT;
1007 continue;
1010 list_del_init(&req->link_list);
1011 if (!list_empty(&nxt->link_list))
1012 nxt->flags |= REQ_F_LINK;
1013 *nxtptr = nxt;
1014 break;
1017 req->flags |= REQ_F_LINK_NEXT;
1018 if (wake_ev)
1019 io_cqring_ev_posted(ctx);
1023 * Called if REQ_F_LINK is set, and we fail the head request
1025 static void io_fail_links(struct io_kiocb *req)
1027 struct io_ring_ctx *ctx = req->ctx;
1028 unsigned long flags;
1030 spin_lock_irqsave(&ctx->completion_lock, flags);
1032 while (!list_empty(&req->link_list)) {
1033 struct io_kiocb *link = list_first_entry(&req->link_list,
1034 struct io_kiocb, link_list);
1036 list_del_init(&link->link_list);
1037 trace_io_uring_fail_link(req, link);
1039 if ((req->flags & REQ_F_LINK_TIMEOUT) &&
1040 link->opcode == IORING_OP_LINK_TIMEOUT) {
1041 io_link_cancel_timeout(link);
1042 } else {
1043 io_cqring_fill_event(link, -ECANCELED);
1044 __io_double_put_req(link);
1046 req->flags &= ~REQ_F_LINK_TIMEOUT;
1049 io_commit_cqring(ctx);
1050 spin_unlock_irqrestore(&ctx->completion_lock, flags);
1051 io_cqring_ev_posted(ctx);
1054 static void io_req_find_next(struct io_kiocb *req, struct io_kiocb **nxt)
1056 if (likely(!(req->flags & REQ_F_LINK)))
1057 return;
1060 * If LINK is set, we have dependent requests in this chain. If we
1061 * didn't fail this request, queue the first one up, moving any other
1062 * dependencies to the next request. In case of failure, fail the rest
1063 * of the chain.
1065 if (req->flags & REQ_F_FAIL_LINK) {
1066 io_fail_links(req);
1067 } else if ((req->flags & (REQ_F_LINK_TIMEOUT | REQ_F_COMP_LOCKED)) ==
1068 REQ_F_LINK_TIMEOUT) {
1069 struct io_ring_ctx *ctx = req->ctx;
1070 unsigned long flags;
1073 * If this is a timeout link, we could be racing with the
1074 * timeout timer. Grab the completion lock for this case to
1075 * protect against that.
1077 spin_lock_irqsave(&ctx->completion_lock, flags);
1078 io_req_link_next(req, nxt);
1079 spin_unlock_irqrestore(&ctx->completion_lock, flags);
1080 } else {
1081 io_req_link_next(req, nxt);
1085 static void io_free_req(struct io_kiocb *req)
1087 struct io_kiocb *nxt = NULL;
1089 io_req_find_next(req, &nxt);
1090 __io_free_req(req);
1092 if (nxt)
1093 io_queue_async_work(nxt);
1097 * Drop reference to request, return next in chain (if there is one) if this
1098 * was the last reference to this request.
1100 __attribute__((nonnull))
1101 static void io_put_req_find_next(struct io_kiocb *req, struct io_kiocb **nxtptr)
1103 if (refcount_dec_and_test(&req->refs)) {
1104 io_req_find_next(req, nxtptr);
1105 __io_free_req(req);
1109 static void io_put_req(struct io_kiocb *req)
1111 if (refcount_dec_and_test(&req->refs))
1112 io_free_req(req);
1116 * Must only be used if we don't need to care about links, usually from
1117 * within the completion handling itself.
1119 static void __io_double_put_req(struct io_kiocb *req)
1121 /* drop both submit and complete references */
1122 if (refcount_sub_and_test(2, &req->refs))
1123 __io_free_req(req);
1126 static void io_double_put_req(struct io_kiocb *req)
1128 /* drop both submit and complete references */
1129 if (refcount_sub_and_test(2, &req->refs))
1130 io_free_req(req);
1133 static unsigned io_cqring_events(struct io_ring_ctx *ctx, bool noflush)
1135 struct io_rings *rings = ctx->rings;
1138 * noflush == true is from the waitqueue handler, just ensure we wake
1139 * up the task, and the next invocation will flush the entries. We
1140 * cannot safely to it from here.
1142 if (noflush && !list_empty(&ctx->cq_overflow_list))
1143 return -1U;
1145 io_cqring_overflow_flush(ctx, false);
1147 /* See comment at the top of this file */
1148 smp_rmb();
1149 return READ_ONCE(rings->cq.tail) - READ_ONCE(rings->cq.head);
1152 static inline unsigned int io_sqring_entries(struct io_ring_ctx *ctx)
1154 struct io_rings *rings = ctx->rings;
1156 /* make sure SQ entry isn't read before tail */
1157 return smp_load_acquire(&rings->sq.tail) - ctx->cached_sq_head;
1161 * Find and free completed poll iocbs
1163 static void io_iopoll_complete(struct io_ring_ctx *ctx, unsigned int *nr_events,
1164 struct list_head *done)
1166 void *reqs[IO_IOPOLL_BATCH];
1167 struct io_kiocb *req;
1168 int to_free;
1170 to_free = 0;
1171 while (!list_empty(done)) {
1172 req = list_first_entry(done, struct io_kiocb, list);
1173 list_del(&req->list);
1175 io_cqring_fill_event(req, req->result);
1176 (*nr_events)++;
1178 if (refcount_dec_and_test(&req->refs)) {
1179 /* If we're not using fixed files, we have to pair the
1180 * completion part with the file put. Use regular
1181 * completions for those, only batch free for fixed
1182 * file and non-linked commands.
1184 if (((req->flags & (REQ_F_FIXED_FILE|REQ_F_LINK)) ==
1185 REQ_F_FIXED_FILE) && !io_is_fallback_req(req) &&
1186 !req->io) {
1187 reqs[to_free++] = req;
1188 if (to_free == ARRAY_SIZE(reqs))
1189 io_free_req_many(ctx, reqs, &to_free);
1190 } else {
1191 io_free_req(req);
1196 io_commit_cqring(ctx);
1197 io_free_req_many(ctx, reqs, &to_free);
1200 static int io_do_iopoll(struct io_ring_ctx *ctx, unsigned int *nr_events,
1201 long min)
1203 struct io_kiocb *req, *tmp;
1204 LIST_HEAD(done);
1205 bool spin;
1206 int ret;
1209 * Only spin for completions if we don't have multiple devices hanging
1210 * off our complete list, and we're under the requested amount.
1212 spin = !ctx->poll_multi_file && *nr_events < min;
1214 ret = 0;
1215 list_for_each_entry_safe(req, tmp, &ctx->poll_list, list) {
1216 struct kiocb *kiocb = &req->rw.kiocb;
1219 * Move completed entries to our local list. If we find a
1220 * request that requires polling, break out and complete
1221 * the done list first, if we have entries there.
1223 if (req->flags & REQ_F_IOPOLL_COMPLETED) {
1224 list_move_tail(&req->list, &done);
1225 continue;
1227 if (!list_empty(&done))
1228 break;
1230 ret = kiocb->ki_filp->f_op->iopoll(kiocb, spin);
1231 if (ret < 0)
1232 break;
1234 if (ret && spin)
1235 spin = false;
1236 ret = 0;
1239 if (!list_empty(&done))
1240 io_iopoll_complete(ctx, nr_events, &done);
1242 return ret;
1246 * Poll for a minimum of 'min' events. Note that if min == 0 we consider that a
1247 * non-spinning poll check - we'll still enter the driver poll loop, but only
1248 * as a non-spinning completion check.
1250 static int io_iopoll_getevents(struct io_ring_ctx *ctx, unsigned int *nr_events,
1251 long min)
1253 while (!list_empty(&ctx->poll_list) && !need_resched()) {
1254 int ret;
1256 ret = io_do_iopoll(ctx, nr_events, min);
1257 if (ret < 0)
1258 return ret;
1259 if (!min || *nr_events >= min)
1260 return 0;
1263 return 1;
1267 * We can't just wait for polled events to come to us, we have to actively
1268 * find and complete them.
1270 static void io_iopoll_reap_events(struct io_ring_ctx *ctx)
1272 if (!(ctx->flags & IORING_SETUP_IOPOLL))
1273 return;
1275 mutex_lock(&ctx->uring_lock);
1276 while (!list_empty(&ctx->poll_list)) {
1277 unsigned int nr_events = 0;
1279 io_iopoll_getevents(ctx, &nr_events, 1);
1282 * Ensure we allow local-to-the-cpu processing to take place,
1283 * in this case we need to ensure that we reap all events.
1285 cond_resched();
1287 mutex_unlock(&ctx->uring_lock);
1290 static int io_iopoll_check(struct io_ring_ctx *ctx, unsigned *nr_events,
1291 long min)
1293 int iters = 0, ret = 0;
1296 * We disallow the app entering submit/complete with polling, but we
1297 * still need to lock the ring to prevent racing with polled issue
1298 * that got punted to a workqueue.
1300 mutex_lock(&ctx->uring_lock);
1301 do {
1302 int tmin = 0;
1305 * Don't enter poll loop if we already have events pending.
1306 * If we do, we can potentially be spinning for commands that
1307 * already triggered a CQE (eg in error).
1309 if (io_cqring_events(ctx, false))
1310 break;
1313 * If a submit got punted to a workqueue, we can have the
1314 * application entering polling for a command before it gets
1315 * issued. That app will hold the uring_lock for the duration
1316 * of the poll right here, so we need to take a breather every
1317 * now and then to ensure that the issue has a chance to add
1318 * the poll to the issued list. Otherwise we can spin here
1319 * forever, while the workqueue is stuck trying to acquire the
1320 * very same mutex.
1322 if (!(++iters & 7)) {
1323 mutex_unlock(&ctx->uring_lock);
1324 mutex_lock(&ctx->uring_lock);
1327 if (*nr_events < min)
1328 tmin = min - *nr_events;
1330 ret = io_iopoll_getevents(ctx, nr_events, tmin);
1331 if (ret <= 0)
1332 break;
1333 ret = 0;
1334 } while (min && !*nr_events && !need_resched());
1336 mutex_unlock(&ctx->uring_lock);
1337 return ret;
1340 static void kiocb_end_write(struct io_kiocb *req)
1343 * Tell lockdep we inherited freeze protection from submission
1344 * thread.
1346 if (req->flags & REQ_F_ISREG) {
1347 struct inode *inode = file_inode(req->file);
1349 __sb_writers_acquired(inode->i_sb, SB_FREEZE_WRITE);
1351 file_end_write(req->file);
1354 static inline void req_set_fail_links(struct io_kiocb *req)
1356 if ((req->flags & (REQ_F_LINK | REQ_F_HARDLINK)) == REQ_F_LINK)
1357 req->flags |= REQ_F_FAIL_LINK;
1360 static void io_complete_rw_common(struct kiocb *kiocb, long res)
1362 struct io_kiocb *req = container_of(kiocb, struct io_kiocb, rw.kiocb);
1364 if (kiocb->ki_flags & IOCB_WRITE)
1365 kiocb_end_write(req);
1367 if (res != req->result)
1368 req_set_fail_links(req);
1369 io_cqring_add_event(req, res);
1372 static void io_complete_rw(struct kiocb *kiocb, long res, long res2)
1374 struct io_kiocb *req = container_of(kiocb, struct io_kiocb, rw.kiocb);
1376 io_complete_rw_common(kiocb, res);
1377 io_put_req(req);
1380 static struct io_kiocb *__io_complete_rw(struct kiocb *kiocb, long res)
1382 struct io_kiocb *req = container_of(kiocb, struct io_kiocb, rw.kiocb);
1383 struct io_kiocb *nxt = NULL;
1385 io_complete_rw_common(kiocb, res);
1386 io_put_req_find_next(req, &nxt);
1388 return nxt;
1391 static void io_complete_rw_iopoll(struct kiocb *kiocb, long res, long res2)
1393 struct io_kiocb *req = container_of(kiocb, struct io_kiocb, rw.kiocb);
1395 if (kiocb->ki_flags & IOCB_WRITE)
1396 kiocb_end_write(req);
1398 if (res != req->result)
1399 req_set_fail_links(req);
1400 req->result = res;
1401 if (res != -EAGAIN)
1402 req->flags |= REQ_F_IOPOLL_COMPLETED;
1406 * After the iocb has been issued, it's safe to be found on the poll list.
1407 * Adding the kiocb to the list AFTER submission ensures that we don't
1408 * find it from a io_iopoll_getevents() thread before the issuer is done
1409 * accessing the kiocb cookie.
1411 static void io_iopoll_req_issued(struct io_kiocb *req)
1413 struct io_ring_ctx *ctx = req->ctx;
1416 * Track whether we have multiple files in our lists. This will impact
1417 * how we do polling eventually, not spinning if we're on potentially
1418 * different devices.
1420 if (list_empty(&ctx->poll_list)) {
1421 ctx->poll_multi_file = false;
1422 } else if (!ctx->poll_multi_file) {
1423 struct io_kiocb *list_req;
1425 list_req = list_first_entry(&ctx->poll_list, struct io_kiocb,
1426 list);
1427 if (list_req->file != req->file)
1428 ctx->poll_multi_file = true;
1432 * For fast devices, IO may have already completed. If it has, add
1433 * it to the front so we find it first.
1435 if (req->flags & REQ_F_IOPOLL_COMPLETED)
1436 list_add(&req->list, &ctx->poll_list);
1437 else
1438 list_add_tail(&req->list, &ctx->poll_list);
1440 if ((ctx->flags & IORING_SETUP_SQPOLL) &&
1441 wq_has_sleeper(&ctx->sqo_wait))
1442 wake_up(&ctx->sqo_wait);
1445 static void io_file_put(struct io_submit_state *state)
1447 if (state->file) {
1448 int diff = state->has_refs - state->used_refs;
1450 if (diff)
1451 fput_many(state->file, diff);
1452 state->file = NULL;
1457 * Get as many references to a file as we have IOs left in this submission,
1458 * assuming most submissions are for one file, or at least that each file
1459 * has more than one submission.
1461 static struct file *io_file_get(struct io_submit_state *state, int fd)
1463 if (!state)
1464 return fget(fd);
1466 if (state->file) {
1467 if (state->fd == fd) {
1468 state->used_refs++;
1469 state->ios_left--;
1470 return state->file;
1472 io_file_put(state);
1474 state->file = fget_many(fd, state->ios_left);
1475 if (!state->file)
1476 return NULL;
1478 state->fd = fd;
1479 state->has_refs = state->ios_left;
1480 state->used_refs = 1;
1481 state->ios_left--;
1482 return state->file;
1486 * If we tracked the file through the SCM inflight mechanism, we could support
1487 * any file. For now, just ensure that anything potentially problematic is done
1488 * inline.
1490 static bool io_file_supports_async(struct file *file)
1492 umode_t mode = file_inode(file)->i_mode;
1494 if (S_ISBLK(mode) || S_ISCHR(mode) || S_ISSOCK(mode))
1495 return true;
1496 if (S_ISREG(mode) && file->f_op != &io_uring_fops)
1497 return true;
1499 return false;
1502 static int io_prep_rw(struct io_kiocb *req, const struct io_uring_sqe *sqe,
1503 bool force_nonblock)
1505 struct io_ring_ctx *ctx = req->ctx;
1506 struct kiocb *kiocb = &req->rw.kiocb;
1507 unsigned ioprio;
1508 int ret;
1510 if (!req->file)
1511 return -EBADF;
1513 if (S_ISREG(file_inode(req->file)->i_mode))
1514 req->flags |= REQ_F_ISREG;
1516 kiocb->ki_pos = READ_ONCE(sqe->off);
1517 kiocb->ki_flags = iocb_flags(kiocb->ki_filp);
1518 kiocb->ki_hint = ki_hint_validate(file_write_hint(kiocb->ki_filp));
1520 ioprio = READ_ONCE(sqe->ioprio);
1521 if (ioprio) {
1522 ret = ioprio_check_cap(ioprio);
1523 if (ret)
1524 return ret;
1526 kiocb->ki_ioprio = ioprio;
1527 } else
1528 kiocb->ki_ioprio = get_current_ioprio();
1530 ret = kiocb_set_rw_flags(kiocb, READ_ONCE(sqe->rw_flags));
1531 if (unlikely(ret))
1532 return ret;
1534 /* don't allow async punt if RWF_NOWAIT was requested */
1535 if ((kiocb->ki_flags & IOCB_NOWAIT) ||
1536 (req->file->f_flags & O_NONBLOCK))
1537 req->flags |= REQ_F_NOWAIT;
1539 if (force_nonblock)
1540 kiocb->ki_flags |= IOCB_NOWAIT;
1542 if (ctx->flags & IORING_SETUP_IOPOLL) {
1543 if (!(kiocb->ki_flags & IOCB_DIRECT) ||
1544 !kiocb->ki_filp->f_op->iopoll)
1545 return -EOPNOTSUPP;
1547 kiocb->ki_flags |= IOCB_HIPRI;
1548 kiocb->ki_complete = io_complete_rw_iopoll;
1549 req->result = 0;
1550 } else {
1551 if (kiocb->ki_flags & IOCB_HIPRI)
1552 return -EINVAL;
1553 kiocb->ki_complete = io_complete_rw;
1556 req->rw.addr = READ_ONCE(sqe->addr);
1557 req->rw.len = READ_ONCE(sqe->len);
1558 /* we own ->private, reuse it for the buffer index */
1559 req->rw.kiocb.private = (void *) (unsigned long)
1560 READ_ONCE(sqe->buf_index);
1561 return 0;
1564 static inline void io_rw_done(struct kiocb *kiocb, ssize_t ret)
1566 switch (ret) {
1567 case -EIOCBQUEUED:
1568 break;
1569 case -ERESTARTSYS:
1570 case -ERESTARTNOINTR:
1571 case -ERESTARTNOHAND:
1572 case -ERESTART_RESTARTBLOCK:
1574 * We can't just restart the syscall, since previously
1575 * submitted sqes may already be in progress. Just fail this
1576 * IO with EINTR.
1578 ret = -EINTR;
1579 /* fall through */
1580 default:
1581 kiocb->ki_complete(kiocb, ret, 0);
1585 static void kiocb_done(struct kiocb *kiocb, ssize_t ret, struct io_kiocb **nxt,
1586 bool in_async)
1588 if (in_async && ret >= 0 && kiocb->ki_complete == io_complete_rw)
1589 *nxt = __io_complete_rw(kiocb, ret);
1590 else
1591 io_rw_done(kiocb, ret);
1594 static ssize_t io_import_fixed(struct io_kiocb *req, int rw,
1595 struct iov_iter *iter)
1597 struct io_ring_ctx *ctx = req->ctx;
1598 size_t len = req->rw.len;
1599 struct io_mapped_ubuf *imu;
1600 unsigned index, buf_index;
1601 size_t offset;
1602 u64 buf_addr;
1604 /* attempt to use fixed buffers without having provided iovecs */
1605 if (unlikely(!ctx->user_bufs))
1606 return -EFAULT;
1608 buf_index = (unsigned long) req->rw.kiocb.private;
1609 if (unlikely(buf_index >= ctx->nr_user_bufs))
1610 return -EFAULT;
1612 index = array_index_nospec(buf_index, ctx->nr_user_bufs);
1613 imu = &ctx->user_bufs[index];
1614 buf_addr = req->rw.addr;
1616 /* overflow */
1617 if (buf_addr + len < buf_addr)
1618 return -EFAULT;
1619 /* not inside the mapped region */
1620 if (buf_addr < imu->ubuf || buf_addr + len > imu->ubuf + imu->len)
1621 return -EFAULT;
1624 * May not be a start of buffer, set size appropriately
1625 * and advance us to the beginning.
1627 offset = buf_addr - imu->ubuf;
1628 iov_iter_bvec(iter, rw, imu->bvec, imu->nr_bvecs, offset + len);
1630 if (offset) {
1632 * Don't use iov_iter_advance() here, as it's really slow for
1633 * using the latter parts of a big fixed buffer - it iterates
1634 * over each segment manually. We can cheat a bit here, because
1635 * we know that:
1637 * 1) it's a BVEC iter, we set it up
1638 * 2) all bvecs are PAGE_SIZE in size, except potentially the
1639 * first and last bvec
1641 * So just find our index, and adjust the iterator afterwards.
1642 * If the offset is within the first bvec (or the whole first
1643 * bvec, just use iov_iter_advance(). This makes it easier
1644 * since we can just skip the first segment, which may not
1645 * be PAGE_SIZE aligned.
1647 const struct bio_vec *bvec = imu->bvec;
1649 if (offset <= bvec->bv_len) {
1650 iov_iter_advance(iter, offset);
1651 } else {
1652 unsigned long seg_skip;
1654 /* skip first vec */
1655 offset -= bvec->bv_len;
1656 seg_skip = 1 + (offset >> PAGE_SHIFT);
1658 iter->bvec = bvec + seg_skip;
1659 iter->nr_segs -= seg_skip;
1660 iter->count -= bvec->bv_len + offset;
1661 iter->iov_offset = offset & ~PAGE_MASK;
1665 return len;
1668 static ssize_t io_import_iovec(int rw, struct io_kiocb *req,
1669 struct iovec **iovec, struct iov_iter *iter)
1671 void __user *buf = u64_to_user_ptr(req->rw.addr);
1672 size_t sqe_len = req->rw.len;
1673 u8 opcode;
1675 opcode = req->opcode;
1676 if (opcode == IORING_OP_READ_FIXED || opcode == IORING_OP_WRITE_FIXED) {
1677 *iovec = NULL;
1678 return io_import_fixed(req, rw, iter);
1681 /* buffer index only valid with fixed read/write */
1682 if (req->rw.kiocb.private)
1683 return -EINVAL;
1685 if (req->io) {
1686 struct io_async_rw *iorw = &req->io->rw;
1688 *iovec = iorw->iov;
1689 iov_iter_init(iter, rw, *iovec, iorw->nr_segs, iorw->size);
1690 if (iorw->iov == iorw->fast_iov)
1691 *iovec = NULL;
1692 return iorw->size;
1695 if (!req->has_user)
1696 return -EFAULT;
1698 #ifdef CONFIG_COMPAT
1699 if (req->ctx->compat)
1700 return compat_import_iovec(rw, buf, sqe_len, UIO_FASTIOV,
1701 iovec, iter);
1702 #endif
1704 return import_iovec(rw, buf, sqe_len, UIO_FASTIOV, iovec, iter);
1708 * For files that don't have ->read_iter() and ->write_iter(), handle them
1709 * by looping over ->read() or ->write() manually.
1711 static ssize_t loop_rw_iter(int rw, struct file *file, struct kiocb *kiocb,
1712 struct iov_iter *iter)
1714 ssize_t ret = 0;
1717 * Don't support polled IO through this interface, and we can't
1718 * support non-blocking either. For the latter, this just causes
1719 * the kiocb to be handled from an async context.
1721 if (kiocb->ki_flags & IOCB_HIPRI)
1722 return -EOPNOTSUPP;
1723 if (kiocb->ki_flags & IOCB_NOWAIT)
1724 return -EAGAIN;
1726 while (iov_iter_count(iter)) {
1727 struct iovec iovec;
1728 ssize_t nr;
1730 if (!iov_iter_is_bvec(iter)) {
1731 iovec = iov_iter_iovec(iter);
1732 } else {
1733 /* fixed buffers import bvec */
1734 iovec.iov_base = kmap(iter->bvec->bv_page)
1735 + iter->iov_offset;
1736 iovec.iov_len = min(iter->count,
1737 iter->bvec->bv_len - iter->iov_offset);
1740 if (rw == READ) {
1741 nr = file->f_op->read(file, iovec.iov_base,
1742 iovec.iov_len, &kiocb->ki_pos);
1743 } else {
1744 nr = file->f_op->write(file, iovec.iov_base,
1745 iovec.iov_len, &kiocb->ki_pos);
1748 if (iov_iter_is_bvec(iter))
1749 kunmap(iter->bvec->bv_page);
1751 if (nr < 0) {
1752 if (!ret)
1753 ret = nr;
1754 break;
1756 ret += nr;
1757 if (nr != iovec.iov_len)
1758 break;
1759 iov_iter_advance(iter, nr);
1762 return ret;
1765 static void io_req_map_rw(struct io_kiocb *req, ssize_t io_size,
1766 struct iovec *iovec, struct iovec *fast_iov,
1767 struct iov_iter *iter)
1769 req->io->rw.nr_segs = iter->nr_segs;
1770 req->io->rw.size = io_size;
1771 req->io->rw.iov = iovec;
1772 if (!req->io->rw.iov) {
1773 req->io->rw.iov = req->io->rw.fast_iov;
1774 memcpy(req->io->rw.iov, fast_iov,
1775 sizeof(struct iovec) * iter->nr_segs);
1779 static int io_alloc_async_ctx(struct io_kiocb *req)
1781 req->io = kmalloc(sizeof(*req->io), GFP_KERNEL);
1782 return req->io == NULL;
1785 static int io_setup_async_rw(struct io_kiocb *req, ssize_t io_size,
1786 struct iovec *iovec, struct iovec *fast_iov,
1787 struct iov_iter *iter)
1789 if (req->opcode == IORING_OP_READ_FIXED ||
1790 req->opcode == IORING_OP_WRITE_FIXED)
1791 return 0;
1792 if (!req->io) {
1793 if (io_alloc_async_ctx(req))
1794 return -ENOMEM;
1796 io_req_map_rw(req, io_size, iovec, fast_iov, iter);
1798 return 0;
1801 static int io_read_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe,
1802 bool force_nonblock)
1804 struct io_async_ctx *io;
1805 struct iov_iter iter;
1806 ssize_t ret;
1808 ret = io_prep_rw(req, sqe, force_nonblock);
1809 if (ret)
1810 return ret;
1812 if (unlikely(!(req->file->f_mode & FMODE_READ)))
1813 return -EBADF;
1815 if (!req->io)
1816 return 0;
1818 io = req->io;
1819 io->rw.iov = io->rw.fast_iov;
1820 req->io = NULL;
1821 ret = io_import_iovec(READ, req, &io->rw.iov, &iter);
1822 req->io = io;
1823 if (ret < 0)
1824 return ret;
1826 io_req_map_rw(req, ret, io->rw.iov, io->rw.fast_iov, &iter);
1827 return 0;
1830 static int io_read(struct io_kiocb *req, struct io_kiocb **nxt,
1831 bool force_nonblock)
1833 struct iovec inline_vecs[UIO_FASTIOV], *iovec = inline_vecs;
1834 struct kiocb *kiocb = &req->rw.kiocb;
1835 struct iov_iter iter;
1836 size_t iov_count;
1837 ssize_t io_size, ret;
1839 ret = io_import_iovec(READ, req, &iovec, &iter);
1840 if (ret < 0)
1841 return ret;
1843 /* Ensure we clear previously set non-block flag */
1844 if (!force_nonblock)
1845 req->rw.kiocb.ki_flags &= ~IOCB_NOWAIT;
1847 req->result = 0;
1848 io_size = ret;
1849 if (req->flags & REQ_F_LINK)
1850 req->result = io_size;
1853 * If the file doesn't support async, mark it as REQ_F_MUST_PUNT so
1854 * we know to async punt it even if it was opened O_NONBLOCK
1856 if (force_nonblock && !io_file_supports_async(req->file)) {
1857 req->flags |= REQ_F_MUST_PUNT;
1858 goto copy_iov;
1861 iov_count = iov_iter_count(&iter);
1862 ret = rw_verify_area(READ, req->file, &kiocb->ki_pos, iov_count);
1863 if (!ret) {
1864 ssize_t ret2;
1866 if (req->file->f_op->read_iter)
1867 ret2 = call_read_iter(req->file, kiocb, &iter);
1868 else
1869 ret2 = loop_rw_iter(READ, req->file, kiocb, &iter);
1871 /* Catch -EAGAIN return for forced non-blocking submission */
1872 if (!force_nonblock || ret2 != -EAGAIN) {
1873 kiocb_done(kiocb, ret2, nxt, req->in_async);
1874 } else {
1875 copy_iov:
1876 ret = io_setup_async_rw(req, io_size, iovec,
1877 inline_vecs, &iter);
1878 if (ret)
1879 goto out_free;
1880 return -EAGAIN;
1883 out_free:
1884 kfree(iovec);
1885 return ret;
1888 static int io_write_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe,
1889 bool force_nonblock)
1891 struct io_async_ctx *io;
1892 struct iov_iter iter;
1893 ssize_t ret;
1895 ret = io_prep_rw(req, sqe, force_nonblock);
1896 if (ret)
1897 return ret;
1899 if (unlikely(!(req->file->f_mode & FMODE_WRITE)))
1900 return -EBADF;
1902 if (!req->io)
1903 return 0;
1905 io = req->io;
1906 io->rw.iov = io->rw.fast_iov;
1907 req->io = NULL;
1908 ret = io_import_iovec(WRITE, req, &io->rw.iov, &iter);
1909 req->io = io;
1910 if (ret < 0)
1911 return ret;
1913 io_req_map_rw(req, ret, io->rw.iov, io->rw.fast_iov, &iter);
1914 return 0;
1917 static int io_write(struct io_kiocb *req, struct io_kiocb **nxt,
1918 bool force_nonblock)
1920 struct iovec inline_vecs[UIO_FASTIOV], *iovec = inline_vecs;
1921 struct kiocb *kiocb = &req->rw.kiocb;
1922 struct iov_iter iter;
1923 size_t iov_count;
1924 ssize_t ret, io_size;
1926 ret = io_import_iovec(WRITE, req, &iovec, &iter);
1927 if (ret < 0)
1928 return ret;
1930 /* Ensure we clear previously set non-block flag */
1931 if (!force_nonblock)
1932 req->rw.kiocb.ki_flags &= ~IOCB_NOWAIT;
1934 req->result = 0;
1935 io_size = ret;
1936 if (req->flags & REQ_F_LINK)
1937 req->result = io_size;
1940 * If the file doesn't support async, mark it as REQ_F_MUST_PUNT so
1941 * we know to async punt it even if it was opened O_NONBLOCK
1943 if (force_nonblock && !io_file_supports_async(req->file)) {
1944 req->flags |= REQ_F_MUST_PUNT;
1945 goto copy_iov;
1948 /* file path doesn't support NOWAIT for non-direct_IO */
1949 if (force_nonblock && !(kiocb->ki_flags & IOCB_DIRECT) &&
1950 (req->flags & REQ_F_ISREG))
1951 goto copy_iov;
1953 iov_count = iov_iter_count(&iter);
1954 ret = rw_verify_area(WRITE, req->file, &kiocb->ki_pos, iov_count);
1955 if (!ret) {
1956 ssize_t ret2;
1959 * Open-code file_start_write here to grab freeze protection,
1960 * which will be released by another thread in
1961 * io_complete_rw(). Fool lockdep by telling it the lock got
1962 * released so that it doesn't complain about the held lock when
1963 * we return to userspace.
1965 if (req->flags & REQ_F_ISREG) {
1966 __sb_start_write(file_inode(req->file)->i_sb,
1967 SB_FREEZE_WRITE, true);
1968 __sb_writers_release(file_inode(req->file)->i_sb,
1969 SB_FREEZE_WRITE);
1971 kiocb->ki_flags |= IOCB_WRITE;
1973 if (req->file->f_op->write_iter)
1974 ret2 = call_write_iter(req->file, kiocb, &iter);
1975 else
1976 ret2 = loop_rw_iter(WRITE, req->file, kiocb, &iter);
1978 * Raw bdev writes will -EOPNOTSUPP for IOCB_NOWAIT. Just
1979 * retry them without IOCB_NOWAIT.
1981 if (ret2 == -EOPNOTSUPP && (kiocb->ki_flags & IOCB_NOWAIT))
1982 ret2 = -EAGAIN;
1983 if (!force_nonblock || ret2 != -EAGAIN) {
1984 kiocb_done(kiocb, ret2, nxt, req->in_async);
1985 } else {
1986 copy_iov:
1987 ret = io_setup_async_rw(req, io_size, iovec,
1988 inline_vecs, &iter);
1989 if (ret)
1990 goto out_free;
1991 return -EAGAIN;
1994 out_free:
1995 kfree(iovec);
1996 return ret;
2000 * IORING_OP_NOP just posts a completion event, nothing else.
2002 static int io_nop(struct io_kiocb *req)
2004 struct io_ring_ctx *ctx = req->ctx;
2006 if (unlikely(ctx->flags & IORING_SETUP_IOPOLL))
2007 return -EINVAL;
2009 io_cqring_add_event(req, 0);
2010 io_put_req(req);
2011 return 0;
2014 static int io_prep_fsync(struct io_kiocb *req, const struct io_uring_sqe *sqe)
2016 struct io_ring_ctx *ctx = req->ctx;
2018 if (!req->file)
2019 return -EBADF;
2021 if (unlikely(ctx->flags & IORING_SETUP_IOPOLL))
2022 return -EINVAL;
2023 if (unlikely(sqe->addr || sqe->ioprio || sqe->buf_index))
2024 return -EINVAL;
2026 req->sync.flags = READ_ONCE(sqe->fsync_flags);
2027 if (unlikely(req->sync.flags & ~IORING_FSYNC_DATASYNC))
2028 return -EINVAL;
2030 req->sync.off = READ_ONCE(sqe->off);
2031 req->sync.len = READ_ONCE(sqe->len);
2032 return 0;
2035 static bool io_req_cancelled(struct io_kiocb *req)
2037 if (req->work.flags & IO_WQ_WORK_CANCEL) {
2038 req_set_fail_links(req);
2039 io_cqring_add_event(req, -ECANCELED);
2040 io_put_req(req);
2041 return true;
2044 return false;
2047 static void io_link_work_cb(struct io_wq_work **workptr)
2049 struct io_wq_work *work = *workptr;
2050 struct io_kiocb *link = work->data;
2052 io_queue_linked_timeout(link);
2053 work->func = io_wq_submit_work;
2056 static void io_wq_assign_next(struct io_wq_work **workptr, struct io_kiocb *nxt)
2058 struct io_kiocb *link;
2060 io_prep_async_work(nxt, &link);
2061 *workptr = &nxt->work;
2062 if (link) {
2063 nxt->work.flags |= IO_WQ_WORK_CB;
2064 nxt->work.func = io_link_work_cb;
2065 nxt->work.data = link;
2069 static void io_fsync_finish(struct io_wq_work **workptr)
2071 struct io_kiocb *req = container_of(*workptr, struct io_kiocb, work);
2072 loff_t end = req->sync.off + req->sync.len;
2073 struct io_kiocb *nxt = NULL;
2074 int ret;
2076 if (io_req_cancelled(req))
2077 return;
2079 ret = vfs_fsync_range(req->file, req->sync.off,
2080 end > 0 ? end : LLONG_MAX,
2081 req->sync.flags & IORING_FSYNC_DATASYNC);
2082 if (ret < 0)
2083 req_set_fail_links(req);
2084 io_cqring_add_event(req, ret);
2085 io_put_req_find_next(req, &nxt);
2086 if (nxt)
2087 io_wq_assign_next(workptr, nxt);
2090 static int io_fsync(struct io_kiocb *req, struct io_kiocb **nxt,
2091 bool force_nonblock)
2093 struct io_wq_work *work, *old_work;
2095 /* fsync always requires a blocking context */
2096 if (force_nonblock) {
2097 io_put_req(req);
2098 req->work.func = io_fsync_finish;
2099 return -EAGAIN;
2102 work = old_work = &req->work;
2103 io_fsync_finish(&work);
2104 if (work && work != old_work)
2105 *nxt = container_of(work, struct io_kiocb, work);
2106 return 0;
2109 static int io_prep_sfr(struct io_kiocb *req, const struct io_uring_sqe *sqe)
2111 struct io_ring_ctx *ctx = req->ctx;
2113 if (!req->file)
2114 return -EBADF;
2116 if (unlikely(ctx->flags & IORING_SETUP_IOPOLL))
2117 return -EINVAL;
2118 if (unlikely(sqe->addr || sqe->ioprio || sqe->buf_index))
2119 return -EINVAL;
2121 req->sync.off = READ_ONCE(sqe->off);
2122 req->sync.len = READ_ONCE(sqe->len);
2123 req->sync.flags = READ_ONCE(sqe->sync_range_flags);
2124 return 0;
2127 static void io_sync_file_range_finish(struct io_wq_work **workptr)
2129 struct io_kiocb *req = container_of(*workptr, struct io_kiocb, work);
2130 struct io_kiocb *nxt = NULL;
2131 int ret;
2133 if (io_req_cancelled(req))
2134 return;
2136 ret = sync_file_range(req->file, req->sync.off, req->sync.len,
2137 req->sync.flags);
2138 if (ret < 0)
2139 req_set_fail_links(req);
2140 io_cqring_add_event(req, ret);
2141 io_put_req_find_next(req, &nxt);
2142 if (nxt)
2143 io_wq_assign_next(workptr, nxt);
2146 static int io_sync_file_range(struct io_kiocb *req, struct io_kiocb **nxt,
2147 bool force_nonblock)
2149 struct io_wq_work *work, *old_work;
2151 /* sync_file_range always requires a blocking context */
2152 if (force_nonblock) {
2153 io_put_req(req);
2154 req->work.func = io_sync_file_range_finish;
2155 return -EAGAIN;
2158 work = old_work = &req->work;
2159 io_sync_file_range_finish(&work);
2160 if (work && work != old_work)
2161 *nxt = container_of(work, struct io_kiocb, work);
2162 return 0;
2165 static int io_sendmsg_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
2167 #if defined(CONFIG_NET)
2168 struct io_sr_msg *sr = &req->sr_msg;
2169 struct io_async_ctx *io = req->io;
2171 sr->msg_flags = READ_ONCE(sqe->msg_flags);
2172 sr->msg = u64_to_user_ptr(READ_ONCE(sqe->addr));
2174 #ifdef CONFIG_COMPAT
2175 if (req->ctx->compat)
2176 sr->msg_flags |= MSG_CMSG_COMPAT;
2177 #endif
2179 if (!io)
2180 return 0;
2182 io->msg.iov = io->msg.fast_iov;
2183 return sendmsg_copy_msghdr(&io->msg.msg, sr->msg, sr->msg_flags,
2184 &io->msg.iov);
2185 #else
2186 return -EOPNOTSUPP;
2187 #endif
2190 static int io_sendmsg(struct io_kiocb *req, struct io_kiocb **nxt,
2191 bool force_nonblock)
2193 #if defined(CONFIG_NET)
2194 struct io_async_msghdr *kmsg = NULL;
2195 struct socket *sock;
2196 int ret;
2198 if (unlikely(req->ctx->flags & IORING_SETUP_IOPOLL))
2199 return -EINVAL;
2201 sock = sock_from_file(req->file, &ret);
2202 if (sock) {
2203 struct io_async_ctx io;
2204 struct sockaddr_storage addr;
2205 unsigned flags;
2207 if (req->io) {
2208 kmsg = &req->io->msg;
2209 kmsg->msg.msg_name = &addr;
2210 /* if iov is set, it's allocated already */
2211 if (!kmsg->iov)
2212 kmsg->iov = kmsg->fast_iov;
2213 kmsg->msg.msg_iter.iov = kmsg->iov;
2214 } else {
2215 struct io_sr_msg *sr = &req->sr_msg;
2217 kmsg = &io.msg;
2218 kmsg->msg.msg_name = &addr;
2220 io.msg.iov = io.msg.fast_iov;
2221 ret = sendmsg_copy_msghdr(&io.msg.msg, sr->msg,
2222 sr->msg_flags, &io.msg.iov);
2223 if (ret)
2224 return ret;
2227 flags = req->sr_msg.msg_flags;
2228 if (flags & MSG_DONTWAIT)
2229 req->flags |= REQ_F_NOWAIT;
2230 else if (force_nonblock)
2231 flags |= MSG_DONTWAIT;
2233 ret = __sys_sendmsg_sock(sock, &kmsg->msg, flags);
2234 if (force_nonblock && ret == -EAGAIN) {
2235 if (req->io)
2236 return -EAGAIN;
2237 if (io_alloc_async_ctx(req)) {
2238 if (kmsg && kmsg->iov != kmsg->fast_iov)
2239 kfree(kmsg->iov);
2240 return -ENOMEM;
2242 memcpy(&req->io->msg, &io.msg, sizeof(io.msg));
2243 return -EAGAIN;
2245 if (ret == -ERESTARTSYS)
2246 ret = -EINTR;
2249 if (kmsg && kmsg->iov != kmsg->fast_iov)
2250 kfree(kmsg->iov);
2251 io_cqring_add_event(req, ret);
2252 if (ret < 0)
2253 req_set_fail_links(req);
2254 io_put_req_find_next(req, nxt);
2255 return 0;
2256 #else
2257 return -EOPNOTSUPP;
2258 #endif
2261 static int io_recvmsg_prep(struct io_kiocb *req,
2262 const struct io_uring_sqe *sqe)
2264 #if defined(CONFIG_NET)
2265 struct io_sr_msg *sr = &req->sr_msg;
2266 struct io_async_ctx *io = req->io;
2268 sr->msg_flags = READ_ONCE(sqe->msg_flags);
2269 sr->msg = u64_to_user_ptr(READ_ONCE(sqe->addr));
2271 #ifdef CONFIG_COMPAT
2272 if (req->ctx->compat)
2273 sr->msg_flags |= MSG_CMSG_COMPAT;
2274 #endif
2276 if (!io)
2277 return 0;
2279 io->msg.iov = io->msg.fast_iov;
2280 return recvmsg_copy_msghdr(&io->msg.msg, sr->msg, sr->msg_flags,
2281 &io->msg.uaddr, &io->msg.iov);
2282 #else
2283 return -EOPNOTSUPP;
2284 #endif
2287 static int io_recvmsg(struct io_kiocb *req, struct io_kiocb **nxt,
2288 bool force_nonblock)
2290 #if defined(CONFIG_NET)
2291 struct io_async_msghdr *kmsg = NULL;
2292 struct socket *sock;
2293 int ret;
2295 if (unlikely(req->ctx->flags & IORING_SETUP_IOPOLL))
2296 return -EINVAL;
2298 sock = sock_from_file(req->file, &ret);
2299 if (sock) {
2300 struct io_async_ctx io;
2301 struct sockaddr_storage addr;
2302 unsigned flags;
2304 if (req->io) {
2305 kmsg = &req->io->msg;
2306 kmsg->msg.msg_name = &addr;
2307 /* if iov is set, it's allocated already */
2308 if (!kmsg->iov)
2309 kmsg->iov = kmsg->fast_iov;
2310 kmsg->msg.msg_iter.iov = kmsg->iov;
2311 } else {
2312 struct io_sr_msg *sr = &req->sr_msg;
2314 kmsg = &io.msg;
2315 kmsg->msg.msg_name = &addr;
2317 io.msg.iov = io.msg.fast_iov;
2318 ret = recvmsg_copy_msghdr(&io.msg.msg, sr->msg,
2319 sr->msg_flags, &io.msg.uaddr,
2320 &io.msg.iov);
2321 if (ret)
2322 return ret;
2325 flags = req->sr_msg.msg_flags;
2326 if (flags & MSG_DONTWAIT)
2327 req->flags |= REQ_F_NOWAIT;
2328 else if (force_nonblock)
2329 flags |= MSG_DONTWAIT;
2331 ret = __sys_recvmsg_sock(sock, &kmsg->msg, req->sr_msg.msg,
2332 kmsg->uaddr, flags);
2333 if (force_nonblock && ret == -EAGAIN) {
2334 if (req->io)
2335 return -EAGAIN;
2336 if (io_alloc_async_ctx(req)) {
2337 if (kmsg && kmsg->iov != kmsg->fast_iov)
2338 kfree(kmsg->iov);
2339 return -ENOMEM;
2341 memcpy(&req->io->msg, &io.msg, sizeof(io.msg));
2342 return -EAGAIN;
2344 if (ret == -ERESTARTSYS)
2345 ret = -EINTR;
2348 if (kmsg && kmsg->iov != kmsg->fast_iov)
2349 kfree(kmsg->iov);
2350 io_cqring_add_event(req, ret);
2351 if (ret < 0)
2352 req_set_fail_links(req);
2353 io_put_req_find_next(req, nxt);
2354 return 0;
2355 #else
2356 return -EOPNOTSUPP;
2357 #endif
2360 static int io_accept_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
2362 #if defined(CONFIG_NET)
2363 struct io_accept *accept = &req->accept;
2365 if (unlikely(req->ctx->flags & (IORING_SETUP_IOPOLL|IORING_SETUP_SQPOLL)))
2366 return -EINVAL;
2367 if (sqe->ioprio || sqe->len || sqe->buf_index)
2368 return -EINVAL;
2370 accept->addr = u64_to_user_ptr(READ_ONCE(sqe->addr));
2371 accept->addr_len = u64_to_user_ptr(READ_ONCE(sqe->addr2));
2372 accept->flags = READ_ONCE(sqe->accept_flags);
2373 return 0;
2374 #else
2375 return -EOPNOTSUPP;
2376 #endif
2379 #if defined(CONFIG_NET)
2380 static int __io_accept(struct io_kiocb *req, struct io_kiocb **nxt,
2381 bool force_nonblock)
2383 struct io_accept *accept = &req->accept;
2384 unsigned file_flags;
2385 int ret;
2387 file_flags = force_nonblock ? O_NONBLOCK : 0;
2388 ret = __sys_accept4_file(req->file, file_flags, accept->addr,
2389 accept->addr_len, accept->flags);
2390 if (ret == -EAGAIN && force_nonblock)
2391 return -EAGAIN;
2392 if (ret == -ERESTARTSYS)
2393 ret = -EINTR;
2394 if (ret < 0)
2395 req_set_fail_links(req);
2396 io_cqring_add_event(req, ret);
2397 io_put_req_find_next(req, nxt);
2398 return 0;
2401 static void io_accept_finish(struct io_wq_work **workptr)
2403 struct io_kiocb *req = container_of(*workptr, struct io_kiocb, work);
2404 struct io_kiocb *nxt = NULL;
2406 if (io_req_cancelled(req))
2407 return;
2408 __io_accept(req, &nxt, false);
2409 if (nxt)
2410 io_wq_assign_next(workptr, nxt);
2412 #endif
2414 static int io_accept(struct io_kiocb *req, struct io_kiocb **nxt,
2415 bool force_nonblock)
2417 #if defined(CONFIG_NET)
2418 int ret;
2420 ret = __io_accept(req, nxt, force_nonblock);
2421 if (ret == -EAGAIN && force_nonblock) {
2422 req->work.func = io_accept_finish;
2423 req->work.flags |= IO_WQ_WORK_NEEDS_FILES;
2424 io_put_req(req);
2425 return -EAGAIN;
2427 return 0;
2428 #else
2429 return -EOPNOTSUPP;
2430 #endif
2433 static int io_connect_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
2435 #if defined(CONFIG_NET)
2436 struct io_connect *conn = &req->connect;
2437 struct io_async_ctx *io = req->io;
2439 if (unlikely(req->ctx->flags & (IORING_SETUP_IOPOLL|IORING_SETUP_SQPOLL)))
2440 return -EINVAL;
2441 if (sqe->ioprio || sqe->len || sqe->buf_index || sqe->rw_flags)
2442 return -EINVAL;
2444 conn->addr = u64_to_user_ptr(READ_ONCE(sqe->addr));
2445 conn->addr_len = READ_ONCE(sqe->addr2);
2447 if (!io)
2448 return 0;
2450 return move_addr_to_kernel(conn->addr, conn->addr_len,
2451 &io->connect.address);
2452 #else
2453 return -EOPNOTSUPP;
2454 #endif
2457 static int io_connect(struct io_kiocb *req, struct io_kiocb **nxt,
2458 bool force_nonblock)
2460 #if defined(CONFIG_NET)
2461 struct io_async_ctx __io, *io;
2462 unsigned file_flags;
2463 int ret;
2465 if (req->io) {
2466 io = req->io;
2467 } else {
2468 ret = move_addr_to_kernel(req->connect.addr,
2469 req->connect.addr_len,
2470 &__io.connect.address);
2471 if (ret)
2472 goto out;
2473 io = &__io;
2476 file_flags = force_nonblock ? O_NONBLOCK : 0;
2478 ret = __sys_connect_file(req->file, &io->connect.address,
2479 req->connect.addr_len, file_flags);
2480 if ((ret == -EAGAIN || ret == -EINPROGRESS) && force_nonblock) {
2481 if (req->io)
2482 return -EAGAIN;
2483 if (io_alloc_async_ctx(req)) {
2484 ret = -ENOMEM;
2485 goto out;
2487 memcpy(&req->io->connect, &__io.connect, sizeof(__io.connect));
2488 return -EAGAIN;
2490 if (ret == -ERESTARTSYS)
2491 ret = -EINTR;
2492 out:
2493 if (ret < 0)
2494 req_set_fail_links(req);
2495 io_cqring_add_event(req, ret);
2496 io_put_req_find_next(req, nxt);
2497 return 0;
2498 #else
2499 return -EOPNOTSUPP;
2500 #endif
2503 static void io_poll_remove_one(struct io_kiocb *req)
2505 struct io_poll_iocb *poll = &req->poll;
2507 spin_lock(&poll->head->lock);
2508 WRITE_ONCE(poll->canceled, true);
2509 if (!list_empty(&poll->wait.entry)) {
2510 list_del_init(&poll->wait.entry);
2511 io_queue_async_work(req);
2513 spin_unlock(&poll->head->lock);
2514 hash_del(&req->hash_node);
2517 static void io_poll_remove_all(struct io_ring_ctx *ctx)
2519 struct hlist_node *tmp;
2520 struct io_kiocb *req;
2521 int i;
2523 spin_lock_irq(&ctx->completion_lock);
2524 for (i = 0; i < (1U << ctx->cancel_hash_bits); i++) {
2525 struct hlist_head *list;
2527 list = &ctx->cancel_hash[i];
2528 hlist_for_each_entry_safe(req, tmp, list, hash_node)
2529 io_poll_remove_one(req);
2531 spin_unlock_irq(&ctx->completion_lock);
2534 static int io_poll_cancel(struct io_ring_ctx *ctx, __u64 sqe_addr)
2536 struct hlist_head *list;
2537 struct io_kiocb *req;
2539 list = &ctx->cancel_hash[hash_long(sqe_addr, ctx->cancel_hash_bits)];
2540 hlist_for_each_entry(req, list, hash_node) {
2541 if (sqe_addr == req->user_data) {
2542 io_poll_remove_one(req);
2543 return 0;
2547 return -ENOENT;
2550 static int io_poll_remove_prep(struct io_kiocb *req,
2551 const struct io_uring_sqe *sqe)
2553 if (unlikely(req->ctx->flags & IORING_SETUP_IOPOLL))
2554 return -EINVAL;
2555 if (sqe->ioprio || sqe->off || sqe->len || sqe->buf_index ||
2556 sqe->poll_events)
2557 return -EINVAL;
2559 req->poll.addr = READ_ONCE(sqe->addr);
2560 return 0;
2564 * Find a running poll command that matches one specified in sqe->addr,
2565 * and remove it if found.
2567 static int io_poll_remove(struct io_kiocb *req)
2569 struct io_ring_ctx *ctx = req->ctx;
2570 u64 addr;
2571 int ret;
2573 addr = req->poll.addr;
2574 spin_lock_irq(&ctx->completion_lock);
2575 ret = io_poll_cancel(ctx, addr);
2576 spin_unlock_irq(&ctx->completion_lock);
2578 io_cqring_add_event(req, ret);
2579 if (ret < 0)
2580 req_set_fail_links(req);
2581 io_put_req(req);
2582 return 0;
2585 static void io_poll_complete(struct io_kiocb *req, __poll_t mask, int error)
2587 struct io_ring_ctx *ctx = req->ctx;
2589 req->poll.done = true;
2590 if (error)
2591 io_cqring_fill_event(req, error);
2592 else
2593 io_cqring_fill_event(req, mangle_poll(mask));
2594 io_commit_cqring(ctx);
2597 static void io_poll_complete_work(struct io_wq_work **workptr)
2599 struct io_wq_work *work = *workptr;
2600 struct io_kiocb *req = container_of(work, struct io_kiocb, work);
2601 struct io_poll_iocb *poll = &req->poll;
2602 struct poll_table_struct pt = { ._key = poll->events };
2603 struct io_ring_ctx *ctx = req->ctx;
2604 struct io_kiocb *nxt = NULL;
2605 __poll_t mask = 0;
2606 int ret = 0;
2608 if (work->flags & IO_WQ_WORK_CANCEL) {
2609 WRITE_ONCE(poll->canceled, true);
2610 ret = -ECANCELED;
2611 } else if (READ_ONCE(poll->canceled)) {
2612 ret = -ECANCELED;
2615 if (ret != -ECANCELED)
2616 mask = vfs_poll(poll->file, &pt) & poll->events;
2619 * Note that ->ki_cancel callers also delete iocb from active_reqs after
2620 * calling ->ki_cancel. We need the ctx_lock roundtrip here to
2621 * synchronize with them. In the cancellation case the list_del_init
2622 * itself is not actually needed, but harmless so we keep it in to
2623 * avoid further branches in the fast path.
2625 spin_lock_irq(&ctx->completion_lock);
2626 if (!mask && ret != -ECANCELED) {
2627 add_wait_queue(poll->head, &poll->wait);
2628 spin_unlock_irq(&ctx->completion_lock);
2629 return;
2631 hash_del(&req->hash_node);
2632 io_poll_complete(req, mask, ret);
2633 spin_unlock_irq(&ctx->completion_lock);
2635 io_cqring_ev_posted(ctx);
2637 if (ret < 0)
2638 req_set_fail_links(req);
2639 io_put_req_find_next(req, &nxt);
2640 if (nxt)
2641 io_wq_assign_next(workptr, nxt);
2644 static void io_poll_trigger_evfd(struct io_wq_work **workptr)
2646 struct io_kiocb *req = container_of(*workptr, struct io_kiocb, work);
2648 eventfd_signal(req->ctx->cq_ev_fd, 1);
2649 io_put_req(req);
2652 static int io_poll_wake(struct wait_queue_entry *wait, unsigned mode, int sync,
2653 void *key)
2655 struct io_poll_iocb *poll = wait->private;
2656 struct io_kiocb *req = container_of(poll, struct io_kiocb, poll);
2657 struct io_ring_ctx *ctx = req->ctx;
2658 __poll_t mask = key_to_poll(key);
2659 unsigned long flags;
2661 /* for instances that support it check for an event match first: */
2662 if (mask && !(mask & poll->events))
2663 return 0;
2665 list_del_init(&poll->wait.entry);
2668 * Run completion inline if we can. We're using trylock here because
2669 * we are violating the completion_lock -> poll wq lock ordering.
2670 * If we have a link timeout we're going to need the completion_lock
2671 * for finalizing the request, mark us as having grabbed that already.
2673 if (mask && spin_trylock_irqsave(&ctx->completion_lock, flags)) {
2674 bool trigger_ev;
2676 hash_del(&req->hash_node);
2677 io_poll_complete(req, mask, 0);
2678 trigger_ev = io_should_trigger_evfd(ctx);
2679 if (trigger_ev && eventfd_signal_count()) {
2680 trigger_ev = false;
2681 req->work.func = io_poll_trigger_evfd;
2682 } else {
2683 req->flags |= REQ_F_COMP_LOCKED;
2684 io_put_req(req);
2685 req = NULL;
2687 spin_unlock_irqrestore(&ctx->completion_lock, flags);
2688 __io_cqring_ev_posted(ctx, trigger_ev);
2689 } else {
2690 io_queue_async_work(req);
2693 return 1;
2696 struct io_poll_table {
2697 struct poll_table_struct pt;
2698 struct io_kiocb *req;
2699 int error;
2702 static void io_poll_queue_proc(struct file *file, struct wait_queue_head *head,
2703 struct poll_table_struct *p)
2705 struct io_poll_table *pt = container_of(p, struct io_poll_table, pt);
2707 if (unlikely(pt->req->poll.head)) {
2708 pt->error = -EINVAL;
2709 return;
2712 pt->error = 0;
2713 pt->req->poll.head = head;
2714 add_wait_queue(head, &pt->req->poll.wait);
2717 static void io_poll_req_insert(struct io_kiocb *req)
2719 struct io_ring_ctx *ctx = req->ctx;
2720 struct hlist_head *list;
2722 list = &ctx->cancel_hash[hash_long(req->user_data, ctx->cancel_hash_bits)];
2723 hlist_add_head(&req->hash_node, list);
2726 static int io_poll_add_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
2728 struct io_poll_iocb *poll = &req->poll;
2729 u16 events;
2731 if (unlikely(req->ctx->flags & IORING_SETUP_IOPOLL))
2732 return -EINVAL;
2733 if (sqe->addr || sqe->ioprio || sqe->off || sqe->len || sqe->buf_index)
2734 return -EINVAL;
2735 if (!poll->file)
2736 return -EBADF;
2738 events = READ_ONCE(sqe->poll_events);
2739 poll->events = demangle_poll(events) | EPOLLERR | EPOLLHUP;
2740 return 0;
2743 static int io_poll_add(struct io_kiocb *req, struct io_kiocb **nxt)
2745 struct io_poll_iocb *poll = &req->poll;
2746 struct io_ring_ctx *ctx = req->ctx;
2747 struct io_poll_table ipt;
2748 bool cancel = false;
2749 __poll_t mask;
2751 INIT_IO_WORK(&req->work, io_poll_complete_work);
2752 INIT_HLIST_NODE(&req->hash_node);
2754 poll->head = NULL;
2755 poll->done = false;
2756 poll->canceled = false;
2758 ipt.pt._qproc = io_poll_queue_proc;
2759 ipt.pt._key = poll->events;
2760 ipt.req = req;
2761 ipt.error = -EINVAL; /* same as no support for IOCB_CMD_POLL */
2763 /* initialized the list so that we can do list_empty checks */
2764 INIT_LIST_HEAD(&poll->wait.entry);
2765 init_waitqueue_func_entry(&poll->wait, io_poll_wake);
2766 poll->wait.private = poll;
2768 INIT_LIST_HEAD(&req->list);
2770 mask = vfs_poll(poll->file, &ipt.pt) & poll->events;
2772 spin_lock_irq(&ctx->completion_lock);
2773 if (likely(poll->head)) {
2774 spin_lock(&poll->head->lock);
2775 if (unlikely(list_empty(&poll->wait.entry))) {
2776 if (ipt.error)
2777 cancel = true;
2778 ipt.error = 0;
2779 mask = 0;
2781 if (mask || ipt.error)
2782 list_del_init(&poll->wait.entry);
2783 else if (cancel)
2784 WRITE_ONCE(poll->canceled, true);
2785 else if (!poll->done) /* actually waiting for an event */
2786 io_poll_req_insert(req);
2787 spin_unlock(&poll->head->lock);
2789 if (mask) { /* no async, we'd stolen it */
2790 ipt.error = 0;
2791 io_poll_complete(req, mask, 0);
2793 spin_unlock_irq(&ctx->completion_lock);
2795 if (mask) {
2796 io_cqring_ev_posted(ctx);
2797 io_put_req_find_next(req, nxt);
2799 return ipt.error;
2802 static enum hrtimer_restart io_timeout_fn(struct hrtimer *timer)
2804 struct io_timeout_data *data = container_of(timer,
2805 struct io_timeout_data, timer);
2806 struct io_kiocb *req = data->req;
2807 struct io_ring_ctx *ctx = req->ctx;
2808 unsigned long flags;
2810 atomic_inc(&ctx->cq_timeouts);
2812 spin_lock_irqsave(&ctx->completion_lock, flags);
2814 * We could be racing with timeout deletion. If the list is empty,
2815 * then timeout lookup already found it and will be handling it.
2817 if (!list_empty(&req->list)) {
2818 struct io_kiocb *prev;
2821 * Adjust the reqs sequence before the current one because it
2822 * will consume a slot in the cq_ring and the cq_tail
2823 * pointer will be increased, otherwise other timeout reqs may
2824 * return in advance without waiting for enough wait_nr.
2826 prev = req;
2827 list_for_each_entry_continue_reverse(prev, &ctx->timeout_list, list)
2828 prev->sequence++;
2829 list_del_init(&req->list);
2832 io_cqring_fill_event(req, -ETIME);
2833 io_commit_cqring(ctx);
2834 spin_unlock_irqrestore(&ctx->completion_lock, flags);
2836 io_cqring_ev_posted(ctx);
2837 req_set_fail_links(req);
2838 io_put_req(req);
2839 return HRTIMER_NORESTART;
2842 static int io_timeout_cancel(struct io_ring_ctx *ctx, __u64 user_data)
2844 struct io_kiocb *req;
2845 int ret = -ENOENT;
2847 list_for_each_entry(req, &ctx->timeout_list, list) {
2848 if (user_data == req->user_data) {
2849 list_del_init(&req->list);
2850 ret = 0;
2851 break;
2855 if (ret == -ENOENT)
2856 return ret;
2858 ret = hrtimer_try_to_cancel(&req->io->timeout.timer);
2859 if (ret == -1)
2860 return -EALREADY;
2862 req_set_fail_links(req);
2863 io_cqring_fill_event(req, -ECANCELED);
2864 io_put_req(req);
2865 return 0;
2868 static int io_timeout_remove_prep(struct io_kiocb *req,
2869 const struct io_uring_sqe *sqe)
2871 if (unlikely(req->ctx->flags & IORING_SETUP_IOPOLL))
2872 return -EINVAL;
2873 if (sqe->flags || sqe->ioprio || sqe->buf_index || sqe->len)
2874 return -EINVAL;
2876 req->timeout.addr = READ_ONCE(sqe->addr);
2877 req->timeout.flags = READ_ONCE(sqe->timeout_flags);
2878 if (req->timeout.flags)
2879 return -EINVAL;
2881 return 0;
2885 * Remove or update an existing timeout command
2887 static int io_timeout_remove(struct io_kiocb *req)
2889 struct io_ring_ctx *ctx = req->ctx;
2890 int ret;
2892 spin_lock_irq(&ctx->completion_lock);
2893 ret = io_timeout_cancel(ctx, req->timeout.addr);
2895 io_cqring_fill_event(req, ret);
2896 io_commit_cqring(ctx);
2897 spin_unlock_irq(&ctx->completion_lock);
2898 io_cqring_ev_posted(ctx);
2899 if (ret < 0)
2900 req_set_fail_links(req);
2901 io_put_req(req);
2902 return 0;
2905 static int io_timeout_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe,
2906 bool is_timeout_link)
2908 struct io_timeout_data *data;
2909 unsigned flags;
2911 if (unlikely(req->ctx->flags & IORING_SETUP_IOPOLL))
2912 return -EINVAL;
2913 if (sqe->ioprio || sqe->buf_index || sqe->len != 1)
2914 return -EINVAL;
2915 if (sqe->off && is_timeout_link)
2916 return -EINVAL;
2917 flags = READ_ONCE(sqe->timeout_flags);
2918 if (flags & ~IORING_TIMEOUT_ABS)
2919 return -EINVAL;
2921 req->timeout.count = READ_ONCE(sqe->off);
2923 if (!req->io && io_alloc_async_ctx(req))
2924 return -ENOMEM;
2926 data = &req->io->timeout;
2927 data->req = req;
2928 req->flags |= REQ_F_TIMEOUT;
2930 if (get_timespec64(&data->ts, u64_to_user_ptr(sqe->addr)))
2931 return -EFAULT;
2933 if (flags & IORING_TIMEOUT_ABS)
2934 data->mode = HRTIMER_MODE_ABS;
2935 else
2936 data->mode = HRTIMER_MODE_REL;
2938 hrtimer_init(&data->timer, CLOCK_MONOTONIC, data->mode);
2939 return 0;
2942 static int io_timeout(struct io_kiocb *req)
2944 unsigned count;
2945 struct io_ring_ctx *ctx = req->ctx;
2946 struct io_timeout_data *data;
2947 struct list_head *entry;
2948 unsigned span = 0;
2950 data = &req->io->timeout;
2953 * sqe->off holds how many events that need to occur for this
2954 * timeout event to be satisfied. If it isn't set, then this is
2955 * a pure timeout request, sequence isn't used.
2957 count = req->timeout.count;
2958 if (!count) {
2959 req->flags |= REQ_F_TIMEOUT_NOSEQ;
2960 spin_lock_irq(&ctx->completion_lock);
2961 entry = ctx->timeout_list.prev;
2962 goto add;
2965 req->sequence = ctx->cached_sq_head + count - 1;
2966 data->seq_offset = count;
2969 * Insertion sort, ensuring the first entry in the list is always
2970 * the one we need first.
2972 spin_lock_irq(&ctx->completion_lock);
2973 list_for_each_prev(entry, &ctx->timeout_list) {
2974 struct io_kiocb *nxt = list_entry(entry, struct io_kiocb, list);
2975 unsigned nxt_sq_head;
2976 long long tmp, tmp_nxt;
2977 u32 nxt_offset = nxt->io->timeout.seq_offset;
2979 if (nxt->flags & REQ_F_TIMEOUT_NOSEQ)
2980 continue;
2983 * Since cached_sq_head + count - 1 can overflow, use type long
2984 * long to store it.
2986 tmp = (long long)ctx->cached_sq_head + count - 1;
2987 nxt_sq_head = nxt->sequence - nxt_offset + 1;
2988 tmp_nxt = (long long)nxt_sq_head + nxt_offset - 1;
2991 * cached_sq_head may overflow, and it will never overflow twice
2992 * once there is some timeout req still be valid.
2994 if (ctx->cached_sq_head < nxt_sq_head)
2995 tmp += UINT_MAX;
2997 if (tmp > tmp_nxt)
2998 break;
3001 * Sequence of reqs after the insert one and itself should
3002 * be adjusted because each timeout req consumes a slot.
3004 span++;
3005 nxt->sequence++;
3007 req->sequence -= span;
3008 add:
3009 list_add(&req->list, entry);
3010 data->timer.function = io_timeout_fn;
3011 hrtimer_start(&data->timer, timespec64_to_ktime(data->ts), data->mode);
3012 spin_unlock_irq(&ctx->completion_lock);
3013 return 0;
3016 static bool io_cancel_cb(struct io_wq_work *work, void *data)
3018 struct io_kiocb *req = container_of(work, struct io_kiocb, work);
3020 return req->user_data == (unsigned long) data;
3023 static int io_async_cancel_one(struct io_ring_ctx *ctx, void *sqe_addr)
3025 enum io_wq_cancel cancel_ret;
3026 int ret = 0;
3028 cancel_ret = io_wq_cancel_cb(ctx->io_wq, io_cancel_cb, sqe_addr);
3029 switch (cancel_ret) {
3030 case IO_WQ_CANCEL_OK:
3031 ret = 0;
3032 break;
3033 case IO_WQ_CANCEL_RUNNING:
3034 ret = -EALREADY;
3035 break;
3036 case IO_WQ_CANCEL_NOTFOUND:
3037 ret = -ENOENT;
3038 break;
3041 return ret;
3044 static void io_async_find_and_cancel(struct io_ring_ctx *ctx,
3045 struct io_kiocb *req, __u64 sqe_addr,
3046 struct io_kiocb **nxt, int success_ret)
3048 unsigned long flags;
3049 int ret;
3051 ret = io_async_cancel_one(ctx, (void *) (unsigned long) sqe_addr);
3052 if (ret != -ENOENT) {
3053 spin_lock_irqsave(&ctx->completion_lock, flags);
3054 goto done;
3057 spin_lock_irqsave(&ctx->completion_lock, flags);
3058 ret = io_timeout_cancel(ctx, sqe_addr);
3059 if (ret != -ENOENT)
3060 goto done;
3061 ret = io_poll_cancel(ctx, sqe_addr);
3062 done:
3063 if (!ret)
3064 ret = success_ret;
3065 io_cqring_fill_event(req, ret);
3066 io_commit_cqring(ctx);
3067 spin_unlock_irqrestore(&ctx->completion_lock, flags);
3068 io_cqring_ev_posted(ctx);
3070 if (ret < 0)
3071 req_set_fail_links(req);
3072 io_put_req_find_next(req, nxt);
3075 static int io_async_cancel_prep(struct io_kiocb *req,
3076 const struct io_uring_sqe *sqe)
3078 if (unlikely(req->ctx->flags & IORING_SETUP_IOPOLL))
3079 return -EINVAL;
3080 if (sqe->flags || sqe->ioprio || sqe->off || sqe->len ||
3081 sqe->cancel_flags)
3082 return -EINVAL;
3084 req->cancel.addr = READ_ONCE(sqe->addr);
3085 return 0;
3088 static int io_async_cancel(struct io_kiocb *req, struct io_kiocb **nxt)
3090 struct io_ring_ctx *ctx = req->ctx;
3092 io_async_find_and_cancel(ctx, req, req->cancel.addr, nxt, 0);
3093 return 0;
3096 static int io_req_defer_prep(struct io_kiocb *req,
3097 const struct io_uring_sqe *sqe)
3099 ssize_t ret = 0;
3101 if (!sqe)
3102 return 0;
3104 switch (req->opcode) {
3105 case IORING_OP_NOP:
3106 break;
3107 case IORING_OP_READV:
3108 case IORING_OP_READ_FIXED:
3109 ret = io_read_prep(req, sqe, true);
3110 break;
3111 case IORING_OP_WRITEV:
3112 case IORING_OP_WRITE_FIXED:
3113 ret = io_write_prep(req, sqe, true);
3114 break;
3115 case IORING_OP_POLL_ADD:
3116 ret = io_poll_add_prep(req, sqe);
3117 break;
3118 case IORING_OP_POLL_REMOVE:
3119 ret = io_poll_remove_prep(req, sqe);
3120 break;
3121 case IORING_OP_FSYNC:
3122 ret = io_prep_fsync(req, sqe);
3123 break;
3124 case IORING_OP_SYNC_FILE_RANGE:
3125 ret = io_prep_sfr(req, sqe);
3126 break;
3127 case IORING_OP_SENDMSG:
3128 ret = io_sendmsg_prep(req, sqe);
3129 break;
3130 case IORING_OP_RECVMSG:
3131 ret = io_recvmsg_prep(req, sqe);
3132 break;
3133 case IORING_OP_CONNECT:
3134 ret = io_connect_prep(req, sqe);
3135 break;
3136 case IORING_OP_TIMEOUT:
3137 ret = io_timeout_prep(req, sqe, false);
3138 break;
3139 case IORING_OP_TIMEOUT_REMOVE:
3140 ret = io_timeout_remove_prep(req, sqe);
3141 break;
3142 case IORING_OP_ASYNC_CANCEL:
3143 ret = io_async_cancel_prep(req, sqe);
3144 break;
3145 case IORING_OP_LINK_TIMEOUT:
3146 ret = io_timeout_prep(req, sqe, true);
3147 break;
3148 case IORING_OP_ACCEPT:
3149 ret = io_accept_prep(req, sqe);
3150 break;
3151 default:
3152 printk_once(KERN_WARNING "io_uring: unhandled opcode %d\n",
3153 req->opcode);
3154 ret = -EINVAL;
3155 break;
3158 return ret;
3161 static int io_req_defer(struct io_kiocb *req, const struct io_uring_sqe *sqe)
3163 struct io_ring_ctx *ctx = req->ctx;
3164 int ret;
3166 /* Still need defer if there is pending req in defer list. */
3167 if (!req_need_defer(req) && list_empty(&ctx->defer_list))
3168 return 0;
3170 if (!req->io && io_alloc_async_ctx(req))
3171 return -EAGAIN;
3173 ret = io_req_defer_prep(req, sqe);
3174 if (ret < 0)
3175 return ret;
3177 spin_lock_irq(&ctx->completion_lock);
3178 if (!req_need_defer(req) && list_empty(&ctx->defer_list)) {
3179 spin_unlock_irq(&ctx->completion_lock);
3180 return 0;
3183 trace_io_uring_defer(ctx, req, req->user_data);
3184 list_add_tail(&req->list, &ctx->defer_list);
3185 spin_unlock_irq(&ctx->completion_lock);
3186 return -EIOCBQUEUED;
3189 static int io_issue_sqe(struct io_kiocb *req, const struct io_uring_sqe *sqe,
3190 struct io_kiocb **nxt, bool force_nonblock)
3192 struct io_ring_ctx *ctx = req->ctx;
3193 int ret;
3195 switch (req->opcode) {
3196 case IORING_OP_NOP:
3197 ret = io_nop(req);
3198 break;
3199 case IORING_OP_READV:
3200 case IORING_OP_READ_FIXED:
3201 if (sqe) {
3202 ret = io_read_prep(req, sqe, force_nonblock);
3203 if (ret < 0)
3204 break;
3206 ret = io_read(req, nxt, force_nonblock);
3207 break;
3208 case IORING_OP_WRITEV:
3209 case IORING_OP_WRITE_FIXED:
3210 if (sqe) {
3211 ret = io_write_prep(req, sqe, force_nonblock);
3212 if (ret < 0)
3213 break;
3215 ret = io_write(req, nxt, force_nonblock);
3216 break;
3217 case IORING_OP_FSYNC:
3218 if (sqe) {
3219 ret = io_prep_fsync(req, sqe);
3220 if (ret < 0)
3221 break;
3223 ret = io_fsync(req, nxt, force_nonblock);
3224 break;
3225 case IORING_OP_POLL_ADD:
3226 if (sqe) {
3227 ret = io_poll_add_prep(req, sqe);
3228 if (ret)
3229 break;
3231 ret = io_poll_add(req, nxt);
3232 break;
3233 case IORING_OP_POLL_REMOVE:
3234 if (sqe) {
3235 ret = io_poll_remove_prep(req, sqe);
3236 if (ret < 0)
3237 break;
3239 ret = io_poll_remove(req);
3240 break;
3241 case IORING_OP_SYNC_FILE_RANGE:
3242 if (sqe) {
3243 ret = io_prep_sfr(req, sqe);
3244 if (ret < 0)
3245 break;
3247 ret = io_sync_file_range(req, nxt, force_nonblock);
3248 break;
3249 case IORING_OP_SENDMSG:
3250 if (sqe) {
3251 ret = io_sendmsg_prep(req, sqe);
3252 if (ret < 0)
3253 break;
3255 ret = io_sendmsg(req, nxt, force_nonblock);
3256 break;
3257 case IORING_OP_RECVMSG:
3258 if (sqe) {
3259 ret = io_recvmsg_prep(req, sqe);
3260 if (ret)
3261 break;
3263 ret = io_recvmsg(req, nxt, force_nonblock);
3264 break;
3265 case IORING_OP_TIMEOUT:
3266 if (sqe) {
3267 ret = io_timeout_prep(req, sqe, false);
3268 if (ret)
3269 break;
3271 ret = io_timeout(req);
3272 break;
3273 case IORING_OP_TIMEOUT_REMOVE:
3274 if (sqe) {
3275 ret = io_timeout_remove_prep(req, sqe);
3276 if (ret)
3277 break;
3279 ret = io_timeout_remove(req);
3280 break;
3281 case IORING_OP_ACCEPT:
3282 if (sqe) {
3283 ret = io_accept_prep(req, sqe);
3284 if (ret)
3285 break;
3287 ret = io_accept(req, nxt, force_nonblock);
3288 break;
3289 case IORING_OP_CONNECT:
3290 if (sqe) {
3291 ret = io_connect_prep(req, sqe);
3292 if (ret)
3293 break;
3295 ret = io_connect(req, nxt, force_nonblock);
3296 break;
3297 case IORING_OP_ASYNC_CANCEL:
3298 if (sqe) {
3299 ret = io_async_cancel_prep(req, sqe);
3300 if (ret)
3301 break;
3303 ret = io_async_cancel(req, nxt);
3304 break;
3305 default:
3306 ret = -EINVAL;
3307 break;
3310 if (ret)
3311 return ret;
3313 if (ctx->flags & IORING_SETUP_IOPOLL) {
3314 const bool in_async = io_wq_current_is_worker();
3316 if (req->result == -EAGAIN)
3317 return -EAGAIN;
3319 /* workqueue context doesn't hold uring_lock, grab it now */
3320 if (in_async)
3321 mutex_lock(&ctx->uring_lock);
3323 io_iopoll_req_issued(req);
3325 if (in_async)
3326 mutex_unlock(&ctx->uring_lock);
3329 return 0;
3332 static void io_wq_submit_work(struct io_wq_work **workptr)
3334 struct io_wq_work *work = *workptr;
3335 struct io_kiocb *req = container_of(work, struct io_kiocb, work);
3336 struct io_kiocb *nxt = NULL;
3337 int ret = 0;
3339 if (work->flags & IO_WQ_WORK_CANCEL)
3340 ret = -ECANCELED;
3342 if (!ret) {
3343 req->has_user = (work->flags & IO_WQ_WORK_HAS_MM) != 0;
3344 req->in_async = true;
3345 do {
3346 ret = io_issue_sqe(req, NULL, &nxt, false);
3348 * We can get EAGAIN for polled IO even though we're
3349 * forcing a sync submission from here, since we can't
3350 * wait for request slots on the block side.
3352 if (ret != -EAGAIN)
3353 break;
3354 cond_resched();
3355 } while (1);
3358 /* drop submission reference */
3359 io_put_req(req);
3361 if (ret) {
3362 req_set_fail_links(req);
3363 io_cqring_add_event(req, ret);
3364 io_put_req(req);
3367 /* if a dependent link is ready, pass it back */
3368 if (!ret && nxt)
3369 io_wq_assign_next(workptr, nxt);
3372 static bool io_req_op_valid(int op)
3374 return op >= IORING_OP_NOP && op < IORING_OP_LAST;
3377 static int io_req_needs_file(struct io_kiocb *req)
3379 switch (req->opcode) {
3380 case IORING_OP_NOP:
3381 case IORING_OP_POLL_REMOVE:
3382 case IORING_OP_TIMEOUT:
3383 case IORING_OP_TIMEOUT_REMOVE:
3384 case IORING_OP_ASYNC_CANCEL:
3385 case IORING_OP_LINK_TIMEOUT:
3386 return 0;
3387 default:
3388 if (io_req_op_valid(req->opcode))
3389 return 1;
3390 return -EINVAL;
3394 static inline struct file *io_file_from_index(struct io_ring_ctx *ctx,
3395 int index)
3397 struct fixed_file_table *table;
3399 table = &ctx->file_table[index >> IORING_FILE_TABLE_SHIFT];
3400 return table->files[index & IORING_FILE_TABLE_MASK];
3403 static int io_req_set_file(struct io_submit_state *state, struct io_kiocb *req,
3404 const struct io_uring_sqe *sqe)
3406 struct io_ring_ctx *ctx = req->ctx;
3407 unsigned flags;
3408 int fd, ret;
3410 flags = READ_ONCE(sqe->flags);
3411 fd = READ_ONCE(sqe->fd);
3413 if (flags & IOSQE_IO_DRAIN)
3414 req->flags |= REQ_F_IO_DRAIN;
3416 ret = io_req_needs_file(req);
3417 if (ret <= 0)
3418 return ret;
3420 if (flags & IOSQE_FIXED_FILE) {
3421 if (unlikely(!ctx->file_table ||
3422 (unsigned) fd >= ctx->nr_user_files))
3423 return -EBADF;
3424 fd = array_index_nospec(fd, ctx->nr_user_files);
3425 req->file = io_file_from_index(ctx, fd);
3426 if (!req->file)
3427 return -EBADF;
3428 req->flags |= REQ_F_FIXED_FILE;
3429 } else {
3430 if (req->needs_fixed_file)
3431 return -EBADF;
3432 trace_io_uring_file_get(ctx, fd);
3433 req->file = io_file_get(state, fd);
3434 if (unlikely(!req->file))
3435 return -EBADF;
3438 return 0;
3441 static int io_grab_files(struct io_kiocb *req)
3443 int ret = -EBADF;
3444 struct io_ring_ctx *ctx = req->ctx;
3446 rcu_read_lock();
3447 spin_lock_irq(&ctx->inflight_lock);
3449 * We use the f_ops->flush() handler to ensure that we can flush
3450 * out work accessing these files if the fd is closed. Check if
3451 * the fd has changed since we started down this path, and disallow
3452 * this operation if it has.
3454 if (fcheck(req->ring_fd) == req->ring_file) {
3455 list_add(&req->inflight_entry, &ctx->inflight_list);
3456 req->flags |= REQ_F_INFLIGHT;
3457 req->work.files = current->files;
3458 ret = 0;
3460 spin_unlock_irq(&ctx->inflight_lock);
3461 rcu_read_unlock();
3463 return ret;
3466 static enum hrtimer_restart io_link_timeout_fn(struct hrtimer *timer)
3468 struct io_timeout_data *data = container_of(timer,
3469 struct io_timeout_data, timer);
3470 struct io_kiocb *req = data->req;
3471 struct io_ring_ctx *ctx = req->ctx;
3472 struct io_kiocb *prev = NULL;
3473 unsigned long flags;
3475 spin_lock_irqsave(&ctx->completion_lock, flags);
3478 * We don't expect the list to be empty, that will only happen if we
3479 * race with the completion of the linked work.
3481 if (!list_empty(&req->link_list)) {
3482 prev = list_entry(req->link_list.prev, struct io_kiocb,
3483 link_list);
3484 if (refcount_inc_not_zero(&prev->refs)) {
3485 list_del_init(&req->link_list);
3486 prev->flags &= ~REQ_F_LINK_TIMEOUT;
3487 } else
3488 prev = NULL;
3491 spin_unlock_irqrestore(&ctx->completion_lock, flags);
3493 if (prev) {
3494 req_set_fail_links(prev);
3495 io_async_find_and_cancel(ctx, req, prev->user_data, NULL,
3496 -ETIME);
3497 io_put_req(prev);
3498 } else {
3499 io_cqring_add_event(req, -ETIME);
3500 io_put_req(req);
3502 return HRTIMER_NORESTART;
3505 static void io_queue_linked_timeout(struct io_kiocb *req)
3507 struct io_ring_ctx *ctx = req->ctx;
3510 * If the list is now empty, then our linked request finished before
3511 * we got a chance to setup the timer
3513 spin_lock_irq(&ctx->completion_lock);
3514 if (!list_empty(&req->link_list)) {
3515 struct io_timeout_data *data = &req->io->timeout;
3517 data->timer.function = io_link_timeout_fn;
3518 hrtimer_start(&data->timer, timespec64_to_ktime(data->ts),
3519 data->mode);
3521 spin_unlock_irq(&ctx->completion_lock);
3523 /* drop submission reference */
3524 io_put_req(req);
3527 static struct io_kiocb *io_prep_linked_timeout(struct io_kiocb *req)
3529 struct io_kiocb *nxt;
3531 if (!(req->flags & REQ_F_LINK))
3532 return NULL;
3534 nxt = list_first_entry_or_null(&req->link_list, struct io_kiocb,
3535 link_list);
3536 if (!nxt || nxt->opcode != IORING_OP_LINK_TIMEOUT)
3537 return NULL;
3539 req->flags |= REQ_F_LINK_TIMEOUT;
3540 return nxt;
3543 static void __io_queue_sqe(struct io_kiocb *req, const struct io_uring_sqe *sqe)
3545 struct io_kiocb *linked_timeout;
3546 struct io_kiocb *nxt = NULL;
3547 int ret;
3549 again:
3550 linked_timeout = io_prep_linked_timeout(req);
3552 ret = io_issue_sqe(req, sqe, &nxt, true);
3555 * We async punt it if the file wasn't marked NOWAIT, or if the file
3556 * doesn't support non-blocking read/write attempts
3558 if (ret == -EAGAIN && (!(req->flags & REQ_F_NOWAIT) ||
3559 (req->flags & REQ_F_MUST_PUNT))) {
3560 if (req->work.flags & IO_WQ_WORK_NEEDS_FILES) {
3561 ret = io_grab_files(req);
3562 if (ret)
3563 goto err;
3567 * Queued up for async execution, worker will release
3568 * submit reference when the iocb is actually submitted.
3570 io_queue_async_work(req);
3571 goto done_req;
3574 err:
3575 /* drop submission reference */
3576 io_put_req_find_next(req, &nxt);
3578 if (linked_timeout) {
3579 if (!ret)
3580 io_queue_linked_timeout(linked_timeout);
3581 else
3582 io_put_req(linked_timeout);
3585 /* and drop final reference, if we failed */
3586 if (ret) {
3587 io_cqring_add_event(req, ret);
3588 req_set_fail_links(req);
3589 io_put_req(req);
3591 done_req:
3592 if (nxt) {
3593 req = nxt;
3594 nxt = NULL;
3595 goto again;
3599 static void io_queue_sqe(struct io_kiocb *req, const struct io_uring_sqe *sqe)
3601 int ret;
3603 if (unlikely(req->ctx->drain_next)) {
3604 req->flags |= REQ_F_IO_DRAIN;
3605 req->ctx->drain_next = false;
3607 req->ctx->drain_next = (req->flags & REQ_F_DRAIN_LINK);
3609 ret = io_req_defer(req, sqe);
3610 if (ret) {
3611 if (ret != -EIOCBQUEUED) {
3612 io_cqring_add_event(req, ret);
3613 req_set_fail_links(req);
3614 io_double_put_req(req);
3616 } else
3617 __io_queue_sqe(req, sqe);
3620 static inline void io_queue_link_head(struct io_kiocb *req)
3622 if (unlikely(req->flags & REQ_F_FAIL_LINK)) {
3623 io_cqring_add_event(req, -ECANCELED);
3624 io_double_put_req(req);
3625 } else
3626 io_queue_sqe(req, NULL);
3629 #define SQE_VALID_FLAGS (IOSQE_FIXED_FILE|IOSQE_IO_DRAIN|IOSQE_IO_LINK| \
3630 IOSQE_IO_HARDLINK)
3632 static bool io_submit_sqe(struct io_kiocb *req, const struct io_uring_sqe *sqe,
3633 struct io_submit_state *state, struct io_kiocb **link)
3635 struct io_ring_ctx *ctx = req->ctx;
3636 int ret;
3638 /* enforce forwards compatibility on users */
3639 if (unlikely(sqe->flags & ~SQE_VALID_FLAGS)) {
3640 ret = -EINVAL;
3641 goto err_req;
3644 ret = io_req_set_file(state, req, sqe);
3645 if (unlikely(ret)) {
3646 err_req:
3647 io_cqring_add_event(req, ret);
3648 io_double_put_req(req);
3649 return false;
3653 * If we already have a head request, queue this one for async
3654 * submittal once the head completes. If we don't have a head but
3655 * IOSQE_IO_LINK is set in the sqe, start a new head. This one will be
3656 * submitted sync once the chain is complete. If none of those
3657 * conditions are true (normal request), then just queue it.
3659 if (*link) {
3660 struct io_kiocb *prev = *link;
3662 if (sqe->flags & IOSQE_IO_DRAIN)
3663 (*link)->flags |= REQ_F_DRAIN_LINK | REQ_F_IO_DRAIN;
3665 if (sqe->flags & IOSQE_IO_HARDLINK)
3666 req->flags |= REQ_F_HARDLINK;
3668 if (io_alloc_async_ctx(req)) {
3669 ret = -EAGAIN;
3670 goto err_req;
3673 ret = io_req_defer_prep(req, sqe);
3674 if (ret) {
3675 /* fail even hard links since we don't submit */
3676 prev->flags |= REQ_F_FAIL_LINK;
3677 goto err_req;
3679 trace_io_uring_link(ctx, req, prev);
3680 list_add_tail(&req->link_list, &prev->link_list);
3681 } else if (sqe->flags & (IOSQE_IO_LINK|IOSQE_IO_HARDLINK)) {
3682 req->flags |= REQ_F_LINK;
3683 if (sqe->flags & IOSQE_IO_HARDLINK)
3684 req->flags |= REQ_F_HARDLINK;
3686 INIT_LIST_HEAD(&req->link_list);
3688 if (io_alloc_async_ctx(req)) {
3689 ret = -EAGAIN;
3690 goto err_req;
3692 ret = io_req_defer_prep(req, sqe);
3693 if (ret)
3694 req->flags |= REQ_F_FAIL_LINK;
3695 *link = req;
3696 } else {
3697 io_queue_sqe(req, sqe);
3700 return true;
3704 * Batched submission is done, ensure local IO is flushed out.
3706 static void io_submit_state_end(struct io_submit_state *state)
3708 blk_finish_plug(&state->plug);
3709 io_file_put(state);
3710 if (state->free_reqs)
3711 kmem_cache_free_bulk(req_cachep, state->free_reqs,
3712 &state->reqs[state->cur_req]);
3716 * Start submission side cache.
3718 static void io_submit_state_start(struct io_submit_state *state,
3719 unsigned int max_ios)
3721 blk_start_plug(&state->plug);
3722 state->free_reqs = 0;
3723 state->file = NULL;
3724 state->ios_left = max_ios;
3727 static void io_commit_sqring(struct io_ring_ctx *ctx)
3729 struct io_rings *rings = ctx->rings;
3731 if (ctx->cached_sq_head != READ_ONCE(rings->sq.head)) {
3733 * Ensure any loads from the SQEs are done at this point,
3734 * since once we write the new head, the application could
3735 * write new data to them.
3737 smp_store_release(&rings->sq.head, ctx->cached_sq_head);
3742 * Fetch an sqe, if one is available. Note that sqe_ptr will point to memory
3743 * that is mapped by userspace. This means that care needs to be taken to
3744 * ensure that reads are stable, as we cannot rely on userspace always
3745 * being a good citizen. If members of the sqe are validated and then later
3746 * used, it's important that those reads are done through READ_ONCE() to
3747 * prevent a re-load down the line.
3749 static bool io_get_sqring(struct io_ring_ctx *ctx, struct io_kiocb *req,
3750 const struct io_uring_sqe **sqe_ptr)
3752 struct io_rings *rings = ctx->rings;
3753 u32 *sq_array = ctx->sq_array;
3754 unsigned head;
3757 * The cached sq head (or cq tail) serves two purposes:
3759 * 1) allows us to batch the cost of updating the user visible
3760 * head updates.
3761 * 2) allows the kernel side to track the head on its own, even
3762 * though the application is the one updating it.
3764 head = ctx->cached_sq_head;
3765 /* make sure SQ entry isn't read before tail */
3766 if (unlikely(head == smp_load_acquire(&rings->sq.tail)))
3767 return false;
3769 head = READ_ONCE(sq_array[head & ctx->sq_mask]);
3770 if (likely(head < ctx->sq_entries)) {
3772 * All io need record the previous position, if LINK vs DARIN,
3773 * it can be used to mark the position of the first IO in the
3774 * link list.
3776 req->sequence = ctx->cached_sq_head;
3777 *sqe_ptr = &ctx->sq_sqes[head];
3778 req->opcode = READ_ONCE((*sqe_ptr)->opcode);
3779 req->user_data = READ_ONCE((*sqe_ptr)->user_data);
3780 ctx->cached_sq_head++;
3781 return true;
3784 /* drop invalid entries */
3785 ctx->cached_sq_head++;
3786 ctx->cached_sq_dropped++;
3787 WRITE_ONCE(rings->sq_dropped, ctx->cached_sq_dropped);
3788 return false;
3791 static int io_submit_sqes(struct io_ring_ctx *ctx, unsigned int nr,
3792 struct file *ring_file, int ring_fd,
3793 struct mm_struct **mm, bool async)
3795 struct io_submit_state state, *statep = NULL;
3796 struct io_kiocb *link = NULL;
3797 int i, submitted = 0;
3798 bool mm_fault = false;
3800 /* if we have a backlog and couldn't flush it all, return BUSY */
3801 if (!list_empty(&ctx->cq_overflow_list) &&
3802 !io_cqring_overflow_flush(ctx, false))
3803 return -EBUSY;
3805 if (nr > IO_PLUG_THRESHOLD) {
3806 io_submit_state_start(&state, nr);
3807 statep = &state;
3810 for (i = 0; i < nr; i++) {
3811 const struct io_uring_sqe *sqe;
3812 struct io_kiocb *req;
3813 unsigned int sqe_flags;
3815 req = io_get_req(ctx, statep);
3816 if (unlikely(!req)) {
3817 if (!submitted)
3818 submitted = -EAGAIN;
3819 break;
3821 if (!io_get_sqring(ctx, req, &sqe)) {
3822 __io_free_req(req);
3823 break;
3826 if (io_req_needs_user(req) && !*mm) {
3827 mm_fault = mm_fault || !mmget_not_zero(ctx->sqo_mm);
3828 if (!mm_fault) {
3829 use_mm(ctx->sqo_mm);
3830 *mm = ctx->sqo_mm;
3834 submitted++;
3835 sqe_flags = sqe->flags;
3837 req->ring_file = ring_file;
3838 req->ring_fd = ring_fd;
3839 req->has_user = *mm != NULL;
3840 req->in_async = async;
3841 req->needs_fixed_file = async;
3842 trace_io_uring_submit_sqe(ctx, req->user_data, true, async);
3843 if (!io_submit_sqe(req, sqe, statep, &link))
3844 break;
3846 * If previous wasn't linked and we have a linked command,
3847 * that's the end of the chain. Submit the previous link.
3849 if (!(sqe_flags & (IOSQE_IO_LINK|IOSQE_IO_HARDLINK)) && link) {
3850 io_queue_link_head(link);
3851 link = NULL;
3855 if (link)
3856 io_queue_link_head(link);
3857 if (statep)
3858 io_submit_state_end(&state);
3860 /* Commit SQ ring head once we've consumed and submitted all SQEs */
3861 io_commit_sqring(ctx);
3863 return submitted;
3866 static int io_sq_thread(void *data)
3868 struct io_ring_ctx *ctx = data;
3869 struct mm_struct *cur_mm = NULL;
3870 const struct cred *old_cred;
3871 mm_segment_t old_fs;
3872 DEFINE_WAIT(wait);
3873 unsigned long timeout;
3874 int ret = 0;
3876 complete(&ctx->completions[1]);
3878 old_fs = get_fs();
3879 set_fs(USER_DS);
3880 old_cred = override_creds(ctx->creds);
3882 timeout = jiffies + ctx->sq_thread_idle;
3883 while (!kthread_should_park()) {
3884 unsigned int to_submit;
3886 if (!list_empty(&ctx->poll_list)) {
3887 unsigned nr_events = 0;
3889 mutex_lock(&ctx->uring_lock);
3890 if (!list_empty(&ctx->poll_list))
3891 io_iopoll_getevents(ctx, &nr_events, 0);
3892 else
3893 timeout = jiffies + ctx->sq_thread_idle;
3894 mutex_unlock(&ctx->uring_lock);
3897 to_submit = io_sqring_entries(ctx);
3900 * If submit got -EBUSY, flag us as needing the application
3901 * to enter the kernel to reap and flush events.
3903 if (!to_submit || ret == -EBUSY) {
3905 * Drop cur_mm before scheduling, we can't hold it for
3906 * long periods (or over schedule()). Do this before
3907 * adding ourselves to the waitqueue, as the unuse/drop
3908 * may sleep.
3910 if (cur_mm) {
3911 unuse_mm(cur_mm);
3912 mmput(cur_mm);
3913 cur_mm = NULL;
3917 * We're polling. If we're within the defined idle
3918 * period, then let us spin without work before going
3919 * to sleep. The exception is if we got EBUSY doing
3920 * more IO, we should wait for the application to
3921 * reap events and wake us up.
3923 if (!list_empty(&ctx->poll_list) ||
3924 (!time_after(jiffies, timeout) && ret != -EBUSY &&
3925 !percpu_ref_is_dying(&ctx->refs))) {
3926 cond_resched();
3927 continue;
3930 prepare_to_wait(&ctx->sqo_wait, &wait,
3931 TASK_INTERRUPTIBLE);
3934 * While doing polled IO, before going to sleep, we need
3935 * to check if there are new reqs added to poll_list, it
3936 * is because reqs may have been punted to io worker and
3937 * will be added to poll_list later, hence check the
3938 * poll_list again.
3940 if ((ctx->flags & IORING_SETUP_IOPOLL) &&
3941 !list_empty_careful(&ctx->poll_list)) {
3942 finish_wait(&ctx->sqo_wait, &wait);
3943 continue;
3946 /* Tell userspace we may need a wakeup call */
3947 ctx->rings->sq_flags |= IORING_SQ_NEED_WAKEUP;
3948 /* make sure to read SQ tail after writing flags */
3949 smp_mb();
3951 to_submit = io_sqring_entries(ctx);
3952 if (!to_submit || ret == -EBUSY) {
3953 if (kthread_should_park()) {
3954 finish_wait(&ctx->sqo_wait, &wait);
3955 break;
3957 if (signal_pending(current))
3958 flush_signals(current);
3959 schedule();
3960 finish_wait(&ctx->sqo_wait, &wait);
3962 ctx->rings->sq_flags &= ~IORING_SQ_NEED_WAKEUP;
3963 continue;
3965 finish_wait(&ctx->sqo_wait, &wait);
3967 ctx->rings->sq_flags &= ~IORING_SQ_NEED_WAKEUP;
3970 to_submit = min(to_submit, ctx->sq_entries);
3971 mutex_lock(&ctx->uring_lock);
3972 ret = io_submit_sqes(ctx, to_submit, NULL, -1, &cur_mm, true);
3973 mutex_unlock(&ctx->uring_lock);
3974 timeout = jiffies + ctx->sq_thread_idle;
3977 set_fs(old_fs);
3978 if (cur_mm) {
3979 unuse_mm(cur_mm);
3980 mmput(cur_mm);
3982 revert_creds(old_cred);
3984 kthread_parkme();
3986 return 0;
3989 struct io_wait_queue {
3990 struct wait_queue_entry wq;
3991 struct io_ring_ctx *ctx;
3992 unsigned to_wait;
3993 unsigned nr_timeouts;
3996 static inline bool io_should_wake(struct io_wait_queue *iowq, bool noflush)
3998 struct io_ring_ctx *ctx = iowq->ctx;
4001 * Wake up if we have enough events, or if a timeout occurred since we
4002 * started waiting. For timeouts, we always want to return to userspace,
4003 * regardless of event count.
4005 return io_cqring_events(ctx, noflush) >= iowq->to_wait ||
4006 atomic_read(&ctx->cq_timeouts) != iowq->nr_timeouts;
4009 static int io_wake_function(struct wait_queue_entry *curr, unsigned int mode,
4010 int wake_flags, void *key)
4012 struct io_wait_queue *iowq = container_of(curr, struct io_wait_queue,
4013 wq);
4015 /* use noflush == true, as we can't safely rely on locking context */
4016 if (!io_should_wake(iowq, true))
4017 return -1;
4019 return autoremove_wake_function(curr, mode, wake_flags, key);
4023 * Wait until events become available, if we don't already have some. The
4024 * application must reap them itself, as they reside on the shared cq ring.
4026 static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events,
4027 const sigset_t __user *sig, size_t sigsz)
4029 struct io_wait_queue iowq = {
4030 .wq = {
4031 .private = current,
4032 .func = io_wake_function,
4033 .entry = LIST_HEAD_INIT(iowq.wq.entry),
4035 .ctx = ctx,
4036 .to_wait = min_events,
4038 struct io_rings *rings = ctx->rings;
4039 int ret = 0;
4041 if (io_cqring_events(ctx, false) >= min_events)
4042 return 0;
4044 if (sig) {
4045 #ifdef CONFIG_COMPAT
4046 if (in_compat_syscall())
4047 ret = set_compat_user_sigmask((const compat_sigset_t __user *)sig,
4048 sigsz);
4049 else
4050 #endif
4051 ret = set_user_sigmask(sig, sigsz);
4053 if (ret)
4054 return ret;
4057 iowq.nr_timeouts = atomic_read(&ctx->cq_timeouts);
4058 trace_io_uring_cqring_wait(ctx, min_events);
4059 do {
4060 prepare_to_wait_exclusive(&ctx->wait, &iowq.wq,
4061 TASK_INTERRUPTIBLE);
4062 if (io_should_wake(&iowq, false))
4063 break;
4064 schedule();
4065 if (signal_pending(current)) {
4066 ret = -EINTR;
4067 break;
4069 } while (1);
4070 finish_wait(&ctx->wait, &iowq.wq);
4072 restore_saved_sigmask_unless(ret == -EINTR);
4074 return READ_ONCE(rings->cq.head) == READ_ONCE(rings->cq.tail) ? ret : 0;
4077 static void __io_sqe_files_unregister(struct io_ring_ctx *ctx)
4079 #if defined(CONFIG_UNIX)
4080 if (ctx->ring_sock) {
4081 struct sock *sock = ctx->ring_sock->sk;
4082 struct sk_buff *skb;
4084 while ((skb = skb_dequeue(&sock->sk_receive_queue)) != NULL)
4085 kfree_skb(skb);
4087 #else
4088 int i;
4090 for (i = 0; i < ctx->nr_user_files; i++) {
4091 struct file *file;
4093 file = io_file_from_index(ctx, i);
4094 if (file)
4095 fput(file);
4097 #endif
4100 static int io_sqe_files_unregister(struct io_ring_ctx *ctx)
4102 unsigned nr_tables, i;
4104 if (!ctx->file_table)
4105 return -ENXIO;
4107 __io_sqe_files_unregister(ctx);
4108 nr_tables = DIV_ROUND_UP(ctx->nr_user_files, IORING_MAX_FILES_TABLE);
4109 for (i = 0; i < nr_tables; i++)
4110 kfree(ctx->file_table[i].files);
4111 kfree(ctx->file_table);
4112 ctx->file_table = NULL;
4113 ctx->nr_user_files = 0;
4114 return 0;
4117 static void io_sq_thread_stop(struct io_ring_ctx *ctx)
4119 if (ctx->sqo_thread) {
4120 wait_for_completion(&ctx->completions[1]);
4122 * The park is a bit of a work-around, without it we get
4123 * warning spews on shutdown with SQPOLL set and affinity
4124 * set to a single CPU.
4126 kthread_park(ctx->sqo_thread);
4127 kthread_stop(ctx->sqo_thread);
4128 ctx->sqo_thread = NULL;
4132 static void io_finish_async(struct io_ring_ctx *ctx)
4134 io_sq_thread_stop(ctx);
4136 if (ctx->io_wq) {
4137 io_wq_destroy(ctx->io_wq);
4138 ctx->io_wq = NULL;
4142 #if defined(CONFIG_UNIX)
4143 static void io_destruct_skb(struct sk_buff *skb)
4145 struct io_ring_ctx *ctx = skb->sk->sk_user_data;
4147 if (ctx->io_wq)
4148 io_wq_flush(ctx->io_wq);
4150 unix_destruct_scm(skb);
4154 * Ensure the UNIX gc is aware of our file set, so we are certain that
4155 * the io_uring can be safely unregistered on process exit, even if we have
4156 * loops in the file referencing.
4158 static int __io_sqe_files_scm(struct io_ring_ctx *ctx, int nr, int offset)
4160 struct sock *sk = ctx->ring_sock->sk;
4161 struct scm_fp_list *fpl;
4162 struct sk_buff *skb;
4163 int i, nr_files;
4165 if (!capable(CAP_SYS_RESOURCE) && !capable(CAP_SYS_ADMIN)) {
4166 unsigned long inflight = ctx->user->unix_inflight + nr;
4168 if (inflight > task_rlimit(current, RLIMIT_NOFILE))
4169 return -EMFILE;
4172 fpl = kzalloc(sizeof(*fpl), GFP_KERNEL);
4173 if (!fpl)
4174 return -ENOMEM;
4176 skb = alloc_skb(0, GFP_KERNEL);
4177 if (!skb) {
4178 kfree(fpl);
4179 return -ENOMEM;
4182 skb->sk = sk;
4184 nr_files = 0;
4185 fpl->user = get_uid(ctx->user);
4186 for (i = 0; i < nr; i++) {
4187 struct file *file = io_file_from_index(ctx, i + offset);
4189 if (!file)
4190 continue;
4191 fpl->fp[nr_files] = get_file(file);
4192 unix_inflight(fpl->user, fpl->fp[nr_files]);
4193 nr_files++;
4196 if (nr_files) {
4197 fpl->max = SCM_MAX_FD;
4198 fpl->count = nr_files;
4199 UNIXCB(skb).fp = fpl;
4200 skb->destructor = io_destruct_skb;
4201 refcount_add(skb->truesize, &sk->sk_wmem_alloc);
4202 skb_queue_head(&sk->sk_receive_queue, skb);
4204 for (i = 0; i < nr_files; i++)
4205 fput(fpl->fp[i]);
4206 } else {
4207 kfree_skb(skb);
4208 kfree(fpl);
4211 return 0;
4215 * If UNIX sockets are enabled, fd passing can cause a reference cycle which
4216 * causes regular reference counting to break down. We rely on the UNIX
4217 * garbage collection to take care of this problem for us.
4219 static int io_sqe_files_scm(struct io_ring_ctx *ctx)
4221 unsigned left, total;
4222 int ret = 0;
4224 total = 0;
4225 left = ctx->nr_user_files;
4226 while (left) {
4227 unsigned this_files = min_t(unsigned, left, SCM_MAX_FD);
4229 ret = __io_sqe_files_scm(ctx, this_files, total);
4230 if (ret)
4231 break;
4232 left -= this_files;
4233 total += this_files;
4236 if (!ret)
4237 return 0;
4239 while (total < ctx->nr_user_files) {
4240 struct file *file = io_file_from_index(ctx, total);
4242 if (file)
4243 fput(file);
4244 total++;
4247 return ret;
4249 #else
4250 static int io_sqe_files_scm(struct io_ring_ctx *ctx)
4252 return 0;
4254 #endif
4256 static int io_sqe_alloc_file_tables(struct io_ring_ctx *ctx, unsigned nr_tables,
4257 unsigned nr_files)
4259 int i;
4261 for (i = 0; i < nr_tables; i++) {
4262 struct fixed_file_table *table = &ctx->file_table[i];
4263 unsigned this_files;
4265 this_files = min(nr_files, IORING_MAX_FILES_TABLE);
4266 table->files = kcalloc(this_files, sizeof(struct file *),
4267 GFP_KERNEL);
4268 if (!table->files)
4269 break;
4270 nr_files -= this_files;
4273 if (i == nr_tables)
4274 return 0;
4276 for (i = 0; i < nr_tables; i++) {
4277 struct fixed_file_table *table = &ctx->file_table[i];
4278 kfree(table->files);
4280 return 1;
4283 static int io_sqe_files_register(struct io_ring_ctx *ctx, void __user *arg,
4284 unsigned nr_args)
4286 __s32 __user *fds = (__s32 __user *) arg;
4287 unsigned nr_tables;
4288 int fd, ret = 0;
4289 unsigned i;
4291 if (ctx->file_table)
4292 return -EBUSY;
4293 if (!nr_args)
4294 return -EINVAL;
4295 if (nr_args > IORING_MAX_FIXED_FILES)
4296 return -EMFILE;
4298 nr_tables = DIV_ROUND_UP(nr_args, IORING_MAX_FILES_TABLE);
4299 ctx->file_table = kcalloc(nr_tables, sizeof(struct fixed_file_table),
4300 GFP_KERNEL);
4301 if (!ctx->file_table)
4302 return -ENOMEM;
4304 if (io_sqe_alloc_file_tables(ctx, nr_tables, nr_args)) {
4305 kfree(ctx->file_table);
4306 ctx->file_table = NULL;
4307 return -ENOMEM;
4310 for (i = 0; i < nr_args; i++, ctx->nr_user_files++) {
4311 struct fixed_file_table *table;
4312 unsigned index;
4314 ret = -EFAULT;
4315 if (copy_from_user(&fd, &fds[i], sizeof(fd)))
4316 break;
4317 /* allow sparse sets */
4318 if (fd == -1) {
4319 ret = 0;
4320 continue;
4323 table = &ctx->file_table[i >> IORING_FILE_TABLE_SHIFT];
4324 index = i & IORING_FILE_TABLE_MASK;
4325 table->files[index] = fget(fd);
4327 ret = -EBADF;
4328 if (!table->files[index])
4329 break;
4331 * Don't allow io_uring instances to be registered. If UNIX
4332 * isn't enabled, then this causes a reference cycle and this
4333 * instance can never get freed. If UNIX is enabled we'll
4334 * handle it just fine, but there's still no point in allowing
4335 * a ring fd as it doesn't support regular read/write anyway.
4337 if (table->files[index]->f_op == &io_uring_fops) {
4338 fput(table->files[index]);
4339 break;
4341 ret = 0;
4344 if (ret) {
4345 for (i = 0; i < ctx->nr_user_files; i++) {
4346 struct file *file;
4348 file = io_file_from_index(ctx, i);
4349 if (file)
4350 fput(file);
4352 for (i = 0; i < nr_tables; i++)
4353 kfree(ctx->file_table[i].files);
4355 kfree(ctx->file_table);
4356 ctx->file_table = NULL;
4357 ctx->nr_user_files = 0;
4358 return ret;
4361 ret = io_sqe_files_scm(ctx);
4362 if (ret)
4363 io_sqe_files_unregister(ctx);
4365 return ret;
4368 static void io_sqe_file_unregister(struct io_ring_ctx *ctx, int index)
4370 #if defined(CONFIG_UNIX)
4371 struct file *file = io_file_from_index(ctx, index);
4372 struct sock *sock = ctx->ring_sock->sk;
4373 struct sk_buff_head list, *head = &sock->sk_receive_queue;
4374 struct sk_buff *skb;
4375 int i;
4377 __skb_queue_head_init(&list);
4380 * Find the skb that holds this file in its SCM_RIGHTS. When found,
4381 * remove this entry and rearrange the file array.
4383 skb = skb_dequeue(head);
4384 while (skb) {
4385 struct scm_fp_list *fp;
4387 fp = UNIXCB(skb).fp;
4388 for (i = 0; i < fp->count; i++) {
4389 int left;
4391 if (fp->fp[i] != file)
4392 continue;
4394 unix_notinflight(fp->user, fp->fp[i]);
4395 left = fp->count - 1 - i;
4396 if (left) {
4397 memmove(&fp->fp[i], &fp->fp[i + 1],
4398 left * sizeof(struct file *));
4400 fp->count--;
4401 if (!fp->count) {
4402 kfree_skb(skb);
4403 skb = NULL;
4404 } else {
4405 __skb_queue_tail(&list, skb);
4407 fput(file);
4408 file = NULL;
4409 break;
4412 if (!file)
4413 break;
4415 __skb_queue_tail(&list, skb);
4417 skb = skb_dequeue(head);
4420 if (skb_peek(&list)) {
4421 spin_lock_irq(&head->lock);
4422 while ((skb = __skb_dequeue(&list)) != NULL)
4423 __skb_queue_tail(head, skb);
4424 spin_unlock_irq(&head->lock);
4426 #else
4427 fput(io_file_from_index(ctx, index));
4428 #endif
4431 static int io_sqe_file_register(struct io_ring_ctx *ctx, struct file *file,
4432 int index)
4434 #if defined(CONFIG_UNIX)
4435 struct sock *sock = ctx->ring_sock->sk;
4436 struct sk_buff_head *head = &sock->sk_receive_queue;
4437 struct sk_buff *skb;
4440 * See if we can merge this file into an existing skb SCM_RIGHTS
4441 * file set. If there's no room, fall back to allocating a new skb
4442 * and filling it in.
4444 spin_lock_irq(&head->lock);
4445 skb = skb_peek(head);
4446 if (skb) {
4447 struct scm_fp_list *fpl = UNIXCB(skb).fp;
4449 if (fpl->count < SCM_MAX_FD) {
4450 __skb_unlink(skb, head);
4451 spin_unlock_irq(&head->lock);
4452 fpl->fp[fpl->count] = get_file(file);
4453 unix_inflight(fpl->user, fpl->fp[fpl->count]);
4454 fpl->count++;
4455 spin_lock_irq(&head->lock);
4456 __skb_queue_head(head, skb);
4457 } else {
4458 skb = NULL;
4461 spin_unlock_irq(&head->lock);
4463 if (skb) {
4464 fput(file);
4465 return 0;
4468 return __io_sqe_files_scm(ctx, 1, index);
4469 #else
4470 return 0;
4471 #endif
4474 static int io_sqe_files_update(struct io_ring_ctx *ctx, void __user *arg,
4475 unsigned nr_args)
4477 struct io_uring_files_update up;
4478 __s32 __user *fds;
4479 int fd, i, err;
4480 __u32 done;
4482 if (!ctx->file_table)
4483 return -ENXIO;
4484 if (!nr_args)
4485 return -EINVAL;
4486 if (copy_from_user(&up, arg, sizeof(up)))
4487 return -EFAULT;
4488 if (up.resv)
4489 return -EINVAL;
4490 if (check_add_overflow(up.offset, nr_args, &done))
4491 return -EOVERFLOW;
4492 if (done > ctx->nr_user_files)
4493 return -EINVAL;
4495 done = 0;
4496 fds = u64_to_user_ptr(up.fds);
4497 while (nr_args) {
4498 struct fixed_file_table *table;
4499 unsigned index;
4501 err = 0;
4502 if (copy_from_user(&fd, &fds[done], sizeof(fd))) {
4503 err = -EFAULT;
4504 break;
4506 i = array_index_nospec(up.offset, ctx->nr_user_files);
4507 table = &ctx->file_table[i >> IORING_FILE_TABLE_SHIFT];
4508 index = i & IORING_FILE_TABLE_MASK;
4509 if (table->files[index]) {
4510 io_sqe_file_unregister(ctx, i);
4511 table->files[index] = NULL;
4513 if (fd != -1) {
4514 struct file *file;
4516 file = fget(fd);
4517 if (!file) {
4518 err = -EBADF;
4519 break;
4522 * Don't allow io_uring instances to be registered. If
4523 * UNIX isn't enabled, then this causes a reference
4524 * cycle and this instance can never get freed. If UNIX
4525 * is enabled we'll handle it just fine, but there's
4526 * still no point in allowing a ring fd as it doesn't
4527 * support regular read/write anyway.
4529 if (file->f_op == &io_uring_fops) {
4530 fput(file);
4531 err = -EBADF;
4532 break;
4534 table->files[index] = file;
4535 err = io_sqe_file_register(ctx, file, i);
4536 if (err)
4537 break;
4539 nr_args--;
4540 done++;
4541 up.offset++;
4544 return done ? done : err;
4547 static void io_put_work(struct io_wq_work *work)
4549 struct io_kiocb *req = container_of(work, struct io_kiocb, work);
4551 io_put_req(req);
4554 static void io_get_work(struct io_wq_work *work)
4556 struct io_kiocb *req = container_of(work, struct io_kiocb, work);
4558 refcount_inc(&req->refs);
4561 static int io_sq_offload_start(struct io_ring_ctx *ctx,
4562 struct io_uring_params *p)
4564 struct io_wq_data data;
4565 unsigned concurrency;
4566 int ret;
4568 init_waitqueue_head(&ctx->sqo_wait);
4569 mmgrab(current->mm);
4570 ctx->sqo_mm = current->mm;
4572 if (ctx->flags & IORING_SETUP_SQPOLL) {
4573 ret = -EPERM;
4574 if (!capable(CAP_SYS_ADMIN))
4575 goto err;
4577 ctx->sq_thread_idle = msecs_to_jiffies(p->sq_thread_idle);
4578 if (!ctx->sq_thread_idle)
4579 ctx->sq_thread_idle = HZ;
4581 if (p->flags & IORING_SETUP_SQ_AFF) {
4582 int cpu = p->sq_thread_cpu;
4584 ret = -EINVAL;
4585 if (cpu >= nr_cpu_ids)
4586 goto err;
4587 if (!cpu_online(cpu))
4588 goto err;
4590 ctx->sqo_thread = kthread_create_on_cpu(io_sq_thread,
4591 ctx, cpu,
4592 "io_uring-sq");
4593 } else {
4594 ctx->sqo_thread = kthread_create(io_sq_thread, ctx,
4595 "io_uring-sq");
4597 if (IS_ERR(ctx->sqo_thread)) {
4598 ret = PTR_ERR(ctx->sqo_thread);
4599 ctx->sqo_thread = NULL;
4600 goto err;
4602 wake_up_process(ctx->sqo_thread);
4603 } else if (p->flags & IORING_SETUP_SQ_AFF) {
4604 /* Can't have SQ_AFF without SQPOLL */
4605 ret = -EINVAL;
4606 goto err;
4609 data.mm = ctx->sqo_mm;
4610 data.user = ctx->user;
4611 data.creds = ctx->creds;
4612 data.get_work = io_get_work;
4613 data.put_work = io_put_work;
4615 /* Do QD, or 4 * CPUS, whatever is smallest */
4616 concurrency = min(ctx->sq_entries, 4 * num_online_cpus());
4617 ctx->io_wq = io_wq_create(concurrency, &data);
4618 if (IS_ERR(ctx->io_wq)) {
4619 ret = PTR_ERR(ctx->io_wq);
4620 ctx->io_wq = NULL;
4621 goto err;
4624 return 0;
4625 err:
4626 io_finish_async(ctx);
4627 mmdrop(ctx->sqo_mm);
4628 ctx->sqo_mm = NULL;
4629 return ret;
4632 static void io_unaccount_mem(struct user_struct *user, unsigned long nr_pages)
4634 atomic_long_sub(nr_pages, &user->locked_vm);
4637 static int io_account_mem(struct user_struct *user, unsigned long nr_pages)
4639 unsigned long page_limit, cur_pages, new_pages;
4641 /* Don't allow more pages than we can safely lock */
4642 page_limit = rlimit(RLIMIT_MEMLOCK) >> PAGE_SHIFT;
4644 do {
4645 cur_pages = atomic_long_read(&user->locked_vm);
4646 new_pages = cur_pages + nr_pages;
4647 if (new_pages > page_limit)
4648 return -ENOMEM;
4649 } while (atomic_long_cmpxchg(&user->locked_vm, cur_pages,
4650 new_pages) != cur_pages);
4652 return 0;
4655 static void io_mem_free(void *ptr)
4657 struct page *page;
4659 if (!ptr)
4660 return;
4662 page = virt_to_head_page(ptr);
4663 if (put_page_testzero(page))
4664 free_compound_page(page);
4667 static void *io_mem_alloc(size_t size)
4669 gfp_t gfp_flags = GFP_KERNEL | __GFP_ZERO | __GFP_NOWARN | __GFP_COMP |
4670 __GFP_NORETRY;
4672 return (void *) __get_free_pages(gfp_flags, get_order(size));
4675 static unsigned long rings_size(unsigned sq_entries, unsigned cq_entries,
4676 size_t *sq_offset)
4678 struct io_rings *rings;
4679 size_t off, sq_array_size;
4681 off = struct_size(rings, cqes, cq_entries);
4682 if (off == SIZE_MAX)
4683 return SIZE_MAX;
4685 #ifdef CONFIG_SMP
4686 off = ALIGN(off, SMP_CACHE_BYTES);
4687 if (off == 0)
4688 return SIZE_MAX;
4689 #endif
4691 sq_array_size = array_size(sizeof(u32), sq_entries);
4692 if (sq_array_size == SIZE_MAX)
4693 return SIZE_MAX;
4695 if (check_add_overflow(off, sq_array_size, &off))
4696 return SIZE_MAX;
4698 if (sq_offset)
4699 *sq_offset = off;
4701 return off;
4704 static unsigned long ring_pages(unsigned sq_entries, unsigned cq_entries)
4706 size_t pages;
4708 pages = (size_t)1 << get_order(
4709 rings_size(sq_entries, cq_entries, NULL));
4710 pages += (size_t)1 << get_order(
4711 array_size(sizeof(struct io_uring_sqe), sq_entries));
4713 return pages;
4716 static int io_sqe_buffer_unregister(struct io_ring_ctx *ctx)
4718 int i, j;
4720 if (!ctx->user_bufs)
4721 return -ENXIO;
4723 for (i = 0; i < ctx->nr_user_bufs; i++) {
4724 struct io_mapped_ubuf *imu = &ctx->user_bufs[i];
4726 for (j = 0; j < imu->nr_bvecs; j++)
4727 put_user_page(imu->bvec[j].bv_page);
4729 if (ctx->account_mem)
4730 io_unaccount_mem(ctx->user, imu->nr_bvecs);
4731 kvfree(imu->bvec);
4732 imu->nr_bvecs = 0;
4735 kfree(ctx->user_bufs);
4736 ctx->user_bufs = NULL;
4737 ctx->nr_user_bufs = 0;
4738 return 0;
4741 static int io_copy_iov(struct io_ring_ctx *ctx, struct iovec *dst,
4742 void __user *arg, unsigned index)
4744 struct iovec __user *src;
4746 #ifdef CONFIG_COMPAT
4747 if (ctx->compat) {
4748 struct compat_iovec __user *ciovs;
4749 struct compat_iovec ciov;
4751 ciovs = (struct compat_iovec __user *) arg;
4752 if (copy_from_user(&ciov, &ciovs[index], sizeof(ciov)))
4753 return -EFAULT;
4755 dst->iov_base = u64_to_user_ptr((u64)ciov.iov_base);
4756 dst->iov_len = ciov.iov_len;
4757 return 0;
4759 #endif
4760 src = (struct iovec __user *) arg;
4761 if (copy_from_user(dst, &src[index], sizeof(*dst)))
4762 return -EFAULT;
4763 return 0;
4766 static int io_sqe_buffer_register(struct io_ring_ctx *ctx, void __user *arg,
4767 unsigned nr_args)
4769 struct vm_area_struct **vmas = NULL;
4770 struct page **pages = NULL;
4771 int i, j, got_pages = 0;
4772 int ret = -EINVAL;
4774 if (ctx->user_bufs)
4775 return -EBUSY;
4776 if (!nr_args || nr_args > UIO_MAXIOV)
4777 return -EINVAL;
4779 ctx->user_bufs = kcalloc(nr_args, sizeof(struct io_mapped_ubuf),
4780 GFP_KERNEL);
4781 if (!ctx->user_bufs)
4782 return -ENOMEM;
4784 for (i = 0; i < nr_args; i++) {
4785 struct io_mapped_ubuf *imu = &ctx->user_bufs[i];
4786 unsigned long off, start, end, ubuf;
4787 int pret, nr_pages;
4788 struct iovec iov;
4789 size_t size;
4791 ret = io_copy_iov(ctx, &iov, arg, i);
4792 if (ret)
4793 goto err;
4796 * Don't impose further limits on the size and buffer
4797 * constraints here, we'll -EINVAL later when IO is
4798 * submitted if they are wrong.
4800 ret = -EFAULT;
4801 if (!iov.iov_base || !iov.iov_len)
4802 goto err;
4804 /* arbitrary limit, but we need something */
4805 if (iov.iov_len > SZ_1G)
4806 goto err;
4808 ubuf = (unsigned long) iov.iov_base;
4809 end = (ubuf + iov.iov_len + PAGE_SIZE - 1) >> PAGE_SHIFT;
4810 start = ubuf >> PAGE_SHIFT;
4811 nr_pages = end - start;
4813 if (ctx->account_mem) {
4814 ret = io_account_mem(ctx->user, nr_pages);
4815 if (ret)
4816 goto err;
4819 ret = 0;
4820 if (!pages || nr_pages > got_pages) {
4821 kfree(vmas);
4822 kfree(pages);
4823 pages = kvmalloc_array(nr_pages, sizeof(struct page *),
4824 GFP_KERNEL);
4825 vmas = kvmalloc_array(nr_pages,
4826 sizeof(struct vm_area_struct *),
4827 GFP_KERNEL);
4828 if (!pages || !vmas) {
4829 ret = -ENOMEM;
4830 if (ctx->account_mem)
4831 io_unaccount_mem(ctx->user, nr_pages);
4832 goto err;
4834 got_pages = nr_pages;
4837 imu->bvec = kvmalloc_array(nr_pages, sizeof(struct bio_vec),
4838 GFP_KERNEL);
4839 ret = -ENOMEM;
4840 if (!imu->bvec) {
4841 if (ctx->account_mem)
4842 io_unaccount_mem(ctx->user, nr_pages);
4843 goto err;
4846 ret = 0;
4847 down_read(&current->mm->mmap_sem);
4848 pret = get_user_pages(ubuf, nr_pages,
4849 FOLL_WRITE | FOLL_LONGTERM,
4850 pages, vmas);
4851 if (pret == nr_pages) {
4852 /* don't support file backed memory */
4853 for (j = 0; j < nr_pages; j++) {
4854 struct vm_area_struct *vma = vmas[j];
4856 if (vma->vm_file &&
4857 !is_file_hugepages(vma->vm_file)) {
4858 ret = -EOPNOTSUPP;
4859 break;
4862 } else {
4863 ret = pret < 0 ? pret : -EFAULT;
4865 up_read(&current->mm->mmap_sem);
4866 if (ret) {
4868 * if we did partial map, or found file backed vmas,
4869 * release any pages we did get
4871 if (pret > 0)
4872 put_user_pages(pages, pret);
4873 if (ctx->account_mem)
4874 io_unaccount_mem(ctx->user, nr_pages);
4875 kvfree(imu->bvec);
4876 goto err;
4879 off = ubuf & ~PAGE_MASK;
4880 size = iov.iov_len;
4881 for (j = 0; j < nr_pages; j++) {
4882 size_t vec_len;
4884 vec_len = min_t(size_t, size, PAGE_SIZE - off);
4885 imu->bvec[j].bv_page = pages[j];
4886 imu->bvec[j].bv_len = vec_len;
4887 imu->bvec[j].bv_offset = off;
4888 off = 0;
4889 size -= vec_len;
4891 /* store original address for later verification */
4892 imu->ubuf = ubuf;
4893 imu->len = iov.iov_len;
4894 imu->nr_bvecs = nr_pages;
4896 ctx->nr_user_bufs++;
4898 kvfree(pages);
4899 kvfree(vmas);
4900 return 0;
4901 err:
4902 kvfree(pages);
4903 kvfree(vmas);
4904 io_sqe_buffer_unregister(ctx);
4905 return ret;
4908 static int io_eventfd_register(struct io_ring_ctx *ctx, void __user *arg)
4910 __s32 __user *fds = arg;
4911 int fd;
4913 if (ctx->cq_ev_fd)
4914 return -EBUSY;
4916 if (copy_from_user(&fd, fds, sizeof(*fds)))
4917 return -EFAULT;
4919 ctx->cq_ev_fd = eventfd_ctx_fdget(fd);
4920 if (IS_ERR(ctx->cq_ev_fd)) {
4921 int ret = PTR_ERR(ctx->cq_ev_fd);
4922 ctx->cq_ev_fd = NULL;
4923 return ret;
4926 return 0;
4929 static int io_eventfd_unregister(struct io_ring_ctx *ctx)
4931 if (ctx->cq_ev_fd) {
4932 eventfd_ctx_put(ctx->cq_ev_fd);
4933 ctx->cq_ev_fd = NULL;
4934 return 0;
4937 return -ENXIO;
4940 static void io_ring_ctx_free(struct io_ring_ctx *ctx)
4942 io_finish_async(ctx);
4943 if (ctx->sqo_mm)
4944 mmdrop(ctx->sqo_mm);
4946 io_iopoll_reap_events(ctx);
4947 io_sqe_buffer_unregister(ctx);
4948 io_sqe_files_unregister(ctx);
4949 io_eventfd_unregister(ctx);
4951 #if defined(CONFIG_UNIX)
4952 if (ctx->ring_sock) {
4953 ctx->ring_sock->file = NULL; /* so that iput() is called */
4954 sock_release(ctx->ring_sock);
4956 #endif
4958 io_mem_free(ctx->rings);
4959 io_mem_free(ctx->sq_sqes);
4961 percpu_ref_exit(&ctx->refs);
4962 if (ctx->account_mem)
4963 io_unaccount_mem(ctx->user,
4964 ring_pages(ctx->sq_entries, ctx->cq_entries));
4965 free_uid(ctx->user);
4966 put_cred(ctx->creds);
4967 kfree(ctx->completions);
4968 kfree(ctx->cancel_hash);
4969 kmem_cache_free(req_cachep, ctx->fallback_req);
4970 kfree(ctx);
4973 static __poll_t io_uring_poll(struct file *file, poll_table *wait)
4975 struct io_ring_ctx *ctx = file->private_data;
4976 __poll_t mask = 0;
4978 poll_wait(file, &ctx->cq_wait, wait);
4980 * synchronizes with barrier from wq_has_sleeper call in
4981 * io_commit_cqring
4983 smp_rmb();
4984 if (READ_ONCE(ctx->rings->sq.tail) - ctx->cached_sq_head !=
4985 ctx->rings->sq_ring_entries)
4986 mask |= EPOLLOUT | EPOLLWRNORM;
4987 if (io_cqring_events(ctx, false))
4988 mask |= EPOLLIN | EPOLLRDNORM;
4990 return mask;
4993 static int io_uring_fasync(int fd, struct file *file, int on)
4995 struct io_ring_ctx *ctx = file->private_data;
4997 return fasync_helper(fd, file, on, &ctx->cq_fasync);
5000 static void io_ring_ctx_wait_and_kill(struct io_ring_ctx *ctx)
5002 mutex_lock(&ctx->uring_lock);
5003 percpu_ref_kill(&ctx->refs);
5004 mutex_unlock(&ctx->uring_lock);
5007 * Wait for sq thread to idle, if we have one. It won't spin on new
5008 * work after we've killed the ctx ref above. This is important to do
5009 * before we cancel existing commands, as the thread could otherwise
5010 * be queueing new work post that. If that's work we need to cancel,
5011 * it could cause shutdown to hang.
5013 while (ctx->sqo_thread && !wq_has_sleeper(&ctx->sqo_wait))
5014 cpu_relax();
5016 io_kill_timeouts(ctx);
5017 io_poll_remove_all(ctx);
5019 if (ctx->io_wq)
5020 io_wq_cancel_all(ctx->io_wq);
5022 io_iopoll_reap_events(ctx);
5023 /* if we failed setting up the ctx, we might not have any rings */
5024 if (ctx->rings)
5025 io_cqring_overflow_flush(ctx, true);
5026 wait_for_completion(&ctx->completions[0]);
5027 io_ring_ctx_free(ctx);
5030 static int io_uring_release(struct inode *inode, struct file *file)
5032 struct io_ring_ctx *ctx = file->private_data;
5034 file->private_data = NULL;
5035 io_ring_ctx_wait_and_kill(ctx);
5036 return 0;
5039 static void io_uring_cancel_files(struct io_ring_ctx *ctx,
5040 struct files_struct *files)
5042 struct io_kiocb *req;
5043 DEFINE_WAIT(wait);
5045 while (!list_empty_careful(&ctx->inflight_list)) {
5046 struct io_kiocb *cancel_req = NULL;
5048 spin_lock_irq(&ctx->inflight_lock);
5049 list_for_each_entry(req, &ctx->inflight_list, inflight_entry) {
5050 if (req->work.files != files)
5051 continue;
5052 /* req is being completed, ignore */
5053 if (!refcount_inc_not_zero(&req->refs))
5054 continue;
5055 cancel_req = req;
5056 break;
5058 if (cancel_req)
5059 prepare_to_wait(&ctx->inflight_wait, &wait,
5060 TASK_UNINTERRUPTIBLE);
5061 spin_unlock_irq(&ctx->inflight_lock);
5063 /* We need to keep going until we don't find a matching req */
5064 if (!cancel_req)
5065 break;
5067 io_wq_cancel_work(ctx->io_wq, &cancel_req->work);
5068 io_put_req(cancel_req);
5069 schedule();
5071 finish_wait(&ctx->inflight_wait, &wait);
5074 static int io_uring_flush(struct file *file, void *data)
5076 struct io_ring_ctx *ctx = file->private_data;
5078 io_uring_cancel_files(ctx, data);
5079 return 0;
5082 static void *io_uring_validate_mmap_request(struct file *file,
5083 loff_t pgoff, size_t sz)
5085 struct io_ring_ctx *ctx = file->private_data;
5086 loff_t offset = pgoff << PAGE_SHIFT;
5087 struct page *page;
5088 void *ptr;
5090 switch (offset) {
5091 case IORING_OFF_SQ_RING:
5092 case IORING_OFF_CQ_RING:
5093 ptr = ctx->rings;
5094 break;
5095 case IORING_OFF_SQES:
5096 ptr = ctx->sq_sqes;
5097 break;
5098 default:
5099 return ERR_PTR(-EINVAL);
5102 page = virt_to_head_page(ptr);
5103 if (sz > page_size(page))
5104 return ERR_PTR(-EINVAL);
5106 return ptr;
5109 #ifdef CONFIG_MMU
5111 static int io_uring_mmap(struct file *file, struct vm_area_struct *vma)
5113 size_t sz = vma->vm_end - vma->vm_start;
5114 unsigned long pfn;
5115 void *ptr;
5117 ptr = io_uring_validate_mmap_request(file, vma->vm_pgoff, sz);
5118 if (IS_ERR(ptr))
5119 return PTR_ERR(ptr);
5121 pfn = virt_to_phys(ptr) >> PAGE_SHIFT;
5122 return remap_pfn_range(vma, vma->vm_start, pfn, sz, vma->vm_page_prot);
5125 #else /* !CONFIG_MMU */
5127 static int io_uring_mmap(struct file *file, struct vm_area_struct *vma)
5129 return vma->vm_flags & (VM_SHARED | VM_MAYSHARE) ? 0 : -EINVAL;
5132 static unsigned int io_uring_nommu_mmap_capabilities(struct file *file)
5134 return NOMMU_MAP_DIRECT | NOMMU_MAP_READ | NOMMU_MAP_WRITE;
5137 static unsigned long io_uring_nommu_get_unmapped_area(struct file *file,
5138 unsigned long addr, unsigned long len,
5139 unsigned long pgoff, unsigned long flags)
5141 void *ptr;
5143 ptr = io_uring_validate_mmap_request(file, pgoff, len);
5144 if (IS_ERR(ptr))
5145 return PTR_ERR(ptr);
5147 return (unsigned long) ptr;
5150 #endif /* !CONFIG_MMU */
5152 SYSCALL_DEFINE6(io_uring_enter, unsigned int, fd, u32, to_submit,
5153 u32, min_complete, u32, flags, const sigset_t __user *, sig,
5154 size_t, sigsz)
5156 struct io_ring_ctx *ctx;
5157 long ret = -EBADF;
5158 int submitted = 0;
5159 struct fd f;
5161 if (flags & ~(IORING_ENTER_GETEVENTS | IORING_ENTER_SQ_WAKEUP))
5162 return -EINVAL;
5164 f = fdget(fd);
5165 if (!f.file)
5166 return -EBADF;
5168 ret = -EOPNOTSUPP;
5169 if (f.file->f_op != &io_uring_fops)
5170 goto out_fput;
5172 ret = -ENXIO;
5173 ctx = f.file->private_data;
5174 if (!percpu_ref_tryget(&ctx->refs))
5175 goto out_fput;
5178 * For SQ polling, the thread will do all submissions and completions.
5179 * Just return the requested submit count, and wake the thread if
5180 * we were asked to.
5182 ret = 0;
5183 if (ctx->flags & IORING_SETUP_SQPOLL) {
5184 if (!list_empty_careful(&ctx->cq_overflow_list))
5185 io_cqring_overflow_flush(ctx, false);
5186 if (flags & IORING_ENTER_SQ_WAKEUP)
5187 wake_up(&ctx->sqo_wait);
5188 submitted = to_submit;
5189 } else if (to_submit) {
5190 struct mm_struct *cur_mm;
5192 to_submit = min(to_submit, ctx->sq_entries);
5193 mutex_lock(&ctx->uring_lock);
5194 /* already have mm, so io_submit_sqes() won't try to grab it */
5195 cur_mm = ctx->sqo_mm;
5196 submitted = io_submit_sqes(ctx, to_submit, f.file, fd,
5197 &cur_mm, false);
5198 mutex_unlock(&ctx->uring_lock);
5200 if (submitted != to_submit)
5201 goto out;
5203 if (flags & IORING_ENTER_GETEVENTS) {
5204 unsigned nr_events = 0;
5206 min_complete = min(min_complete, ctx->cq_entries);
5208 if (ctx->flags & IORING_SETUP_IOPOLL) {
5209 ret = io_iopoll_check(ctx, &nr_events, min_complete);
5210 } else {
5211 ret = io_cqring_wait(ctx, min_complete, sig, sigsz);
5215 out:
5216 percpu_ref_put(&ctx->refs);
5217 out_fput:
5218 fdput(f);
5219 return submitted ? submitted : ret;
5222 static const struct file_operations io_uring_fops = {
5223 .release = io_uring_release,
5224 .flush = io_uring_flush,
5225 .mmap = io_uring_mmap,
5226 #ifndef CONFIG_MMU
5227 .get_unmapped_area = io_uring_nommu_get_unmapped_area,
5228 .mmap_capabilities = io_uring_nommu_mmap_capabilities,
5229 #endif
5230 .poll = io_uring_poll,
5231 .fasync = io_uring_fasync,
5234 static int io_allocate_scq_urings(struct io_ring_ctx *ctx,
5235 struct io_uring_params *p)
5237 struct io_rings *rings;
5238 size_t size, sq_array_offset;
5240 size = rings_size(p->sq_entries, p->cq_entries, &sq_array_offset);
5241 if (size == SIZE_MAX)
5242 return -EOVERFLOW;
5244 rings = io_mem_alloc(size);
5245 if (!rings)
5246 return -ENOMEM;
5248 ctx->rings = rings;
5249 ctx->sq_array = (u32 *)((char *)rings + sq_array_offset);
5250 rings->sq_ring_mask = p->sq_entries - 1;
5251 rings->cq_ring_mask = p->cq_entries - 1;
5252 rings->sq_ring_entries = p->sq_entries;
5253 rings->cq_ring_entries = p->cq_entries;
5254 ctx->sq_mask = rings->sq_ring_mask;
5255 ctx->cq_mask = rings->cq_ring_mask;
5256 ctx->sq_entries = rings->sq_ring_entries;
5257 ctx->cq_entries = rings->cq_ring_entries;
5259 size = array_size(sizeof(struct io_uring_sqe), p->sq_entries);
5260 if (size == SIZE_MAX) {
5261 io_mem_free(ctx->rings);
5262 ctx->rings = NULL;
5263 return -EOVERFLOW;
5266 ctx->sq_sqes = io_mem_alloc(size);
5267 if (!ctx->sq_sqes) {
5268 io_mem_free(ctx->rings);
5269 ctx->rings = NULL;
5270 return -ENOMEM;
5273 return 0;
5277 * Allocate an anonymous fd, this is what constitutes the application
5278 * visible backing of an io_uring instance. The application mmaps this
5279 * fd to gain access to the SQ/CQ ring details. If UNIX sockets are enabled,
5280 * we have to tie this fd to a socket for file garbage collection purposes.
5282 static int io_uring_get_fd(struct io_ring_ctx *ctx)
5284 struct file *file;
5285 int ret;
5287 #if defined(CONFIG_UNIX)
5288 ret = sock_create_kern(&init_net, PF_UNIX, SOCK_RAW, IPPROTO_IP,
5289 &ctx->ring_sock);
5290 if (ret)
5291 return ret;
5292 #endif
5294 ret = get_unused_fd_flags(O_RDWR | O_CLOEXEC);
5295 if (ret < 0)
5296 goto err;
5298 file = anon_inode_getfile("[io_uring]", &io_uring_fops, ctx,
5299 O_RDWR | O_CLOEXEC);
5300 if (IS_ERR(file)) {
5301 put_unused_fd(ret);
5302 ret = PTR_ERR(file);
5303 goto err;
5306 #if defined(CONFIG_UNIX)
5307 ctx->ring_sock->file = file;
5308 ctx->ring_sock->sk->sk_user_data = ctx;
5309 #endif
5310 fd_install(ret, file);
5311 return ret;
5312 err:
5313 #if defined(CONFIG_UNIX)
5314 sock_release(ctx->ring_sock);
5315 ctx->ring_sock = NULL;
5316 #endif
5317 return ret;
5320 static int io_uring_create(unsigned entries, struct io_uring_params *p)
5322 struct user_struct *user = NULL;
5323 struct io_ring_ctx *ctx;
5324 bool account_mem;
5325 int ret;
5327 if (!entries || entries > IORING_MAX_ENTRIES)
5328 return -EINVAL;
5331 * Use twice as many entries for the CQ ring. It's possible for the
5332 * application to drive a higher depth than the size of the SQ ring,
5333 * since the sqes are only used at submission time. This allows for
5334 * some flexibility in overcommitting a bit. If the application has
5335 * set IORING_SETUP_CQSIZE, it will have passed in the desired number
5336 * of CQ ring entries manually.
5338 p->sq_entries = roundup_pow_of_two(entries);
5339 if (p->flags & IORING_SETUP_CQSIZE) {
5341 * If IORING_SETUP_CQSIZE is set, we do the same roundup
5342 * to a power-of-two, if it isn't already. We do NOT impose
5343 * any cq vs sq ring sizing.
5345 if (p->cq_entries < p->sq_entries || p->cq_entries > IORING_MAX_CQ_ENTRIES)
5346 return -EINVAL;
5347 p->cq_entries = roundup_pow_of_two(p->cq_entries);
5348 } else {
5349 p->cq_entries = 2 * p->sq_entries;
5352 user = get_uid(current_user());
5353 account_mem = !capable(CAP_IPC_LOCK);
5355 if (account_mem) {
5356 ret = io_account_mem(user,
5357 ring_pages(p->sq_entries, p->cq_entries));
5358 if (ret) {
5359 free_uid(user);
5360 return ret;
5364 ctx = io_ring_ctx_alloc(p);
5365 if (!ctx) {
5366 if (account_mem)
5367 io_unaccount_mem(user, ring_pages(p->sq_entries,
5368 p->cq_entries));
5369 free_uid(user);
5370 return -ENOMEM;
5372 ctx->compat = in_compat_syscall();
5373 ctx->account_mem = account_mem;
5374 ctx->user = user;
5375 ctx->creds = get_current_cred();
5377 ret = io_allocate_scq_urings(ctx, p);
5378 if (ret)
5379 goto err;
5381 ret = io_sq_offload_start(ctx, p);
5382 if (ret)
5383 goto err;
5385 memset(&p->sq_off, 0, sizeof(p->sq_off));
5386 p->sq_off.head = offsetof(struct io_rings, sq.head);
5387 p->sq_off.tail = offsetof(struct io_rings, sq.tail);
5388 p->sq_off.ring_mask = offsetof(struct io_rings, sq_ring_mask);
5389 p->sq_off.ring_entries = offsetof(struct io_rings, sq_ring_entries);
5390 p->sq_off.flags = offsetof(struct io_rings, sq_flags);
5391 p->sq_off.dropped = offsetof(struct io_rings, sq_dropped);
5392 p->sq_off.array = (char *)ctx->sq_array - (char *)ctx->rings;
5394 memset(&p->cq_off, 0, sizeof(p->cq_off));
5395 p->cq_off.head = offsetof(struct io_rings, cq.head);
5396 p->cq_off.tail = offsetof(struct io_rings, cq.tail);
5397 p->cq_off.ring_mask = offsetof(struct io_rings, cq_ring_mask);
5398 p->cq_off.ring_entries = offsetof(struct io_rings, cq_ring_entries);
5399 p->cq_off.overflow = offsetof(struct io_rings, cq_overflow);
5400 p->cq_off.cqes = offsetof(struct io_rings, cqes);
5403 * Install ring fd as the very last thing, so we don't risk someone
5404 * having closed it before we finish setup
5406 ret = io_uring_get_fd(ctx);
5407 if (ret < 0)
5408 goto err;
5410 p->features = IORING_FEAT_SINGLE_MMAP | IORING_FEAT_NODROP |
5411 IORING_FEAT_SUBMIT_STABLE;
5412 trace_io_uring_create(ret, ctx, p->sq_entries, p->cq_entries, p->flags);
5413 return ret;
5414 err:
5415 io_ring_ctx_wait_and_kill(ctx);
5416 return ret;
5420 * Sets up an aio uring context, and returns the fd. Applications asks for a
5421 * ring size, we return the actual sq/cq ring sizes (among other things) in the
5422 * params structure passed in.
5424 static long io_uring_setup(u32 entries, struct io_uring_params __user *params)
5426 struct io_uring_params p;
5427 long ret;
5428 int i;
5430 if (copy_from_user(&p, params, sizeof(p)))
5431 return -EFAULT;
5432 for (i = 0; i < ARRAY_SIZE(p.resv); i++) {
5433 if (p.resv[i])
5434 return -EINVAL;
5437 if (p.flags & ~(IORING_SETUP_IOPOLL | IORING_SETUP_SQPOLL |
5438 IORING_SETUP_SQ_AFF | IORING_SETUP_CQSIZE))
5439 return -EINVAL;
5441 ret = io_uring_create(entries, &p);
5442 if (ret < 0)
5443 return ret;
5445 if (copy_to_user(params, &p, sizeof(p)))
5446 return -EFAULT;
5448 return ret;
5451 SYSCALL_DEFINE2(io_uring_setup, u32, entries,
5452 struct io_uring_params __user *, params)
5454 return io_uring_setup(entries, params);
5457 static int __io_uring_register(struct io_ring_ctx *ctx, unsigned opcode,
5458 void __user *arg, unsigned nr_args)
5459 __releases(ctx->uring_lock)
5460 __acquires(ctx->uring_lock)
5462 int ret;
5465 * We're inside the ring mutex, if the ref is already dying, then
5466 * someone else killed the ctx or is already going through
5467 * io_uring_register().
5469 if (percpu_ref_is_dying(&ctx->refs))
5470 return -ENXIO;
5472 percpu_ref_kill(&ctx->refs);
5475 * Drop uring mutex before waiting for references to exit. If another
5476 * thread is currently inside io_uring_enter() it might need to grab
5477 * the uring_lock to make progress. If we hold it here across the drain
5478 * wait, then we can deadlock. It's safe to drop the mutex here, since
5479 * no new references will come in after we've killed the percpu ref.
5481 mutex_unlock(&ctx->uring_lock);
5482 wait_for_completion(&ctx->completions[0]);
5483 mutex_lock(&ctx->uring_lock);
5485 switch (opcode) {
5486 case IORING_REGISTER_BUFFERS:
5487 ret = io_sqe_buffer_register(ctx, arg, nr_args);
5488 break;
5489 case IORING_UNREGISTER_BUFFERS:
5490 ret = -EINVAL;
5491 if (arg || nr_args)
5492 break;
5493 ret = io_sqe_buffer_unregister(ctx);
5494 break;
5495 case IORING_REGISTER_FILES:
5496 ret = io_sqe_files_register(ctx, arg, nr_args);
5497 break;
5498 case IORING_UNREGISTER_FILES:
5499 ret = -EINVAL;
5500 if (arg || nr_args)
5501 break;
5502 ret = io_sqe_files_unregister(ctx);
5503 break;
5504 case IORING_REGISTER_FILES_UPDATE:
5505 ret = io_sqe_files_update(ctx, arg, nr_args);
5506 break;
5507 case IORING_REGISTER_EVENTFD:
5508 case IORING_REGISTER_EVENTFD_ASYNC:
5509 ret = -EINVAL;
5510 if (nr_args != 1)
5511 break;
5512 ret = io_eventfd_register(ctx, arg);
5513 if (ret)
5514 break;
5515 if (opcode == IORING_REGISTER_EVENTFD_ASYNC)
5516 ctx->eventfd_async = 1;
5517 else
5518 ctx->eventfd_async = 0;
5519 break;
5520 case IORING_UNREGISTER_EVENTFD:
5521 ret = -EINVAL;
5522 if (arg || nr_args)
5523 break;
5524 ret = io_eventfd_unregister(ctx);
5525 break;
5526 default:
5527 ret = -EINVAL;
5528 break;
5531 /* bring the ctx back to life */
5532 reinit_completion(&ctx->completions[0]);
5533 percpu_ref_reinit(&ctx->refs);
5534 return ret;
5537 SYSCALL_DEFINE4(io_uring_register, unsigned int, fd, unsigned int, opcode,
5538 void __user *, arg, unsigned int, nr_args)
5540 struct io_ring_ctx *ctx;
5541 long ret = -EBADF;
5542 struct fd f;
5544 f = fdget(fd);
5545 if (!f.file)
5546 return -EBADF;
5548 ret = -EOPNOTSUPP;
5549 if (f.file->f_op != &io_uring_fops)
5550 goto out_fput;
5552 ctx = f.file->private_data;
5554 mutex_lock(&ctx->uring_lock);
5555 ret = __io_uring_register(ctx, opcode, arg, nr_args);
5556 mutex_unlock(&ctx->uring_lock);
5557 trace_io_uring_register(ctx, opcode, ctx->nr_user_files, ctx->nr_user_bufs,
5558 ctx->cq_ev_fd != NULL, ret);
5559 out_fput:
5560 fdput(f);
5561 return ret;
5564 static int __init io_uring_init(void)
5566 req_cachep = KMEM_CACHE(io_kiocb, SLAB_HWCACHE_ALIGN | SLAB_PANIC);
5567 return 0;
5569 __initcall(io_uring_init);