1 // SPDX-License-Identifier: GPL-2.0
3 * Copyright (c) 2016-2018 Oracle. All rights reserved.
5 * Use the core R/W API to move RPC-over-RDMA Read and Write chunks.
10 #include <linux/sunrpc/rpc_rdma.h>
11 #include <linux/sunrpc/svc_rdma.h>
12 #include <linux/sunrpc/debug.h>
14 #include "xprt_rdma.h"
15 #include <trace/events/rpcrdma.h>
17 #define RPCDBG_FACILITY RPCDBG_SVCXPRT
19 static void svc_rdma_write_done(struct ib_cq
*cq
, struct ib_wc
*wc
);
20 static void svc_rdma_wc_read_done(struct ib_cq
*cq
, struct ib_wc
*wc
);
22 /* Each R/W context contains state for one chain of RDMA Read or
23 * Write Work Requests.
25 * Each WR chain handles a single contiguous server-side buffer,
26 * because scatterlist entries after the first have to start on
27 * page alignment. xdr_buf iovecs cannot guarantee alignment.
29 * Each WR chain handles only one R_key. Each RPC-over-RDMA segment
30 * from a client may contain a unique R_key, so each WR chain moves
31 * up to one segment at a time.
33 * The scatterlist makes this data structure over 4KB in size. To
34 * make it less likely to fail, and to handle the allocation for
35 * smaller I/O requests without disabling bottom-halves, these
36 * contexts are created on demand, but cached and reused until the
37 * controlling svcxprt_rdma is destroyed.
39 struct svc_rdma_rw_ctxt
{
40 struct list_head rw_list
;
41 struct rdma_rw_ctx rw_ctx
;
43 struct sg_table rw_sg_table
;
44 struct scatterlist rw_first_sgl
[0];
47 static inline struct svc_rdma_rw_ctxt
*
48 svc_rdma_next_ctxt(struct list_head
*list
)
50 return list_first_entry_or_null(list
, struct svc_rdma_rw_ctxt
,
54 static struct svc_rdma_rw_ctxt
*
55 svc_rdma_get_rw_ctxt(struct svcxprt_rdma
*rdma
, unsigned int sges
)
57 struct svc_rdma_rw_ctxt
*ctxt
;
59 spin_lock(&rdma
->sc_rw_ctxt_lock
);
61 ctxt
= svc_rdma_next_ctxt(&rdma
->sc_rw_ctxts
);
63 list_del(&ctxt
->rw_list
);
64 spin_unlock(&rdma
->sc_rw_ctxt_lock
);
66 spin_unlock(&rdma
->sc_rw_ctxt_lock
);
67 ctxt
= kmalloc(sizeof(*ctxt
) +
68 SG_CHUNK_SIZE
* sizeof(struct scatterlist
),
72 INIT_LIST_HEAD(&ctxt
->rw_list
);
75 ctxt
->rw_sg_table
.sgl
= ctxt
->rw_first_sgl
;
76 if (sg_alloc_table_chained(&ctxt
->rw_sg_table
, sges
,
77 ctxt
->rw_sg_table
.sgl
)) {
85 static void svc_rdma_put_rw_ctxt(struct svcxprt_rdma
*rdma
,
86 struct svc_rdma_rw_ctxt
*ctxt
)
88 sg_free_table_chained(&ctxt
->rw_sg_table
, true);
90 spin_lock(&rdma
->sc_rw_ctxt_lock
);
91 list_add(&ctxt
->rw_list
, &rdma
->sc_rw_ctxts
);
92 spin_unlock(&rdma
->sc_rw_ctxt_lock
);
96 * svc_rdma_destroy_rw_ctxts - Free accumulated R/W contexts
97 * @rdma: transport about to be destroyed
100 void svc_rdma_destroy_rw_ctxts(struct svcxprt_rdma
*rdma
)
102 struct svc_rdma_rw_ctxt
*ctxt
;
104 while ((ctxt
= svc_rdma_next_ctxt(&rdma
->sc_rw_ctxts
)) != NULL
) {
105 list_del(&ctxt
->rw_list
);
110 /* A chunk context tracks all I/O for moving one Read or Write
111 * chunk. This is a a set of rdma_rw's that handle data movement
112 * for all segments of one chunk.
114 * These are small, acquired with a single allocator call, and
115 * no more than one is needed per chunk. They are allocated on
116 * demand, and not cached.
118 struct svc_rdma_chunk_ctxt
{
119 struct ib_cqe cc_cqe
;
120 struct svcxprt_rdma
*cc_rdma
;
121 struct list_head cc_rwctxts
;
125 static void svc_rdma_cc_init(struct svcxprt_rdma
*rdma
,
126 struct svc_rdma_chunk_ctxt
*cc
)
129 svc_xprt_get(&rdma
->sc_xprt
);
131 INIT_LIST_HEAD(&cc
->cc_rwctxts
);
135 static void svc_rdma_cc_release(struct svc_rdma_chunk_ctxt
*cc
,
136 enum dma_data_direction dir
)
138 struct svcxprt_rdma
*rdma
= cc
->cc_rdma
;
139 struct svc_rdma_rw_ctxt
*ctxt
;
141 while ((ctxt
= svc_rdma_next_ctxt(&cc
->cc_rwctxts
)) != NULL
) {
142 list_del(&ctxt
->rw_list
);
144 rdma_rw_ctx_destroy(&ctxt
->rw_ctx
, rdma
->sc_qp
,
145 rdma
->sc_port_num
, ctxt
->rw_sg_table
.sgl
,
146 ctxt
->rw_nents
, dir
);
147 svc_rdma_put_rw_ctxt(rdma
, ctxt
);
149 svc_xprt_put(&rdma
->sc_xprt
);
152 /* State for sending a Write or Reply chunk.
153 * - Tracks progress of writing one chunk over all its segments
154 * - Stores arguments for the SGL constructor functions
156 struct svc_rdma_write_info
{
157 /* write state of this chunk */
158 unsigned int wi_seg_off
;
159 unsigned int wi_seg_no
;
160 unsigned int wi_nsegs
;
163 /* SGL constructor arguments */
164 struct xdr_buf
*wi_xdr
;
165 unsigned char *wi_base
;
166 unsigned int wi_next_off
;
168 struct svc_rdma_chunk_ctxt wi_cc
;
171 static struct svc_rdma_write_info
*
172 svc_rdma_write_info_alloc(struct svcxprt_rdma
*rdma
, __be32
*chunk
)
174 struct svc_rdma_write_info
*info
;
176 info
= kmalloc(sizeof(*info
), GFP_KERNEL
);
180 info
->wi_seg_off
= 0;
182 info
->wi_nsegs
= be32_to_cpup(++chunk
);
183 info
->wi_segs
= ++chunk
;
184 svc_rdma_cc_init(rdma
, &info
->wi_cc
);
185 info
->wi_cc
.cc_cqe
.done
= svc_rdma_write_done
;
189 static void svc_rdma_write_info_free(struct svc_rdma_write_info
*info
)
191 svc_rdma_cc_release(&info
->wi_cc
, DMA_TO_DEVICE
);
196 * svc_rdma_write_done - Write chunk completion
197 * @cq: controlling Completion Queue
198 * @wc: Work Completion
200 * Pages under I/O are freed by a subsequent Send completion.
202 static void svc_rdma_write_done(struct ib_cq
*cq
, struct ib_wc
*wc
)
204 struct ib_cqe
*cqe
= wc
->wr_cqe
;
205 struct svc_rdma_chunk_ctxt
*cc
=
206 container_of(cqe
, struct svc_rdma_chunk_ctxt
, cc_cqe
);
207 struct svcxprt_rdma
*rdma
= cc
->cc_rdma
;
208 struct svc_rdma_write_info
*info
=
209 container_of(cc
, struct svc_rdma_write_info
, wi_cc
);
211 trace_svcrdma_wc_write(wc
);
213 atomic_add(cc
->cc_sqecount
, &rdma
->sc_sq_avail
);
214 wake_up(&rdma
->sc_send_wait
);
216 if (unlikely(wc
->status
!= IB_WC_SUCCESS
)) {
217 set_bit(XPT_CLOSE
, &rdma
->sc_xprt
.xpt_flags
);
218 if (wc
->status
!= IB_WC_WR_FLUSH_ERR
)
219 pr_err("svcrdma: write ctx: %s (%u/0x%x)\n",
220 ib_wc_status_msg(wc
->status
),
221 wc
->status
, wc
->vendor_err
);
224 svc_rdma_write_info_free(info
);
227 /* State for pulling a Read chunk.
229 struct svc_rdma_read_info
{
230 struct svc_rdma_recv_ctxt
*ri_readctxt
;
231 unsigned int ri_position
;
232 unsigned int ri_pageno
;
233 unsigned int ri_pageoff
;
234 unsigned int ri_chunklen
;
236 struct svc_rdma_chunk_ctxt ri_cc
;
239 static struct svc_rdma_read_info
*
240 svc_rdma_read_info_alloc(struct svcxprt_rdma
*rdma
)
242 struct svc_rdma_read_info
*info
;
244 info
= kmalloc(sizeof(*info
), GFP_KERNEL
);
248 svc_rdma_cc_init(rdma
, &info
->ri_cc
);
249 info
->ri_cc
.cc_cqe
.done
= svc_rdma_wc_read_done
;
253 static void svc_rdma_read_info_free(struct svc_rdma_read_info
*info
)
255 svc_rdma_cc_release(&info
->ri_cc
, DMA_FROM_DEVICE
);
260 * svc_rdma_wc_read_done - Handle completion of an RDMA Read ctx
261 * @cq: controlling Completion Queue
262 * @wc: Work Completion
265 static void svc_rdma_wc_read_done(struct ib_cq
*cq
, struct ib_wc
*wc
)
267 struct ib_cqe
*cqe
= wc
->wr_cqe
;
268 struct svc_rdma_chunk_ctxt
*cc
=
269 container_of(cqe
, struct svc_rdma_chunk_ctxt
, cc_cqe
);
270 struct svcxprt_rdma
*rdma
= cc
->cc_rdma
;
271 struct svc_rdma_read_info
*info
=
272 container_of(cc
, struct svc_rdma_read_info
, ri_cc
);
274 trace_svcrdma_wc_read(wc
);
276 atomic_add(cc
->cc_sqecount
, &rdma
->sc_sq_avail
);
277 wake_up(&rdma
->sc_send_wait
);
279 if (unlikely(wc
->status
!= IB_WC_SUCCESS
)) {
280 set_bit(XPT_CLOSE
, &rdma
->sc_xprt
.xpt_flags
);
281 if (wc
->status
!= IB_WC_WR_FLUSH_ERR
)
282 pr_err("svcrdma: read ctx: %s (%u/0x%x)\n",
283 ib_wc_status_msg(wc
->status
),
284 wc
->status
, wc
->vendor_err
);
285 svc_rdma_recv_ctxt_put(rdma
, info
->ri_readctxt
);
287 spin_lock(&rdma
->sc_rq_dto_lock
);
288 list_add_tail(&info
->ri_readctxt
->rc_list
,
289 &rdma
->sc_read_complete_q
);
290 spin_unlock(&rdma
->sc_rq_dto_lock
);
292 set_bit(XPT_DATA
, &rdma
->sc_xprt
.xpt_flags
);
293 svc_xprt_enqueue(&rdma
->sc_xprt
);
296 svc_rdma_read_info_free(info
);
299 /* This function sleeps when the transport's Send Queue is congested.
302 * - If ib_post_send() succeeds, only one completion is expected,
303 * even if one or more WRs are flushed. This is true when posting
304 * an rdma_rw_ctx or when posting a single signaled WR.
306 static int svc_rdma_post_chunk_ctxt(struct svc_rdma_chunk_ctxt
*cc
)
308 struct svcxprt_rdma
*rdma
= cc
->cc_rdma
;
309 struct svc_xprt
*xprt
= &rdma
->sc_xprt
;
310 struct ib_send_wr
*first_wr
;
311 const struct ib_send_wr
*bad_wr
;
312 struct list_head
*tmp
;
316 if (cc
->cc_sqecount
> rdma
->sc_sq_depth
)
321 list_for_each(tmp
, &cc
->cc_rwctxts
) {
322 struct svc_rdma_rw_ctxt
*ctxt
;
324 ctxt
= list_entry(tmp
, struct svc_rdma_rw_ctxt
, rw_list
);
325 first_wr
= rdma_rw_ctx_wrs(&ctxt
->rw_ctx
, rdma
->sc_qp
,
326 rdma
->sc_port_num
, cqe
, first_wr
);
331 if (atomic_sub_return(cc
->cc_sqecount
,
332 &rdma
->sc_sq_avail
) > 0) {
333 ret
= ib_post_send(rdma
->sc_qp
, first_wr
, &bad_wr
);
334 trace_svcrdma_post_rw(&cc
->cc_cqe
,
335 cc
->cc_sqecount
, ret
);
341 trace_svcrdma_sq_full(rdma
);
342 atomic_add(cc
->cc_sqecount
, &rdma
->sc_sq_avail
);
343 wait_event(rdma
->sc_send_wait
,
344 atomic_read(&rdma
->sc_sq_avail
) > cc
->cc_sqecount
);
345 trace_svcrdma_sq_retry(rdma
);
348 set_bit(XPT_CLOSE
, &xprt
->xpt_flags
);
350 /* If even one was posted, there will be a completion. */
351 if (bad_wr
!= first_wr
)
354 atomic_add(cc
->cc_sqecount
, &rdma
->sc_sq_avail
);
355 wake_up(&rdma
->sc_send_wait
);
359 /* Build and DMA-map an SGL that covers one kvec in an xdr_buf
361 static void svc_rdma_vec_to_sg(struct svc_rdma_write_info
*info
,
363 struct svc_rdma_rw_ctxt
*ctxt
)
365 struct scatterlist
*sg
= ctxt
->rw_sg_table
.sgl
;
367 sg_set_buf(&sg
[0], info
->wi_base
, len
);
368 info
->wi_base
+= len
;
373 /* Build and DMA-map an SGL that covers part of an xdr_buf's pagelist.
375 static void svc_rdma_pagelist_to_sg(struct svc_rdma_write_info
*info
,
376 unsigned int remaining
,
377 struct svc_rdma_rw_ctxt
*ctxt
)
379 unsigned int sge_no
, sge_bytes
, page_off
, page_no
;
380 struct xdr_buf
*xdr
= info
->wi_xdr
;
381 struct scatterlist
*sg
;
384 page_off
= info
->wi_next_off
+ xdr
->page_base
;
385 page_no
= page_off
>> PAGE_SHIFT
;
386 page_off
= offset_in_page(page_off
);
387 page
= xdr
->pages
+ page_no
;
388 info
->wi_next_off
+= remaining
;
389 sg
= ctxt
->rw_sg_table
.sgl
;
392 sge_bytes
= min_t(unsigned int, remaining
,
393 PAGE_SIZE
- page_off
);
394 sg_set_page(sg
, *page
, sge_bytes
, page_off
);
396 remaining
-= sge_bytes
;
403 ctxt
->rw_nents
= sge_no
;
406 /* Construct RDMA Write WRs to send a portion of an xdr_buf containing
410 svc_rdma_build_writes(struct svc_rdma_write_info
*info
,
411 void (*constructor
)(struct svc_rdma_write_info
*info
,
413 struct svc_rdma_rw_ctxt
*ctxt
),
414 unsigned int remaining
)
416 struct svc_rdma_chunk_ctxt
*cc
= &info
->wi_cc
;
417 struct svcxprt_rdma
*rdma
= cc
->cc_rdma
;
418 struct svc_rdma_rw_ctxt
*ctxt
;
422 seg
= info
->wi_segs
+ info
->wi_seg_no
* rpcrdma_segment_maxsz
;
424 unsigned int write_len
;
425 u32 seg_length
, seg_handle
;
428 if (info
->wi_seg_no
>= info
->wi_nsegs
)
431 seg_handle
= be32_to_cpup(seg
);
432 seg_length
= be32_to_cpup(seg
+ 1);
433 xdr_decode_hyper(seg
+ 2, &seg_offset
);
434 seg_offset
+= info
->wi_seg_off
;
436 write_len
= min(remaining
, seg_length
- info
->wi_seg_off
);
437 ctxt
= svc_rdma_get_rw_ctxt(rdma
,
438 (write_len
>> PAGE_SHIFT
) + 2);
442 constructor(info
, write_len
, ctxt
);
443 ret
= rdma_rw_ctx_init(&ctxt
->rw_ctx
, rdma
->sc_qp
,
444 rdma
->sc_port_num
, ctxt
->rw_sg_table
.sgl
,
445 ctxt
->rw_nents
, 0, seg_offset
,
446 seg_handle
, DMA_TO_DEVICE
);
450 trace_svcrdma_encode_wseg(seg_handle
, write_len
, seg_offset
);
451 list_add(&ctxt
->rw_list
, &cc
->cc_rwctxts
);
452 cc
->cc_sqecount
+= ret
;
453 if (write_len
== seg_length
- info
->wi_seg_off
) {
456 info
->wi_seg_off
= 0;
458 info
->wi_seg_off
+= write_len
;
460 remaining
-= write_len
;
466 dprintk("svcrdma: inadequate space in Write chunk (%u)\n",
471 dprintk("svcrdma: no R/W ctxs available\n");
475 svc_rdma_put_rw_ctxt(rdma
, ctxt
);
476 trace_svcrdma_dma_map_rwctx(rdma
, ret
);
480 /* Send one of an xdr_buf's kvecs by itself. To send a Reply
481 * chunk, the whole RPC Reply is written back to the client.
482 * This function writes either the head or tail of the xdr_buf
483 * containing the Reply.
485 static int svc_rdma_send_xdr_kvec(struct svc_rdma_write_info
*info
,
488 info
->wi_base
= vec
->iov_base
;
489 return svc_rdma_build_writes(info
, svc_rdma_vec_to_sg
,
493 /* Send an xdr_buf's page list by itself. A Write chunk is
494 * just the page list. a Reply chunk is the head, page list,
495 * and tail. This function is shared between the two types
498 static int svc_rdma_send_xdr_pagelist(struct svc_rdma_write_info
*info
,
502 info
->wi_next_off
= 0;
503 return svc_rdma_build_writes(info
, svc_rdma_pagelist_to_sg
,
508 * svc_rdma_send_write_chunk - Write all segments in a Write chunk
509 * @rdma: controlling RDMA transport
510 * @wr_ch: Write chunk provided by client
511 * @xdr: xdr_buf containing the data payload
513 * Returns a non-negative number of bytes the chunk consumed, or
514 * %-E2BIG if the payload was larger than the Write chunk,
515 * %-EINVAL if client provided too many segments,
516 * %-ENOMEM if rdma_rw context pool was exhausted,
517 * %-ENOTCONN if posting failed (connection is lost),
518 * %-EIO if rdma_rw initialization failed (DMA mapping, etc).
520 int svc_rdma_send_write_chunk(struct svcxprt_rdma
*rdma
, __be32
*wr_ch
,
523 struct svc_rdma_write_info
*info
;
529 info
= svc_rdma_write_info_alloc(rdma
, wr_ch
);
533 ret
= svc_rdma_send_xdr_pagelist(info
, xdr
);
537 ret
= svc_rdma_post_chunk_ctxt(&info
->wi_cc
);
541 trace_svcrdma_encode_write(xdr
->page_len
);
542 return xdr
->page_len
;
545 svc_rdma_write_info_free(info
);
550 * svc_rdma_send_reply_chunk - Write all segments in the Reply chunk
551 * @rdma: controlling RDMA transport
552 * @rp_ch: Reply chunk provided by client
553 * @writelist: true if client provided a Write list
554 * @xdr: xdr_buf containing an RPC Reply
556 * Returns a non-negative number of bytes the chunk consumed, or
557 * %-E2BIG if the payload was larger than the Reply chunk,
558 * %-EINVAL if client provided too many segments,
559 * %-ENOMEM if rdma_rw context pool was exhausted,
560 * %-ENOTCONN if posting failed (connection is lost),
561 * %-EIO if rdma_rw initialization failed (DMA mapping, etc).
563 int svc_rdma_send_reply_chunk(struct svcxprt_rdma
*rdma
, __be32
*rp_ch
,
564 bool writelist
, struct xdr_buf
*xdr
)
566 struct svc_rdma_write_info
*info
;
569 info
= svc_rdma_write_info_alloc(rdma
, rp_ch
);
573 ret
= svc_rdma_send_xdr_kvec(info
, &xdr
->head
[0]);
576 consumed
= xdr
->head
[0].iov_len
;
578 /* Send the page list in the Reply chunk only if the
579 * client did not provide Write chunks.
581 if (!writelist
&& xdr
->page_len
) {
582 ret
= svc_rdma_send_xdr_pagelist(info
, xdr
);
585 consumed
+= xdr
->page_len
;
588 if (xdr
->tail
[0].iov_len
) {
589 ret
= svc_rdma_send_xdr_kvec(info
, &xdr
->tail
[0]);
592 consumed
+= xdr
->tail
[0].iov_len
;
595 ret
= svc_rdma_post_chunk_ctxt(&info
->wi_cc
);
599 trace_svcrdma_encode_reply(consumed
);
603 svc_rdma_write_info_free(info
);
607 static int svc_rdma_build_read_segment(struct svc_rdma_read_info
*info
,
608 struct svc_rqst
*rqstp
,
609 u32 rkey
, u32 len
, u64 offset
)
611 struct svc_rdma_recv_ctxt
*head
= info
->ri_readctxt
;
612 struct svc_rdma_chunk_ctxt
*cc
= &info
->ri_cc
;
613 struct svc_rdma_rw_ctxt
*ctxt
;
614 unsigned int sge_no
, seg_len
;
615 struct scatterlist
*sg
;
618 sge_no
= PAGE_ALIGN(info
->ri_pageoff
+ len
) >> PAGE_SHIFT
;
619 ctxt
= svc_rdma_get_rw_ctxt(cc
->cc_rdma
, sge_no
);
622 ctxt
->rw_nents
= sge_no
;
624 sg
= ctxt
->rw_sg_table
.sgl
;
625 for (sge_no
= 0; sge_no
< ctxt
->rw_nents
; sge_no
++) {
626 seg_len
= min_t(unsigned int, len
,
627 PAGE_SIZE
- info
->ri_pageoff
);
629 head
->rc_arg
.pages
[info
->ri_pageno
] =
630 rqstp
->rq_pages
[info
->ri_pageno
];
631 if (!info
->ri_pageoff
)
632 head
->rc_page_count
++;
634 sg_set_page(sg
, rqstp
->rq_pages
[info
->ri_pageno
],
635 seg_len
, info
->ri_pageoff
);
638 info
->ri_pageoff
+= seg_len
;
639 if (info
->ri_pageoff
== PAGE_SIZE
) {
641 info
->ri_pageoff
= 0;
647 &rqstp
->rq_pages
[info
->ri_pageno
+ 1] > rqstp
->rq_page_end
)
651 ret
= rdma_rw_ctx_init(&ctxt
->rw_ctx
, cc
->cc_rdma
->sc_qp
,
652 cc
->cc_rdma
->sc_port_num
,
653 ctxt
->rw_sg_table
.sgl
, ctxt
->rw_nents
,
654 0, offset
, rkey
, DMA_FROM_DEVICE
);
658 list_add(&ctxt
->rw_list
, &cc
->cc_rwctxts
);
659 cc
->cc_sqecount
+= ret
;
663 dprintk("svcrdma: no R/W ctxs available\n");
667 dprintk("svcrdma: request overruns rq_pages\n");
671 trace_svcrdma_dma_map_rwctx(cc
->cc_rdma
, ret
);
672 svc_rdma_put_rw_ctxt(cc
->cc_rdma
, ctxt
);
676 /* Walk the segments in the Read chunk starting at @p and construct
677 * RDMA Read operations to pull the chunk to the server.
679 static int svc_rdma_build_read_chunk(struct svc_rqst
*rqstp
,
680 struct svc_rdma_read_info
*info
,
687 info
->ri_chunklen
= 0;
688 while (*p
++ != xdr_zero
&& be32_to_cpup(p
++) == info
->ri_position
) {
689 u32 rs_handle
, rs_length
;
692 rs_handle
= be32_to_cpup(p
++);
693 rs_length
= be32_to_cpup(p
++);
694 p
= xdr_decode_hyper(p
, &rs_offset
);
696 ret
= svc_rdma_build_read_segment(info
, rqstp
,
697 rs_handle
, rs_length
,
702 trace_svcrdma_encode_rseg(rs_handle
, rs_length
, rs_offset
);
703 info
->ri_chunklen
+= rs_length
;
706 /* Pages under I/O have been copied to head->rc_pages.
707 * Prevent their premature release by svc_xprt_release() .
709 for (i
= 0; i
< info
->ri_readctxt
->rc_page_count
; i
++)
710 rqstp
->rq_pages
[i
] = NULL
;
715 /* Construct RDMA Reads to pull over a normal Read chunk. The chunk
716 * data lands in the page list of head->rc_arg.pages.
718 * Currently NFSD does not look at the head->rc_arg.tail[0] iovec.
719 * Therefore, XDR round-up of the Read chunk and trailing
720 * inline content must both be added at the end of the pagelist.
722 static int svc_rdma_build_normal_read_chunk(struct svc_rqst
*rqstp
,
723 struct svc_rdma_read_info
*info
,
726 struct svc_rdma_recv_ctxt
*head
= info
->ri_readctxt
;
729 ret
= svc_rdma_build_read_chunk(rqstp
, info
, p
);
733 trace_svcrdma_encode_read(info
->ri_chunklen
, info
->ri_position
);
735 head
->rc_hdr_count
= 0;
737 /* Split the Receive buffer between the head and tail
738 * buffers at Read chunk's position. XDR roundup of the
739 * chunk is not included in either the pagelist or in
742 head
->rc_arg
.tail
[0].iov_base
=
743 head
->rc_arg
.head
[0].iov_base
+ info
->ri_position
;
744 head
->rc_arg
.tail
[0].iov_len
=
745 head
->rc_arg
.head
[0].iov_len
- info
->ri_position
;
746 head
->rc_arg
.head
[0].iov_len
= info
->ri_position
;
748 /* Read chunk may need XDR roundup (see RFC 8166, s. 3.4.5.2).
750 * If the client already rounded up the chunk length, the
751 * length does not change. Otherwise, the length of the page
752 * list is increased to include XDR round-up.
754 * Currently these chunks always start at page offset 0,
755 * thus the rounded-up length never crosses a page boundary.
757 info
->ri_chunklen
= XDR_QUADLEN(info
->ri_chunklen
) << 2;
759 head
->rc_arg
.page_len
= info
->ri_chunklen
;
760 head
->rc_arg
.len
+= info
->ri_chunklen
;
761 head
->rc_arg
.buflen
+= info
->ri_chunklen
;
767 /* Construct RDMA Reads to pull over a Position Zero Read chunk.
768 * The start of the data lands in the first page just after
769 * the Transport header, and the rest lands in the page list of
770 * head->rc_arg.pages.
773 * - A PZRC has an XDR-aligned length (no implicit round-up).
774 * - There can be no trailing inline content (IOW, we assume
775 * a PZRC is never sent in an RDMA_MSG message, though it's
778 static int svc_rdma_build_pz_read_chunk(struct svc_rqst
*rqstp
,
779 struct svc_rdma_read_info
*info
,
782 struct svc_rdma_recv_ctxt
*head
= info
->ri_readctxt
;
785 ret
= svc_rdma_build_read_chunk(rqstp
, info
, p
);
789 trace_svcrdma_encode_pzr(info
->ri_chunklen
);
791 head
->rc_arg
.len
+= info
->ri_chunklen
;
792 head
->rc_arg
.buflen
+= info
->ri_chunklen
;
794 head
->rc_hdr_count
= 1;
795 head
->rc_arg
.head
[0].iov_base
= page_address(head
->rc_pages
[0]);
796 head
->rc_arg
.head
[0].iov_len
= min_t(size_t, PAGE_SIZE
,
799 head
->rc_arg
.page_len
= info
->ri_chunklen
-
800 head
->rc_arg
.head
[0].iov_len
;
807 * svc_rdma_recv_read_chunk - Pull a Read chunk from the client
808 * @rdma: controlling RDMA transport
809 * @rqstp: set of pages to use as Read sink buffers
810 * @head: pages under I/O collect here
811 * @p: pointer to start of Read chunk
814 * %0 if all needed RDMA Reads were posted successfully,
815 * %-EINVAL if client provided too many segments,
816 * %-ENOMEM if rdma_rw context pool was exhausted,
817 * %-ENOTCONN if posting failed (connection is lost),
818 * %-EIO if rdma_rw initialization failed (DMA mapping, etc).
821 * - All Read segments in @p have the same Position value.
823 int svc_rdma_recv_read_chunk(struct svcxprt_rdma
*rdma
, struct svc_rqst
*rqstp
,
824 struct svc_rdma_recv_ctxt
*head
, __be32
*p
)
826 struct svc_rdma_read_info
*info
;
829 /* The request (with page list) is constructed in
830 * head->rc_arg. Pages involved with RDMA Read I/O are
833 head
->rc_arg
.head
[0] = rqstp
->rq_arg
.head
[0];
834 head
->rc_arg
.tail
[0] = rqstp
->rq_arg
.tail
[0];
835 head
->rc_arg
.pages
= head
->rc_pages
;
836 head
->rc_arg
.page_base
= 0;
837 head
->rc_arg
.page_len
= 0;
838 head
->rc_arg
.len
= rqstp
->rq_arg
.len
;
839 head
->rc_arg
.buflen
= rqstp
->rq_arg
.buflen
;
841 info
= svc_rdma_read_info_alloc(rdma
);
844 info
->ri_readctxt
= head
;
846 info
->ri_pageoff
= 0;
848 info
->ri_position
= be32_to_cpup(p
+ 1);
849 if (info
->ri_position
)
850 ret
= svc_rdma_build_normal_read_chunk(rqstp
, info
, p
);
852 ret
= svc_rdma_build_pz_read_chunk(rqstp
, info
, p
);
856 ret
= svc_rdma_post_chunk_ctxt(&info
->ri_cc
);
862 svc_rdma_read_info_free(info
);