2 * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
4 * This software is available to you under a choice of one of two
5 * licenses. You may choose to be licensed under the terms of the GNU
6 * General Public License (GPL) Version 2, available from the file
7 * COPYING in the main directory of this source tree, or the BSD-type
10 * Redistribution and use in source and binary forms, with or without
11 * modification, are permitted provided that the following conditions
14 * Redistributions of source code must retain the above copyright
15 * notice, this list of conditions and the following disclaimer.
17 * Redistributions in binary form must reproduce the above
18 * copyright notice, this list of conditions and the following
19 * disclaimer in the documentation and/or other materials provided
20 * with the distribution.
22 * Neither the name of the Network Appliance, Inc. nor the names of
23 * its contributors may be used to endorse or promote products
24 * derived from this software without specific prior written
27 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
28 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
29 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
30 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
31 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
32 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
33 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
34 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
35 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
36 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
37 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
43 * This file contains the guts of the RPC RDMA protocol, and
44 * does marshaling/unmarshaling, etc. It is also where interfacing
45 * to the Linux RPC framework lives.
48 #include "xprt_rdma.h"
50 #include <linux/highmem.h>
52 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
53 # define RPCDBG_FACILITY RPCDBG_TRANS
56 static const char transfertypes
[][12] = {
57 "inline", /* no chunks */
58 "read list", /* some argument via rdma read */
59 "*read list", /* entire request via rdma read */
60 "write list", /* some result via rdma write */
61 "reply chunk" /* entire reply via rdma write */
64 /* Returns size of largest RPC-over-RDMA header in a Call message
66 * The largest Call header contains a full-size Read list and a
67 * minimal Reply chunk.
69 static unsigned int rpcrdma_max_call_header_size(unsigned int maxsegs
)
73 /* Fixed header fields and list discriminators */
74 size
= RPCRDMA_HDRLEN_MIN
;
76 /* Maximum Read list size */
77 maxsegs
+= 2; /* segment for head and tail buffers */
78 size
= maxsegs
* sizeof(struct rpcrdma_read_chunk
);
80 /* Minimal Read chunk size */
81 size
+= sizeof(__be32
); /* segment count */
82 size
+= sizeof(struct rpcrdma_segment
);
83 size
+= sizeof(__be32
); /* list discriminator */
85 dprintk("RPC: %s: max call header size = %u\n",
90 /* Returns size of largest RPC-over-RDMA header in a Reply message
92 * There is only one Write list or one Reply chunk per Reply
93 * message. The larger list is the Write list.
95 static unsigned int rpcrdma_max_reply_header_size(unsigned int maxsegs
)
99 /* Fixed header fields and list discriminators */
100 size
= RPCRDMA_HDRLEN_MIN
;
102 /* Maximum Write list size */
103 maxsegs
+= 2; /* segment for head and tail buffers */
104 size
= sizeof(__be32
); /* segment count */
105 size
+= maxsegs
* sizeof(struct rpcrdma_segment
);
106 size
+= sizeof(__be32
); /* list discriminator */
108 dprintk("RPC: %s: max reply header size = %u\n",
113 void rpcrdma_set_max_header_sizes(struct rpcrdma_xprt
*r_xprt
)
115 struct rpcrdma_create_data_internal
*cdata
= &r_xprt
->rx_data
;
116 struct rpcrdma_ia
*ia
= &r_xprt
->rx_ia
;
117 unsigned int maxsegs
= ia
->ri_max_segs
;
119 ia
->ri_max_inline_write
= cdata
->inline_wsize
-
120 rpcrdma_max_call_header_size(maxsegs
);
121 ia
->ri_max_inline_read
= cdata
->inline_rsize
-
122 rpcrdma_max_reply_header_size(maxsegs
);
125 /* The client can send a request inline as long as the RPCRDMA header
126 * plus the RPC call fit under the transport's inline limit. If the
127 * combined call message size exceeds that limit, the client must use
128 * the read chunk list for this operation.
130 static bool rpcrdma_args_inline(struct rpcrdma_xprt
*r_xprt
,
131 struct rpc_rqst
*rqst
)
133 struct rpcrdma_ia
*ia
= &r_xprt
->rx_ia
;
135 return rqst
->rq_snd_buf
.len
<= ia
->ri_max_inline_write
;
138 /* The client can't know how large the actual reply will be. Thus it
139 * plans for the largest possible reply for that particular ULP
140 * operation. If the maximum combined reply message size exceeds that
141 * limit, the client must provide a write list or a reply chunk for
144 static bool rpcrdma_results_inline(struct rpcrdma_xprt
*r_xprt
,
145 struct rpc_rqst
*rqst
)
147 struct rpcrdma_ia
*ia
= &r_xprt
->rx_ia
;
149 return rqst
->rq_rcv_buf
.buflen
<= ia
->ri_max_inline_read
;
152 /* Split "vec" on page boundaries into segments. FMR registers pages,
153 * not a byte range. Other modes coalesce these segments into a single
157 rpcrdma_convert_kvec(struct kvec
*vec
, struct rpcrdma_mr_seg
*seg
, int n
)
163 base
= vec
->iov_base
;
164 page_offset
= offset_in_page(base
);
165 remaining
= vec
->iov_len
;
166 while (remaining
&& n
< RPCRDMA_MAX_SEGS
) {
167 seg
[n
].mr_page
= NULL
;
168 seg
[n
].mr_offset
= base
;
169 seg
[n
].mr_len
= min_t(u32
, PAGE_SIZE
- page_offset
, remaining
);
170 remaining
-= seg
[n
].mr_len
;
171 base
+= seg
[n
].mr_len
;
179 * Chunk assembly from upper layer xdr_buf.
181 * Prepare the passed-in xdr_buf into representation as RPC/RDMA chunk
182 * elements. Segments are then coalesced when registered, if possible
183 * within the selected memreg mode.
185 * Returns positive number of segments converted, or a negative errno.
189 rpcrdma_convert_iovs(struct xdr_buf
*xdrbuf
, unsigned int pos
,
190 enum rpcrdma_chunktype type
, struct rpcrdma_mr_seg
*seg
,
191 bool reminv_expected
)
193 int len
, n
, p
, page_base
;
194 struct page
**ppages
;
198 n
= rpcrdma_convert_kvec(&xdrbuf
->head
[0], seg
, n
);
199 if (n
== RPCRDMA_MAX_SEGS
)
203 len
= xdrbuf
->page_len
;
204 ppages
= xdrbuf
->pages
+ (xdrbuf
->page_base
>> PAGE_SHIFT
);
205 page_base
= xdrbuf
->page_base
& ~PAGE_MASK
;
207 while (len
&& n
< RPCRDMA_MAX_SEGS
) {
209 /* alloc the pagelist for receiving buffer */
210 ppages
[p
] = alloc_page(GFP_ATOMIC
);
214 seg
[n
].mr_page
= ppages
[p
];
215 seg
[n
].mr_offset
= (void *)(unsigned long) page_base
;
216 seg
[n
].mr_len
= min_t(u32
, PAGE_SIZE
- page_base
, len
);
217 if (seg
[n
].mr_len
> PAGE_SIZE
)
219 len
-= seg
[n
].mr_len
;
222 page_base
= 0; /* page offset only applies to first page */
225 /* Message overflows the seg array */
226 if (len
&& n
== RPCRDMA_MAX_SEGS
)
229 /* When encoding the read list, the tail is always sent inline */
230 if (type
== rpcrdma_readch
)
233 /* When encoding the Write list, some servers need to see an extra
234 * segment for odd-length Write chunks. The upper layer provides
235 * space in the tail iovec for this purpose.
237 if (type
== rpcrdma_writech
&& reminv_expected
)
240 if (xdrbuf
->tail
[0].iov_len
) {
241 /* the rpcrdma protocol allows us to omit any trailing
242 * xdr pad bytes, saving the server an RDMA operation. */
243 if (xdrbuf
->tail
[0].iov_len
< 4 && xprt_rdma_pad_optimize
)
245 n
= rpcrdma_convert_kvec(&xdrbuf
->tail
[0], seg
, n
);
246 if (n
== RPCRDMA_MAX_SEGS
)
253 pr_err("rpcrdma: segment array overflow\n");
257 static inline __be32
*
258 xdr_encode_rdma_segment(__be32
*iptr
, struct rpcrdma_mw
*mw
)
260 *iptr
++ = cpu_to_be32(mw
->mw_handle
);
261 *iptr
++ = cpu_to_be32(mw
->mw_length
);
262 return xdr_encode_hyper(iptr
, mw
->mw_offset
);
265 /* XDR-encode the Read list. Supports encoding a list of read
266 * segments that belong to a single read chunk.
268 * Encoding key for single-list chunks (HLOO = Handle32 Length32 Offset64):
270 * Read chunklist (a linked list):
271 * N elements, position P (same P for all chunks of same arg!):
272 * 1 - PHLOO - 1 - PHLOO - ... - 1 - PHLOO - 0
274 * Returns a pointer to the XDR word in the RDMA header following
275 * the end of the Read list, or an error pointer.
278 rpcrdma_encode_read_list(struct rpcrdma_xprt
*r_xprt
,
279 struct rpcrdma_req
*req
, struct rpc_rqst
*rqst
,
280 __be32
*iptr
, enum rpcrdma_chunktype rtype
)
282 struct rpcrdma_mr_seg
*seg
;
283 struct rpcrdma_mw
*mw
;
287 if (rtype
== rpcrdma_noch
) {
288 *iptr
++ = xdr_zero
; /* item not present */
292 pos
= rqst
->rq_snd_buf
.head
[0].iov_len
;
293 if (rtype
== rpcrdma_areadch
)
295 seg
= req
->rl_segments
;
296 nsegs
= rpcrdma_convert_iovs(&rqst
->rq_snd_buf
, pos
, rtype
, seg
, false);
298 return ERR_PTR(nsegs
);
301 n
= r_xprt
->rx_ia
.ri_ops
->ro_map(r_xprt
, seg
, nsegs
,
305 list_add(&mw
->mw_list
, &req
->rl_registered
);
307 *iptr
++ = xdr_one
; /* item present */
309 /* All read segments in this chunk
310 * have the same "position".
312 *iptr
++ = cpu_to_be32(pos
);
313 iptr
= xdr_encode_rdma_segment(iptr
, mw
);
315 dprintk("RPC: %5u %s: pos %u %u@0x%016llx:0x%08x (%s)\n",
316 rqst
->rq_task
->tk_pid
, __func__
, pos
,
317 mw
->mw_length
, (unsigned long long)mw
->mw_offset
,
318 mw
->mw_handle
, n
< nsegs
? "more" : "last");
320 r_xprt
->rx_stats
.read_chunk_count
++;
325 /* Finish Read list */
326 *iptr
++ = xdr_zero
; /* Next item not present */
330 /* XDR-encode the Write list. Supports encoding a list containing
331 * one array of plain segments that belong to a single write chunk.
333 * Encoding key for single-list chunks (HLOO = Handle32 Length32 Offset64):
335 * Write chunklist (a list of (one) counted array):
337 * 1 - N - HLOO - HLOO - ... - HLOO - 0
339 * Returns a pointer to the XDR word in the RDMA header following
340 * the end of the Write list, or an error pointer.
343 rpcrdma_encode_write_list(struct rpcrdma_xprt
*r_xprt
, struct rpcrdma_req
*req
,
344 struct rpc_rqst
*rqst
, __be32
*iptr
,
345 enum rpcrdma_chunktype wtype
)
347 struct rpcrdma_mr_seg
*seg
;
348 struct rpcrdma_mw
*mw
;
349 int n
, nsegs
, nchunks
;
352 if (wtype
!= rpcrdma_writech
) {
353 *iptr
++ = xdr_zero
; /* no Write list present */
357 seg
= req
->rl_segments
;
358 nsegs
= rpcrdma_convert_iovs(&rqst
->rq_rcv_buf
,
359 rqst
->rq_rcv_buf
.head
[0].iov_len
,
361 r_xprt
->rx_ia
.ri_reminv_expected
);
363 return ERR_PTR(nsegs
);
365 *iptr
++ = xdr_one
; /* Write list present */
366 segcount
= iptr
++; /* save location of segment count */
370 n
= r_xprt
->rx_ia
.ri_ops
->ro_map(r_xprt
, seg
, nsegs
,
374 list_add(&mw
->mw_list
, &req
->rl_registered
);
376 iptr
= xdr_encode_rdma_segment(iptr
, mw
);
378 dprintk("RPC: %5u %s: %u@0x016%llx:0x%08x (%s)\n",
379 rqst
->rq_task
->tk_pid
, __func__
,
380 mw
->mw_length
, (unsigned long long)mw
->mw_offset
,
381 mw
->mw_handle
, n
< nsegs
? "more" : "last");
383 r_xprt
->rx_stats
.write_chunk_count
++;
384 r_xprt
->rx_stats
.total_rdma_request
+= seg
->mr_len
;
390 /* Update count of segments in this Write chunk */
391 *segcount
= cpu_to_be32(nchunks
);
393 /* Finish Write list */
394 *iptr
++ = xdr_zero
; /* Next item not present */
398 /* XDR-encode the Reply chunk. Supports encoding an array of plain
399 * segments that belong to a single write (reply) chunk.
401 * Encoding key for single-list chunks (HLOO = Handle32 Length32 Offset64):
403 * Reply chunk (a counted array):
405 * 1 - N - HLOO - HLOO - ... - HLOO
407 * Returns a pointer to the XDR word in the RDMA header following
408 * the end of the Reply chunk, or an error pointer.
411 rpcrdma_encode_reply_chunk(struct rpcrdma_xprt
*r_xprt
,
412 struct rpcrdma_req
*req
, struct rpc_rqst
*rqst
,
413 __be32
*iptr
, enum rpcrdma_chunktype wtype
)
415 struct rpcrdma_mr_seg
*seg
;
416 struct rpcrdma_mw
*mw
;
417 int n
, nsegs
, nchunks
;
420 if (wtype
!= rpcrdma_replych
) {
421 *iptr
++ = xdr_zero
; /* no Reply chunk present */
425 seg
= req
->rl_segments
;
426 nsegs
= rpcrdma_convert_iovs(&rqst
->rq_rcv_buf
, 0, wtype
, seg
,
427 r_xprt
->rx_ia
.ri_reminv_expected
);
429 return ERR_PTR(nsegs
);
431 *iptr
++ = xdr_one
; /* Reply chunk present */
432 segcount
= iptr
++; /* save location of segment count */
436 n
= r_xprt
->rx_ia
.ri_ops
->ro_map(r_xprt
, seg
, nsegs
,
440 list_add(&mw
->mw_list
, &req
->rl_registered
);
442 iptr
= xdr_encode_rdma_segment(iptr
, mw
);
444 dprintk("RPC: %5u %s: %u@0x%016llx:0x%08x (%s)\n",
445 rqst
->rq_task
->tk_pid
, __func__
,
446 mw
->mw_length
, (unsigned long long)mw
->mw_offset
,
447 mw
->mw_handle
, n
< nsegs
? "more" : "last");
449 r_xprt
->rx_stats
.reply_chunk_count
++;
450 r_xprt
->rx_stats
.total_rdma_request
+= seg
->mr_len
;
456 /* Update count of segments in the Reply chunk */
457 *segcount
= cpu_to_be32(nchunks
);
462 /* Prepare the RPC-over-RDMA header SGE.
465 rpcrdma_prepare_hdr_sge(struct rpcrdma_ia
*ia
, struct rpcrdma_req
*req
,
468 struct rpcrdma_regbuf
*rb
= req
->rl_rdmabuf
;
469 struct ib_sge
*sge
= &req
->rl_send_sge
[0];
471 if (unlikely(!rpcrdma_regbuf_is_mapped(rb
))) {
472 if (!__rpcrdma_dma_map_regbuf(ia
, rb
))
474 sge
->addr
= rdmab_addr(rb
);
475 sge
->lkey
= rdmab_lkey(rb
);
479 ib_dma_sync_single_for_device(ia
->ri_device
, sge
->addr
,
480 sge
->length
, DMA_TO_DEVICE
);
481 req
->rl_send_wr
.num_sge
++;
485 /* Prepare the Send SGEs. The head and tail iovec, and each entry
486 * in the page list, gets its own SGE.
489 rpcrdma_prepare_msg_sges(struct rpcrdma_ia
*ia
, struct rpcrdma_req
*req
,
490 struct xdr_buf
*xdr
, enum rpcrdma_chunktype rtype
)
492 unsigned int sge_no
, page_base
, len
, remaining
;
493 struct rpcrdma_regbuf
*rb
= req
->rl_sendbuf
;
494 struct ib_device
*device
= ia
->ri_device
;
495 struct ib_sge
*sge
= req
->rl_send_sge
;
496 u32 lkey
= ia
->ri_pd
->local_dma_lkey
;
497 struct page
*page
, **ppages
;
499 /* The head iovec is straightforward, as it is already
500 * DMA-mapped. Sync the content that has changed.
502 if (!rpcrdma_dma_map_regbuf(ia
, rb
))
505 sge
[sge_no
].addr
= rdmab_addr(rb
);
506 sge
[sge_no
].length
= xdr
->head
[0].iov_len
;
507 sge
[sge_no
].lkey
= rdmab_lkey(rb
);
508 ib_dma_sync_single_for_device(device
, sge
[sge_no
].addr
,
509 sge
[sge_no
].length
, DMA_TO_DEVICE
);
511 /* If there is a Read chunk, the page list is being handled
512 * via explicit RDMA, and thus is skipped here. However, the
513 * tail iovec may include an XDR pad for the page list, as
514 * well as additional content, and may not reside in the
515 * same page as the head iovec.
517 if (rtype
== rpcrdma_readch
) {
518 len
= xdr
->tail
[0].iov_len
;
520 /* Do not include the tail if it is only an XDR pad */
524 page
= virt_to_page(xdr
->tail
[0].iov_base
);
525 page_base
= (unsigned long)xdr
->tail
[0].iov_base
& ~PAGE_MASK
;
527 /* If the content in the page list is an odd length,
528 * xdr_write_pages() has added a pad at the beginning
529 * of the tail iovec. Force the tail's non-pad content
530 * to land at the next XDR position in the Send message.
532 page_base
+= len
& 3;
537 /* If there is a page list present, temporarily DMA map
538 * and prepare an SGE for each page to be sent.
541 ppages
= xdr
->pages
+ (xdr
->page_base
>> PAGE_SHIFT
);
542 page_base
= xdr
->page_base
& ~PAGE_MASK
;
543 remaining
= xdr
->page_len
;
546 if (sge_no
> RPCRDMA_MAX_SEND_SGES
- 2)
547 goto out_mapping_overflow
;
549 len
= min_t(u32
, PAGE_SIZE
- page_base
, remaining
);
550 sge
[sge_no
].addr
= ib_dma_map_page(device
, *ppages
,
553 if (ib_dma_mapping_error(device
, sge
[sge_no
].addr
))
554 goto out_mapping_err
;
555 sge
[sge_no
].length
= len
;
556 sge
[sge_no
].lkey
= lkey
;
558 req
->rl_mapped_sges
++;
565 /* The tail iovec is not always constructed in the same
566 * page where the head iovec resides (see, for example,
567 * gss_wrap_req_priv). To neatly accommodate that case,
568 * DMA map it separately.
570 if (xdr
->tail
[0].iov_len
) {
571 page
= virt_to_page(xdr
->tail
[0].iov_base
);
572 page_base
= (unsigned long)xdr
->tail
[0].iov_base
& ~PAGE_MASK
;
573 len
= xdr
->tail
[0].iov_len
;
577 sge
[sge_no
].addr
= ib_dma_map_page(device
, page
,
580 if (ib_dma_mapping_error(device
, sge
[sge_no
].addr
))
581 goto out_mapping_err
;
582 sge
[sge_no
].length
= len
;
583 sge
[sge_no
].lkey
= lkey
;
584 req
->rl_mapped_sges
++;
588 req
->rl_send_wr
.num_sge
= sge_no
+ 1;
591 out_mapping_overflow
:
592 pr_err("rpcrdma: too many Send SGEs (%u)\n", sge_no
);
596 pr_err("rpcrdma: Send mapping error\n");
601 rpcrdma_prepare_send_sges(struct rpcrdma_ia
*ia
, struct rpcrdma_req
*req
,
602 u32 hdrlen
, struct xdr_buf
*xdr
,
603 enum rpcrdma_chunktype rtype
)
605 req
->rl_send_wr
.num_sge
= 0;
606 req
->rl_mapped_sges
= 0;
608 if (!rpcrdma_prepare_hdr_sge(ia
, req
, hdrlen
))
611 if (rtype
!= rpcrdma_areadch
)
612 if (!rpcrdma_prepare_msg_sges(ia
, req
, xdr
, rtype
))
618 pr_err("rpcrdma: failed to DMA map a Send buffer\n");
623 rpcrdma_unmap_sges(struct rpcrdma_ia
*ia
, struct rpcrdma_req
*req
)
625 struct ib_device
*device
= ia
->ri_device
;
629 sge
= &req
->rl_send_sge
[2];
630 for (count
= req
->rl_mapped_sges
; count
--; sge
++)
631 ib_dma_unmap_page(device
, sge
->addr
, sge
->length
,
633 req
->rl_mapped_sges
= 0;
637 * Marshal a request: the primary job of this routine is to choose
638 * the transfer modes. See comments below.
640 * Returns zero on success, otherwise a negative errno.
644 rpcrdma_marshal_req(struct rpc_rqst
*rqst
)
646 struct rpc_xprt
*xprt
= rqst
->rq_xprt
;
647 struct rpcrdma_xprt
*r_xprt
= rpcx_to_rdmax(xprt
);
648 struct rpcrdma_req
*req
= rpcr_to_rdmar(rqst
);
649 enum rpcrdma_chunktype rtype
, wtype
;
650 struct rpcrdma_msg
*headerp
;
656 #if defined(CONFIG_SUNRPC_BACKCHANNEL)
657 if (test_bit(RPC_BC_PA_IN_USE
, &rqst
->rq_bc_pa_state
))
658 return rpcrdma_bc_marshal_reply(rqst
);
661 headerp
= rdmab_to_msg(req
->rl_rdmabuf
);
662 /* don't byte-swap XID, it's already done in request */
663 headerp
->rm_xid
= rqst
->rq_xid
;
664 headerp
->rm_vers
= rpcrdma_version
;
665 headerp
->rm_credit
= cpu_to_be32(r_xprt
->rx_buf
.rb_max_requests
);
666 headerp
->rm_type
= rdma_msg
;
668 /* When the ULP employs a GSS flavor that guarantees integrity
669 * or privacy, direct data placement of individual data items
672 ddp_allowed
= !(rqst
->rq_cred
->cr_auth
->au_flags
&
673 RPCAUTH_AUTH_DATATOUCH
);
676 * Chunks needed for results?
678 * o If the expected result is under the inline threshold, all ops
680 * o Large read ops return data as write chunk(s), header as
682 * o Large non-read ops return as a single reply chunk.
684 if (rpcrdma_results_inline(r_xprt
, rqst
))
685 wtype
= rpcrdma_noch
;
686 else if (ddp_allowed
&& rqst
->rq_rcv_buf
.flags
& XDRBUF_READ
)
687 wtype
= rpcrdma_writech
;
689 wtype
= rpcrdma_replych
;
692 * Chunks needed for arguments?
694 * o If the total request is under the inline threshold, all ops
695 * are sent as inline.
696 * o Large write ops transmit data as read chunk(s), header as
698 * o Large non-write ops are sent with the entire message as a
699 * single read chunk (protocol 0-position special case).
701 * This assumes that the upper layer does not present a request
702 * that both has a data payload, and whose non-data arguments
703 * by themselves are larger than the inline threshold.
705 if (rpcrdma_args_inline(r_xprt
, rqst
)) {
706 rtype
= rpcrdma_noch
;
707 rpclen
= rqst
->rq_snd_buf
.len
;
708 } else if (ddp_allowed
&& rqst
->rq_snd_buf
.flags
& XDRBUF_WRITE
) {
709 rtype
= rpcrdma_readch
;
710 rpclen
= rqst
->rq_snd_buf
.head
[0].iov_len
+
711 rqst
->rq_snd_buf
.tail
[0].iov_len
;
713 r_xprt
->rx_stats
.nomsg_call_count
++;
714 headerp
->rm_type
= htonl(RDMA_NOMSG
);
715 rtype
= rpcrdma_areadch
;
719 /* This implementation supports the following combinations
720 * of chunk lists in one RPC-over-RDMA Call message:
725 * - Read list + Reply chunk
727 * It might not yet support the following combinations:
729 * - Read list + Write list
731 * It does not support the following combinations:
733 * - Write list + Reply chunk
734 * - Read list + Write list + Reply chunk
736 * This implementation supports only a single chunk in each
737 * Read or Write list. Thus for example the client cannot
738 * send a Call message with a Position Zero Read chunk and a
739 * regular Read chunk at the same time.
741 iptr
= headerp
->rm_body
.rm_chunks
;
742 iptr
= rpcrdma_encode_read_list(r_xprt
, req
, rqst
, iptr
, rtype
);
745 iptr
= rpcrdma_encode_write_list(r_xprt
, req
, rqst
, iptr
, wtype
);
748 iptr
= rpcrdma_encode_reply_chunk(r_xprt
, req
, rqst
, iptr
, wtype
);
751 hdrlen
= (unsigned char *)iptr
- (unsigned char *)headerp
;
753 dprintk("RPC: %5u %s: %s/%s: hdrlen %zd rpclen %zd\n",
754 rqst
->rq_task
->tk_pid
, __func__
,
755 transfertypes
[rtype
], transfertypes
[wtype
],
758 if (!rpcrdma_prepare_send_sges(&r_xprt
->rx_ia
, req
, hdrlen
,
759 &rqst
->rq_snd_buf
, rtype
)) {
760 iptr
= ERR_PTR(-EIO
);
766 r_xprt
->rx_ia
.ri_ops
->ro_unmap_safe(r_xprt
, req
, false);
767 return PTR_ERR(iptr
);
771 * Chase down a received write or reply chunklist to get length
772 * RDMA'd by server. See map at rpcrdma_create_chunks()! :-)
775 rpcrdma_count_chunks(struct rpcrdma_rep
*rep
, int wrchunk
, __be32
**iptrp
)
777 unsigned int i
, total_len
;
778 struct rpcrdma_write_chunk
*cur_wchunk
;
779 char *base
= (char *)rdmab_to_msg(rep
->rr_rdmabuf
);
781 i
= be32_to_cpu(**iptrp
);
782 cur_wchunk
= (struct rpcrdma_write_chunk
*) (*iptrp
+ 1);
785 struct rpcrdma_segment
*seg
= &cur_wchunk
->wc_target
;
788 xdr_decode_hyper((__be32
*)&seg
->rs_offset
, &off
);
789 dprintk("RPC: %s: chunk %d@0x%016llx:0x%08x\n",
791 be32_to_cpu(seg
->rs_length
),
792 (unsigned long long)off
,
793 be32_to_cpu(seg
->rs_handle
));
795 total_len
+= be32_to_cpu(seg
->rs_length
);
798 /* check and adjust for properly terminated write chunk */
800 __be32
*w
= (__be32
*) cur_wchunk
;
801 if (*w
++ != xdr_zero
)
803 cur_wchunk
= (struct rpcrdma_write_chunk
*) w
;
805 if ((char *)cur_wchunk
> base
+ rep
->rr_len
)
808 *iptrp
= (__be32
*) cur_wchunk
;
813 * rpcrdma_inline_fixup - Scatter inline received data into rqst's iovecs
814 * @rqst: controlling RPC request
815 * @srcp: points to RPC message payload in receive buffer
816 * @copy_len: remaining length of receive buffer content
817 * @pad: Write chunk pad bytes needed (zero for pure inline)
819 * The upper layer has set the maximum number of bytes it can
820 * receive in each component of rq_rcv_buf. These values are set in
821 * the head.iov_len, page_len, tail.iov_len, and buflen fields.
823 * Unlike the TCP equivalent (xdr_partial_copy_from_skb), in
824 * many cases this function simply updates iov_base pointers in
825 * rq_rcv_buf to point directly to the received reply data, to
826 * avoid copying reply data.
828 * Returns the count of bytes which had to be memcopied.
831 rpcrdma_inline_fixup(struct rpc_rqst
*rqst
, char *srcp
, int copy_len
, int pad
)
833 unsigned long fixup_copy_count
;
834 int i
, npages
, curlen
;
836 struct page
**ppages
;
839 /* The head iovec is redirected to the RPC reply message
840 * in the receive buffer, to avoid a memcopy.
842 rqst
->rq_rcv_buf
.head
[0].iov_base
= srcp
;
843 rqst
->rq_private_buf
.head
[0].iov_base
= srcp
;
845 /* The contents of the receive buffer that follow
846 * head.iov_len bytes are copied into the page list.
848 curlen
= rqst
->rq_rcv_buf
.head
[0].iov_len
;
849 if (curlen
> copy_len
)
851 dprintk("RPC: %s: srcp 0x%p len %d hdrlen %d\n",
852 __func__
, srcp
, copy_len
, curlen
);
856 page_base
= rqst
->rq_rcv_buf
.page_base
;
857 ppages
= rqst
->rq_rcv_buf
.pages
+ (page_base
>> PAGE_SHIFT
);
858 page_base
&= ~PAGE_MASK
;
859 fixup_copy_count
= 0;
860 if (copy_len
&& rqst
->rq_rcv_buf
.page_len
) {
863 pagelist_len
= rqst
->rq_rcv_buf
.page_len
;
864 if (pagelist_len
> copy_len
)
865 pagelist_len
= copy_len
;
866 npages
= PAGE_ALIGN(page_base
+ pagelist_len
) >> PAGE_SHIFT
;
867 for (i
= 0; i
< npages
; i
++) {
868 curlen
= PAGE_SIZE
- page_base
;
869 if (curlen
> pagelist_len
)
870 curlen
= pagelist_len
;
872 dprintk("RPC: %s: page %d"
873 " srcp 0x%p len %d curlen %d\n",
874 __func__
, i
, srcp
, copy_len
, curlen
);
875 destp
= kmap_atomic(ppages
[i
]);
876 memcpy(destp
+ page_base
, srcp
, curlen
);
877 flush_dcache_page(ppages
[i
]);
878 kunmap_atomic(destp
);
881 fixup_copy_count
+= curlen
;
882 pagelist_len
-= curlen
;
888 /* Implicit padding for the last segment in a Write
889 * chunk is inserted inline at the front of the tail
890 * iovec. The upper layer ignores the content of
891 * the pad. Simply ensure inline content in the tail
892 * that follows the Write chunk is properly aligned.
898 /* The tail iovec is redirected to the remaining data
899 * in the receive buffer, to avoid a memcopy.
901 if (copy_len
|| pad
) {
902 rqst
->rq_rcv_buf
.tail
[0].iov_base
= srcp
;
903 rqst
->rq_private_buf
.tail
[0].iov_base
= srcp
;
906 return fixup_copy_count
;
909 #if defined(CONFIG_SUNRPC_BACKCHANNEL)
910 /* By convention, backchannel calls arrive via rdma_msg type
911 * messages, and never populate the chunk lists. This makes
912 * the RPC/RDMA header small and fixed in size, so it is
913 * straightforward to check the RPC header's direction field.
916 rpcrdma_is_bcall(struct rpcrdma_msg
*headerp
)
918 __be32
*p
= (__be32
*)headerp
;
920 if (headerp
->rm_type
!= rdma_msg
)
922 if (headerp
->rm_body
.rm_chunks
[0] != xdr_zero
)
924 if (headerp
->rm_body
.rm_chunks
[1] != xdr_zero
)
926 if (headerp
->rm_body
.rm_chunks
[2] != xdr_zero
)
930 if (p
[7] != headerp
->rm_xid
)
933 if (p
[8] != cpu_to_be32(RPC_CALL
))
938 #endif /* CONFIG_SUNRPC_BACKCHANNEL */
940 /* Process received RPC/RDMA messages.
942 * Errors must result in the RPC task either being awakened, or
943 * allowed to timeout, to discover the errors at that time.
946 rpcrdma_reply_handler(struct work_struct
*work
)
948 struct rpcrdma_rep
*rep
=
949 container_of(work
, struct rpcrdma_rep
, rr_work
);
950 struct rpcrdma_msg
*headerp
;
951 struct rpcrdma_req
*req
;
952 struct rpc_rqst
*rqst
;
953 struct rpcrdma_xprt
*r_xprt
= rep
->rr_rxprt
;
954 struct rpc_xprt
*xprt
= &r_xprt
->rx_xprt
;
956 int rdmalen
, status
, rmerr
;
959 dprintk("RPC: %s: incoming rep %p\n", __func__
, rep
);
961 if (rep
->rr_len
== RPCRDMA_BAD_LEN
)
963 if (rep
->rr_len
< RPCRDMA_HDRLEN_ERR
)
966 headerp
= rdmab_to_msg(rep
->rr_rdmabuf
);
967 #if defined(CONFIG_SUNRPC_BACKCHANNEL)
968 if (rpcrdma_is_bcall(headerp
))
972 /* Match incoming rpcrdma_rep to an rpcrdma_req to
973 * get context for handling any incoming chunks.
975 spin_lock_bh(&xprt
->transport_lock
);
976 rqst
= xprt_lookup_rqst(xprt
, headerp
->rm_xid
);
980 req
= rpcr_to_rdmar(rqst
);
984 /* Sanity checking has passed. We are now committed
985 * to complete this transaction.
987 list_del_init(&rqst
->rq_list
);
988 spin_unlock_bh(&xprt
->transport_lock
);
989 dprintk("RPC: %s: reply %p completes request %p (xid 0x%08x)\n",
990 __func__
, rep
, req
, be32_to_cpu(headerp
->rm_xid
));
992 /* from here on, the reply is no longer an orphan */
994 xprt
->reestablish_timeout
= 0;
996 if (headerp
->rm_vers
!= rpcrdma_version
)
999 /* check for expected message types */
1000 /* The order of some of these tests is important. */
1001 switch (headerp
->rm_type
) {
1003 /* never expect read chunks */
1004 /* never expect reply chunks (two ways to check) */
1005 /* never expect write chunks without having offered RDMA */
1006 if (headerp
->rm_body
.rm_chunks
[0] != xdr_zero
||
1007 (headerp
->rm_body
.rm_chunks
[1] == xdr_zero
&&
1008 headerp
->rm_body
.rm_chunks
[2] != xdr_zero
) ||
1009 (headerp
->rm_body
.rm_chunks
[1] != xdr_zero
&&
1010 list_empty(&req
->rl_registered
)))
1012 if (headerp
->rm_body
.rm_chunks
[1] != xdr_zero
) {
1013 /* count any expected write chunks in read reply */
1014 /* start at write chunk array count */
1015 iptr
= &headerp
->rm_body
.rm_chunks
[2];
1016 rdmalen
= rpcrdma_count_chunks(rep
, 1, &iptr
);
1017 /* check for validity, and no reply chunk after */
1018 if (rdmalen
< 0 || *iptr
++ != xdr_zero
)
1021 ((unsigned char *)iptr
- (unsigned char *)headerp
);
1022 status
= rep
->rr_len
+ rdmalen
;
1023 r_xprt
->rx_stats
.total_rdma_reply
+= rdmalen
;
1024 /* special case - last chunk may omit padding */
1026 rdmalen
= 4 - rdmalen
;
1030 /* else ordinary inline */
1032 iptr
= (__be32
*)((unsigned char *)headerp
+
1033 RPCRDMA_HDRLEN_MIN
);
1034 rep
->rr_len
-= RPCRDMA_HDRLEN_MIN
;
1035 status
= rep
->rr_len
;
1038 r_xprt
->rx_stats
.fixup_copy_count
+=
1039 rpcrdma_inline_fixup(rqst
, (char *)iptr
, rep
->rr_len
,
1044 /* never expect read or write chunks, always reply chunks */
1045 if (headerp
->rm_body
.rm_chunks
[0] != xdr_zero
||
1046 headerp
->rm_body
.rm_chunks
[1] != xdr_zero
||
1047 headerp
->rm_body
.rm_chunks
[2] != xdr_one
||
1048 list_empty(&req
->rl_registered
))
1050 iptr
= (__be32
*)((unsigned char *)headerp
+
1051 RPCRDMA_HDRLEN_MIN
);
1052 rdmalen
= rpcrdma_count_chunks(rep
, 0, &iptr
);
1055 r_xprt
->rx_stats
.total_rdma_reply
+= rdmalen
;
1056 /* Reply chunk buffer already is the reply vector - no fixup. */
1065 dprintk("RPC: %5u %s: invalid rpcrdma reply (type %u)\n",
1066 rqst
->rq_task
->tk_pid
, __func__
,
1067 be32_to_cpu(headerp
->rm_type
));
1069 r_xprt
->rx_stats
.bad_reply_count
++;
1074 /* Invalidate and flush the data payloads before waking the
1075 * waiting application. This guarantees the memory region is
1076 * properly fenced from the server before the application
1077 * accesses the data. It also ensures proper send flow
1078 * control: waking the next RPC waits until this RPC has
1079 * relinquished all its Send Queue entries.
1081 if (!list_empty(&req
->rl_registered
))
1082 r_xprt
->rx_ia
.ri_ops
->ro_unmap_sync(r_xprt
, req
);
1084 spin_lock_bh(&xprt
->transport_lock
);
1086 xprt
->cwnd
= atomic_read(&r_xprt
->rx_buf
.rb_credits
) << RPC_CWNDSHIFT
;
1087 if (xprt
->cwnd
> cwnd
)
1088 xprt_release_rqst_cong(rqst
->rq_task
);
1090 xprt_complete_rqst(rqst
->rq_task
, status
);
1091 spin_unlock_bh(&xprt
->transport_lock
);
1092 dprintk("RPC: %s: xprt_complete_rqst(0x%p, 0x%p, %d)\n",
1093 __func__
, xprt
, rqst
, status
);
1097 rpcrdma_recv_buffer_put(rep
);
1098 if (r_xprt
->rx_ep
.rep_connected
== 1) {
1099 r_xprt
->rx_ep
.rep_connected
= -EIO
;
1100 rpcrdma_conn_func(&r_xprt
->rx_ep
);
1104 #if defined(CONFIG_SUNRPC_BACKCHANNEL)
1106 rpcrdma_bc_receive_call(r_xprt
, rep
);
1110 /* If the incoming reply terminated a pending RPC, the next
1111 * RPC call will post a replacement receive buffer as it is
1115 dprintk("RPC: %s: invalid version %d\n",
1116 __func__
, be32_to_cpu(headerp
->rm_vers
));
1118 r_xprt
->rx_stats
.bad_reply_count
++;
1122 rmerr
= be32_to_cpu(headerp
->rm_body
.rm_error
.rm_err
);
1125 pr_err("%s: server reports header version error (%u-%u)\n",
1127 be32_to_cpu(headerp
->rm_body
.rm_error
.rm_vers_low
),
1128 be32_to_cpu(headerp
->rm_body
.rm_error
.rm_vers_high
));
1131 pr_err("%s: server reports header decoding error\n",
1135 pr_err("%s: server reports unknown error %d\n",
1138 status
= -EREMOTEIO
;
1139 r_xprt
->rx_stats
.bad_reply_count
++;
1142 /* If no pending RPC transaction was matched, post a replacement
1143 * receive buffer before returning.
1146 dprintk("RPC: %s: short/invalid reply\n", __func__
);
1150 spin_unlock_bh(&xprt
->transport_lock
);
1151 dprintk("RPC: %s: no match for incoming xid 0x%08x len %d\n",
1152 __func__
, be32_to_cpu(headerp
->rm_xid
),
1157 spin_unlock_bh(&xprt
->transport_lock
);
1159 "duplicate reply %p to RPC request %p: xid 0x%08x\n",
1160 __func__
, rep
, req
, be32_to_cpu(headerp
->rm_xid
));
1163 r_xprt
->rx_stats
.bad_reply_count
++;
1164 if (rpcrdma_ep_post_recv(&r_xprt
->rx_ia
, rep
))
1165 rpcrdma_recv_buffer_put(rep
);