2 * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
4 * This software is available to you under a choice of one of two
5 * licenses. You may choose to be licensed under the terms of the GNU
6 * General Public License (GPL) Version 2, available from the file
7 * COPYING in the main directory of this source tree, or the BSD-type
10 * Redistribution and use in source and binary forms, with or without
11 * modification, are permitted provided that the following conditions
14 * Redistributions of source code must retain the above copyright
15 * notice, this list of conditions and the following disclaimer.
17 * Redistributions in binary form must reproduce the above
18 * copyright notice, this list of conditions and the following
19 * disclaimer in the documentation and/or other materials provided
20 * with the distribution.
22 * Neither the name of the Network Appliance, Inc. nor the names of
23 * its contributors may be used to endorse or promote products
24 * derived from this software without specific prior written
27 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
28 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
29 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
30 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
31 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
32 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
33 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
34 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
35 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
36 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
37 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
43 * This file contains the guts of the RPC RDMA protocol, and
44 * does marshaling/unmarshaling, etc. It is also where interfacing
45 * to the Linux RPC framework lives.
48 #include "xprt_rdma.h"
50 #include <linux/highmem.h>
52 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
53 # define RPCDBG_FACILITY RPCDBG_TRANS
56 static const char transfertypes
[][12] = {
57 "inline", /* no chunks */
58 "read list", /* some argument via rdma read */
59 "*read list", /* entire request via rdma read */
60 "write list", /* some result via rdma write */
61 "reply chunk" /* entire reply via rdma write */
64 /* Returns size of largest RPC-over-RDMA header in a Call message
66 * The largest Call header contains a full-size Read list and a
67 * minimal Reply chunk.
69 static unsigned int rpcrdma_max_call_header_size(unsigned int maxsegs
)
73 /* Fixed header fields and list discriminators */
74 size
= RPCRDMA_HDRLEN_MIN
;
76 /* Maximum Read list size */
77 maxsegs
+= 2; /* segment for head and tail buffers */
78 size
= maxsegs
* sizeof(struct rpcrdma_read_chunk
);
80 /* Minimal Read chunk size */
81 size
+= sizeof(__be32
); /* segment count */
82 size
+= sizeof(struct rpcrdma_segment
);
83 size
+= sizeof(__be32
); /* list discriminator */
85 dprintk("RPC: %s: max call header size = %u\n",
90 /* Returns size of largest RPC-over-RDMA header in a Reply message
92 * There is only one Write list or one Reply chunk per Reply
93 * message. The larger list is the Write list.
95 static unsigned int rpcrdma_max_reply_header_size(unsigned int maxsegs
)
99 /* Fixed header fields and list discriminators */
100 size
= RPCRDMA_HDRLEN_MIN
;
102 /* Maximum Write list size */
103 maxsegs
+= 2; /* segment for head and tail buffers */
104 size
= sizeof(__be32
); /* segment count */
105 size
+= maxsegs
* sizeof(struct rpcrdma_segment
);
106 size
+= sizeof(__be32
); /* list discriminator */
108 dprintk("RPC: %s: max reply header size = %u\n",
113 void rpcrdma_set_max_header_sizes(struct rpcrdma_xprt
*r_xprt
)
115 struct rpcrdma_create_data_internal
*cdata
= &r_xprt
->rx_data
;
116 struct rpcrdma_ia
*ia
= &r_xprt
->rx_ia
;
117 unsigned int maxsegs
= ia
->ri_max_segs
;
119 ia
->ri_max_inline_write
= cdata
->inline_wsize
-
120 rpcrdma_max_call_header_size(maxsegs
);
121 ia
->ri_max_inline_read
= cdata
->inline_rsize
-
122 rpcrdma_max_reply_header_size(maxsegs
);
125 /* The client can send a request inline as long as the RPCRDMA header
126 * plus the RPC call fit under the transport's inline limit. If the
127 * combined call message size exceeds that limit, the client must use
128 * a Read chunk for this operation.
130 * A Read chunk is also required if sending the RPC call inline would
131 * exceed this device's max_sge limit.
133 static bool rpcrdma_args_inline(struct rpcrdma_xprt
*r_xprt
,
134 struct rpc_rqst
*rqst
)
136 struct xdr_buf
*xdr
= &rqst
->rq_snd_buf
;
137 unsigned int count
, remaining
, offset
;
139 if (xdr
->len
> r_xprt
->rx_ia
.ri_max_inline_write
)
143 remaining
= xdr
->page_len
;
144 offset
= offset_in_page(xdr
->page_base
);
147 remaining
-= min_t(unsigned int,
148 PAGE_SIZE
- offset
, remaining
);
150 if (++count
> r_xprt
->rx_ia
.ri_max_send_sges
)
158 /* The client can't know how large the actual reply will be. Thus it
159 * plans for the largest possible reply for that particular ULP
160 * operation. If the maximum combined reply message size exceeds that
161 * limit, the client must provide a write list or a reply chunk for
164 static bool rpcrdma_results_inline(struct rpcrdma_xprt
*r_xprt
,
165 struct rpc_rqst
*rqst
)
167 struct rpcrdma_ia
*ia
= &r_xprt
->rx_ia
;
169 return rqst
->rq_rcv_buf
.buflen
<= ia
->ri_max_inline_read
;
172 /* Split @vec on page boundaries into SGEs. FMR registers pages, not
173 * a byte range. Other modes coalesce these SGEs into a single MR
176 * Returns pointer to next available SGE, and bumps the total number
179 static struct rpcrdma_mr_seg
*
180 rpcrdma_convert_kvec(struct kvec
*vec
, struct rpcrdma_mr_seg
*seg
,
183 u32 remaining
, page_offset
;
186 base
= vec
->iov_base
;
187 page_offset
= offset_in_page(base
);
188 remaining
= vec
->iov_len
;
191 seg
->mr_offset
= base
;
192 seg
->mr_len
= min_t(u32
, PAGE_SIZE
- page_offset
, remaining
);
193 remaining
-= seg
->mr_len
;
202 /* Convert @xdrbuf into SGEs no larger than a page each. As they
203 * are registered, these SGEs are then coalesced into RDMA segments
204 * when the selected memreg mode supports it.
206 * Returns positive number of SGEs consumed, or a negative errno.
210 rpcrdma_convert_iovs(struct rpcrdma_xprt
*r_xprt
, struct xdr_buf
*xdrbuf
,
211 unsigned int pos
, enum rpcrdma_chunktype type
,
212 struct rpcrdma_mr_seg
*seg
)
214 unsigned long page_base
;
216 struct page
**ppages
;
220 seg
= rpcrdma_convert_kvec(&xdrbuf
->head
[0], seg
, &n
);
222 len
= xdrbuf
->page_len
;
223 ppages
= xdrbuf
->pages
+ (xdrbuf
->page_base
>> PAGE_SHIFT
);
224 page_base
= offset_in_page(xdrbuf
->page_base
);
226 if (unlikely(!*ppages
)) {
227 /* XXX: Certain upper layer operations do
228 * not provide receive buffer pages.
230 *ppages
= alloc_page(GFP_ATOMIC
);
234 seg
->mr_page
= *ppages
;
235 seg
->mr_offset
= (char *)page_base
;
236 seg
->mr_len
= min_t(u32
, PAGE_SIZE
- page_base
, len
);
244 /* When encoding a Read chunk, the tail iovec contains an
245 * XDR pad and may be omitted.
247 if (type
== rpcrdma_readch
&& r_xprt
->rx_ia
.ri_implicit_roundup
)
250 /* When encoding a Write chunk, some servers need to see an
251 * extra segment for non-XDR-aligned Write chunks. The upper
252 * layer provides space in the tail iovec that may be used
255 if (type
== rpcrdma_writech
&& r_xprt
->rx_ia
.ri_implicit_roundup
)
258 if (xdrbuf
->tail
[0].iov_len
)
259 seg
= rpcrdma_convert_kvec(&xdrbuf
->tail
[0], seg
, &n
);
262 if (unlikely(n
> RPCRDMA_MAX_SEGS
))
268 encode_item_present(struct xdr_stream
*xdr
)
272 p
= xdr_reserve_space(xdr
, sizeof(*p
));
281 encode_item_not_present(struct xdr_stream
*xdr
)
285 p
= xdr_reserve_space(xdr
, sizeof(*p
));
294 xdr_encode_rdma_segment(__be32
*iptr
, struct rpcrdma_mw
*mw
)
296 *iptr
++ = cpu_to_be32(mw
->mw_handle
);
297 *iptr
++ = cpu_to_be32(mw
->mw_length
);
298 xdr_encode_hyper(iptr
, mw
->mw_offset
);
302 encode_rdma_segment(struct xdr_stream
*xdr
, struct rpcrdma_mw
*mw
)
306 p
= xdr_reserve_space(xdr
, 4 * sizeof(*p
));
310 xdr_encode_rdma_segment(p
, mw
);
315 encode_read_segment(struct xdr_stream
*xdr
, struct rpcrdma_mw
*mw
,
320 p
= xdr_reserve_space(xdr
, 6 * sizeof(*p
));
324 *p
++ = xdr_one
; /* Item present */
325 *p
++ = cpu_to_be32(position
);
326 xdr_encode_rdma_segment(p
, mw
);
330 /* Register and XDR encode the Read list. Supports encoding a list of read
331 * segments that belong to a single read chunk.
333 * Encoding key for single-list chunks (HLOO = Handle32 Length32 Offset64):
335 * Read chunklist (a linked list):
336 * N elements, position P (same P for all chunks of same arg!):
337 * 1 - PHLOO - 1 - PHLOO - ... - 1 - PHLOO - 0
339 * Returns zero on success, or a negative errno if a failure occurred.
340 * @xdr is advanced to the next position in the stream.
342 * Only a single @pos value is currently supported.
345 rpcrdma_encode_read_list(struct rpcrdma_xprt
*r_xprt
, struct rpcrdma_req
*req
,
346 struct rpc_rqst
*rqst
, enum rpcrdma_chunktype rtype
)
348 struct xdr_stream
*xdr
= &req
->rl_stream
;
349 struct rpcrdma_mr_seg
*seg
;
350 struct rpcrdma_mw
*mw
;
354 pos
= rqst
->rq_snd_buf
.head
[0].iov_len
;
355 if (rtype
== rpcrdma_areadch
)
357 seg
= req
->rl_segments
;
358 nsegs
= rpcrdma_convert_iovs(r_xprt
, &rqst
->rq_snd_buf
, pos
,
364 seg
= r_xprt
->rx_ia
.ri_ops
->ro_map(r_xprt
, seg
, nsegs
,
368 rpcrdma_push_mw(mw
, &req
->rl_registered
);
370 if (encode_read_segment(xdr
, mw
, pos
) < 0)
373 dprintk("RPC: %5u %s: pos %u %u@0x%016llx:0x%08x (%s)\n",
374 rqst
->rq_task
->tk_pid
, __func__
, pos
,
375 mw
->mw_length
, (unsigned long long)mw
->mw_offset
,
376 mw
->mw_handle
, mw
->mw_nents
< nsegs
? "more" : "last");
378 r_xprt
->rx_stats
.read_chunk_count
++;
379 nsegs
-= mw
->mw_nents
;
385 /* Register and XDR encode the Write list. Supports encoding a list
386 * containing one array of plain segments that belong to a single
389 * Encoding key for single-list chunks (HLOO = Handle32 Length32 Offset64):
391 * Write chunklist (a list of (one) counted array):
393 * 1 - N - HLOO - HLOO - ... - HLOO - 0
395 * Returns zero on success, or a negative errno if a failure occurred.
396 * @xdr is advanced to the next position in the stream.
398 * Only a single Write chunk is currently supported.
401 rpcrdma_encode_write_list(struct rpcrdma_xprt
*r_xprt
, struct rpcrdma_req
*req
,
402 struct rpc_rqst
*rqst
, enum rpcrdma_chunktype wtype
)
404 struct xdr_stream
*xdr
= &req
->rl_stream
;
405 struct rpcrdma_mr_seg
*seg
;
406 struct rpcrdma_mw
*mw
;
410 seg
= req
->rl_segments
;
411 nsegs
= rpcrdma_convert_iovs(r_xprt
, &rqst
->rq_rcv_buf
,
412 rqst
->rq_rcv_buf
.head
[0].iov_len
,
417 if (encode_item_present(xdr
) < 0)
419 segcount
= xdr_reserve_space(xdr
, sizeof(*segcount
));
420 if (unlikely(!segcount
))
422 /* Actual value encoded below */
426 seg
= r_xprt
->rx_ia
.ri_ops
->ro_map(r_xprt
, seg
, nsegs
,
430 rpcrdma_push_mw(mw
, &req
->rl_registered
);
432 if (encode_rdma_segment(xdr
, mw
) < 0)
435 dprintk("RPC: %5u %s: %u@0x016%llx:0x%08x (%s)\n",
436 rqst
->rq_task
->tk_pid
, __func__
,
437 mw
->mw_length
, (unsigned long long)mw
->mw_offset
,
438 mw
->mw_handle
, mw
->mw_nents
< nsegs
? "more" : "last");
440 r_xprt
->rx_stats
.write_chunk_count
++;
441 r_xprt
->rx_stats
.total_rdma_request
+= seg
->mr_len
;
443 nsegs
-= mw
->mw_nents
;
446 /* Update count of segments in this Write chunk */
447 *segcount
= cpu_to_be32(nchunks
);
452 /* Register and XDR encode the Reply chunk. Supports encoding an array
453 * of plain segments that belong to a single write (reply) chunk.
455 * Encoding key for single-list chunks (HLOO = Handle32 Length32 Offset64):
457 * Reply chunk (a counted array):
459 * 1 - N - HLOO - HLOO - ... - HLOO
461 * Returns zero on success, or a negative errno if a failure occurred.
462 * @xdr is advanced to the next position in the stream.
465 rpcrdma_encode_reply_chunk(struct rpcrdma_xprt
*r_xprt
, struct rpcrdma_req
*req
,
466 struct rpc_rqst
*rqst
, enum rpcrdma_chunktype wtype
)
468 struct xdr_stream
*xdr
= &req
->rl_stream
;
469 struct rpcrdma_mr_seg
*seg
;
470 struct rpcrdma_mw
*mw
;
474 seg
= req
->rl_segments
;
475 nsegs
= rpcrdma_convert_iovs(r_xprt
, &rqst
->rq_rcv_buf
, 0, wtype
, seg
);
479 if (encode_item_present(xdr
) < 0)
481 segcount
= xdr_reserve_space(xdr
, sizeof(*segcount
));
482 if (unlikely(!segcount
))
484 /* Actual value encoded below */
488 seg
= r_xprt
->rx_ia
.ri_ops
->ro_map(r_xprt
, seg
, nsegs
,
492 rpcrdma_push_mw(mw
, &req
->rl_registered
);
494 if (encode_rdma_segment(xdr
, mw
) < 0)
497 dprintk("RPC: %5u %s: %u@0x%016llx:0x%08x (%s)\n",
498 rqst
->rq_task
->tk_pid
, __func__
,
499 mw
->mw_length
, (unsigned long long)mw
->mw_offset
,
500 mw
->mw_handle
, mw
->mw_nents
< nsegs
? "more" : "last");
502 r_xprt
->rx_stats
.reply_chunk_count
++;
503 r_xprt
->rx_stats
.total_rdma_request
+= seg
->mr_len
;
505 nsegs
-= mw
->mw_nents
;
508 /* Update count of segments in the Reply chunk */
509 *segcount
= cpu_to_be32(nchunks
);
514 /* Prepare the RPC-over-RDMA header SGE.
517 rpcrdma_prepare_hdr_sge(struct rpcrdma_ia
*ia
, struct rpcrdma_req
*req
,
520 struct rpcrdma_regbuf
*rb
= req
->rl_rdmabuf
;
521 struct ib_sge
*sge
= &req
->rl_send_sge
[0];
523 if (unlikely(!rpcrdma_regbuf_is_mapped(rb
))) {
524 if (!__rpcrdma_dma_map_regbuf(ia
, rb
))
526 sge
->addr
= rdmab_addr(rb
);
527 sge
->lkey
= rdmab_lkey(rb
);
531 ib_dma_sync_single_for_device(rdmab_device(rb
), sge
->addr
,
532 sge
->length
, DMA_TO_DEVICE
);
533 req
->rl_send_wr
.num_sge
++;
537 /* Prepare the Send SGEs. The head and tail iovec, and each entry
538 * in the page list, gets its own SGE.
541 rpcrdma_prepare_msg_sges(struct rpcrdma_ia
*ia
, struct rpcrdma_req
*req
,
542 struct xdr_buf
*xdr
, enum rpcrdma_chunktype rtype
)
544 unsigned int sge_no
, page_base
, len
, remaining
;
545 struct rpcrdma_regbuf
*rb
= req
->rl_sendbuf
;
546 struct ib_device
*device
= ia
->ri_device
;
547 struct ib_sge
*sge
= req
->rl_send_sge
;
548 u32 lkey
= ia
->ri_pd
->local_dma_lkey
;
549 struct page
*page
, **ppages
;
551 /* The head iovec is straightforward, as it is already
552 * DMA-mapped. Sync the content that has changed.
554 if (!rpcrdma_dma_map_regbuf(ia
, rb
))
557 sge
[sge_no
].addr
= rdmab_addr(rb
);
558 sge
[sge_no
].length
= xdr
->head
[0].iov_len
;
559 sge
[sge_no
].lkey
= rdmab_lkey(rb
);
560 ib_dma_sync_single_for_device(rdmab_device(rb
), sge
[sge_no
].addr
,
561 sge
[sge_no
].length
, DMA_TO_DEVICE
);
563 /* If there is a Read chunk, the page list is being handled
564 * via explicit RDMA, and thus is skipped here. However, the
565 * tail iovec may include an XDR pad for the page list, as
566 * well as additional content, and may not reside in the
567 * same page as the head iovec.
569 if (rtype
== rpcrdma_readch
) {
570 len
= xdr
->tail
[0].iov_len
;
572 /* Do not include the tail if it is only an XDR pad */
576 page
= virt_to_page(xdr
->tail
[0].iov_base
);
577 page_base
= offset_in_page(xdr
->tail
[0].iov_base
);
579 /* If the content in the page list is an odd length,
580 * xdr_write_pages() has added a pad at the beginning
581 * of the tail iovec. Force the tail's non-pad content
582 * to land at the next XDR position in the Send message.
584 page_base
+= len
& 3;
589 /* If there is a page list present, temporarily DMA map
590 * and prepare an SGE for each page to be sent.
593 ppages
= xdr
->pages
+ (xdr
->page_base
>> PAGE_SHIFT
);
594 page_base
= offset_in_page(xdr
->page_base
);
595 remaining
= xdr
->page_len
;
598 if (sge_no
> RPCRDMA_MAX_SEND_SGES
- 2)
599 goto out_mapping_overflow
;
601 len
= min_t(u32
, PAGE_SIZE
- page_base
, remaining
);
602 sge
[sge_no
].addr
= ib_dma_map_page(device
, *ppages
,
605 if (ib_dma_mapping_error(device
, sge
[sge_no
].addr
))
606 goto out_mapping_err
;
607 sge
[sge_no
].length
= len
;
608 sge
[sge_no
].lkey
= lkey
;
610 req
->rl_mapped_sges
++;
617 /* The tail iovec is not always constructed in the same
618 * page where the head iovec resides (see, for example,
619 * gss_wrap_req_priv). To neatly accommodate that case,
620 * DMA map it separately.
622 if (xdr
->tail
[0].iov_len
) {
623 page
= virt_to_page(xdr
->tail
[0].iov_base
);
624 page_base
= offset_in_page(xdr
->tail
[0].iov_base
);
625 len
= xdr
->tail
[0].iov_len
;
629 sge
[sge_no
].addr
= ib_dma_map_page(device
, page
,
632 if (ib_dma_mapping_error(device
, sge
[sge_no
].addr
))
633 goto out_mapping_err
;
634 sge
[sge_no
].length
= len
;
635 sge
[sge_no
].lkey
= lkey
;
636 req
->rl_mapped_sges
++;
640 req
->rl_send_wr
.num_sge
= sge_no
+ 1;
643 out_mapping_overflow
:
644 pr_err("rpcrdma: too many Send SGEs (%u)\n", sge_no
);
648 pr_err("rpcrdma: Send mapping error\n");
653 rpcrdma_prepare_send_sges(struct rpcrdma_ia
*ia
, struct rpcrdma_req
*req
,
654 u32 hdrlen
, struct xdr_buf
*xdr
,
655 enum rpcrdma_chunktype rtype
)
657 req
->rl_send_wr
.num_sge
= 0;
658 req
->rl_mapped_sges
= 0;
660 if (!rpcrdma_prepare_hdr_sge(ia
, req
, hdrlen
))
663 if (rtype
!= rpcrdma_areadch
)
664 if (!rpcrdma_prepare_msg_sges(ia
, req
, xdr
, rtype
))
670 pr_err("rpcrdma: failed to DMA map a Send buffer\n");
675 rpcrdma_unmap_sges(struct rpcrdma_ia
*ia
, struct rpcrdma_req
*req
)
677 struct ib_device
*device
= ia
->ri_device
;
681 sge
= &req
->rl_send_sge
[2];
682 for (count
= req
->rl_mapped_sges
; count
--; sge
++)
683 ib_dma_unmap_page(device
, sge
->addr
, sge
->length
,
685 req
->rl_mapped_sges
= 0;
689 * rpcrdma_marshal_req - Marshal and send one RPC request
690 * @r_xprt: controlling transport
691 * @rqst: RPC request to be marshaled
693 * For the RPC in "rqst", this function:
694 * - Chooses the transfer mode (eg., RDMA_MSG or RDMA_NOMSG)
695 * - Registers Read, Write, and Reply chunks
696 * - Constructs the transport header
697 * - Posts a Send WR to send the transport header and request
700 * %0 if the RPC was sent successfully,
701 * %-ENOTCONN if the connection was lost,
702 * %-EAGAIN if not enough pages are available for on-demand reply buffer,
703 * %-ENOBUFS if no MRs are available to register chunks,
704 * %-EMSGSIZE if the transport header is too small,
705 * %-EIO if a permanent problem occurred while marshaling.
708 rpcrdma_marshal_req(struct rpcrdma_xprt
*r_xprt
, struct rpc_rqst
*rqst
)
710 struct rpcrdma_req
*req
= rpcr_to_rdmar(rqst
);
711 struct xdr_stream
*xdr
= &req
->rl_stream
;
712 enum rpcrdma_chunktype rtype
, wtype
;
717 #if defined(CONFIG_SUNRPC_BACKCHANNEL)
718 if (test_bit(RPC_BC_PA_IN_USE
, &rqst
->rq_bc_pa_state
))
719 return rpcrdma_bc_marshal_reply(rqst
);
722 rpcrdma_set_xdrlen(&req
->rl_hdrbuf
, 0);
723 xdr_init_encode(xdr
, &req
->rl_hdrbuf
,
724 req
->rl_rdmabuf
->rg_base
);
726 /* Fixed header fields */
728 p
= xdr_reserve_space(xdr
, 4 * sizeof(*p
));
732 *p
++ = rpcrdma_version
;
733 *p
++ = cpu_to_be32(r_xprt
->rx_buf
.rb_max_requests
);
735 /* When the ULP employs a GSS flavor that guarantees integrity
736 * or privacy, direct data placement of individual data items
739 ddp_allowed
= !(rqst
->rq_cred
->cr_auth
->au_flags
&
740 RPCAUTH_AUTH_DATATOUCH
);
743 * Chunks needed for results?
745 * o If the expected result is under the inline threshold, all ops
747 * o Large read ops return data as write chunk(s), header as
749 * o Large non-read ops return as a single reply chunk.
751 if (rpcrdma_results_inline(r_xprt
, rqst
))
752 wtype
= rpcrdma_noch
;
753 else if (ddp_allowed
&& rqst
->rq_rcv_buf
.flags
& XDRBUF_READ
)
754 wtype
= rpcrdma_writech
;
756 wtype
= rpcrdma_replych
;
759 * Chunks needed for arguments?
761 * o If the total request is under the inline threshold, all ops
762 * are sent as inline.
763 * o Large write ops transmit data as read chunk(s), header as
765 * o Large non-write ops are sent with the entire message as a
766 * single read chunk (protocol 0-position special case).
768 * This assumes that the upper layer does not present a request
769 * that both has a data payload, and whose non-data arguments
770 * by themselves are larger than the inline threshold.
772 if (rpcrdma_args_inline(r_xprt
, rqst
)) {
774 rtype
= rpcrdma_noch
;
775 } else if (ddp_allowed
&& rqst
->rq_snd_buf
.flags
& XDRBUF_WRITE
) {
777 rtype
= rpcrdma_readch
;
779 r_xprt
->rx_stats
.nomsg_call_count
++;
781 rtype
= rpcrdma_areadch
;
784 /* This implementation supports the following combinations
785 * of chunk lists in one RPC-over-RDMA Call message:
790 * - Read list + Reply chunk
792 * It might not yet support the following combinations:
794 * - Read list + Write list
796 * It does not support the following combinations:
798 * - Write list + Reply chunk
799 * - Read list + Write list + Reply chunk
801 * This implementation supports only a single chunk in each
802 * Read or Write list. Thus for example the client cannot
803 * send a Call message with a Position Zero Read chunk and a
804 * regular Read chunk at the same time.
806 if (rtype
!= rpcrdma_noch
) {
807 ret
= rpcrdma_encode_read_list(r_xprt
, req
, rqst
, rtype
);
811 ret
= encode_item_not_present(xdr
);
815 if (wtype
== rpcrdma_writech
) {
816 ret
= rpcrdma_encode_write_list(r_xprt
, req
, rqst
, wtype
);
820 ret
= encode_item_not_present(xdr
);
824 if (wtype
!= rpcrdma_replych
)
825 ret
= encode_item_not_present(xdr
);
827 ret
= rpcrdma_encode_reply_chunk(r_xprt
, req
, rqst
, wtype
);
831 dprintk("RPC: %5u %s: %s/%s: hdrlen %u rpclen\n",
832 rqst
->rq_task
->tk_pid
, __func__
,
833 transfertypes
[rtype
], transfertypes
[wtype
],
834 xdr_stream_pos(xdr
));
836 if (!rpcrdma_prepare_send_sges(&r_xprt
->rx_ia
, req
,
838 &rqst
->rq_snd_buf
, rtype
)) {
845 if (ret
!= -ENOBUFS
) {
846 pr_err("rpcrdma: header marshaling failed (%d)\n", ret
);
847 r_xprt
->rx_stats
.failed_marshal_count
++;
853 * rpcrdma_inline_fixup - Scatter inline received data into rqst's iovecs
854 * @rqst: controlling RPC request
855 * @srcp: points to RPC message payload in receive buffer
856 * @copy_len: remaining length of receive buffer content
857 * @pad: Write chunk pad bytes needed (zero for pure inline)
859 * The upper layer has set the maximum number of bytes it can
860 * receive in each component of rq_rcv_buf. These values are set in
861 * the head.iov_len, page_len, tail.iov_len, and buflen fields.
863 * Unlike the TCP equivalent (xdr_partial_copy_from_skb), in
864 * many cases this function simply updates iov_base pointers in
865 * rq_rcv_buf to point directly to the received reply data, to
866 * avoid copying reply data.
868 * Returns the count of bytes which had to be memcopied.
871 rpcrdma_inline_fixup(struct rpc_rqst
*rqst
, char *srcp
, int copy_len
, int pad
)
873 unsigned long fixup_copy_count
;
874 int i
, npages
, curlen
;
876 struct page
**ppages
;
879 /* The head iovec is redirected to the RPC reply message
880 * in the receive buffer, to avoid a memcopy.
882 rqst
->rq_rcv_buf
.head
[0].iov_base
= srcp
;
883 rqst
->rq_private_buf
.head
[0].iov_base
= srcp
;
885 /* The contents of the receive buffer that follow
886 * head.iov_len bytes are copied into the page list.
888 curlen
= rqst
->rq_rcv_buf
.head
[0].iov_len
;
889 if (curlen
> copy_len
)
891 dprintk("RPC: %s: srcp 0x%p len %d hdrlen %d\n",
892 __func__
, srcp
, copy_len
, curlen
);
896 ppages
= rqst
->rq_rcv_buf
.pages
+
897 (rqst
->rq_rcv_buf
.page_base
>> PAGE_SHIFT
);
898 page_base
= offset_in_page(rqst
->rq_rcv_buf
.page_base
);
899 fixup_copy_count
= 0;
900 if (copy_len
&& rqst
->rq_rcv_buf
.page_len
) {
903 pagelist_len
= rqst
->rq_rcv_buf
.page_len
;
904 if (pagelist_len
> copy_len
)
905 pagelist_len
= copy_len
;
906 npages
= PAGE_ALIGN(page_base
+ pagelist_len
) >> PAGE_SHIFT
;
907 for (i
= 0; i
< npages
; i
++) {
908 curlen
= PAGE_SIZE
- page_base
;
909 if (curlen
> pagelist_len
)
910 curlen
= pagelist_len
;
912 dprintk("RPC: %s: page %d"
913 " srcp 0x%p len %d curlen %d\n",
914 __func__
, i
, srcp
, copy_len
, curlen
);
915 destp
= kmap_atomic(ppages
[i
]);
916 memcpy(destp
+ page_base
, srcp
, curlen
);
917 flush_dcache_page(ppages
[i
]);
918 kunmap_atomic(destp
);
921 fixup_copy_count
+= curlen
;
922 pagelist_len
-= curlen
;
928 /* Implicit padding for the last segment in a Write
929 * chunk is inserted inline at the front of the tail
930 * iovec. The upper layer ignores the content of
931 * the pad. Simply ensure inline content in the tail
932 * that follows the Write chunk is properly aligned.
938 /* The tail iovec is redirected to the remaining data
939 * in the receive buffer, to avoid a memcopy.
941 if (copy_len
|| pad
) {
942 rqst
->rq_rcv_buf
.tail
[0].iov_base
= srcp
;
943 rqst
->rq_private_buf
.tail
[0].iov_base
= srcp
;
946 return fixup_copy_count
;
949 /* Caller must guarantee @rep remains stable during this call.
952 rpcrdma_mark_remote_invalidation(struct list_head
*mws
,
953 struct rpcrdma_rep
*rep
)
955 struct rpcrdma_mw
*mw
;
957 if (!(rep
->rr_wc_flags
& IB_WC_WITH_INVALIDATE
))
960 list_for_each_entry(mw
, mws
, mw_list
)
961 if (mw
->mw_handle
== rep
->rr_inv_rkey
) {
962 mw
->mw_flags
= RPCRDMA_MW_F_RI
;
963 break; /* only one invalidated MR per RPC */
967 /* By convention, backchannel calls arrive via rdma_msg type
968 * messages, and never populate the chunk lists. This makes
969 * the RPC/RDMA header small and fixed in size, so it is
970 * straightforward to check the RPC header's direction field.
973 rpcrdma_is_bcall(struct rpcrdma_xprt
*r_xprt
, struct rpcrdma_rep
*rep
,
974 __be32 xid
, __be32 proc
)
975 #if defined(CONFIG_SUNRPC_BACKCHANNEL)
977 struct xdr_stream
*xdr
= &rep
->rr_stream
;
980 if (proc
!= rdma_msg
)
983 /* Peek at stream contents without advancing. */
984 p
= xdr_inline_decode(xdr
, 0);
987 if (*p
++ != xdr_zero
)
989 if (*p
++ != xdr_zero
)
991 if (*p
++ != xdr_zero
)
997 if (*p
!= cpu_to_be32(RPC_CALL
))
1000 /* Now that we are sure this is a backchannel call,
1001 * advance to the RPC header.
1003 p
= xdr_inline_decode(xdr
, 3 * sizeof(*p
));
1007 rpcrdma_bc_receive_call(r_xprt
, rep
);
1011 pr_warn("RPC/RDMA short backward direction call\n");
1012 if (rpcrdma_ep_post_recv(&r_xprt
->rx_ia
, rep
))
1013 xprt_disconnect_done(&r_xprt
->rx_xprt
);
1016 #else /* CONFIG_SUNRPC_BACKCHANNEL */
1020 #endif /* CONFIG_SUNRPC_BACKCHANNEL */
1022 static int decode_rdma_segment(struct xdr_stream
*xdr
, u32
*length
)
1026 p
= xdr_inline_decode(xdr
, 4 * sizeof(*p
));
1034 handle
= be32_to_cpup(p
++);
1035 *length
= be32_to_cpup(p
++);
1036 xdr_decode_hyper(p
, &offset
);
1037 dprintk("RPC: %s: segment %u@0x%016llx:0x%08x\n",
1038 __func__
, *length
, (unsigned long long)offset
,
1041 *length
= be32_to_cpup(p
+ 1);
1047 static int decode_write_chunk(struct xdr_stream
*xdr
, u32
*length
)
1049 u32 segcount
, seglength
;
1052 p
= xdr_inline_decode(xdr
, sizeof(*p
));
1057 segcount
= be32_to_cpup(p
);
1058 while (segcount
--) {
1059 if (decode_rdma_segment(xdr
, &seglength
))
1061 *length
+= seglength
;
1064 dprintk("RPC: %s: segcount=%u, %u bytes\n",
1065 __func__
, be32_to_cpup(p
), *length
);
1069 /* In RPC-over-RDMA Version One replies, a Read list is never
1070 * expected. This decoder is a stub that returns an error if
1071 * a Read list is present.
1073 static int decode_read_list(struct xdr_stream
*xdr
)
1077 p
= xdr_inline_decode(xdr
, sizeof(*p
));
1080 if (unlikely(*p
!= xdr_zero
))
1085 /* Supports only one Write chunk in the Write list
1087 static int decode_write_list(struct xdr_stream
*xdr
, u32
*length
)
1096 p
= xdr_inline_decode(xdr
, sizeof(*p
));
1104 if (decode_write_chunk(xdr
, &chunklen
))
1106 *length
+= chunklen
;
1112 static int decode_reply_chunk(struct xdr_stream
*xdr
, u32
*length
)
1116 p
= xdr_inline_decode(xdr
, sizeof(*p
));
1122 if (decode_write_chunk(xdr
, length
))
1128 rpcrdma_decode_msg(struct rpcrdma_xprt
*r_xprt
, struct rpcrdma_rep
*rep
,
1129 struct rpc_rqst
*rqst
)
1131 struct xdr_stream
*xdr
= &rep
->rr_stream
;
1132 u32 writelist
, replychunk
, rpclen
;
1135 /* Decode the chunk lists */
1136 if (decode_read_list(xdr
))
1138 if (decode_write_list(xdr
, &writelist
))
1140 if (decode_reply_chunk(xdr
, &replychunk
))
1143 /* RDMA_MSG sanity checks */
1144 if (unlikely(replychunk
))
1147 /* Build the RPC reply's Payload stream in rqst->rq_rcv_buf */
1148 base
= (char *)xdr_inline_decode(xdr
, 0);
1149 rpclen
= xdr_stream_remaining(xdr
);
1150 r_xprt
->rx_stats
.fixup_copy_count
+=
1151 rpcrdma_inline_fixup(rqst
, base
, rpclen
, writelist
& 3);
1153 r_xprt
->rx_stats
.total_rdma_reply
+= writelist
;
1154 return rpclen
+ xdr_align_size(writelist
);
1158 rpcrdma_decode_nomsg(struct rpcrdma_xprt
*r_xprt
, struct rpcrdma_rep
*rep
)
1160 struct xdr_stream
*xdr
= &rep
->rr_stream
;
1161 u32 writelist
, replychunk
;
1163 /* Decode the chunk lists */
1164 if (decode_read_list(xdr
))
1166 if (decode_write_list(xdr
, &writelist
))
1168 if (decode_reply_chunk(xdr
, &replychunk
))
1171 /* RDMA_NOMSG sanity checks */
1172 if (unlikely(writelist
))
1174 if (unlikely(!replychunk
))
1177 /* Reply chunk buffer already is the reply vector */
1178 r_xprt
->rx_stats
.total_rdma_reply
+= replychunk
;
1183 rpcrdma_decode_error(struct rpcrdma_xprt
*r_xprt
, struct rpcrdma_rep
*rep
,
1184 struct rpc_rqst
*rqst
)
1186 struct xdr_stream
*xdr
= &rep
->rr_stream
;
1189 p
= xdr_inline_decode(xdr
, sizeof(*p
));
1195 p
= xdr_inline_decode(xdr
, 2 * sizeof(*p
));
1198 dprintk("RPC: %5u: %s: server reports version error (%u-%u)\n",
1199 rqst
->rq_task
->tk_pid
, __func__
,
1200 be32_to_cpup(p
), be32_to_cpu(*(p
+ 1)));
1203 dprintk("RPC: %5u: %s: server reports header decoding error\n",
1204 rqst
->rq_task
->tk_pid
, __func__
);
1207 dprintk("RPC: %5u: %s: server reports unrecognized error %d\n",
1208 rqst
->rq_task
->tk_pid
, __func__
, be32_to_cpup(p
));
1211 r_xprt
->rx_stats
.bad_reply_count
++;
1215 /* Process received RPC/RDMA messages.
1217 * Errors must result in the RPC task either being awakened, or
1218 * allowed to timeout, to discover the errors at that time.
1221 rpcrdma_reply_handler(struct work_struct
*work
)
1223 struct rpcrdma_rep
*rep
=
1224 container_of(work
, struct rpcrdma_rep
, rr_work
);
1225 struct rpcrdma_xprt
*r_xprt
= rep
->rr_rxprt
;
1226 struct rpc_xprt
*xprt
= &r_xprt
->rx_xprt
;
1227 struct xdr_stream
*xdr
= &rep
->rr_stream
;
1228 struct rpcrdma_req
*req
;
1229 struct rpc_rqst
*rqst
;
1230 __be32
*p
, xid
, vers
, proc
;
1234 dprintk("RPC: %s: incoming rep %p\n", __func__
, rep
);
1236 if (rep
->rr_hdrbuf
.head
[0].iov_len
== 0)
1239 xdr_init_decode(xdr
, &rep
->rr_hdrbuf
,
1240 rep
->rr_hdrbuf
.head
[0].iov_base
);
1242 /* Fixed transport header fields */
1243 p
= xdr_inline_decode(xdr
, 4 * sizeof(*p
));
1245 goto out_shortreply
;
1251 if (rpcrdma_is_bcall(r_xprt
, rep
, xid
, proc
))
1254 /* Match incoming rpcrdma_rep to an rpcrdma_req to
1255 * get context for handling any incoming chunks.
1257 spin_lock(&xprt
->recv_lock
);
1258 rqst
= xprt_lookup_rqst(xprt
, xid
);
1261 xprt_pin_rqst(rqst
);
1262 spin_unlock(&xprt
->recv_lock
);
1263 req
= rpcr_to_rdmar(rqst
);
1264 req
->rl_reply
= rep
;
1266 dprintk("RPC: %s: reply %p completes request %p (xid 0x%08x)\n",
1267 __func__
, rep
, req
, be32_to_cpu(xid
));
1269 /* Invalidate and unmap the data payloads before waking the
1270 * waiting application. This guarantees the memory regions
1271 * are properly fenced from the server before the application
1272 * accesses the data. It also ensures proper send flow control:
1273 * waking the next RPC waits until this RPC has relinquished
1274 * all its Send Queue entries.
1276 if (!list_empty(&req
->rl_registered
)) {
1277 rpcrdma_mark_remote_invalidation(&req
->rl_registered
, rep
);
1278 r_xprt
->rx_ia
.ri_ops
->ro_unmap_sync(r_xprt
,
1279 &req
->rl_registered
);
1282 xprt
->reestablish_timeout
= 0;
1283 if (vers
!= rpcrdma_version
)
1284 goto out_badversion
;
1288 status
= rpcrdma_decode_msg(r_xprt
, rep
, rqst
);
1291 status
= rpcrdma_decode_nomsg(r_xprt
, rep
);
1294 status
= rpcrdma_decode_error(r_xprt
, rep
, rqst
);
1303 spin_lock(&xprt
->recv_lock
);
1305 xprt
->cwnd
= atomic_read(&r_xprt
->rx_buf
.rb_credits
) << RPC_CWNDSHIFT
;
1306 if (xprt
->cwnd
> cwnd
)
1307 xprt_release_rqst_cong(rqst
->rq_task
);
1309 xprt_complete_rqst(rqst
->rq_task
, status
);
1310 xprt_unpin_rqst(rqst
);
1311 spin_unlock(&xprt
->recv_lock
);
1312 dprintk("RPC: %s: xprt_complete_rqst(0x%p, 0x%p, %d)\n",
1313 __func__
, xprt
, rqst
, status
);
1317 rpcrdma_recv_buffer_put(rep
);
1318 if (r_xprt
->rx_ep
.rep_connected
== 1) {
1319 r_xprt
->rx_ep
.rep_connected
= -EIO
;
1320 rpcrdma_conn_func(&r_xprt
->rx_ep
);
1324 /* If the incoming reply terminated a pending RPC, the next
1325 * RPC call will post a replacement receive buffer as it is
1329 dprintk("RPC: %s: invalid version %d\n",
1330 __func__
, be32_to_cpu(vers
));
1332 r_xprt
->rx_stats
.bad_reply_count
++;
1336 dprintk("RPC: %5u %s: invalid rpcrdma reply (type %u)\n",
1337 rqst
->rq_task
->tk_pid
, __func__
, be32_to_cpu(proc
));
1338 r_xprt
->rx_stats
.bad_reply_count
++;
1342 /* The req was still available, but by the time the recv_lock
1343 * was acquired, the rqst and task had been released. Thus the RPC
1344 * has already been terminated.
1347 spin_unlock(&xprt
->recv_lock
);
1348 dprintk("RPC: %s: no match for incoming xid 0x%08x\n",
1349 __func__
, be32_to_cpu(xid
));
1353 dprintk("RPC: %s: short/invalid reply\n", __func__
);
1356 /* If no pending RPC transaction was matched, post a replacement
1357 * receive buffer before returning.
1360 r_xprt
->rx_stats
.bad_reply_count
++;
1361 if (rpcrdma_ep_post_recv(&r_xprt
->rx_ia
, rep
))
1362 rpcrdma_recv_buffer_put(rep
);