2 * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
4 * This software is available to you under a choice of one of two
5 * licenses. You may choose to be licensed under the terms of the GNU
6 * General Public License (GPL) Version 2, available from the file
7 * COPYING in the main directory of this source tree, or the BSD-type
10 * Redistribution and use in source and binary forms, with or without
11 * modification, are permitted provided that the following conditions
14 * Redistributions of source code must retain the above copyright
15 * notice, this list of conditions and the following disclaimer.
17 * Redistributions in binary form must reproduce the above
18 * copyright notice, this list of conditions and the following
19 * disclaimer in the documentation and/or other materials provided
20 * with the distribution.
22 * Neither the name of the Network Appliance, Inc. nor the names of
23 * its contributors may be used to endorse or promote products
24 * derived from this software without specific prior written
27 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
28 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
29 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
30 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
31 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
32 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
33 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
34 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
35 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
36 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
37 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
43 * This file contains the guts of the RPC RDMA protocol, and
44 * does marshaling/unmarshaling, etc. It is also where interfacing
45 * to the Linux RPC framework lives.
48 #include "xprt_rdma.h"
50 #include <linux/highmem.h>
52 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
53 # define RPCDBG_FACILITY RPCDBG_TRANS
56 static const char transfertypes
[][12] = {
57 "inline", /* no chunks */
58 "read list", /* some argument via rdma read */
59 "*read list", /* entire request via rdma read */
60 "write list", /* some result via rdma write */
61 "reply chunk" /* entire reply via rdma write */
64 /* Returns size of largest RPC-over-RDMA header in a Call message
66 * The largest Call header contains a full-size Read list and a
67 * minimal Reply chunk.
69 static unsigned int rpcrdma_max_call_header_size(unsigned int maxsegs
)
73 /* Fixed header fields and list discriminators */
74 size
= RPCRDMA_HDRLEN_MIN
;
76 /* Maximum Read list size */
77 maxsegs
+= 2; /* segment for head and tail buffers */
78 size
= maxsegs
* sizeof(struct rpcrdma_read_chunk
);
80 /* Minimal Read chunk size */
81 size
+= sizeof(__be32
); /* segment count */
82 size
+= sizeof(struct rpcrdma_segment
);
83 size
+= sizeof(__be32
); /* list discriminator */
85 dprintk("RPC: %s: max call header size = %u\n",
90 /* Returns size of largest RPC-over-RDMA header in a Reply message
92 * There is only one Write list or one Reply chunk per Reply
93 * message. The larger list is the Write list.
95 static unsigned int rpcrdma_max_reply_header_size(unsigned int maxsegs
)
99 /* Fixed header fields and list discriminators */
100 size
= RPCRDMA_HDRLEN_MIN
;
102 /* Maximum Write list size */
103 maxsegs
+= 2; /* segment for head and tail buffers */
104 size
= sizeof(__be32
); /* segment count */
105 size
+= maxsegs
* sizeof(struct rpcrdma_segment
);
106 size
+= sizeof(__be32
); /* list discriminator */
108 dprintk("RPC: %s: max reply header size = %u\n",
113 void rpcrdma_set_max_header_sizes(struct rpcrdma_xprt
*r_xprt
)
115 struct rpcrdma_create_data_internal
*cdata
= &r_xprt
->rx_data
;
116 struct rpcrdma_ia
*ia
= &r_xprt
->rx_ia
;
117 unsigned int maxsegs
= ia
->ri_max_segs
;
119 ia
->ri_max_inline_write
= cdata
->inline_wsize
-
120 rpcrdma_max_call_header_size(maxsegs
);
121 ia
->ri_max_inline_read
= cdata
->inline_rsize
-
122 rpcrdma_max_reply_header_size(maxsegs
);
125 /* The client can send a request inline as long as the RPCRDMA header
126 * plus the RPC call fit under the transport's inline limit. If the
127 * combined call message size exceeds that limit, the client must use
128 * a Read chunk for this operation.
130 * A Read chunk is also required if sending the RPC call inline would
131 * exceed this device's max_sge limit.
133 static bool rpcrdma_args_inline(struct rpcrdma_xprt
*r_xprt
,
134 struct rpc_rqst
*rqst
)
136 struct xdr_buf
*xdr
= &rqst
->rq_snd_buf
;
137 unsigned int count
, remaining
, offset
;
139 if (xdr
->len
> r_xprt
->rx_ia
.ri_max_inline_write
)
143 remaining
= xdr
->page_len
;
144 offset
= xdr
->page_base
& ~PAGE_MASK
;
147 remaining
-= min_t(unsigned int,
148 PAGE_SIZE
- offset
, remaining
);
150 if (++count
> r_xprt
->rx_ia
.ri_max_send_sges
)
158 /* The client can't know how large the actual reply will be. Thus it
159 * plans for the largest possible reply for that particular ULP
160 * operation. If the maximum combined reply message size exceeds that
161 * limit, the client must provide a write list or a reply chunk for
164 static bool rpcrdma_results_inline(struct rpcrdma_xprt
*r_xprt
,
165 struct rpc_rqst
*rqst
)
167 struct rpcrdma_ia
*ia
= &r_xprt
->rx_ia
;
169 return rqst
->rq_rcv_buf
.buflen
<= ia
->ri_max_inline_read
;
172 /* Split "vec" on page boundaries into segments. FMR registers pages,
173 * not a byte range. Other modes coalesce these segments into a single
177 rpcrdma_convert_kvec(struct kvec
*vec
, struct rpcrdma_mr_seg
*seg
, int n
)
183 base
= vec
->iov_base
;
184 page_offset
= offset_in_page(base
);
185 remaining
= vec
->iov_len
;
186 while (remaining
&& n
< RPCRDMA_MAX_SEGS
) {
187 seg
[n
].mr_page
= NULL
;
188 seg
[n
].mr_offset
= base
;
189 seg
[n
].mr_len
= min_t(u32
, PAGE_SIZE
- page_offset
, remaining
);
190 remaining
-= seg
[n
].mr_len
;
191 base
+= seg
[n
].mr_len
;
199 * Chunk assembly from upper layer xdr_buf.
201 * Prepare the passed-in xdr_buf into representation as RPC/RDMA chunk
202 * elements. Segments are then coalesced when registered, if possible
203 * within the selected memreg mode.
205 * Returns positive number of segments converted, or a negative errno.
209 rpcrdma_convert_iovs(struct rpcrdma_xprt
*r_xprt
, struct xdr_buf
*xdrbuf
,
210 unsigned int pos
, enum rpcrdma_chunktype type
,
211 struct rpcrdma_mr_seg
*seg
)
213 int len
, n
, p
, page_base
;
214 struct page
**ppages
;
218 n
= rpcrdma_convert_kvec(&xdrbuf
->head
[0], seg
, n
);
219 if (n
== RPCRDMA_MAX_SEGS
)
223 len
= xdrbuf
->page_len
;
224 ppages
= xdrbuf
->pages
+ (xdrbuf
->page_base
>> PAGE_SHIFT
);
225 page_base
= xdrbuf
->page_base
& ~PAGE_MASK
;
227 while (len
&& n
< RPCRDMA_MAX_SEGS
) {
229 /* alloc the pagelist for receiving buffer */
230 ppages
[p
] = alloc_page(GFP_ATOMIC
);
234 seg
[n
].mr_page
= ppages
[p
];
235 seg
[n
].mr_offset
= (void *)(unsigned long) page_base
;
236 seg
[n
].mr_len
= min_t(u32
, PAGE_SIZE
- page_base
, len
);
237 if (seg
[n
].mr_len
> PAGE_SIZE
)
239 len
-= seg
[n
].mr_len
;
242 page_base
= 0; /* page offset only applies to first page */
245 /* Message overflows the seg array */
246 if (len
&& n
== RPCRDMA_MAX_SEGS
)
249 /* When encoding a Read chunk, the tail iovec contains an
250 * XDR pad and may be omitted.
252 if (type
== rpcrdma_readch
&& r_xprt
->rx_ia
.ri_implicit_roundup
)
255 /* When encoding a Write chunk, some servers need to see an
256 * extra segment for non-XDR-aligned Write chunks. The upper
257 * layer provides space in the tail iovec that may be used
260 if (type
== rpcrdma_writech
&& r_xprt
->rx_ia
.ri_implicit_roundup
)
263 if (xdrbuf
->tail
[0].iov_len
) {
264 n
= rpcrdma_convert_kvec(&xdrbuf
->tail
[0], seg
, n
);
265 if (n
== RPCRDMA_MAX_SEGS
)
272 pr_err("rpcrdma: segment array overflow\n");
276 static inline __be32
*
277 xdr_encode_rdma_segment(__be32
*iptr
, struct rpcrdma_mw
*mw
)
279 *iptr
++ = cpu_to_be32(mw
->mw_handle
);
280 *iptr
++ = cpu_to_be32(mw
->mw_length
);
281 return xdr_encode_hyper(iptr
, mw
->mw_offset
);
284 /* XDR-encode the Read list. Supports encoding a list of read
285 * segments that belong to a single read chunk.
287 * Encoding key for single-list chunks (HLOO = Handle32 Length32 Offset64):
289 * Read chunklist (a linked list):
290 * N elements, position P (same P for all chunks of same arg!):
291 * 1 - PHLOO - 1 - PHLOO - ... - 1 - PHLOO - 0
293 * Returns a pointer to the XDR word in the RDMA header following
294 * the end of the Read list, or an error pointer.
297 rpcrdma_encode_read_list(struct rpcrdma_xprt
*r_xprt
,
298 struct rpcrdma_req
*req
, struct rpc_rqst
*rqst
,
299 __be32
*iptr
, enum rpcrdma_chunktype rtype
)
301 struct rpcrdma_mr_seg
*seg
;
302 struct rpcrdma_mw
*mw
;
306 if (rtype
== rpcrdma_noch
) {
307 *iptr
++ = xdr_zero
; /* item not present */
311 pos
= rqst
->rq_snd_buf
.head
[0].iov_len
;
312 if (rtype
== rpcrdma_areadch
)
314 seg
= req
->rl_segments
;
315 nsegs
= rpcrdma_convert_iovs(r_xprt
, &rqst
->rq_snd_buf
, pos
,
318 return ERR_PTR(nsegs
);
321 n
= r_xprt
->rx_ia
.ri_ops
->ro_map(r_xprt
, seg
, nsegs
,
325 rpcrdma_push_mw(mw
, &req
->rl_registered
);
327 *iptr
++ = xdr_one
; /* item present */
329 /* All read segments in this chunk
330 * have the same "position".
332 *iptr
++ = cpu_to_be32(pos
);
333 iptr
= xdr_encode_rdma_segment(iptr
, mw
);
335 dprintk("RPC: %5u %s: pos %u %u@0x%016llx:0x%08x (%s)\n",
336 rqst
->rq_task
->tk_pid
, __func__
, pos
,
337 mw
->mw_length
, (unsigned long long)mw
->mw_offset
,
338 mw
->mw_handle
, n
< nsegs
? "more" : "last");
340 r_xprt
->rx_stats
.read_chunk_count
++;
345 /* Finish Read list */
346 *iptr
++ = xdr_zero
; /* Next item not present */
350 /* XDR-encode the Write list. Supports encoding a list containing
351 * one array of plain segments that belong to a single write chunk.
353 * Encoding key for single-list chunks (HLOO = Handle32 Length32 Offset64):
355 * Write chunklist (a list of (one) counted array):
357 * 1 - N - HLOO - HLOO - ... - HLOO - 0
359 * Returns a pointer to the XDR word in the RDMA header following
360 * the end of the Write list, or an error pointer.
363 rpcrdma_encode_write_list(struct rpcrdma_xprt
*r_xprt
, struct rpcrdma_req
*req
,
364 struct rpc_rqst
*rqst
, __be32
*iptr
,
365 enum rpcrdma_chunktype wtype
)
367 struct rpcrdma_mr_seg
*seg
;
368 struct rpcrdma_mw
*mw
;
369 int n
, nsegs
, nchunks
;
372 if (wtype
!= rpcrdma_writech
) {
373 *iptr
++ = xdr_zero
; /* no Write list present */
377 seg
= req
->rl_segments
;
378 nsegs
= rpcrdma_convert_iovs(r_xprt
, &rqst
->rq_rcv_buf
,
379 rqst
->rq_rcv_buf
.head
[0].iov_len
,
382 return ERR_PTR(nsegs
);
384 *iptr
++ = xdr_one
; /* Write list present */
385 segcount
= iptr
++; /* save location of segment count */
389 n
= r_xprt
->rx_ia
.ri_ops
->ro_map(r_xprt
, seg
, nsegs
,
393 rpcrdma_push_mw(mw
, &req
->rl_registered
);
395 iptr
= xdr_encode_rdma_segment(iptr
, mw
);
397 dprintk("RPC: %5u %s: %u@0x016%llx:0x%08x (%s)\n",
398 rqst
->rq_task
->tk_pid
, __func__
,
399 mw
->mw_length
, (unsigned long long)mw
->mw_offset
,
400 mw
->mw_handle
, n
< nsegs
? "more" : "last");
402 r_xprt
->rx_stats
.write_chunk_count
++;
403 r_xprt
->rx_stats
.total_rdma_request
+= seg
->mr_len
;
409 /* Update count of segments in this Write chunk */
410 *segcount
= cpu_to_be32(nchunks
);
412 /* Finish Write list */
413 *iptr
++ = xdr_zero
; /* Next item not present */
417 /* XDR-encode the Reply chunk. Supports encoding an array of plain
418 * segments that belong to a single write (reply) chunk.
420 * Encoding key for single-list chunks (HLOO = Handle32 Length32 Offset64):
422 * Reply chunk (a counted array):
424 * 1 - N - HLOO - HLOO - ... - HLOO
426 * Returns a pointer to the XDR word in the RDMA header following
427 * the end of the Reply chunk, or an error pointer.
430 rpcrdma_encode_reply_chunk(struct rpcrdma_xprt
*r_xprt
,
431 struct rpcrdma_req
*req
, struct rpc_rqst
*rqst
,
432 __be32
*iptr
, enum rpcrdma_chunktype wtype
)
434 struct rpcrdma_mr_seg
*seg
;
435 struct rpcrdma_mw
*mw
;
436 int n
, nsegs
, nchunks
;
439 if (wtype
!= rpcrdma_replych
) {
440 *iptr
++ = xdr_zero
; /* no Reply chunk present */
444 seg
= req
->rl_segments
;
445 nsegs
= rpcrdma_convert_iovs(r_xprt
, &rqst
->rq_rcv_buf
, 0, wtype
, seg
);
447 return ERR_PTR(nsegs
);
449 *iptr
++ = xdr_one
; /* Reply chunk present */
450 segcount
= iptr
++; /* save location of segment count */
454 n
= r_xprt
->rx_ia
.ri_ops
->ro_map(r_xprt
, seg
, nsegs
,
458 rpcrdma_push_mw(mw
, &req
->rl_registered
);
460 iptr
= xdr_encode_rdma_segment(iptr
, mw
);
462 dprintk("RPC: %5u %s: %u@0x%016llx:0x%08x (%s)\n",
463 rqst
->rq_task
->tk_pid
, __func__
,
464 mw
->mw_length
, (unsigned long long)mw
->mw_offset
,
465 mw
->mw_handle
, n
< nsegs
? "more" : "last");
467 r_xprt
->rx_stats
.reply_chunk_count
++;
468 r_xprt
->rx_stats
.total_rdma_request
+= seg
->mr_len
;
474 /* Update count of segments in the Reply chunk */
475 *segcount
= cpu_to_be32(nchunks
);
480 /* Prepare the RPC-over-RDMA header SGE.
483 rpcrdma_prepare_hdr_sge(struct rpcrdma_ia
*ia
, struct rpcrdma_req
*req
,
486 struct rpcrdma_regbuf
*rb
= req
->rl_rdmabuf
;
487 struct ib_sge
*sge
= &req
->rl_send_sge
[0];
489 if (unlikely(!rpcrdma_regbuf_is_mapped(rb
))) {
490 if (!__rpcrdma_dma_map_regbuf(ia
, rb
))
492 sge
->addr
= rdmab_addr(rb
);
493 sge
->lkey
= rdmab_lkey(rb
);
497 ib_dma_sync_single_for_device(rdmab_device(rb
), sge
->addr
,
498 sge
->length
, DMA_TO_DEVICE
);
499 req
->rl_send_wr
.num_sge
++;
503 /* Prepare the Send SGEs. The head and tail iovec, and each entry
504 * in the page list, gets its own SGE.
507 rpcrdma_prepare_msg_sges(struct rpcrdma_ia
*ia
, struct rpcrdma_req
*req
,
508 struct xdr_buf
*xdr
, enum rpcrdma_chunktype rtype
)
510 unsigned int sge_no
, page_base
, len
, remaining
;
511 struct rpcrdma_regbuf
*rb
= req
->rl_sendbuf
;
512 struct ib_device
*device
= ia
->ri_device
;
513 struct ib_sge
*sge
= req
->rl_send_sge
;
514 u32 lkey
= ia
->ri_pd
->local_dma_lkey
;
515 struct page
*page
, **ppages
;
517 /* The head iovec is straightforward, as it is already
518 * DMA-mapped. Sync the content that has changed.
520 if (!rpcrdma_dma_map_regbuf(ia
, rb
))
523 sge
[sge_no
].addr
= rdmab_addr(rb
);
524 sge
[sge_no
].length
= xdr
->head
[0].iov_len
;
525 sge
[sge_no
].lkey
= rdmab_lkey(rb
);
526 ib_dma_sync_single_for_device(rdmab_device(rb
), sge
[sge_no
].addr
,
527 sge
[sge_no
].length
, DMA_TO_DEVICE
);
529 /* If there is a Read chunk, the page list is being handled
530 * via explicit RDMA, and thus is skipped here. However, the
531 * tail iovec may include an XDR pad for the page list, as
532 * well as additional content, and may not reside in the
533 * same page as the head iovec.
535 if (rtype
== rpcrdma_readch
) {
536 len
= xdr
->tail
[0].iov_len
;
538 /* Do not include the tail if it is only an XDR pad */
542 page
= virt_to_page(xdr
->tail
[0].iov_base
);
543 page_base
= (unsigned long)xdr
->tail
[0].iov_base
& ~PAGE_MASK
;
545 /* If the content in the page list is an odd length,
546 * xdr_write_pages() has added a pad at the beginning
547 * of the tail iovec. Force the tail's non-pad content
548 * to land at the next XDR position in the Send message.
550 page_base
+= len
& 3;
555 /* If there is a page list present, temporarily DMA map
556 * and prepare an SGE for each page to be sent.
559 ppages
= xdr
->pages
+ (xdr
->page_base
>> PAGE_SHIFT
);
560 page_base
= xdr
->page_base
& ~PAGE_MASK
;
561 remaining
= xdr
->page_len
;
564 if (sge_no
> RPCRDMA_MAX_SEND_SGES
- 2)
565 goto out_mapping_overflow
;
567 len
= min_t(u32
, PAGE_SIZE
- page_base
, remaining
);
568 sge
[sge_no
].addr
= ib_dma_map_page(device
, *ppages
,
571 if (ib_dma_mapping_error(device
, sge
[sge_no
].addr
))
572 goto out_mapping_err
;
573 sge
[sge_no
].length
= len
;
574 sge
[sge_no
].lkey
= lkey
;
576 req
->rl_mapped_sges
++;
583 /* The tail iovec is not always constructed in the same
584 * page where the head iovec resides (see, for example,
585 * gss_wrap_req_priv). To neatly accommodate that case,
586 * DMA map it separately.
588 if (xdr
->tail
[0].iov_len
) {
589 page
= virt_to_page(xdr
->tail
[0].iov_base
);
590 page_base
= (unsigned long)xdr
->tail
[0].iov_base
& ~PAGE_MASK
;
591 len
= xdr
->tail
[0].iov_len
;
595 sge
[sge_no
].addr
= ib_dma_map_page(device
, page
,
598 if (ib_dma_mapping_error(device
, sge
[sge_no
].addr
))
599 goto out_mapping_err
;
600 sge
[sge_no
].length
= len
;
601 sge
[sge_no
].lkey
= lkey
;
602 req
->rl_mapped_sges
++;
606 req
->rl_send_wr
.num_sge
= sge_no
+ 1;
609 out_mapping_overflow
:
610 pr_err("rpcrdma: too many Send SGEs (%u)\n", sge_no
);
614 pr_err("rpcrdma: Send mapping error\n");
619 rpcrdma_prepare_send_sges(struct rpcrdma_ia
*ia
, struct rpcrdma_req
*req
,
620 u32 hdrlen
, struct xdr_buf
*xdr
,
621 enum rpcrdma_chunktype rtype
)
623 req
->rl_send_wr
.num_sge
= 0;
624 req
->rl_mapped_sges
= 0;
626 if (!rpcrdma_prepare_hdr_sge(ia
, req
, hdrlen
))
629 if (rtype
!= rpcrdma_areadch
)
630 if (!rpcrdma_prepare_msg_sges(ia
, req
, xdr
, rtype
))
636 pr_err("rpcrdma: failed to DMA map a Send buffer\n");
641 rpcrdma_unmap_sges(struct rpcrdma_ia
*ia
, struct rpcrdma_req
*req
)
643 struct ib_device
*device
= ia
->ri_device
;
647 sge
= &req
->rl_send_sge
[2];
648 for (count
= req
->rl_mapped_sges
; count
--; sge
++)
649 ib_dma_unmap_page(device
, sge
->addr
, sge
->length
,
651 req
->rl_mapped_sges
= 0;
655 * Marshal a request: the primary job of this routine is to choose
656 * the transfer modes. See comments below.
658 * Returns zero on success, otherwise a negative errno.
662 rpcrdma_marshal_req(struct rpc_rqst
*rqst
)
664 struct rpc_xprt
*xprt
= rqst
->rq_xprt
;
665 struct rpcrdma_xprt
*r_xprt
= rpcx_to_rdmax(xprt
);
666 struct rpcrdma_req
*req
= rpcr_to_rdmar(rqst
);
667 enum rpcrdma_chunktype rtype
, wtype
;
668 struct rpcrdma_msg
*headerp
;
674 #if defined(CONFIG_SUNRPC_BACKCHANNEL)
675 if (test_bit(RPC_BC_PA_IN_USE
, &rqst
->rq_bc_pa_state
))
676 return rpcrdma_bc_marshal_reply(rqst
);
679 headerp
= rdmab_to_msg(req
->rl_rdmabuf
);
680 /* don't byte-swap XID, it's already done in request */
681 headerp
->rm_xid
= rqst
->rq_xid
;
682 headerp
->rm_vers
= rpcrdma_version
;
683 headerp
->rm_credit
= cpu_to_be32(r_xprt
->rx_buf
.rb_max_requests
);
684 headerp
->rm_type
= rdma_msg
;
686 /* When the ULP employs a GSS flavor that guarantees integrity
687 * or privacy, direct data placement of individual data items
690 ddp_allowed
= !(rqst
->rq_cred
->cr_auth
->au_flags
&
691 RPCAUTH_AUTH_DATATOUCH
);
694 * Chunks needed for results?
696 * o If the expected result is under the inline threshold, all ops
698 * o Large read ops return data as write chunk(s), header as
700 * o Large non-read ops return as a single reply chunk.
702 if (rpcrdma_results_inline(r_xprt
, rqst
))
703 wtype
= rpcrdma_noch
;
704 else if (ddp_allowed
&& rqst
->rq_rcv_buf
.flags
& XDRBUF_READ
)
705 wtype
= rpcrdma_writech
;
707 wtype
= rpcrdma_replych
;
710 * Chunks needed for arguments?
712 * o If the total request is under the inline threshold, all ops
713 * are sent as inline.
714 * o Large write ops transmit data as read chunk(s), header as
716 * o Large non-write ops are sent with the entire message as a
717 * single read chunk (protocol 0-position special case).
719 * This assumes that the upper layer does not present a request
720 * that both has a data payload, and whose non-data arguments
721 * by themselves are larger than the inline threshold.
723 if (rpcrdma_args_inline(r_xprt
, rqst
)) {
724 rtype
= rpcrdma_noch
;
725 rpclen
= rqst
->rq_snd_buf
.len
;
726 } else if (ddp_allowed
&& rqst
->rq_snd_buf
.flags
& XDRBUF_WRITE
) {
727 rtype
= rpcrdma_readch
;
728 rpclen
= rqst
->rq_snd_buf
.head
[0].iov_len
+
729 rqst
->rq_snd_buf
.tail
[0].iov_len
;
731 r_xprt
->rx_stats
.nomsg_call_count
++;
732 headerp
->rm_type
= htonl(RDMA_NOMSG
);
733 rtype
= rpcrdma_areadch
;
737 /* This implementation supports the following combinations
738 * of chunk lists in one RPC-over-RDMA Call message:
743 * - Read list + Reply chunk
745 * It might not yet support the following combinations:
747 * - Read list + Write list
749 * It does not support the following combinations:
751 * - Write list + Reply chunk
752 * - Read list + Write list + Reply chunk
754 * This implementation supports only a single chunk in each
755 * Read or Write list. Thus for example the client cannot
756 * send a Call message with a Position Zero Read chunk and a
757 * regular Read chunk at the same time.
759 iptr
= headerp
->rm_body
.rm_chunks
;
760 iptr
= rpcrdma_encode_read_list(r_xprt
, req
, rqst
, iptr
, rtype
);
763 iptr
= rpcrdma_encode_write_list(r_xprt
, req
, rqst
, iptr
, wtype
);
766 iptr
= rpcrdma_encode_reply_chunk(r_xprt
, req
, rqst
, iptr
, wtype
);
769 hdrlen
= (unsigned char *)iptr
- (unsigned char *)headerp
;
771 dprintk("RPC: %5u %s: %s/%s: hdrlen %zd rpclen %zd\n",
772 rqst
->rq_task
->tk_pid
, __func__
,
773 transfertypes
[rtype
], transfertypes
[wtype
],
776 if (!rpcrdma_prepare_send_sges(&r_xprt
->rx_ia
, req
, hdrlen
,
777 &rqst
->rq_snd_buf
, rtype
)) {
778 iptr
= ERR_PTR(-EIO
);
784 if (PTR_ERR(iptr
) != -ENOBUFS
) {
785 pr_err("rpcrdma: rpcrdma_marshal_req failed, status %ld\n",
787 r_xprt
->rx_stats
.failed_marshal_count
++;
789 return PTR_ERR(iptr
);
793 * Chase down a received write or reply chunklist to get length
794 * RDMA'd by server. See map at rpcrdma_create_chunks()! :-)
797 rpcrdma_count_chunks(struct rpcrdma_rep
*rep
, int wrchunk
, __be32
**iptrp
)
799 unsigned int i
, total_len
;
800 struct rpcrdma_write_chunk
*cur_wchunk
;
801 char *base
= (char *)rdmab_to_msg(rep
->rr_rdmabuf
);
803 i
= be32_to_cpu(**iptrp
);
804 cur_wchunk
= (struct rpcrdma_write_chunk
*) (*iptrp
+ 1);
807 struct rpcrdma_segment
*seg
= &cur_wchunk
->wc_target
;
810 xdr_decode_hyper((__be32
*)&seg
->rs_offset
, &off
);
811 dprintk("RPC: %s: chunk %d@0x%016llx:0x%08x\n",
813 be32_to_cpu(seg
->rs_length
),
814 (unsigned long long)off
,
815 be32_to_cpu(seg
->rs_handle
));
817 total_len
+= be32_to_cpu(seg
->rs_length
);
820 /* check and adjust for properly terminated write chunk */
822 __be32
*w
= (__be32
*) cur_wchunk
;
823 if (*w
++ != xdr_zero
)
825 cur_wchunk
= (struct rpcrdma_write_chunk
*) w
;
827 if ((char *)cur_wchunk
> base
+ rep
->rr_len
)
830 *iptrp
= (__be32
*) cur_wchunk
;
835 * rpcrdma_inline_fixup - Scatter inline received data into rqst's iovecs
836 * @rqst: controlling RPC request
837 * @srcp: points to RPC message payload in receive buffer
838 * @copy_len: remaining length of receive buffer content
839 * @pad: Write chunk pad bytes needed (zero for pure inline)
841 * The upper layer has set the maximum number of bytes it can
842 * receive in each component of rq_rcv_buf. These values are set in
843 * the head.iov_len, page_len, tail.iov_len, and buflen fields.
845 * Unlike the TCP equivalent (xdr_partial_copy_from_skb), in
846 * many cases this function simply updates iov_base pointers in
847 * rq_rcv_buf to point directly to the received reply data, to
848 * avoid copying reply data.
850 * Returns the count of bytes which had to be memcopied.
853 rpcrdma_inline_fixup(struct rpc_rqst
*rqst
, char *srcp
, int copy_len
, int pad
)
855 unsigned long fixup_copy_count
;
856 int i
, npages
, curlen
;
858 struct page
**ppages
;
861 /* The head iovec is redirected to the RPC reply message
862 * in the receive buffer, to avoid a memcopy.
864 rqst
->rq_rcv_buf
.head
[0].iov_base
= srcp
;
865 rqst
->rq_private_buf
.head
[0].iov_base
= srcp
;
867 /* The contents of the receive buffer that follow
868 * head.iov_len bytes are copied into the page list.
870 curlen
= rqst
->rq_rcv_buf
.head
[0].iov_len
;
871 if (curlen
> copy_len
)
873 dprintk("RPC: %s: srcp 0x%p len %d hdrlen %d\n",
874 __func__
, srcp
, copy_len
, curlen
);
878 page_base
= rqst
->rq_rcv_buf
.page_base
;
879 ppages
= rqst
->rq_rcv_buf
.pages
+ (page_base
>> PAGE_SHIFT
);
880 page_base
&= ~PAGE_MASK
;
881 fixup_copy_count
= 0;
882 if (copy_len
&& rqst
->rq_rcv_buf
.page_len
) {
885 pagelist_len
= rqst
->rq_rcv_buf
.page_len
;
886 if (pagelist_len
> copy_len
)
887 pagelist_len
= copy_len
;
888 npages
= PAGE_ALIGN(page_base
+ pagelist_len
) >> PAGE_SHIFT
;
889 for (i
= 0; i
< npages
; i
++) {
890 curlen
= PAGE_SIZE
- page_base
;
891 if (curlen
> pagelist_len
)
892 curlen
= pagelist_len
;
894 dprintk("RPC: %s: page %d"
895 " srcp 0x%p len %d curlen %d\n",
896 __func__
, i
, srcp
, copy_len
, curlen
);
897 destp
= kmap_atomic(ppages
[i
]);
898 memcpy(destp
+ page_base
, srcp
, curlen
);
899 flush_dcache_page(ppages
[i
]);
900 kunmap_atomic(destp
);
903 fixup_copy_count
+= curlen
;
904 pagelist_len
-= curlen
;
910 /* Implicit padding for the last segment in a Write
911 * chunk is inserted inline at the front of the tail
912 * iovec. The upper layer ignores the content of
913 * the pad. Simply ensure inline content in the tail
914 * that follows the Write chunk is properly aligned.
920 /* The tail iovec is redirected to the remaining data
921 * in the receive buffer, to avoid a memcopy.
923 if (copy_len
|| pad
) {
924 rqst
->rq_rcv_buf
.tail
[0].iov_base
= srcp
;
925 rqst
->rq_private_buf
.tail
[0].iov_base
= srcp
;
928 return fixup_copy_count
;
931 #if defined(CONFIG_SUNRPC_BACKCHANNEL)
932 /* By convention, backchannel calls arrive via rdma_msg type
933 * messages, and never populate the chunk lists. This makes
934 * the RPC/RDMA header small and fixed in size, so it is
935 * straightforward to check the RPC header's direction field.
938 rpcrdma_is_bcall(struct rpcrdma_msg
*headerp
)
940 __be32
*p
= (__be32
*)headerp
;
942 if (headerp
->rm_type
!= rdma_msg
)
944 if (headerp
->rm_body
.rm_chunks
[0] != xdr_zero
)
946 if (headerp
->rm_body
.rm_chunks
[1] != xdr_zero
)
948 if (headerp
->rm_body
.rm_chunks
[2] != xdr_zero
)
952 if (p
[7] != headerp
->rm_xid
)
955 if (p
[8] != cpu_to_be32(RPC_CALL
))
960 #endif /* CONFIG_SUNRPC_BACKCHANNEL */
962 /* Process received RPC/RDMA messages.
964 * Errors must result in the RPC task either being awakened, or
965 * allowed to timeout, to discover the errors at that time.
968 rpcrdma_reply_handler(struct work_struct
*work
)
970 struct rpcrdma_rep
*rep
=
971 container_of(work
, struct rpcrdma_rep
, rr_work
);
972 struct rpcrdma_msg
*headerp
;
973 struct rpcrdma_req
*req
;
974 struct rpc_rqst
*rqst
;
975 struct rpcrdma_xprt
*r_xprt
= rep
->rr_rxprt
;
976 struct rpc_xprt
*xprt
= &r_xprt
->rx_xprt
;
978 int rdmalen
, status
, rmerr
;
981 dprintk("RPC: %s: incoming rep %p\n", __func__
, rep
);
983 if (rep
->rr_len
== RPCRDMA_BAD_LEN
)
985 if (rep
->rr_len
< RPCRDMA_HDRLEN_ERR
)
988 headerp
= rdmab_to_msg(rep
->rr_rdmabuf
);
989 #if defined(CONFIG_SUNRPC_BACKCHANNEL)
990 if (rpcrdma_is_bcall(headerp
))
994 /* Match incoming rpcrdma_rep to an rpcrdma_req to
995 * get context for handling any incoming chunks.
997 spin_lock_bh(&xprt
->transport_lock
);
998 rqst
= xprt_lookup_rqst(xprt
, headerp
->rm_xid
);
1002 req
= rpcr_to_rdmar(rqst
);
1006 /* Sanity checking has passed. We are now committed
1007 * to complete this transaction.
1009 list_del_init(&rqst
->rq_list
);
1010 spin_unlock_bh(&xprt
->transport_lock
);
1011 dprintk("RPC: %s: reply %p completes request %p (xid 0x%08x)\n",
1012 __func__
, rep
, req
, be32_to_cpu(headerp
->rm_xid
));
1014 /* from here on, the reply is no longer an orphan */
1015 req
->rl_reply
= rep
;
1016 xprt
->reestablish_timeout
= 0;
1018 if (headerp
->rm_vers
!= rpcrdma_version
)
1019 goto out_badversion
;
1021 /* check for expected message types */
1022 /* The order of some of these tests is important. */
1023 switch (headerp
->rm_type
) {
1025 /* never expect read chunks */
1026 /* never expect reply chunks (two ways to check) */
1027 /* never expect write chunks without having offered RDMA */
1028 if (headerp
->rm_body
.rm_chunks
[0] != xdr_zero
||
1029 (headerp
->rm_body
.rm_chunks
[1] == xdr_zero
&&
1030 headerp
->rm_body
.rm_chunks
[2] != xdr_zero
) ||
1031 (headerp
->rm_body
.rm_chunks
[1] != xdr_zero
&&
1032 list_empty(&req
->rl_registered
)))
1034 if (headerp
->rm_body
.rm_chunks
[1] != xdr_zero
) {
1035 /* count any expected write chunks in read reply */
1036 /* start at write chunk array count */
1037 iptr
= &headerp
->rm_body
.rm_chunks
[2];
1038 rdmalen
= rpcrdma_count_chunks(rep
, 1, &iptr
);
1039 /* check for validity, and no reply chunk after */
1040 if (rdmalen
< 0 || *iptr
++ != xdr_zero
)
1043 ((unsigned char *)iptr
- (unsigned char *)headerp
);
1044 status
= rep
->rr_len
+ rdmalen
;
1045 r_xprt
->rx_stats
.total_rdma_reply
+= rdmalen
;
1046 /* special case - last chunk may omit padding */
1048 rdmalen
= 4 - rdmalen
;
1052 /* else ordinary inline */
1054 iptr
= (__be32
*)((unsigned char *)headerp
+
1055 RPCRDMA_HDRLEN_MIN
);
1056 rep
->rr_len
-= RPCRDMA_HDRLEN_MIN
;
1057 status
= rep
->rr_len
;
1060 r_xprt
->rx_stats
.fixup_copy_count
+=
1061 rpcrdma_inline_fixup(rqst
, (char *)iptr
, rep
->rr_len
,
1066 /* never expect read or write chunks, always reply chunks */
1067 if (headerp
->rm_body
.rm_chunks
[0] != xdr_zero
||
1068 headerp
->rm_body
.rm_chunks
[1] != xdr_zero
||
1069 headerp
->rm_body
.rm_chunks
[2] != xdr_one
||
1070 list_empty(&req
->rl_registered
))
1072 iptr
= (__be32
*)((unsigned char *)headerp
+
1073 RPCRDMA_HDRLEN_MIN
);
1074 rdmalen
= rpcrdma_count_chunks(rep
, 0, &iptr
);
1077 r_xprt
->rx_stats
.total_rdma_reply
+= rdmalen
;
1078 /* Reply chunk buffer already is the reply vector - no fixup. */
1087 dprintk("RPC: %5u %s: invalid rpcrdma reply (type %u)\n",
1088 rqst
->rq_task
->tk_pid
, __func__
,
1089 be32_to_cpu(headerp
->rm_type
));
1091 r_xprt
->rx_stats
.bad_reply_count
++;
1096 /* Invalidate and flush the data payloads before waking the
1097 * waiting application. This guarantees the memory region is
1098 * properly fenced from the server before the application
1099 * accesses the data. It also ensures proper send flow
1100 * control: waking the next RPC waits until this RPC has
1101 * relinquished all its Send Queue entries.
1103 if (!list_empty(&req
->rl_registered
))
1104 r_xprt
->rx_ia
.ri_ops
->ro_unmap_sync(r_xprt
, req
);
1106 spin_lock_bh(&xprt
->transport_lock
);
1108 xprt
->cwnd
= atomic_read(&r_xprt
->rx_buf
.rb_credits
) << RPC_CWNDSHIFT
;
1109 if (xprt
->cwnd
> cwnd
)
1110 xprt_release_rqst_cong(rqst
->rq_task
);
1112 xprt_complete_rqst(rqst
->rq_task
, status
);
1113 spin_unlock_bh(&xprt
->transport_lock
);
1114 dprintk("RPC: %s: xprt_complete_rqst(0x%p, 0x%p, %d)\n",
1115 __func__
, xprt
, rqst
, status
);
1119 rpcrdma_recv_buffer_put(rep
);
1120 if (r_xprt
->rx_ep
.rep_connected
== 1) {
1121 r_xprt
->rx_ep
.rep_connected
= -EIO
;
1122 rpcrdma_conn_func(&r_xprt
->rx_ep
);
1126 #if defined(CONFIG_SUNRPC_BACKCHANNEL)
1128 rpcrdma_bc_receive_call(r_xprt
, rep
);
1132 /* If the incoming reply terminated a pending RPC, the next
1133 * RPC call will post a replacement receive buffer as it is
1137 dprintk("RPC: %s: invalid version %d\n",
1138 __func__
, be32_to_cpu(headerp
->rm_vers
));
1140 r_xprt
->rx_stats
.bad_reply_count
++;
1144 rmerr
= be32_to_cpu(headerp
->rm_body
.rm_error
.rm_err
);
1147 pr_err("%s: server reports header version error (%u-%u)\n",
1149 be32_to_cpu(headerp
->rm_body
.rm_error
.rm_vers_low
),
1150 be32_to_cpu(headerp
->rm_body
.rm_error
.rm_vers_high
));
1153 pr_err("%s: server reports header decoding error\n",
1157 pr_err("%s: server reports unknown error %d\n",
1160 status
= -EREMOTEIO
;
1161 r_xprt
->rx_stats
.bad_reply_count
++;
1164 /* If no pending RPC transaction was matched, post a replacement
1165 * receive buffer before returning.
1168 dprintk("RPC: %s: short/invalid reply\n", __func__
);
1172 spin_unlock_bh(&xprt
->transport_lock
);
1173 dprintk("RPC: %s: no match for incoming xid 0x%08x len %d\n",
1174 __func__
, be32_to_cpu(headerp
->rm_xid
),
1179 spin_unlock_bh(&xprt
->transport_lock
);
1181 "duplicate reply %p to RPC request %p: xid 0x%08x\n",
1182 __func__
, rep
, req
, be32_to_cpu(headerp
->rm_xid
));
1185 r_xprt
->rx_stats
.bad_reply_count
++;
1186 if (rpcrdma_ep_post_recv(&r_xprt
->rx_ia
, rep
))
1187 rpcrdma_recv_buffer_put(rep
);