1 // SPDX-License-Identifier: GPL-2.0
3 * Copyright (c) 2016 Oracle. All rights reserved.
5 * Use the core R/W API to move RPC-over-RDMA Read and Write chunks.
8 #include <linux/sunrpc/rpc_rdma.h>
9 #include <linux/sunrpc/svc_rdma.h>
10 #include <linux/sunrpc/debug.h>
14 #define RPCDBG_FACILITY RPCDBG_SVCXPRT
16 static void svc_rdma_write_done(struct ib_cq
*cq
, struct ib_wc
*wc
);
17 static void svc_rdma_wc_read_done(struct ib_cq
*cq
, struct ib_wc
*wc
);
19 /* Each R/W context contains state for one chain of RDMA Read or
20 * Write Work Requests.
22 * Each WR chain handles a single contiguous server-side buffer,
23 * because scatterlist entries after the first have to start on
24 * page alignment. xdr_buf iovecs cannot guarantee alignment.
26 * Each WR chain handles only one R_key. Each RPC-over-RDMA segment
27 * from a client may contain a unique R_key, so each WR chain moves
28 * up to one segment at a time.
30 * The scatterlist makes this data structure over 4KB in size. To
31 * make it less likely to fail, and to handle the allocation for
32 * smaller I/O requests without disabling bottom-halves, these
33 * contexts are created on demand, but cached and reused until the
34 * controlling svcxprt_rdma is destroyed.
36 struct svc_rdma_rw_ctxt
{
37 struct list_head rw_list
;
38 struct rdma_rw_ctx rw_ctx
;
40 struct sg_table rw_sg_table
;
41 struct scatterlist rw_first_sgl
[0];
44 static inline struct svc_rdma_rw_ctxt
*
45 svc_rdma_next_ctxt(struct list_head
*list
)
47 return list_first_entry_or_null(list
, struct svc_rdma_rw_ctxt
,
51 static struct svc_rdma_rw_ctxt
*
52 svc_rdma_get_rw_ctxt(struct svcxprt_rdma
*rdma
, unsigned int sges
)
54 struct svc_rdma_rw_ctxt
*ctxt
;
56 spin_lock(&rdma
->sc_rw_ctxt_lock
);
58 ctxt
= svc_rdma_next_ctxt(&rdma
->sc_rw_ctxts
);
60 list_del(&ctxt
->rw_list
);
61 spin_unlock(&rdma
->sc_rw_ctxt_lock
);
63 spin_unlock(&rdma
->sc_rw_ctxt_lock
);
64 ctxt
= kmalloc(sizeof(*ctxt
) +
65 SG_CHUNK_SIZE
* sizeof(struct scatterlist
),
69 INIT_LIST_HEAD(&ctxt
->rw_list
);
72 ctxt
->rw_sg_table
.sgl
= ctxt
->rw_first_sgl
;
73 if (sg_alloc_table_chained(&ctxt
->rw_sg_table
, sges
,
74 ctxt
->rw_sg_table
.sgl
)) {
82 static void svc_rdma_put_rw_ctxt(struct svcxprt_rdma
*rdma
,
83 struct svc_rdma_rw_ctxt
*ctxt
)
85 sg_free_table_chained(&ctxt
->rw_sg_table
, true);
87 spin_lock(&rdma
->sc_rw_ctxt_lock
);
88 list_add(&ctxt
->rw_list
, &rdma
->sc_rw_ctxts
);
89 spin_unlock(&rdma
->sc_rw_ctxt_lock
);
93 * svc_rdma_destroy_rw_ctxts - Free accumulated R/W contexts
94 * @rdma: transport about to be destroyed
97 void svc_rdma_destroy_rw_ctxts(struct svcxprt_rdma
*rdma
)
99 struct svc_rdma_rw_ctxt
*ctxt
;
101 while ((ctxt
= svc_rdma_next_ctxt(&rdma
->sc_rw_ctxts
)) != NULL
) {
102 list_del(&ctxt
->rw_list
);
107 /* A chunk context tracks all I/O for moving one Read or Write
108 * chunk. This is a a set of rdma_rw's that handle data movement
109 * for all segments of one chunk.
111 * These are small, acquired with a single allocator call, and
112 * no more than one is needed per chunk. They are allocated on
113 * demand, and not cached.
115 struct svc_rdma_chunk_ctxt
{
116 struct ib_cqe cc_cqe
;
117 struct svcxprt_rdma
*cc_rdma
;
118 struct list_head cc_rwctxts
;
122 static void svc_rdma_cc_init(struct svcxprt_rdma
*rdma
,
123 struct svc_rdma_chunk_ctxt
*cc
)
126 svc_xprt_get(&rdma
->sc_xprt
);
128 INIT_LIST_HEAD(&cc
->cc_rwctxts
);
132 static void svc_rdma_cc_release(struct svc_rdma_chunk_ctxt
*cc
,
133 enum dma_data_direction dir
)
135 struct svcxprt_rdma
*rdma
= cc
->cc_rdma
;
136 struct svc_rdma_rw_ctxt
*ctxt
;
138 while ((ctxt
= svc_rdma_next_ctxt(&cc
->cc_rwctxts
)) != NULL
) {
139 list_del(&ctxt
->rw_list
);
141 rdma_rw_ctx_destroy(&ctxt
->rw_ctx
, rdma
->sc_qp
,
142 rdma
->sc_port_num
, ctxt
->rw_sg_table
.sgl
,
143 ctxt
->rw_nents
, dir
);
144 svc_rdma_put_rw_ctxt(rdma
, ctxt
);
146 svc_xprt_put(&rdma
->sc_xprt
);
149 /* State for sending a Write or Reply chunk.
150 * - Tracks progress of writing one chunk over all its segments
151 * - Stores arguments for the SGL constructor functions
153 struct svc_rdma_write_info
{
154 /* write state of this chunk */
155 unsigned int wi_seg_off
;
156 unsigned int wi_seg_no
;
157 unsigned int wi_nsegs
;
160 /* SGL constructor arguments */
161 struct xdr_buf
*wi_xdr
;
162 unsigned char *wi_base
;
163 unsigned int wi_next_off
;
165 struct svc_rdma_chunk_ctxt wi_cc
;
168 static struct svc_rdma_write_info
*
169 svc_rdma_write_info_alloc(struct svcxprt_rdma
*rdma
, __be32
*chunk
)
171 struct svc_rdma_write_info
*info
;
173 info
= kmalloc(sizeof(*info
), GFP_KERNEL
);
177 info
->wi_seg_off
= 0;
179 info
->wi_nsegs
= be32_to_cpup(++chunk
);
180 info
->wi_segs
= ++chunk
;
181 svc_rdma_cc_init(rdma
, &info
->wi_cc
);
182 info
->wi_cc
.cc_cqe
.done
= svc_rdma_write_done
;
186 static void svc_rdma_write_info_free(struct svc_rdma_write_info
*info
)
188 svc_rdma_cc_release(&info
->wi_cc
, DMA_TO_DEVICE
);
193 * svc_rdma_write_done - Write chunk completion
194 * @cq: controlling Completion Queue
195 * @wc: Work Completion
197 * Pages under I/O are freed by a subsequent Send completion.
199 static void svc_rdma_write_done(struct ib_cq
*cq
, struct ib_wc
*wc
)
201 struct ib_cqe
*cqe
= wc
->wr_cqe
;
202 struct svc_rdma_chunk_ctxt
*cc
=
203 container_of(cqe
, struct svc_rdma_chunk_ctxt
, cc_cqe
);
204 struct svcxprt_rdma
*rdma
= cc
->cc_rdma
;
205 struct svc_rdma_write_info
*info
=
206 container_of(cc
, struct svc_rdma_write_info
, wi_cc
);
208 atomic_add(cc
->cc_sqecount
, &rdma
->sc_sq_avail
);
209 wake_up(&rdma
->sc_send_wait
);
211 if (unlikely(wc
->status
!= IB_WC_SUCCESS
)) {
212 set_bit(XPT_CLOSE
, &rdma
->sc_xprt
.xpt_flags
);
213 if (wc
->status
!= IB_WC_WR_FLUSH_ERR
)
214 pr_err("svcrdma: write ctx: %s (%u/0x%x)\n",
215 ib_wc_status_msg(wc
->status
),
216 wc
->status
, wc
->vendor_err
);
219 svc_rdma_write_info_free(info
);
222 /* State for pulling a Read chunk.
224 struct svc_rdma_read_info
{
225 struct svc_rdma_op_ctxt
*ri_readctxt
;
226 unsigned int ri_position
;
227 unsigned int ri_pageno
;
228 unsigned int ri_pageoff
;
229 unsigned int ri_chunklen
;
231 struct svc_rdma_chunk_ctxt ri_cc
;
234 static struct svc_rdma_read_info
*
235 svc_rdma_read_info_alloc(struct svcxprt_rdma
*rdma
)
237 struct svc_rdma_read_info
*info
;
239 info
= kmalloc(sizeof(*info
), GFP_KERNEL
);
243 svc_rdma_cc_init(rdma
, &info
->ri_cc
);
244 info
->ri_cc
.cc_cqe
.done
= svc_rdma_wc_read_done
;
248 static void svc_rdma_read_info_free(struct svc_rdma_read_info
*info
)
250 svc_rdma_cc_release(&info
->ri_cc
, DMA_FROM_DEVICE
);
255 * svc_rdma_wc_read_done - Handle completion of an RDMA Read ctx
256 * @cq: controlling Completion Queue
257 * @wc: Work Completion
260 static void svc_rdma_wc_read_done(struct ib_cq
*cq
, struct ib_wc
*wc
)
262 struct ib_cqe
*cqe
= wc
->wr_cqe
;
263 struct svc_rdma_chunk_ctxt
*cc
=
264 container_of(cqe
, struct svc_rdma_chunk_ctxt
, cc_cqe
);
265 struct svcxprt_rdma
*rdma
= cc
->cc_rdma
;
266 struct svc_rdma_read_info
*info
=
267 container_of(cc
, struct svc_rdma_read_info
, ri_cc
);
269 atomic_add(cc
->cc_sqecount
, &rdma
->sc_sq_avail
);
270 wake_up(&rdma
->sc_send_wait
);
272 if (unlikely(wc
->status
!= IB_WC_SUCCESS
)) {
273 set_bit(XPT_CLOSE
, &rdma
->sc_xprt
.xpt_flags
);
274 if (wc
->status
!= IB_WC_WR_FLUSH_ERR
)
275 pr_err("svcrdma: read ctx: %s (%u/0x%x)\n",
276 ib_wc_status_msg(wc
->status
),
277 wc
->status
, wc
->vendor_err
);
278 svc_rdma_put_context(info
->ri_readctxt
, 1);
280 spin_lock(&rdma
->sc_rq_dto_lock
);
281 list_add_tail(&info
->ri_readctxt
->list
,
282 &rdma
->sc_read_complete_q
);
283 spin_unlock(&rdma
->sc_rq_dto_lock
);
285 set_bit(XPT_DATA
, &rdma
->sc_xprt
.xpt_flags
);
286 svc_xprt_enqueue(&rdma
->sc_xprt
);
289 svc_rdma_read_info_free(info
);
292 /* This function sleeps when the transport's Send Queue is congested.
295 * - If ib_post_send() succeeds, only one completion is expected,
296 * even if one or more WRs are flushed. This is true when posting
297 * an rdma_rw_ctx or when posting a single signaled WR.
299 static int svc_rdma_post_chunk_ctxt(struct svc_rdma_chunk_ctxt
*cc
)
301 struct svcxprt_rdma
*rdma
= cc
->cc_rdma
;
302 struct svc_xprt
*xprt
= &rdma
->sc_xprt
;
303 struct ib_send_wr
*first_wr
, *bad_wr
;
304 struct list_head
*tmp
;
308 if (cc
->cc_sqecount
> rdma
->sc_sq_depth
)
313 list_for_each(tmp
, &cc
->cc_rwctxts
) {
314 struct svc_rdma_rw_ctxt
*ctxt
;
316 ctxt
= list_entry(tmp
, struct svc_rdma_rw_ctxt
, rw_list
);
317 first_wr
= rdma_rw_ctx_wrs(&ctxt
->rw_ctx
, rdma
->sc_qp
,
318 rdma
->sc_port_num
, cqe
, first_wr
);
323 if (atomic_sub_return(cc
->cc_sqecount
,
324 &rdma
->sc_sq_avail
) > 0) {
325 ret
= ib_post_send(rdma
->sc_qp
, first_wr
, &bad_wr
);
331 atomic_inc(&rdma_stat_sq_starve
);
332 atomic_add(cc
->cc_sqecount
, &rdma
->sc_sq_avail
);
333 wait_event(rdma
->sc_send_wait
,
334 atomic_read(&rdma
->sc_sq_avail
) > cc
->cc_sqecount
);
337 pr_err("svcrdma: ib_post_send failed (%d)\n", ret
);
338 set_bit(XPT_CLOSE
, &xprt
->xpt_flags
);
340 /* If even one was posted, there will be a completion. */
341 if (bad_wr
!= first_wr
)
344 atomic_add(cc
->cc_sqecount
, &rdma
->sc_sq_avail
);
345 wake_up(&rdma
->sc_send_wait
);
349 /* Build and DMA-map an SGL that covers one kvec in an xdr_buf
351 static void svc_rdma_vec_to_sg(struct svc_rdma_write_info
*info
,
353 struct svc_rdma_rw_ctxt
*ctxt
)
355 struct scatterlist
*sg
= ctxt
->rw_sg_table
.sgl
;
357 sg_set_buf(&sg
[0], info
->wi_base
, len
);
358 info
->wi_base
+= len
;
363 /* Build and DMA-map an SGL that covers part of an xdr_buf's pagelist.
365 static void svc_rdma_pagelist_to_sg(struct svc_rdma_write_info
*info
,
366 unsigned int remaining
,
367 struct svc_rdma_rw_ctxt
*ctxt
)
369 unsigned int sge_no
, sge_bytes
, page_off
, page_no
;
370 struct xdr_buf
*xdr
= info
->wi_xdr
;
371 struct scatterlist
*sg
;
374 page_off
= info
->wi_next_off
+ xdr
->page_base
;
375 page_no
= page_off
>> PAGE_SHIFT
;
376 page_off
= offset_in_page(page_off
);
377 page
= xdr
->pages
+ page_no
;
378 info
->wi_next_off
+= remaining
;
379 sg
= ctxt
->rw_sg_table
.sgl
;
382 sge_bytes
= min_t(unsigned int, remaining
,
383 PAGE_SIZE
- page_off
);
384 sg_set_page(sg
, *page
, sge_bytes
, page_off
);
386 remaining
-= sge_bytes
;
393 ctxt
->rw_nents
= sge_no
;
396 /* Construct RDMA Write WRs to send a portion of an xdr_buf containing
400 svc_rdma_build_writes(struct svc_rdma_write_info
*info
,
401 void (*constructor
)(struct svc_rdma_write_info
*info
,
403 struct svc_rdma_rw_ctxt
*ctxt
),
404 unsigned int remaining
)
406 struct svc_rdma_chunk_ctxt
*cc
= &info
->wi_cc
;
407 struct svcxprt_rdma
*rdma
= cc
->cc_rdma
;
408 struct svc_rdma_rw_ctxt
*ctxt
;
412 seg
= info
->wi_segs
+ info
->wi_seg_no
* rpcrdma_segment_maxsz
;
414 unsigned int write_len
;
415 u32 seg_length
, seg_handle
;
418 if (info
->wi_seg_no
>= info
->wi_nsegs
)
421 seg_handle
= be32_to_cpup(seg
);
422 seg_length
= be32_to_cpup(seg
+ 1);
423 xdr_decode_hyper(seg
+ 2, &seg_offset
);
424 seg_offset
+= info
->wi_seg_off
;
426 write_len
= min(remaining
, seg_length
- info
->wi_seg_off
);
427 ctxt
= svc_rdma_get_rw_ctxt(rdma
,
428 (write_len
>> PAGE_SHIFT
) + 2);
432 constructor(info
, write_len
, ctxt
);
433 ret
= rdma_rw_ctx_init(&ctxt
->rw_ctx
, rdma
->sc_qp
,
434 rdma
->sc_port_num
, ctxt
->rw_sg_table
.sgl
,
435 ctxt
->rw_nents
, 0, seg_offset
,
436 seg_handle
, DMA_TO_DEVICE
);
440 list_add(&ctxt
->rw_list
, &cc
->cc_rwctxts
);
441 cc
->cc_sqecount
+= ret
;
442 if (write_len
== seg_length
- info
->wi_seg_off
) {
445 info
->wi_seg_off
= 0;
447 info
->wi_seg_off
+= write_len
;
449 remaining
-= write_len
;
455 dprintk("svcrdma: inadequate space in Write chunk (%u)\n",
460 dprintk("svcrdma: no R/W ctxs available\n");
464 svc_rdma_put_rw_ctxt(rdma
, ctxt
);
465 pr_err("svcrdma: failed to map pagelist (%d)\n", ret
);
469 /* Send one of an xdr_buf's kvecs by itself. To send a Reply
470 * chunk, the whole RPC Reply is written back to the client.
471 * This function writes either the head or tail of the xdr_buf
472 * containing the Reply.
474 static int svc_rdma_send_xdr_kvec(struct svc_rdma_write_info
*info
,
477 info
->wi_base
= vec
->iov_base
;
478 return svc_rdma_build_writes(info
, svc_rdma_vec_to_sg
,
482 /* Send an xdr_buf's page list by itself. A Write chunk is
483 * just the page list. a Reply chunk is the head, page list,
484 * and tail. This function is shared between the two types
487 static int svc_rdma_send_xdr_pagelist(struct svc_rdma_write_info
*info
,
491 info
->wi_next_off
= 0;
492 return svc_rdma_build_writes(info
, svc_rdma_pagelist_to_sg
,
497 * svc_rdma_send_write_chunk - Write all segments in a Write chunk
498 * @rdma: controlling RDMA transport
499 * @wr_ch: Write chunk provided by client
500 * @xdr: xdr_buf containing the data payload
502 * Returns a non-negative number of bytes the chunk consumed, or
503 * %-E2BIG if the payload was larger than the Write chunk,
504 * %-EINVAL if client provided too many segments,
505 * %-ENOMEM if rdma_rw context pool was exhausted,
506 * %-ENOTCONN if posting failed (connection is lost),
507 * %-EIO if rdma_rw initialization failed (DMA mapping, etc).
509 int svc_rdma_send_write_chunk(struct svcxprt_rdma
*rdma
, __be32
*wr_ch
,
512 struct svc_rdma_write_info
*info
;
518 info
= svc_rdma_write_info_alloc(rdma
, wr_ch
);
522 ret
= svc_rdma_send_xdr_pagelist(info
, xdr
);
526 ret
= svc_rdma_post_chunk_ctxt(&info
->wi_cc
);
529 return xdr
->page_len
;
532 svc_rdma_write_info_free(info
);
537 * svc_rdma_send_reply_chunk - Write all segments in the Reply chunk
538 * @rdma: controlling RDMA transport
539 * @rp_ch: Reply chunk provided by client
540 * @writelist: true if client provided a Write list
541 * @xdr: xdr_buf containing an RPC Reply
543 * Returns a non-negative number of bytes the chunk consumed, or
544 * %-E2BIG if the payload was larger than the Reply chunk,
545 * %-EINVAL if client provided too many segments,
546 * %-ENOMEM if rdma_rw context pool was exhausted,
547 * %-ENOTCONN if posting failed (connection is lost),
548 * %-EIO if rdma_rw initialization failed (DMA mapping, etc).
550 int svc_rdma_send_reply_chunk(struct svcxprt_rdma
*rdma
, __be32
*rp_ch
,
551 bool writelist
, struct xdr_buf
*xdr
)
553 struct svc_rdma_write_info
*info
;
556 info
= svc_rdma_write_info_alloc(rdma
, rp_ch
);
560 ret
= svc_rdma_send_xdr_kvec(info
, &xdr
->head
[0]);
563 consumed
= xdr
->head
[0].iov_len
;
565 /* Send the page list in the Reply chunk only if the
566 * client did not provide Write chunks.
568 if (!writelist
&& xdr
->page_len
) {
569 ret
= svc_rdma_send_xdr_pagelist(info
, xdr
);
572 consumed
+= xdr
->page_len
;
575 if (xdr
->tail
[0].iov_len
) {
576 ret
= svc_rdma_send_xdr_kvec(info
, &xdr
->tail
[0]);
579 consumed
+= xdr
->tail
[0].iov_len
;
582 ret
= svc_rdma_post_chunk_ctxt(&info
->wi_cc
);
588 svc_rdma_write_info_free(info
);
592 static int svc_rdma_build_read_segment(struct svc_rdma_read_info
*info
,
593 struct svc_rqst
*rqstp
,
594 u32 rkey
, u32 len
, u64 offset
)
596 struct svc_rdma_op_ctxt
*head
= info
->ri_readctxt
;
597 struct svc_rdma_chunk_ctxt
*cc
= &info
->ri_cc
;
598 struct svc_rdma_rw_ctxt
*ctxt
;
599 unsigned int sge_no
, seg_len
;
600 struct scatterlist
*sg
;
603 sge_no
= PAGE_ALIGN(info
->ri_pageoff
+ len
) >> PAGE_SHIFT
;
604 ctxt
= svc_rdma_get_rw_ctxt(cc
->cc_rdma
, sge_no
);
607 ctxt
->rw_nents
= sge_no
;
609 dprintk("svcrdma: reading segment %u@0x%016llx:0x%08x (%u sges)\n",
610 len
, offset
, rkey
, sge_no
);
612 sg
= ctxt
->rw_sg_table
.sgl
;
613 for (sge_no
= 0; sge_no
< ctxt
->rw_nents
; sge_no
++) {
614 seg_len
= min_t(unsigned int, len
,
615 PAGE_SIZE
- info
->ri_pageoff
);
617 head
->arg
.pages
[info
->ri_pageno
] =
618 rqstp
->rq_pages
[info
->ri_pageno
];
619 if (!info
->ri_pageoff
)
622 sg_set_page(sg
, rqstp
->rq_pages
[info
->ri_pageno
],
623 seg_len
, info
->ri_pageoff
);
626 info
->ri_pageoff
+= seg_len
;
627 if (info
->ri_pageoff
== PAGE_SIZE
) {
629 info
->ri_pageoff
= 0;
635 &rqstp
->rq_pages
[info
->ri_pageno
+ 1] > rqstp
->rq_page_end
)
639 ret
= rdma_rw_ctx_init(&ctxt
->rw_ctx
, cc
->cc_rdma
->sc_qp
,
640 cc
->cc_rdma
->sc_port_num
,
641 ctxt
->rw_sg_table
.sgl
, ctxt
->rw_nents
,
642 0, offset
, rkey
, DMA_FROM_DEVICE
);
646 list_add(&ctxt
->rw_list
, &cc
->cc_rwctxts
);
647 cc
->cc_sqecount
+= ret
;
651 dprintk("svcrdma: no R/W ctxs available\n");
655 dprintk("svcrdma: request overruns rq_pages\n");
659 svc_rdma_put_rw_ctxt(cc
->cc_rdma
, ctxt
);
660 pr_err("svcrdma: failed to map pagelist (%d)\n", ret
);
664 /* Walk the segments in the Read chunk starting at @p and construct
665 * RDMA Read operations to pull the chunk to the server.
667 static int svc_rdma_build_read_chunk(struct svc_rqst
*rqstp
,
668 struct svc_rdma_read_info
*info
,
674 info
->ri_chunklen
= 0;
675 while (*p
++ != xdr_zero
&& be32_to_cpup(p
++) == info
->ri_position
) {
676 u32 rs_handle
, rs_length
;
679 rs_handle
= be32_to_cpup(p
++);
680 rs_length
= be32_to_cpup(p
++);
681 p
= xdr_decode_hyper(p
, &rs_offset
);
683 ret
= svc_rdma_build_read_segment(info
, rqstp
,
684 rs_handle
, rs_length
,
689 info
->ri_chunklen
+= rs_length
;
695 /* Construct RDMA Reads to pull over a normal Read chunk. The chunk
696 * data lands in the page list of head->arg.pages.
698 * Currently NFSD does not look at the head->arg.tail[0] iovec.
699 * Therefore, XDR round-up of the Read chunk and trailing
700 * inline content must both be added at the end of the pagelist.
702 static int svc_rdma_build_normal_read_chunk(struct svc_rqst
*rqstp
,
703 struct svc_rdma_read_info
*info
,
706 struct svc_rdma_op_ctxt
*head
= info
->ri_readctxt
;
709 dprintk("svcrdma: Reading Read chunk at position %u\n",
712 info
->ri_pageno
= head
->hdr_count
;
713 info
->ri_pageoff
= 0;
715 ret
= svc_rdma_build_read_chunk(rqstp
, info
, p
);
719 /* Split the Receive buffer between the head and tail
720 * buffers at Read chunk's position. XDR roundup of the
721 * chunk is not included in either the pagelist or in
724 head
->arg
.tail
[0].iov_base
=
725 head
->arg
.head
[0].iov_base
+ info
->ri_position
;
726 head
->arg
.tail
[0].iov_len
=
727 head
->arg
.head
[0].iov_len
- info
->ri_position
;
728 head
->arg
.head
[0].iov_len
= info
->ri_position
;
730 /* Read chunk may need XDR roundup (see RFC 8166, s. 3.4.5.2).
732 * If the client already rounded up the chunk length, the
733 * length does not change. Otherwise, the length of the page
734 * list is increased to include XDR round-up.
736 * Currently these chunks always start at page offset 0,
737 * thus the rounded-up length never crosses a page boundary.
739 info
->ri_chunklen
= XDR_QUADLEN(info
->ri_chunklen
) << 2;
741 head
->arg
.page_len
= info
->ri_chunklen
;
742 head
->arg
.len
+= info
->ri_chunklen
;
743 head
->arg
.buflen
+= info
->ri_chunklen
;
749 /* Construct RDMA Reads to pull over a Position Zero Read chunk.
750 * The start of the data lands in the first page just after
751 * the Transport header, and the rest lands in the page list of
755 * - A PZRC has an XDR-aligned length (no implicit round-up).
756 * - There can be no trailing inline content (IOW, we assume
757 * a PZRC is never sent in an RDMA_MSG message, though it's
760 static int svc_rdma_build_pz_read_chunk(struct svc_rqst
*rqstp
,
761 struct svc_rdma_read_info
*info
,
764 struct svc_rdma_op_ctxt
*head
= info
->ri_readctxt
;
767 dprintk("svcrdma: Reading Position Zero Read chunk\n");
769 info
->ri_pageno
= head
->hdr_count
- 1;
770 info
->ri_pageoff
= offset_in_page(head
->byte_len
);
772 ret
= svc_rdma_build_read_chunk(rqstp
, info
, p
);
776 head
->arg
.len
+= info
->ri_chunklen
;
777 head
->arg
.buflen
+= info
->ri_chunklen
;
779 if (head
->arg
.buflen
<= head
->sge
[0].length
) {
780 /* Transport header and RPC message fit entirely
781 * in page where head iovec resides.
783 head
->arg
.head
[0].iov_len
= info
->ri_chunklen
;
785 /* Transport header and part of RPC message reside
786 * in the head iovec's page.
788 head
->arg
.head
[0].iov_len
=
789 head
->sge
[0].length
- head
->byte_len
;
791 info
->ri_chunklen
- head
->arg
.head
[0].iov_len
;
799 * svc_rdma_recv_read_chunk - Pull a Read chunk from the client
800 * @rdma: controlling RDMA transport
801 * @rqstp: set of pages to use as Read sink buffers
802 * @head: pages under I/O collect here
803 * @p: pointer to start of Read chunk
806 * %0 if all needed RDMA Reads were posted successfully,
807 * %-EINVAL if client provided too many segments,
808 * %-ENOMEM if rdma_rw context pool was exhausted,
809 * %-ENOTCONN if posting failed (connection is lost),
810 * %-EIO if rdma_rw initialization failed (DMA mapping, etc).
813 * - All Read segments in @p have the same Position value.
815 int svc_rdma_recv_read_chunk(struct svcxprt_rdma
*rdma
, struct svc_rqst
*rqstp
,
816 struct svc_rdma_op_ctxt
*head
, __be32
*p
)
818 struct svc_rdma_read_info
*info
;
822 /* The request (with page list) is constructed in
823 * head->arg. Pages involved with RDMA Read I/O are
826 head
->hdr_count
= head
->count
;
827 head
->arg
.head
[0] = rqstp
->rq_arg
.head
[0];
828 head
->arg
.tail
[0] = rqstp
->rq_arg
.tail
[0];
829 head
->arg
.pages
= head
->pages
;
830 head
->arg
.page_base
= 0;
831 head
->arg
.page_len
= 0;
832 head
->arg
.len
= rqstp
->rq_arg
.len
;
833 head
->arg
.buflen
= rqstp
->rq_arg
.buflen
;
835 info
= svc_rdma_read_info_alloc(rdma
);
838 info
->ri_readctxt
= head
;
840 info
->ri_position
= be32_to_cpup(p
+ 1);
841 if (info
->ri_position
)
842 ret
= svc_rdma_build_normal_read_chunk(rqstp
, info
, p
);
844 ret
= svc_rdma_build_pz_read_chunk(rqstp
, info
, p
);
846 /* Mark the start of the pages that can be used for the reply */
847 if (info
->ri_pageoff
> 0)
849 rqstp
->rq_respages
= &rqstp
->rq_pages
[info
->ri_pageno
];
850 rqstp
->rq_next_page
= rqstp
->rq_respages
+ 1;
855 ret
= svc_rdma_post_chunk_ctxt(&info
->ri_cc
);
858 /* Read sink pages have been moved from rqstp->rq_pages to
859 * head->arg.pages. Force svc_recv to refill those slots
862 for (page
= rqstp
->rq_pages
; page
< rqstp
->rq_respages
; page
++)
866 svc_rdma_read_info_free(info
);