1 // SPDX-License-Identifier: GPL-2.0
2 #include <linux/kernel.h>
3 #include <linux/errno.h>
4 #include <linux/file.h>
5 #include <linux/slab.h>
6 #include <linux/nospec.h>
7 #include <linux/io_uring.h>
9 #include <uapi/linux/io_uring.h>
13 #include "filetable.h"
14 #include "alloc_cache.h"
17 /* All valid masks for MSG_RING */
18 #define IORING_MSG_RING_MASK (IORING_MSG_RING_CQE_SKIP | \
19 IORING_MSG_RING_FLAGS_PASS)
23 struct file
*src_file
;
24 struct callback_head tw
;
36 static void io_double_unlock_ctx(struct io_ring_ctx
*octx
)
38 mutex_unlock(&octx
->uring_lock
);
41 static int io_double_lock_ctx(struct io_ring_ctx
*octx
,
42 unsigned int issue_flags
)
45 * To ensure proper ordering between the two ctxs, we can only
46 * attempt a trylock on the target. If that fails and we already have
47 * the source ctx lock, punt to io-wq.
49 if (!(issue_flags
& IO_URING_F_UNLOCKED
)) {
50 if (!mutex_trylock(&octx
->uring_lock
))
54 mutex_lock(&octx
->uring_lock
);
58 void io_msg_ring_cleanup(struct io_kiocb
*req
)
60 struct io_msg
*msg
= io_kiocb_to_cmd(req
, struct io_msg
);
62 if (WARN_ON_ONCE(!msg
->src_file
))
69 static inline bool io_msg_need_remote(struct io_ring_ctx
*target_ctx
)
71 return target_ctx
->task_complete
;
74 static void io_msg_tw_complete(struct io_kiocb
*req
, struct io_tw_state
*ts
)
76 struct io_ring_ctx
*ctx
= req
->ctx
;
78 io_add_aux_cqe(ctx
, req
->cqe
.user_data
, req
->cqe
.res
, req
->cqe
.flags
);
79 if (spin_trylock(&ctx
->msg_lock
)) {
80 if (io_alloc_cache_put(&ctx
->msg_cache
, req
))
82 spin_unlock(&ctx
->msg_lock
);
85 kmem_cache_free(req_cachep
, req
);
86 percpu_ref_put(&ctx
->refs
);
89 static int io_msg_remote_post(struct io_ring_ctx
*ctx
, struct io_kiocb
*req
,
90 int res
, u32 cflags
, u64 user_data
)
92 req
->tctx
= READ_ONCE(ctx
->submitter_task
->io_uring
);
94 kmem_cache_free(req_cachep
, req
);
97 req
->cqe
.user_data
= user_data
;
98 io_req_set_res(req
, res
, cflags
);
99 percpu_ref_get(&ctx
->refs
);
101 req
->io_task_work
.func
= io_msg_tw_complete
;
102 io_req_task_work_add_remote(req
, ctx
, IOU_F_TWQ_LAZY_WAKE
);
106 static struct io_kiocb
*io_msg_get_kiocb(struct io_ring_ctx
*ctx
)
108 struct io_kiocb
*req
= NULL
;
110 if (spin_trylock(&ctx
->msg_lock
)) {
111 req
= io_alloc_cache_get(&ctx
->msg_cache
);
112 spin_unlock(&ctx
->msg_lock
);
116 return kmem_cache_alloc(req_cachep
, GFP_KERNEL
| __GFP_NOWARN
| __GFP_ZERO
);
119 static int io_msg_data_remote(struct io_ring_ctx
*target_ctx
,
122 struct io_kiocb
*target
;
125 target
= io_msg_get_kiocb(target_ctx
);
126 if (unlikely(!target
))
129 if (msg
->flags
& IORING_MSG_RING_FLAGS_PASS
)
130 flags
= msg
->cqe_flags
;
132 return io_msg_remote_post(target_ctx
, target
, msg
->len
, flags
,
136 static int __io_msg_ring_data(struct io_ring_ctx
*target_ctx
,
137 struct io_msg
*msg
, unsigned int issue_flags
)
142 if (msg
->src_fd
|| msg
->flags
& ~IORING_MSG_RING_FLAGS_PASS
)
144 if (!(msg
->flags
& IORING_MSG_RING_FLAGS_PASS
) && msg
->dst_fd
)
146 if (target_ctx
->flags
& IORING_SETUP_R_DISABLED
)
149 if (io_msg_need_remote(target_ctx
))
150 return io_msg_data_remote(target_ctx
, msg
);
152 if (msg
->flags
& IORING_MSG_RING_FLAGS_PASS
)
153 flags
= msg
->cqe_flags
;
156 if (target_ctx
->flags
& IORING_SETUP_IOPOLL
) {
157 if (unlikely(io_double_lock_ctx(target_ctx
, issue_flags
)))
160 if (io_post_aux_cqe(target_ctx
, msg
->user_data
, msg
->len
, flags
))
162 if (target_ctx
->flags
& IORING_SETUP_IOPOLL
)
163 io_double_unlock_ctx(target_ctx
);
167 static int io_msg_ring_data(struct io_kiocb
*req
, unsigned int issue_flags
)
169 struct io_ring_ctx
*target_ctx
= req
->file
->private_data
;
170 struct io_msg
*msg
= io_kiocb_to_cmd(req
, struct io_msg
);
172 return __io_msg_ring_data(target_ctx
, msg
, issue_flags
);
175 static int io_msg_grab_file(struct io_kiocb
*req
, unsigned int issue_flags
)
177 struct io_msg
*msg
= io_kiocb_to_cmd(req
, struct io_msg
);
178 struct io_ring_ctx
*ctx
= req
->ctx
;
179 struct io_rsrc_node
*node
;
182 io_ring_submit_lock(ctx
, issue_flags
);
183 node
= io_rsrc_node_lookup(&ctx
->file_table
.data
, msg
->src_fd
);
185 msg
->src_file
= io_slot_file(node
);
187 get_file(msg
->src_file
);
188 req
->flags
|= REQ_F_NEED_CLEANUP
;
191 io_ring_submit_unlock(ctx
, issue_flags
);
195 static int io_msg_install_complete(struct io_kiocb
*req
, unsigned int issue_flags
)
197 struct io_ring_ctx
*target_ctx
= req
->file
->private_data
;
198 struct io_msg
*msg
= io_kiocb_to_cmd(req
, struct io_msg
);
199 struct file
*src_file
= msg
->src_file
;
202 if (unlikely(io_double_lock_ctx(target_ctx
, issue_flags
)))
205 ret
= __io_fixed_fd_install(target_ctx
, src_file
, msg
->dst_fd
);
209 msg
->src_file
= NULL
;
210 req
->flags
&= ~REQ_F_NEED_CLEANUP
;
212 if (msg
->flags
& IORING_MSG_RING_CQE_SKIP
)
215 * If this fails, the target still received the file descriptor but
216 * wasn't notified of the fact. This means that if this request
217 * completes with -EOVERFLOW, then the sender must ensure that a
218 * later IORING_OP_MSG_RING delivers the message.
220 if (!io_post_aux_cqe(target_ctx
, msg
->user_data
, ret
, 0))
223 io_double_unlock_ctx(target_ctx
);
227 static void io_msg_tw_fd_complete(struct callback_head
*head
)
229 struct io_msg
*msg
= container_of(head
, struct io_msg
, tw
);
230 struct io_kiocb
*req
= cmd_to_io_kiocb(msg
);
231 int ret
= -EOWNERDEAD
;
233 if (!(current
->flags
& PF_EXITING
))
234 ret
= io_msg_install_complete(req
, IO_URING_F_UNLOCKED
);
237 io_req_queue_tw_complete(req
, ret
);
240 static int io_msg_fd_remote(struct io_kiocb
*req
)
242 struct io_ring_ctx
*ctx
= req
->file
->private_data
;
243 struct io_msg
*msg
= io_kiocb_to_cmd(req
, struct io_msg
);
244 struct task_struct
*task
= READ_ONCE(ctx
->submitter_task
);
249 init_task_work(&msg
->tw
, io_msg_tw_fd_complete
);
250 if (task_work_add(task
, &msg
->tw
, TWA_SIGNAL
))
253 return IOU_ISSUE_SKIP_COMPLETE
;
256 static int io_msg_send_fd(struct io_kiocb
*req
, unsigned int issue_flags
)
258 struct io_ring_ctx
*target_ctx
= req
->file
->private_data
;
259 struct io_msg
*msg
= io_kiocb_to_cmd(req
, struct io_msg
);
260 struct io_ring_ctx
*ctx
= req
->ctx
;
264 if (target_ctx
== ctx
)
266 if (target_ctx
->flags
& IORING_SETUP_R_DISABLED
)
268 if (!msg
->src_file
) {
269 int ret
= io_msg_grab_file(req
, issue_flags
);
274 if (io_msg_need_remote(target_ctx
))
275 return io_msg_fd_remote(req
);
276 return io_msg_install_complete(req
, issue_flags
);
279 static int __io_msg_ring_prep(struct io_msg
*msg
, const struct io_uring_sqe
*sqe
)
281 if (unlikely(sqe
->buf_index
|| sqe
->personality
))
284 msg
->src_file
= NULL
;
285 msg
->user_data
= READ_ONCE(sqe
->off
);
286 msg
->len
= READ_ONCE(sqe
->len
);
287 msg
->cmd
= READ_ONCE(sqe
->addr
);
288 msg
->src_fd
= READ_ONCE(sqe
->addr3
);
289 msg
->dst_fd
= READ_ONCE(sqe
->file_index
);
290 msg
->flags
= READ_ONCE(sqe
->msg_ring_flags
);
291 if (msg
->flags
& ~IORING_MSG_RING_MASK
)
297 int io_msg_ring_prep(struct io_kiocb
*req
, const struct io_uring_sqe
*sqe
)
299 return __io_msg_ring_prep(io_kiocb_to_cmd(req
, struct io_msg
), sqe
);
302 int io_msg_ring(struct io_kiocb
*req
, unsigned int issue_flags
)
304 struct io_msg
*msg
= io_kiocb_to_cmd(req
, struct io_msg
);
308 if (!io_is_uring_fops(req
->file
))
312 case IORING_MSG_DATA
:
313 ret
= io_msg_ring_data(req
, issue_flags
);
315 case IORING_MSG_SEND_FD
:
316 ret
= io_msg_send_fd(req
, issue_flags
);
325 if (ret
== -EAGAIN
|| ret
== IOU_ISSUE_SKIP_COMPLETE
)
329 io_req_set_res(req
, ret
, 0);
333 int io_uring_sync_msg_ring(struct io_uring_sqe
*sqe
)
335 struct io_msg io_msg
= { };
338 ret
= __io_msg_ring_prep(&io_msg
, sqe
);
343 * Only data sending supported, not IORING_MSG_SEND_FD as that one
344 * doesn't make sense without a source ring to send files from.
346 if (io_msg
.cmd
!= IORING_MSG_DATA
)
349 CLASS(fd
, f
)(sqe
->fd
);
352 if (!io_is_uring_fops(fd_file(f
)))
354 return __io_msg_ring_data(fd_file(f
)->private_data
,
355 &io_msg
, IO_URING_F_UNLOCKED
);
358 void io_msg_cache_free(const void *entry
)
360 struct io_kiocb
*req
= (struct io_kiocb
*) entry
;
362 kmem_cache_free(req_cachep
, req
);