1 // SPDX-License-Identifier: GPL-2.0
3 * Copyright (c) 2016-2018 Oracle. All rights reserved.
5 * Use the core R/W API to move RPC-over-RDMA Read and Write chunks.
10 #include <linux/sunrpc/xdr.h>
11 #include <linux/sunrpc/rpc_rdma.h>
12 #include <linux/sunrpc/svc_rdma.h>
14 #include "xprt_rdma.h"
15 #include <trace/events/rpcrdma.h>
17 static void svc_rdma_write_done(struct ib_cq
*cq
, struct ib_wc
*wc
);
18 static void svc_rdma_wc_read_done(struct ib_cq
*cq
, struct ib_wc
*wc
);
20 /* Each R/W context contains state for one chain of RDMA Read or
21 * Write Work Requests.
23 * Each WR chain handles a single contiguous server-side buffer,
24 * because scatterlist entries after the first have to start on
25 * page alignment. xdr_buf iovecs cannot guarantee alignment.
27 * Each WR chain handles only one R_key. Each RPC-over-RDMA segment
28 * from a client may contain a unique R_key, so each WR chain moves
29 * up to one segment at a time.
31 * The scatterlist makes this data structure over 4KB in size. To
32 * make it less likely to fail, and to handle the allocation for
33 * smaller I/O requests without disabling bottom-halves, these
34 * contexts are created on demand, but cached and reused until the
35 * controlling svcxprt_rdma is destroyed.
37 struct svc_rdma_rw_ctxt
{
38 struct list_head rw_list
;
39 struct rdma_rw_ctx rw_ctx
;
40 unsigned int rw_nents
;
41 struct sg_table rw_sg_table
;
42 struct scatterlist rw_first_sgl
[];
45 static inline struct svc_rdma_rw_ctxt
*
46 svc_rdma_next_ctxt(struct list_head
*list
)
48 return list_first_entry_or_null(list
, struct svc_rdma_rw_ctxt
,
52 static struct svc_rdma_rw_ctxt
*
53 svc_rdma_get_rw_ctxt(struct svcxprt_rdma
*rdma
, unsigned int sges
)
55 struct svc_rdma_rw_ctxt
*ctxt
;
57 spin_lock(&rdma
->sc_rw_ctxt_lock
);
59 ctxt
= svc_rdma_next_ctxt(&rdma
->sc_rw_ctxts
);
61 list_del(&ctxt
->rw_list
);
62 spin_unlock(&rdma
->sc_rw_ctxt_lock
);
64 spin_unlock(&rdma
->sc_rw_ctxt_lock
);
65 ctxt
= kmalloc(struct_size(ctxt
, rw_first_sgl
, SG_CHUNK_SIZE
),
69 INIT_LIST_HEAD(&ctxt
->rw_list
);
72 ctxt
->rw_sg_table
.sgl
= ctxt
->rw_first_sgl
;
73 if (sg_alloc_table_chained(&ctxt
->rw_sg_table
, sges
,
74 ctxt
->rw_sg_table
.sgl
,
82 trace_svcrdma_no_rwctx_err(rdma
, sges
);
86 static void svc_rdma_put_rw_ctxt(struct svcxprt_rdma
*rdma
,
87 struct svc_rdma_rw_ctxt
*ctxt
)
89 sg_free_table_chained(&ctxt
->rw_sg_table
, SG_CHUNK_SIZE
);
91 spin_lock(&rdma
->sc_rw_ctxt_lock
);
92 list_add(&ctxt
->rw_list
, &rdma
->sc_rw_ctxts
);
93 spin_unlock(&rdma
->sc_rw_ctxt_lock
);
97 * svc_rdma_destroy_rw_ctxts - Free accumulated R/W contexts
98 * @rdma: transport about to be destroyed
101 void svc_rdma_destroy_rw_ctxts(struct svcxprt_rdma
*rdma
)
103 struct svc_rdma_rw_ctxt
*ctxt
;
105 while ((ctxt
= svc_rdma_next_ctxt(&rdma
->sc_rw_ctxts
)) != NULL
) {
106 list_del(&ctxt
->rw_list
);
112 * svc_rdma_rw_ctx_init - Prepare a R/W context for I/O
113 * @rdma: controlling transport instance
114 * @ctxt: R/W context to prepare
115 * @offset: RDMA offset
116 * @handle: RDMA tag/handle
117 * @direction: I/O direction
119 * Returns on success, the number of WQEs that will be needed
120 * on the workqueue, or a negative errno.
122 static int svc_rdma_rw_ctx_init(struct svcxprt_rdma
*rdma
,
123 struct svc_rdma_rw_ctxt
*ctxt
,
124 u64 offset
, u32 handle
,
125 enum dma_data_direction direction
)
129 ret
= rdma_rw_ctx_init(&ctxt
->rw_ctx
, rdma
->sc_qp
, rdma
->sc_port_num
,
130 ctxt
->rw_sg_table
.sgl
, ctxt
->rw_nents
,
131 0, offset
, handle
, direction
);
132 if (unlikely(ret
< 0)) {
133 svc_rdma_put_rw_ctxt(rdma
, ctxt
);
134 trace_svcrdma_dma_map_rw_err(rdma
, ctxt
->rw_nents
, ret
);
139 /* A chunk context tracks all I/O for moving one Read or Write
140 * chunk. This is a set of rdma_rw's that handle data movement
141 * for all segments of one chunk.
143 * These are small, acquired with a single allocator call, and
144 * no more than one is needed per chunk. They are allocated on
145 * demand, and not cached.
147 struct svc_rdma_chunk_ctxt
{
148 struct rpc_rdma_cid cc_cid
;
149 struct ib_cqe cc_cqe
;
150 struct svcxprt_rdma
*cc_rdma
;
151 struct list_head cc_rwctxts
;
155 static void svc_rdma_cc_cid_init(struct svcxprt_rdma
*rdma
,
156 struct rpc_rdma_cid
*cid
)
158 cid
->ci_queue_id
= rdma
->sc_sq_cq
->res
.id
;
159 cid
->ci_completion_id
= atomic_inc_return(&rdma
->sc_completion_ids
);
162 static void svc_rdma_cc_init(struct svcxprt_rdma
*rdma
,
163 struct svc_rdma_chunk_ctxt
*cc
)
165 svc_rdma_cc_cid_init(rdma
, &cc
->cc_cid
);
168 INIT_LIST_HEAD(&cc
->cc_rwctxts
);
172 static void svc_rdma_cc_release(struct svc_rdma_chunk_ctxt
*cc
,
173 enum dma_data_direction dir
)
175 struct svcxprt_rdma
*rdma
= cc
->cc_rdma
;
176 struct svc_rdma_rw_ctxt
*ctxt
;
178 while ((ctxt
= svc_rdma_next_ctxt(&cc
->cc_rwctxts
)) != NULL
) {
179 list_del(&ctxt
->rw_list
);
181 rdma_rw_ctx_destroy(&ctxt
->rw_ctx
, rdma
->sc_qp
,
182 rdma
->sc_port_num
, ctxt
->rw_sg_table
.sgl
,
183 ctxt
->rw_nents
, dir
);
184 svc_rdma_put_rw_ctxt(rdma
, ctxt
);
188 /* State for sending a Write or Reply chunk.
189 * - Tracks progress of writing one chunk over all its segments
190 * - Stores arguments for the SGL constructor functions
192 struct svc_rdma_write_info
{
193 const struct svc_rdma_chunk
*wi_chunk
;
195 /* write state of this chunk */
196 unsigned int wi_seg_off
;
197 unsigned int wi_seg_no
;
199 /* SGL constructor arguments */
200 const struct xdr_buf
*wi_xdr
;
201 unsigned char *wi_base
;
202 unsigned int wi_next_off
;
204 struct svc_rdma_chunk_ctxt wi_cc
;
207 static struct svc_rdma_write_info
*
208 svc_rdma_write_info_alloc(struct svcxprt_rdma
*rdma
,
209 const struct svc_rdma_chunk
*chunk
)
211 struct svc_rdma_write_info
*info
;
213 info
= kmalloc(sizeof(*info
), GFP_KERNEL
);
217 info
->wi_chunk
= chunk
;
218 info
->wi_seg_off
= 0;
220 svc_rdma_cc_init(rdma
, &info
->wi_cc
);
221 info
->wi_cc
.cc_cqe
.done
= svc_rdma_write_done
;
225 static void svc_rdma_write_info_free(struct svc_rdma_write_info
*info
)
227 svc_rdma_cc_release(&info
->wi_cc
, DMA_TO_DEVICE
);
232 * svc_rdma_write_done - Write chunk completion
233 * @cq: controlling Completion Queue
234 * @wc: Work Completion
236 * Pages under I/O are freed by a subsequent Send completion.
238 static void svc_rdma_write_done(struct ib_cq
*cq
, struct ib_wc
*wc
)
240 struct ib_cqe
*cqe
= wc
->wr_cqe
;
241 struct svc_rdma_chunk_ctxt
*cc
=
242 container_of(cqe
, struct svc_rdma_chunk_ctxt
, cc_cqe
);
243 struct svcxprt_rdma
*rdma
= cc
->cc_rdma
;
244 struct svc_rdma_write_info
*info
=
245 container_of(cc
, struct svc_rdma_write_info
, wi_cc
);
247 trace_svcrdma_wc_write(wc
, &cc
->cc_cid
);
249 atomic_add(cc
->cc_sqecount
, &rdma
->sc_sq_avail
);
250 wake_up(&rdma
->sc_send_wait
);
252 if (unlikely(wc
->status
!= IB_WC_SUCCESS
))
253 set_bit(XPT_CLOSE
, &rdma
->sc_xprt
.xpt_flags
);
255 svc_rdma_write_info_free(info
);
258 /* State for pulling a Read chunk.
260 struct svc_rdma_read_info
{
261 struct svc_rqst
*ri_rqst
;
262 struct svc_rdma_recv_ctxt
*ri_readctxt
;
263 unsigned int ri_pageno
;
264 unsigned int ri_pageoff
;
265 unsigned int ri_totalbytes
;
267 struct svc_rdma_chunk_ctxt ri_cc
;
270 static struct svc_rdma_read_info
*
271 svc_rdma_read_info_alloc(struct svcxprt_rdma
*rdma
)
273 struct svc_rdma_read_info
*info
;
275 info
= kmalloc(sizeof(*info
), GFP_KERNEL
);
279 svc_rdma_cc_init(rdma
, &info
->ri_cc
);
280 info
->ri_cc
.cc_cqe
.done
= svc_rdma_wc_read_done
;
284 static void svc_rdma_read_info_free(struct svc_rdma_read_info
*info
)
286 svc_rdma_cc_release(&info
->ri_cc
, DMA_FROM_DEVICE
);
291 * svc_rdma_wc_read_done - Handle completion of an RDMA Read ctx
292 * @cq: controlling Completion Queue
293 * @wc: Work Completion
296 static void svc_rdma_wc_read_done(struct ib_cq
*cq
, struct ib_wc
*wc
)
298 struct ib_cqe
*cqe
= wc
->wr_cqe
;
299 struct svc_rdma_chunk_ctxt
*cc
=
300 container_of(cqe
, struct svc_rdma_chunk_ctxt
, cc_cqe
);
301 struct svcxprt_rdma
*rdma
= cc
->cc_rdma
;
302 struct svc_rdma_read_info
*info
=
303 container_of(cc
, struct svc_rdma_read_info
, ri_cc
);
305 trace_svcrdma_wc_read(wc
, &cc
->cc_cid
);
307 atomic_add(cc
->cc_sqecount
, &rdma
->sc_sq_avail
);
308 wake_up(&rdma
->sc_send_wait
);
310 if (unlikely(wc
->status
!= IB_WC_SUCCESS
)) {
311 set_bit(XPT_CLOSE
, &rdma
->sc_xprt
.xpt_flags
);
312 svc_rdma_recv_ctxt_put(rdma
, info
->ri_readctxt
);
314 spin_lock(&rdma
->sc_rq_dto_lock
);
315 list_add_tail(&info
->ri_readctxt
->rc_list
,
316 &rdma
->sc_read_complete_q
);
317 /* Note the unlock pairs with the smp_rmb in svc_xprt_ready: */
318 set_bit(XPT_DATA
, &rdma
->sc_xprt
.xpt_flags
);
319 spin_unlock(&rdma
->sc_rq_dto_lock
);
321 svc_xprt_enqueue(&rdma
->sc_xprt
);
324 svc_rdma_read_info_free(info
);
327 /* This function sleeps when the transport's Send Queue is congested.
330 * - If ib_post_send() succeeds, only one completion is expected,
331 * even if one or more WRs are flushed. This is true when posting
332 * an rdma_rw_ctx or when posting a single signaled WR.
334 static int svc_rdma_post_chunk_ctxt(struct svc_rdma_chunk_ctxt
*cc
)
336 struct svcxprt_rdma
*rdma
= cc
->cc_rdma
;
337 struct svc_xprt
*xprt
= &rdma
->sc_xprt
;
338 struct ib_send_wr
*first_wr
;
339 const struct ib_send_wr
*bad_wr
;
340 struct list_head
*tmp
;
344 if (cc
->cc_sqecount
> rdma
->sc_sq_depth
)
349 list_for_each(tmp
, &cc
->cc_rwctxts
) {
350 struct svc_rdma_rw_ctxt
*ctxt
;
352 ctxt
= list_entry(tmp
, struct svc_rdma_rw_ctxt
, rw_list
);
353 first_wr
= rdma_rw_ctx_wrs(&ctxt
->rw_ctx
, rdma
->sc_qp
,
354 rdma
->sc_port_num
, cqe
, first_wr
);
359 if (atomic_sub_return(cc
->cc_sqecount
,
360 &rdma
->sc_sq_avail
) > 0) {
361 ret
= ib_post_send(rdma
->sc_qp
, first_wr
, &bad_wr
);
367 trace_svcrdma_sq_full(rdma
);
368 atomic_add(cc
->cc_sqecount
, &rdma
->sc_sq_avail
);
369 wait_event(rdma
->sc_send_wait
,
370 atomic_read(&rdma
->sc_sq_avail
) > cc
->cc_sqecount
);
371 trace_svcrdma_sq_retry(rdma
);
374 trace_svcrdma_sq_post_err(rdma
, ret
);
375 set_bit(XPT_CLOSE
, &xprt
->xpt_flags
);
377 /* If even one was posted, there will be a completion. */
378 if (bad_wr
!= first_wr
)
381 atomic_add(cc
->cc_sqecount
, &rdma
->sc_sq_avail
);
382 wake_up(&rdma
->sc_send_wait
);
386 /* Build and DMA-map an SGL that covers one kvec in an xdr_buf
388 static void svc_rdma_vec_to_sg(struct svc_rdma_write_info
*info
,
390 struct svc_rdma_rw_ctxt
*ctxt
)
392 struct scatterlist
*sg
= ctxt
->rw_sg_table
.sgl
;
394 sg_set_buf(&sg
[0], info
->wi_base
, len
);
395 info
->wi_base
+= len
;
400 /* Build and DMA-map an SGL that covers part of an xdr_buf's pagelist.
402 static void svc_rdma_pagelist_to_sg(struct svc_rdma_write_info
*info
,
403 unsigned int remaining
,
404 struct svc_rdma_rw_ctxt
*ctxt
)
406 unsigned int sge_no
, sge_bytes
, page_off
, page_no
;
407 const struct xdr_buf
*xdr
= info
->wi_xdr
;
408 struct scatterlist
*sg
;
411 page_off
= info
->wi_next_off
+ xdr
->page_base
;
412 page_no
= page_off
>> PAGE_SHIFT
;
413 page_off
= offset_in_page(page_off
);
414 page
= xdr
->pages
+ page_no
;
415 info
->wi_next_off
+= remaining
;
416 sg
= ctxt
->rw_sg_table
.sgl
;
419 sge_bytes
= min_t(unsigned int, remaining
,
420 PAGE_SIZE
- page_off
);
421 sg_set_page(sg
, *page
, sge_bytes
, page_off
);
423 remaining
-= sge_bytes
;
430 ctxt
->rw_nents
= sge_no
;
433 /* Construct RDMA Write WRs to send a portion of an xdr_buf containing
437 svc_rdma_build_writes(struct svc_rdma_write_info
*info
,
438 void (*constructor
)(struct svc_rdma_write_info
*info
,
440 struct svc_rdma_rw_ctxt
*ctxt
),
441 unsigned int remaining
)
443 struct svc_rdma_chunk_ctxt
*cc
= &info
->wi_cc
;
444 struct svcxprt_rdma
*rdma
= cc
->cc_rdma
;
445 const struct svc_rdma_segment
*seg
;
446 struct svc_rdma_rw_ctxt
*ctxt
;
450 unsigned int write_len
;
453 seg
= &info
->wi_chunk
->ch_segments
[info
->wi_seg_no
];
457 write_len
= min(remaining
, seg
->rs_length
- info
->wi_seg_off
);
460 ctxt
= svc_rdma_get_rw_ctxt(rdma
,
461 (write_len
>> PAGE_SHIFT
) + 2);
465 constructor(info
, write_len
, ctxt
);
466 offset
= seg
->rs_offset
+ info
->wi_seg_off
;
467 ret
= svc_rdma_rw_ctx_init(rdma
, ctxt
, offset
, seg
->rs_handle
,
472 list_add(&ctxt
->rw_list
, &cc
->cc_rwctxts
);
473 cc
->cc_sqecount
+= ret
;
474 if (write_len
== seg
->rs_length
- info
->wi_seg_off
) {
476 info
->wi_seg_off
= 0;
478 info
->wi_seg_off
+= write_len
;
480 remaining
-= write_len
;
486 trace_svcrdma_small_wrch_err(rdma
, remaining
, info
->wi_seg_no
,
487 info
->wi_chunk
->ch_segcount
);
492 * svc_rdma_iov_write - Construct RDMA Writes from an iov
493 * @info: pointer to write arguments
494 * @iov: kvec to write
497 * On succes, returns zero
498 * %-E2BIG if the client-provided Write chunk is too small
499 * %-ENOMEM if a resource has been exhausted
500 * %-EIO if an rdma-rw error occurred
502 static int svc_rdma_iov_write(struct svc_rdma_write_info
*info
,
503 const struct kvec
*iov
)
505 info
->wi_base
= iov
->iov_base
;
506 return svc_rdma_build_writes(info
, svc_rdma_vec_to_sg
,
511 * svc_rdma_pages_write - Construct RDMA Writes from pages
512 * @info: pointer to write arguments
513 * @xdr: xdr_buf with pages to write
514 * @offset: offset into the content of @xdr
515 * @length: number of bytes to write
518 * On succes, returns zero
519 * %-E2BIG if the client-provided Write chunk is too small
520 * %-ENOMEM if a resource has been exhausted
521 * %-EIO if an rdma-rw error occurred
523 static int svc_rdma_pages_write(struct svc_rdma_write_info
*info
,
524 const struct xdr_buf
*xdr
,
526 unsigned long length
)
529 info
->wi_next_off
= offset
- xdr
->head
[0].iov_len
;
530 return svc_rdma_build_writes(info
, svc_rdma_pagelist_to_sg
,
535 * svc_rdma_xb_write - Construct RDMA Writes to write an xdr_buf
536 * @xdr: xdr_buf to write
537 * @data: pointer to write arguments
540 * On succes, returns zero
541 * %-E2BIG if the client-provided Write chunk is too small
542 * %-ENOMEM if a resource has been exhausted
543 * %-EIO if an rdma-rw error occurred
545 static int svc_rdma_xb_write(const struct xdr_buf
*xdr
, void *data
)
547 struct svc_rdma_write_info
*info
= data
;
550 if (xdr
->head
[0].iov_len
) {
551 ret
= svc_rdma_iov_write(info
, &xdr
->head
[0]);
557 ret
= svc_rdma_pages_write(info
, xdr
, xdr
->head
[0].iov_len
,
563 if (xdr
->tail
[0].iov_len
) {
564 ret
= svc_rdma_iov_write(info
, &xdr
->tail
[0]);
573 * svc_rdma_send_write_chunk - Write all segments in a Write chunk
574 * @rdma: controlling RDMA transport
575 * @chunk: Write chunk provided by the client
576 * @xdr: xdr_buf containing the data payload
578 * Returns a non-negative number of bytes the chunk consumed, or
579 * %-E2BIG if the payload was larger than the Write chunk,
580 * %-EINVAL if client provided too many segments,
581 * %-ENOMEM if rdma_rw context pool was exhausted,
582 * %-ENOTCONN if posting failed (connection is lost),
583 * %-EIO if rdma_rw initialization failed (DMA mapping, etc).
585 int svc_rdma_send_write_chunk(struct svcxprt_rdma
*rdma
,
586 const struct svc_rdma_chunk
*chunk
,
587 const struct xdr_buf
*xdr
)
589 struct svc_rdma_write_info
*info
;
590 struct svc_rdma_chunk_ctxt
*cc
;
593 info
= svc_rdma_write_info_alloc(rdma
, chunk
);
598 ret
= svc_rdma_xb_write(xdr
, info
);
602 trace_svcrdma_post_write_chunk(&cc
->cc_cid
, cc
->cc_sqecount
);
603 ret
= svc_rdma_post_chunk_ctxt(cc
);
609 svc_rdma_write_info_free(info
);
614 * svc_rdma_send_reply_chunk - Write all segments in the Reply chunk
615 * @rdma: controlling RDMA transport
616 * @rctxt: Write and Reply chunks from client
617 * @xdr: xdr_buf containing an RPC Reply
619 * Returns a non-negative number of bytes the chunk consumed, or
620 * %-E2BIG if the payload was larger than the Reply chunk,
621 * %-EINVAL if client provided too many segments,
622 * %-ENOMEM if rdma_rw context pool was exhausted,
623 * %-ENOTCONN if posting failed (connection is lost),
624 * %-EIO if rdma_rw initialization failed (DMA mapping, etc).
626 int svc_rdma_send_reply_chunk(struct svcxprt_rdma
*rdma
,
627 const struct svc_rdma_recv_ctxt
*rctxt
,
628 const struct xdr_buf
*xdr
)
630 struct svc_rdma_write_info
*info
;
631 struct svc_rdma_chunk_ctxt
*cc
;
632 struct svc_rdma_chunk
*chunk
;
635 if (pcl_is_empty(&rctxt
->rc_reply_pcl
))
638 chunk
= pcl_first_chunk(&rctxt
->rc_reply_pcl
);
639 info
= svc_rdma_write_info_alloc(rdma
, chunk
);
644 ret
= pcl_process_nonpayloads(&rctxt
->rc_write_pcl
, xdr
,
645 svc_rdma_xb_write
, info
);
649 trace_svcrdma_post_reply_chunk(&cc
->cc_cid
, cc
->cc_sqecount
);
650 ret
= svc_rdma_post_chunk_ctxt(cc
);
657 svc_rdma_write_info_free(info
);
662 * svc_rdma_build_read_segment - Build RDMA Read WQEs to pull one RDMA segment
663 * @info: context for ongoing I/O
664 * @segment: co-ordinates of remote memory to be read
667 * %0: the Read WR chain was constructed successfully
668 * %-EINVAL: there were not enough rq_pages to finish
669 * %-ENOMEM: allocating a local resources failed
670 * %-EIO: a DMA mapping error occurred
672 static int svc_rdma_build_read_segment(struct svc_rdma_read_info
*info
,
673 const struct svc_rdma_segment
*segment
)
675 struct svc_rdma_recv_ctxt
*head
= info
->ri_readctxt
;
676 struct svc_rdma_chunk_ctxt
*cc
= &info
->ri_cc
;
677 struct svc_rqst
*rqstp
= info
->ri_rqst
;
678 struct svc_rdma_rw_ctxt
*ctxt
;
679 unsigned int sge_no
, seg_len
, len
;
680 struct scatterlist
*sg
;
683 len
= segment
->rs_length
;
684 sge_no
= PAGE_ALIGN(info
->ri_pageoff
+ len
) >> PAGE_SHIFT
;
685 ctxt
= svc_rdma_get_rw_ctxt(cc
->cc_rdma
, sge_no
);
688 ctxt
->rw_nents
= sge_no
;
690 sg
= ctxt
->rw_sg_table
.sgl
;
691 for (sge_no
= 0; sge_no
< ctxt
->rw_nents
; sge_no
++) {
692 seg_len
= min_t(unsigned int, len
,
693 PAGE_SIZE
- info
->ri_pageoff
);
695 head
->rc_arg
.pages
[info
->ri_pageno
] =
696 rqstp
->rq_pages
[info
->ri_pageno
];
697 if (!info
->ri_pageoff
)
698 head
->rc_page_count
++;
700 sg_set_page(sg
, rqstp
->rq_pages
[info
->ri_pageno
],
701 seg_len
, info
->ri_pageoff
);
704 info
->ri_pageoff
+= seg_len
;
705 if (info
->ri_pageoff
== PAGE_SIZE
) {
707 info
->ri_pageoff
= 0;
713 &rqstp
->rq_pages
[info
->ri_pageno
+ 1] > rqstp
->rq_page_end
)
717 ret
= svc_rdma_rw_ctx_init(cc
->cc_rdma
, ctxt
, segment
->rs_offset
,
718 segment
->rs_handle
, DMA_FROM_DEVICE
);
722 list_add(&ctxt
->rw_list
, &cc
->cc_rwctxts
);
723 cc
->cc_sqecount
+= ret
;
727 trace_svcrdma_page_overrun_err(cc
->cc_rdma
, rqstp
, info
->ri_pageno
);
732 * svc_rdma_build_read_chunk - Build RDMA Read WQEs to pull one RDMA chunk
733 * @info: context for ongoing I/O
734 * @chunk: Read chunk to pull
737 * %0: the Read WR chain was constructed successfully
738 * %-EINVAL: there were not enough resources to finish
739 * %-ENOMEM: allocating a local resources failed
740 * %-EIO: a DMA mapping error occurred
742 static int svc_rdma_build_read_chunk(struct svc_rdma_read_info
*info
,
743 const struct svc_rdma_chunk
*chunk
)
745 const struct svc_rdma_segment
*segment
;
749 pcl_for_each_segment(segment
, chunk
) {
750 ret
= svc_rdma_build_read_segment(info
, segment
);
753 info
->ri_totalbytes
+= segment
->rs_length
;
759 * svc_rdma_copy_inline_range - Copy part of the inline content into pages
760 * @info: context for RDMA Reads
761 * @offset: offset into the Receive buffer of region to copy
762 * @remaining: length of region to copy
764 * Take a page at a time from rqstp->rq_pages and copy the inline
765 * content from the Receive buffer into that page. Update
766 * info->ri_pageno and info->ri_pageoff so that the next RDMA Read
767 * result will land contiguously with the copied content.
770 * %0: Inline content was successfully copied
771 * %-EINVAL: offset or length was incorrect
773 static int svc_rdma_copy_inline_range(struct svc_rdma_read_info
*info
,
775 unsigned int remaining
)
777 struct svc_rdma_recv_ctxt
*head
= info
->ri_readctxt
;
778 unsigned char *dst
, *src
= head
->rc_recv_buf
;
779 struct svc_rqst
*rqstp
= info
->ri_rqst
;
780 unsigned int page_no
, numpages
;
782 numpages
= PAGE_ALIGN(info
->ri_pageoff
+ remaining
) >> PAGE_SHIFT
;
783 for (page_no
= 0; page_no
< numpages
; page_no
++) {
784 unsigned int page_len
;
786 page_len
= min_t(unsigned int, remaining
,
787 PAGE_SIZE
- info
->ri_pageoff
);
789 head
->rc_arg
.pages
[info
->ri_pageno
] =
790 rqstp
->rq_pages
[info
->ri_pageno
];
791 if (!info
->ri_pageoff
)
792 head
->rc_page_count
++;
794 dst
= page_address(head
->rc_arg
.pages
[info
->ri_pageno
]);
795 memcpy(dst
+ info
->ri_pageno
, src
+ offset
, page_len
);
797 info
->ri_totalbytes
+= page_len
;
798 info
->ri_pageoff
+= page_len
;
799 if (info
->ri_pageoff
== PAGE_SIZE
) {
801 info
->ri_pageoff
= 0;
803 remaining
-= page_len
;
811 * svc_rdma_read_multiple_chunks - Construct RDMA Reads to pull data item Read chunks
812 * @info: context for RDMA Reads
814 * The chunk data lands in head->rc_arg as a series of contiguous pages,
815 * like an incoming TCP call.
818 * %0: RDMA Read WQEs were successfully built
819 * %-EINVAL: client provided too many chunks or segments,
820 * %-ENOMEM: rdma_rw context pool was exhausted,
821 * %-ENOTCONN: posting failed (connection is lost),
822 * %-EIO: rdma_rw initialization failed (DMA mapping, etc).
824 static noinline
int svc_rdma_read_multiple_chunks(struct svc_rdma_read_info
*info
)
826 struct svc_rdma_recv_ctxt
*head
= info
->ri_readctxt
;
827 const struct svc_rdma_pcl
*pcl
= &head
->rc_read_pcl
;
828 struct svc_rdma_chunk
*chunk
, *next
;
829 struct xdr_buf
*buf
= &head
->rc_arg
;
830 unsigned int start
, length
;
834 chunk
= pcl_first_chunk(pcl
);
835 length
= chunk
->ch_position
;
836 ret
= svc_rdma_copy_inline_range(info
, start
, length
);
840 pcl_for_each_chunk(chunk
, pcl
) {
841 ret
= svc_rdma_build_read_chunk(info
, chunk
);
845 next
= pcl_next_chunk(pcl
, chunk
);
850 length
= next
->ch_position
- info
->ri_totalbytes
;
851 ret
= svc_rdma_copy_inline_range(info
, start
, length
);
857 length
= head
->rc_byte_len
- start
;
858 ret
= svc_rdma_copy_inline_range(info
, start
, length
);
862 buf
->len
+= info
->ri_totalbytes
;
863 buf
->buflen
+= info
->ri_totalbytes
;
865 head
->rc_hdr_count
= 1;
866 buf
->head
[0].iov_base
= page_address(head
->rc_pages
[0]);
867 buf
->head
[0].iov_len
= min_t(size_t, PAGE_SIZE
, info
->ri_totalbytes
);
868 buf
->page_len
= info
->ri_totalbytes
- buf
->head
[0].iov_len
;
873 * svc_rdma_read_data_item - Construct RDMA Reads to pull data item Read chunks
874 * @info: context for RDMA Reads
876 * The chunk data lands in the page list of head->rc_arg.pages.
878 * Currently NFSD does not look at the head->rc_arg.tail[0] iovec.
879 * Therefore, XDR round-up of the Read chunk and trailing
880 * inline content must both be added at the end of the pagelist.
883 * %0: RDMA Read WQEs were successfully built
884 * %-EINVAL: client provided too many chunks or segments,
885 * %-ENOMEM: rdma_rw context pool was exhausted,
886 * %-ENOTCONN: posting failed (connection is lost),
887 * %-EIO: rdma_rw initialization failed (DMA mapping, etc).
889 static int svc_rdma_read_data_item(struct svc_rdma_read_info
*info
)
891 struct svc_rdma_recv_ctxt
*head
= info
->ri_readctxt
;
892 struct xdr_buf
*buf
= &head
->rc_arg
;
893 struct svc_rdma_chunk
*chunk
;
897 chunk
= pcl_first_chunk(&head
->rc_read_pcl
);
898 ret
= svc_rdma_build_read_chunk(info
, chunk
);
902 head
->rc_hdr_count
= 0;
904 /* Split the Receive buffer between the head and tail
905 * buffers at Read chunk's position. XDR roundup of the
906 * chunk is not included in either the pagelist or in
909 buf
->tail
[0].iov_base
= buf
->head
[0].iov_base
+ chunk
->ch_position
;
910 buf
->tail
[0].iov_len
= buf
->head
[0].iov_len
- chunk
->ch_position
;
911 buf
->head
[0].iov_len
= chunk
->ch_position
;
913 /* Read chunk may need XDR roundup (see RFC 8166, s. 3.4.5.2).
915 * If the client already rounded up the chunk length, the
916 * length does not change. Otherwise, the length of the page
917 * list is increased to include XDR round-up.
919 * Currently these chunks always start at page offset 0,
920 * thus the rounded-up length never crosses a page boundary.
922 length
= XDR_QUADLEN(info
->ri_totalbytes
) << 2;
923 buf
->page_len
= length
;
925 buf
->buflen
+= length
;
932 * svc_rdma_read_chunk_range - Build RDMA Read WQEs for portion of a chunk
933 * @info: context for RDMA Reads
934 * @chunk: parsed Call chunk to pull
935 * @offset: offset of region to pull
936 * @length: length of region to pull
939 * %0: RDMA Read WQEs were successfully built
940 * %-EINVAL: there were not enough resources to finish
941 * %-ENOMEM: rdma_rw context pool was exhausted,
942 * %-ENOTCONN: posting failed (connection is lost),
943 * %-EIO: rdma_rw initialization failed (DMA mapping, etc).
945 static int svc_rdma_read_chunk_range(struct svc_rdma_read_info
*info
,
946 const struct svc_rdma_chunk
*chunk
,
947 unsigned int offset
, unsigned int length
)
949 const struct svc_rdma_segment
*segment
;
953 pcl_for_each_segment(segment
, chunk
) {
954 struct svc_rdma_segment dummy
;
956 if (offset
> segment
->rs_length
) {
957 offset
-= segment
->rs_length
;
961 dummy
.rs_handle
= segment
->rs_handle
;
962 dummy
.rs_length
= min_t(u32
, length
, segment
->rs_length
) - offset
;
963 dummy
.rs_offset
= segment
->rs_offset
+ offset
;
965 ret
= svc_rdma_build_read_segment(info
, &dummy
);
969 info
->ri_totalbytes
+= dummy
.rs_length
;
970 length
-= dummy
.rs_length
;
977 * svc_rdma_read_call_chunk - Build RDMA Read WQEs to pull a Long Message
978 * @info: context for RDMA Reads
981 * %0: RDMA Read WQEs were successfully built
982 * %-EINVAL: there were not enough resources to finish
983 * %-ENOMEM: rdma_rw context pool was exhausted,
984 * %-ENOTCONN: posting failed (connection is lost),
985 * %-EIO: rdma_rw initialization failed (DMA mapping, etc).
987 static int svc_rdma_read_call_chunk(struct svc_rdma_read_info
*info
)
989 struct svc_rdma_recv_ctxt
*head
= info
->ri_readctxt
;
990 const struct svc_rdma_chunk
*call_chunk
=
991 pcl_first_chunk(&head
->rc_call_pcl
);
992 const struct svc_rdma_pcl
*pcl
= &head
->rc_read_pcl
;
993 struct svc_rdma_chunk
*chunk
, *next
;
994 unsigned int start
, length
;
997 if (pcl_is_empty(pcl
))
998 return svc_rdma_build_read_chunk(info
, call_chunk
);
1001 chunk
= pcl_first_chunk(pcl
);
1002 length
= chunk
->ch_position
;
1003 ret
= svc_rdma_read_chunk_range(info
, call_chunk
, start
, length
);
1007 pcl_for_each_chunk(chunk
, pcl
) {
1008 ret
= svc_rdma_build_read_chunk(info
, chunk
);
1012 next
= pcl_next_chunk(pcl
, chunk
);
1017 length
= next
->ch_position
- info
->ri_totalbytes
;
1018 ret
= svc_rdma_read_chunk_range(info
, call_chunk
,
1025 length
= call_chunk
->ch_length
- start
;
1026 return svc_rdma_read_chunk_range(info
, call_chunk
, start
, length
);
1030 * svc_rdma_read_special - Build RDMA Read WQEs to pull a Long Message
1031 * @info: context for RDMA Reads
1033 * The start of the data lands in the first page just after the
1034 * Transport header, and the rest lands in the page list of
1035 * head->rc_arg.pages.
1038 * - A PZRC is never sent in an RDMA_MSG message, though it's
1042 * %0: RDMA Read WQEs were successfully built
1043 * %-EINVAL: client provided too many chunks or segments,
1044 * %-ENOMEM: rdma_rw context pool was exhausted,
1045 * %-ENOTCONN: posting failed (connection is lost),
1046 * %-EIO: rdma_rw initialization failed (DMA mapping, etc).
1048 static noinline
int svc_rdma_read_special(struct svc_rdma_read_info
*info
)
1050 struct svc_rdma_recv_ctxt
*head
= info
->ri_readctxt
;
1051 struct xdr_buf
*buf
= &head
->rc_arg
;
1054 ret
= svc_rdma_read_call_chunk(info
);
1058 buf
->len
+= info
->ri_totalbytes
;
1059 buf
->buflen
+= info
->ri_totalbytes
;
1061 head
->rc_hdr_count
= 1;
1062 buf
->head
[0].iov_base
= page_address(head
->rc_pages
[0]);
1063 buf
->head
[0].iov_len
= min_t(size_t, PAGE_SIZE
, info
->ri_totalbytes
);
1064 buf
->page_len
= info
->ri_totalbytes
- buf
->head
[0].iov_len
;
1070 /* Pages under I/O have been copied to head->rc_pages. Ensure they
1071 * are not released by svc_xprt_release() until the I/O is complete.
1073 * This has to be done after all Read WRs are constructed to properly
1074 * handle a page that is part of I/O on behalf of two different RDMA
1077 * Do this only if I/O has been posted. Otherwise, we do indeed want
1078 * svc_xprt_release() to clean things up properly.
1080 static void svc_rdma_save_io_pages(struct svc_rqst
*rqstp
,
1081 const unsigned int start
,
1082 const unsigned int num_pages
)
1086 for (i
= start
; i
< num_pages
+ start
; i
++)
1087 rqstp
->rq_pages
[i
] = NULL
;
1091 * svc_rdma_process_read_list - Pull list of Read chunks from the client
1092 * @rdma: controlling RDMA transport
1093 * @rqstp: set of pages to use as Read sink buffers
1094 * @head: pages under I/O collect here
1096 * The RPC/RDMA protocol assumes that the upper layer's XDR decoders
1097 * pull each Read chunk as they decode an incoming RPC message.
1099 * On Linux, however, the server needs to have a fully-constructed RPC
1100 * message in rqstp->rq_arg when there is a positive return code from
1101 * ->xpo_recvfrom. So the Read list is safety-checked immediately when
1102 * it is received, then here the whole Read list is pulled all at once.
1103 * The ingress RPC message is fully reconstructed once all associated
1104 * RDMA Reads have completed.
1107 * %1: all needed RDMA Reads were posted successfully,
1108 * %-EINVAL: client provided too many chunks or segments,
1109 * %-ENOMEM: rdma_rw context pool was exhausted,
1110 * %-ENOTCONN: posting failed (connection is lost),
1111 * %-EIO: rdma_rw initialization failed (DMA mapping, etc).
1113 int svc_rdma_process_read_list(struct svcxprt_rdma
*rdma
,
1114 struct svc_rqst
*rqstp
,
1115 struct svc_rdma_recv_ctxt
*head
)
1117 struct svc_rdma_read_info
*info
;
1118 struct svc_rdma_chunk_ctxt
*cc
;
1121 /* The request (with page list) is constructed in
1122 * head->rc_arg. Pages involved with RDMA Read I/O are
1123 * transferred there.
1125 head
->rc_arg
.head
[0] = rqstp
->rq_arg
.head
[0];
1126 head
->rc_arg
.tail
[0] = rqstp
->rq_arg
.tail
[0];
1127 head
->rc_arg
.pages
= head
->rc_pages
;
1128 head
->rc_arg
.page_base
= 0;
1129 head
->rc_arg
.page_len
= 0;
1130 head
->rc_arg
.len
= rqstp
->rq_arg
.len
;
1131 head
->rc_arg
.buflen
= rqstp
->rq_arg
.buflen
;
1133 info
= svc_rdma_read_info_alloc(rdma
);
1137 info
->ri_rqst
= rqstp
;
1138 info
->ri_readctxt
= head
;
1139 info
->ri_pageno
= 0;
1140 info
->ri_pageoff
= 0;
1141 info
->ri_totalbytes
= 0;
1143 if (pcl_is_empty(&head
->rc_call_pcl
)) {
1144 if (head
->rc_read_pcl
.cl_count
== 1)
1145 ret
= svc_rdma_read_data_item(info
);
1147 ret
= svc_rdma_read_multiple_chunks(info
);
1149 ret
= svc_rdma_read_special(info
);
1153 trace_svcrdma_post_read_chunk(&cc
->cc_cid
, cc
->cc_sqecount
);
1154 ret
= svc_rdma_post_chunk_ctxt(cc
);
1157 svc_rdma_save_io_pages(rqstp
, 0, head
->rc_page_count
);
1161 svc_rdma_read_info_free(info
);