1 // SPDX-License-Identifier: GPL-2.0 OR BSD-3-Clause
3 * Copyright (c) 2016-2018 Oracle. All rights reserved.
4 * Copyright (c) 2014 Open Grid Computing, Inc. All rights reserved.
5 * Copyright (c) 2005-2006 Network Appliance, Inc. All rights reserved.
7 * This software is available to you under a choice of one of two
8 * licenses. You may choose to be licensed under the terms of the GNU
9 * General Public License (GPL) Version 2, available from the file
10 * COPYING in the main directory of this source tree, or the BSD-type
13 * Redistribution and use in source and binary forms, with or without
14 * modification, are permitted provided that the following conditions
17 * Redistributions of source code must retain the above copyright
18 * notice, this list of conditions and the following disclaimer.
20 * Redistributions in binary form must reproduce the above
21 * copyright notice, this list of conditions and the following
22 * disclaimer in the documentation and/or other materials provided
23 * with the distribution.
25 * Neither the name of the Network Appliance, Inc. nor the names of
26 * its contributors may be used to endorse or promote products
27 * derived from this software without specific prior written
30 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
31 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
32 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
33 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
34 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
35 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
36 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
37 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
38 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
39 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
40 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
42 * Author: Tom Tucker <tom@opengridcomputing.com>
47 * The main entry point is svc_rdma_sendto. This is called by the
48 * RPC server when an RPC Reply is ready to be transmitted to a client.
50 * The passed-in svc_rqst contains a struct xdr_buf which holds an
51 * XDR-encoded RPC Reply message. sendto must construct the RPC-over-RDMA
52 * transport header, post all Write WRs needed for this Reply, then post
53 * a Send WR conveying the transport header and the RPC message itself to
56 * svc_rdma_sendto must fully transmit the Reply before returning, as
57 * the svc_rqst will be recycled as soon as sendto returns. Remaining
58 * resources referred to by the svc_rqst are also recycled at that time.
59 * Therefore any resources that must remain longer must be detached
60 * from the svc_rqst and released later.
64 * The I/O that performs Reply transmission is asynchronous, and may
65 * complete well after sendto returns. Thus pages under I/O must be
66 * removed from the svc_rqst before sendto returns.
68 * The logic here depends on Send Queue and completion ordering. Since
69 * the Send WR is always posted last, it will always complete last. Thus
70 * when it completes, it is guaranteed that all previous Write WRs have
73 * Write WRs are constructed and posted. Each Write segment gets its own
74 * svc_rdma_rw_ctxt, allowing the Write completion handler to find and
75 * DMA-unmap the pages under I/O for that Write segment. The Write
76 * completion handler does not release any pages.
78 * When the Send WR is constructed, it also gets its own svc_rdma_send_ctxt.
79 * The ownership of all of the Reply's pages are transferred into that
80 * ctxt, the Send WR is posted, and sendto returns.
82 * The svc_rdma_send_ctxt is presented when the Send WR completes. The
83 * Send completion handler finally releases the Reply's pages.
85 * This mechanism also assumes that completions on the transport's Send
86 * Completion Queue do not run in parallel. Otherwise a Write completion
87 * and Send completion running at the same time could release pages that
88 * are still DMA-mapped.
92 * - If the Send WR is posted successfully, it will either complete
93 * successfully, or get flushed. Either way, the Send completion
94 * handler releases the Reply's pages.
95 * - If the Send WR cannot be not posted, the forward path releases
98 * This handles the case, without the use of page reference counting,
99 * where two different Write segments send portions of the same page.
102 #include <linux/spinlock.h>
103 #include <linux/unaligned.h>
105 #include <rdma/ib_verbs.h>
106 #include <rdma/rdma_cm.h>
108 #include <linux/sunrpc/debug.h>
109 #include <linux/sunrpc/svc_rdma.h>
111 #include "xprt_rdma.h"
112 #include <trace/events/rpcrdma.h>
114 static void svc_rdma_wc_send(struct ib_cq
*cq
, struct ib_wc
*wc
);
116 static struct svc_rdma_send_ctxt
*
117 svc_rdma_send_ctxt_alloc(struct svcxprt_rdma
*rdma
)
119 int node
= ibdev_to_node(rdma
->sc_cm_id
->device
);
120 struct svc_rdma_send_ctxt
*ctxt
;
125 ctxt
= kzalloc_node(struct_size(ctxt
, sc_sges
, rdma
->sc_max_send_sges
),
129 buffer
= kmalloc_node(rdma
->sc_max_req_size
, GFP_KERNEL
, node
);
132 addr
= ib_dma_map_single(rdma
->sc_pd
->device
, buffer
,
133 rdma
->sc_max_req_size
, DMA_TO_DEVICE
);
134 if (ib_dma_mapping_error(rdma
->sc_pd
->device
, addr
))
137 svc_rdma_send_cid_init(rdma
, &ctxt
->sc_cid
);
139 ctxt
->sc_rdma
= rdma
;
140 ctxt
->sc_send_wr
.next
= NULL
;
141 ctxt
->sc_send_wr
.wr_cqe
= &ctxt
->sc_cqe
;
142 ctxt
->sc_send_wr
.sg_list
= ctxt
->sc_sges
;
143 ctxt
->sc_send_wr
.send_flags
= IB_SEND_SIGNALED
;
144 ctxt
->sc_cqe
.done
= svc_rdma_wc_send
;
145 ctxt
->sc_xprt_buf
= buffer
;
146 xdr_buf_init(&ctxt
->sc_hdrbuf
, ctxt
->sc_xprt_buf
,
147 rdma
->sc_max_req_size
);
148 ctxt
->sc_sges
[0].addr
= addr
;
150 for (i
= 0; i
< rdma
->sc_max_send_sges
; i
++)
151 ctxt
->sc_sges
[i
].lkey
= rdma
->sc_pd
->local_dma_lkey
;
163 * svc_rdma_send_ctxts_destroy - Release all send_ctxt's for an xprt
164 * @rdma: svcxprt_rdma being torn down
167 void svc_rdma_send_ctxts_destroy(struct svcxprt_rdma
*rdma
)
169 struct svc_rdma_send_ctxt
*ctxt
;
170 struct llist_node
*node
;
172 while ((node
= llist_del_first(&rdma
->sc_send_ctxts
)) != NULL
) {
173 ctxt
= llist_entry(node
, struct svc_rdma_send_ctxt
, sc_node
);
174 ib_dma_unmap_single(rdma
->sc_pd
->device
,
175 ctxt
->sc_sges
[0].addr
,
176 rdma
->sc_max_req_size
,
178 kfree(ctxt
->sc_xprt_buf
);
184 * svc_rdma_send_ctxt_get - Get a free send_ctxt
185 * @rdma: controlling svcxprt_rdma
187 * Returns a ready-to-use send_ctxt, or NULL if none are
188 * available and a fresh one cannot be allocated.
190 struct svc_rdma_send_ctxt
*svc_rdma_send_ctxt_get(struct svcxprt_rdma
*rdma
)
192 struct svc_rdma_send_ctxt
*ctxt
;
193 struct llist_node
*node
;
195 spin_lock(&rdma
->sc_send_lock
);
196 node
= llist_del_first(&rdma
->sc_send_ctxts
);
197 spin_unlock(&rdma
->sc_send_lock
);
201 ctxt
= llist_entry(node
, struct svc_rdma_send_ctxt
, sc_node
);
204 rpcrdma_set_xdrlen(&ctxt
->sc_hdrbuf
, 0);
205 xdr_init_encode(&ctxt
->sc_stream
, &ctxt
->sc_hdrbuf
,
206 ctxt
->sc_xprt_buf
, NULL
);
208 svc_rdma_cc_init(rdma
, &ctxt
->sc_reply_info
.wi_cc
);
209 ctxt
->sc_send_wr
.num_sge
= 0;
210 ctxt
->sc_cur_sge_no
= 0;
211 ctxt
->sc_page_count
= 0;
212 ctxt
->sc_wr_chain
= &ctxt
->sc_send_wr
;
213 ctxt
->sc_sqecount
= 1;
218 ctxt
= svc_rdma_send_ctxt_alloc(rdma
);
224 static void svc_rdma_send_ctxt_release(struct svcxprt_rdma
*rdma
,
225 struct svc_rdma_send_ctxt
*ctxt
)
227 struct ib_device
*device
= rdma
->sc_cm_id
->device
;
230 svc_rdma_reply_chunk_release(rdma
, ctxt
);
232 if (ctxt
->sc_page_count
)
233 release_pages(ctxt
->sc_pages
, ctxt
->sc_page_count
);
235 /* The first SGE contains the transport header, which
236 * remains mapped until @ctxt is destroyed.
238 for (i
= 1; i
< ctxt
->sc_send_wr
.num_sge
; i
++) {
239 trace_svcrdma_dma_unmap_page(&ctxt
->sc_cid
,
240 ctxt
->sc_sges
[i
].addr
,
241 ctxt
->sc_sges
[i
].length
);
242 ib_dma_unmap_page(device
,
243 ctxt
->sc_sges
[i
].addr
,
244 ctxt
->sc_sges
[i
].length
,
248 llist_add(&ctxt
->sc_node
, &rdma
->sc_send_ctxts
);
251 static void svc_rdma_send_ctxt_put_async(struct work_struct
*work
)
253 struct svc_rdma_send_ctxt
*ctxt
;
255 ctxt
= container_of(work
, struct svc_rdma_send_ctxt
, sc_work
);
256 svc_rdma_send_ctxt_release(ctxt
->sc_rdma
, ctxt
);
260 * svc_rdma_send_ctxt_put - Return send_ctxt to free list
261 * @rdma: controlling svcxprt_rdma
262 * @ctxt: object to return to the free list
264 * Pages left in sc_pages are DMA unmapped and released.
266 void svc_rdma_send_ctxt_put(struct svcxprt_rdma
*rdma
,
267 struct svc_rdma_send_ctxt
*ctxt
)
269 INIT_WORK(&ctxt
->sc_work
, svc_rdma_send_ctxt_put_async
);
270 queue_work(svcrdma_wq
, &ctxt
->sc_work
);
274 * svc_rdma_wake_send_waiters - manage Send Queue accounting
275 * @rdma: controlling transport
276 * @avail: Number of additional SQEs that are now available
279 void svc_rdma_wake_send_waiters(struct svcxprt_rdma
*rdma
, int avail
)
281 atomic_add(avail
, &rdma
->sc_sq_avail
);
282 smp_mb__after_atomic();
283 if (unlikely(waitqueue_active(&rdma
->sc_send_wait
)))
284 wake_up(&rdma
->sc_send_wait
);
288 * svc_rdma_wc_send - Invoked by RDMA provider for each polled Send WC
289 * @cq: Completion Queue context
290 * @wc: Work Completion object
292 * NB: The svc_xprt/svcxprt_rdma is pinned whenever it's possible that
293 * the Send completion handler could be running.
295 static void svc_rdma_wc_send(struct ib_cq
*cq
, struct ib_wc
*wc
)
297 struct svcxprt_rdma
*rdma
= cq
->cq_context
;
298 struct ib_cqe
*cqe
= wc
->wr_cqe
;
299 struct svc_rdma_send_ctxt
*ctxt
=
300 container_of(cqe
, struct svc_rdma_send_ctxt
, sc_cqe
);
302 svc_rdma_wake_send_waiters(rdma
, ctxt
->sc_sqecount
);
304 if (unlikely(wc
->status
!= IB_WC_SUCCESS
))
307 trace_svcrdma_wc_send(&ctxt
->sc_cid
);
308 svc_rdma_send_ctxt_put(rdma
, ctxt
);
312 if (wc
->status
!= IB_WC_WR_FLUSH_ERR
)
313 trace_svcrdma_wc_send_err(wc
, &ctxt
->sc_cid
);
315 trace_svcrdma_wc_send_flush(wc
, &ctxt
->sc_cid
);
316 svc_rdma_send_ctxt_put(rdma
, ctxt
);
317 svc_xprt_deferred_close(&rdma
->sc_xprt
);
321 * svc_rdma_post_send - Post a WR chain to the Send Queue
322 * @rdma: transport context
323 * @ctxt: WR chain to post
325 * Copy fields in @ctxt to stack variables in order to guarantee
326 * that these values remain available after the ib_post_send() call.
327 * In some error flow cases, svc_rdma_wc_send() releases @ctxt.
329 * Note there is potential for starvation when the Send Queue is
330 * full because there is no order to when waiting threads are
331 * awoken. The transport is typically provisioned with a deep
332 * enough Send Queue that SQ exhaustion should be a rare event.
335 * %0: @ctxt's WR chain was posted successfully
336 * %-ENOTCONN: The connection was lost
338 int svc_rdma_post_send(struct svcxprt_rdma
*rdma
,
339 struct svc_rdma_send_ctxt
*ctxt
)
341 struct ib_send_wr
*first_wr
= ctxt
->sc_wr_chain
;
342 struct ib_send_wr
*send_wr
= &ctxt
->sc_send_wr
;
343 const struct ib_send_wr
*bad_wr
= first_wr
;
344 struct rpc_rdma_cid cid
= ctxt
->sc_cid
;
345 int ret
, sqecount
= ctxt
->sc_sqecount
;
349 /* Sync the transport header buffer */
350 ib_dma_sync_single_for_device(rdma
->sc_pd
->device
,
351 send_wr
->sg_list
[0].addr
,
352 send_wr
->sg_list
[0].length
,
355 /* If the SQ is full, wait until an SQ entry is available */
356 while (!test_bit(XPT_CLOSE
, &rdma
->sc_xprt
.xpt_flags
)) {
357 if (atomic_sub_return(sqecount
, &rdma
->sc_sq_avail
) < 0) {
358 svc_rdma_wake_send_waiters(rdma
, sqecount
);
360 /* When the transport is torn down, assume
361 * ib_drain_sq() will trigger enough Send
362 * completions to wake us. The XPT_CLOSE test
363 * above should then cause the while loop to
366 percpu_counter_inc(&svcrdma_stat_sq_starve
);
367 trace_svcrdma_sq_full(rdma
, &cid
);
368 wait_event(rdma
->sc_send_wait
,
369 atomic_read(&rdma
->sc_sq_avail
) > 0);
370 trace_svcrdma_sq_retry(rdma
, &cid
);
374 trace_svcrdma_post_send(ctxt
);
375 ret
= ib_post_send(rdma
->sc_qp
, first_wr
, &bad_wr
);
377 trace_svcrdma_sq_post_err(rdma
, &cid
, ret
);
378 svc_xprt_deferred_close(&rdma
->sc_xprt
);
380 /* If even one WR was posted, there will be a
381 * Send completion that bumps sc_sq_avail.
383 if (bad_wr
== first_wr
) {
384 svc_rdma_wake_send_waiters(rdma
, sqecount
);
394 * svc_rdma_encode_read_list - Encode RPC Reply's Read chunk list
395 * @sctxt: Send context for the RPC Reply
398 * On success, returns length in bytes of the Reply XDR buffer
399 * that was consumed by the Reply Read list
400 * %-EMSGSIZE on XDR buffer overflow
402 static ssize_t
svc_rdma_encode_read_list(struct svc_rdma_send_ctxt
*sctxt
)
404 /* RPC-over-RDMA version 1 replies never have a Read list. */
405 return xdr_stream_encode_item_absent(&sctxt
->sc_stream
);
409 * svc_rdma_encode_write_segment - Encode one Write segment
410 * @sctxt: Send context for the RPC Reply
411 * @chunk: Write chunk to push
412 * @remaining: remaining bytes of the payload left in the Write chunk
413 * @segno: which segment in the chunk
416 * On success, returns length in bytes of the Reply XDR buffer
417 * that was consumed by the Write segment, and updates @remaining
418 * %-EMSGSIZE on XDR buffer overflow
420 static ssize_t
svc_rdma_encode_write_segment(struct svc_rdma_send_ctxt
*sctxt
,
421 const struct svc_rdma_chunk
*chunk
,
422 u32
*remaining
, unsigned int segno
)
424 const struct svc_rdma_segment
*segment
= &chunk
->ch_segments
[segno
];
425 const size_t len
= rpcrdma_segment_maxsz
* sizeof(__be32
);
429 p
= xdr_reserve_space(&sctxt
->sc_stream
, len
);
433 length
= min_t(u32
, *remaining
, segment
->rs_length
);
434 *remaining
-= length
;
435 xdr_encode_rdma_segment(p
, segment
->rs_handle
, length
,
437 trace_svcrdma_encode_wseg(sctxt
, segno
, segment
->rs_handle
, length
,
443 * svc_rdma_encode_write_chunk - Encode one Write chunk
444 * @sctxt: Send context for the RPC Reply
445 * @chunk: Write chunk to push
447 * Copy a Write chunk from the Call transport header to the
448 * Reply transport header. Update each segment's length field
449 * to reflect the number of bytes written in that segment.
452 * On success, returns length in bytes of the Reply XDR buffer
453 * that was consumed by the Write chunk
454 * %-EMSGSIZE on XDR buffer overflow
456 static ssize_t
svc_rdma_encode_write_chunk(struct svc_rdma_send_ctxt
*sctxt
,
457 const struct svc_rdma_chunk
*chunk
)
459 u32 remaining
= chunk
->ch_payload_length
;
464 ret
= xdr_stream_encode_item_present(&sctxt
->sc_stream
);
469 ret
= xdr_stream_encode_u32(&sctxt
->sc_stream
, chunk
->ch_segcount
);
474 for (segno
= 0; segno
< chunk
->ch_segcount
; segno
++) {
475 ret
= svc_rdma_encode_write_segment(sctxt
, chunk
, &remaining
, segno
);
485 * svc_rdma_encode_write_list - Encode RPC Reply's Write chunk list
486 * @rctxt: Reply context with information about the RPC Call
487 * @sctxt: Send context for the RPC Reply
490 * On success, returns length in bytes of the Reply XDR buffer
491 * that was consumed by the Reply's Write list
492 * %-EMSGSIZE on XDR buffer overflow
494 static ssize_t
svc_rdma_encode_write_list(struct svc_rdma_recv_ctxt
*rctxt
,
495 struct svc_rdma_send_ctxt
*sctxt
)
497 struct svc_rdma_chunk
*chunk
;
501 pcl_for_each_chunk(chunk
, &rctxt
->rc_write_pcl
) {
502 ret
= svc_rdma_encode_write_chunk(sctxt
, chunk
);
508 /* Terminate the Write list */
509 ret
= xdr_stream_encode_item_absent(&sctxt
->sc_stream
);
517 * svc_rdma_encode_reply_chunk - Encode RPC Reply's Reply chunk
518 * @rctxt: Reply context with information about the RPC Call
519 * @sctxt: Send context for the RPC Reply
520 * @length: size in bytes of the payload in the Reply chunk
523 * On success, returns length in bytes of the Reply XDR buffer
524 * that was consumed by the Reply's Reply chunk
525 * %-EMSGSIZE on XDR buffer overflow
526 * %-E2BIG if the RPC message is larger than the Reply chunk
529 svc_rdma_encode_reply_chunk(struct svc_rdma_recv_ctxt
*rctxt
,
530 struct svc_rdma_send_ctxt
*sctxt
,
533 struct svc_rdma_chunk
*chunk
;
535 if (pcl_is_empty(&rctxt
->rc_reply_pcl
))
536 return xdr_stream_encode_item_absent(&sctxt
->sc_stream
);
538 chunk
= pcl_first_chunk(&rctxt
->rc_reply_pcl
);
539 if (length
> chunk
->ch_length
)
542 chunk
->ch_payload_length
= length
;
543 return svc_rdma_encode_write_chunk(sctxt
, chunk
);
546 struct svc_rdma_map_data
{
547 struct svcxprt_rdma
*md_rdma
;
548 struct svc_rdma_send_ctxt
*md_ctxt
;
552 * svc_rdma_page_dma_map - DMA map one page
553 * @data: pointer to arguments
554 * @page: struct page to DMA map
555 * @offset: offset into the page
556 * @len: number of bytes to map
559 * %0 if DMA mapping was successful
560 * %-EIO if the page cannot be DMA mapped
562 static int svc_rdma_page_dma_map(void *data
, struct page
*page
,
563 unsigned long offset
, unsigned int len
)
565 struct svc_rdma_map_data
*args
= data
;
566 struct svcxprt_rdma
*rdma
= args
->md_rdma
;
567 struct svc_rdma_send_ctxt
*ctxt
= args
->md_ctxt
;
568 struct ib_device
*dev
= rdma
->sc_cm_id
->device
;
571 ++ctxt
->sc_cur_sge_no
;
573 dma_addr
= ib_dma_map_page(dev
, page
, offset
, len
, DMA_TO_DEVICE
);
574 if (ib_dma_mapping_error(dev
, dma_addr
))
577 trace_svcrdma_dma_map_page(&ctxt
->sc_cid
, dma_addr
, len
);
578 ctxt
->sc_sges
[ctxt
->sc_cur_sge_no
].addr
= dma_addr
;
579 ctxt
->sc_sges
[ctxt
->sc_cur_sge_no
].length
= len
;
580 ctxt
->sc_send_wr
.num_sge
++;
584 trace_svcrdma_dma_map_err(&ctxt
->sc_cid
, dma_addr
, len
);
589 * svc_rdma_iov_dma_map - DMA map an iovec
590 * @data: pointer to arguments
591 * @iov: kvec to DMA map
593 * ib_dma_map_page() is used here because svc_rdma_dma_unmap()
594 * handles DMA-unmap and it uses ib_dma_unmap_page() exclusively.
597 * %0 if DMA mapping was successful
598 * %-EIO if the iovec cannot be DMA mapped
600 static int svc_rdma_iov_dma_map(void *data
, const struct kvec
*iov
)
604 return svc_rdma_page_dma_map(data
, virt_to_page(iov
->iov_base
),
605 offset_in_page(iov
->iov_base
),
610 * svc_rdma_xb_dma_map - DMA map all segments of an xdr_buf
611 * @xdr: xdr_buf containing portion of an RPC message to transmit
612 * @data: pointer to arguments
615 * %0 if DMA mapping was successful
616 * %-EIO if DMA mapping failed
618 * On failure, any DMA mappings that have been already done must be
619 * unmapped by the caller.
621 static int svc_rdma_xb_dma_map(const struct xdr_buf
*xdr
, void *data
)
623 unsigned int len
, remaining
;
624 unsigned long pageoff
;
625 struct page
**ppages
;
628 ret
= svc_rdma_iov_dma_map(data
, &xdr
->head
[0]);
632 ppages
= xdr
->pages
+ (xdr
->page_base
>> PAGE_SHIFT
);
633 pageoff
= offset_in_page(xdr
->page_base
);
634 remaining
= xdr
->page_len
;
636 len
= min_t(u32
, PAGE_SIZE
- pageoff
, remaining
);
638 ret
= svc_rdma_page_dma_map(data
, *ppages
++, pageoff
, len
);
646 ret
= svc_rdma_iov_dma_map(data
, &xdr
->tail
[0]);
653 struct svc_rdma_pullup_data
{
655 unsigned int pd_length
;
656 unsigned int pd_num_sges
;
660 * svc_rdma_xb_count_sges - Count how many SGEs will be needed
661 * @xdr: xdr_buf containing portion of an RPC message to transmit
662 * @data: pointer to arguments
665 * Number of SGEs needed to Send the contents of @xdr inline
667 static int svc_rdma_xb_count_sges(const struct xdr_buf
*xdr
,
670 struct svc_rdma_pullup_data
*args
= data
;
671 unsigned int remaining
;
672 unsigned long offset
;
674 if (xdr
->head
[0].iov_len
)
677 offset
= offset_in_page(xdr
->page_base
);
678 remaining
= xdr
->page_len
;
681 remaining
-= min_t(u32
, PAGE_SIZE
- offset
, remaining
);
685 if (xdr
->tail
[0].iov_len
)
688 args
->pd_length
+= xdr
->len
;
693 * svc_rdma_pull_up_needed - Determine whether to use pull-up
694 * @rdma: controlling transport
695 * @sctxt: send_ctxt for the Send WR
696 * @write_pcl: Write chunk list provided by client
697 * @xdr: xdr_buf containing RPC message to transmit
700 * %true if pull-up must be used
703 static bool svc_rdma_pull_up_needed(const struct svcxprt_rdma
*rdma
,
704 const struct svc_rdma_send_ctxt
*sctxt
,
705 const struct svc_rdma_pcl
*write_pcl
,
706 const struct xdr_buf
*xdr
)
708 /* Resources needed for the transport header */
709 struct svc_rdma_pullup_data args
= {
710 .pd_length
= sctxt
->sc_hdrbuf
.len
,
715 ret
= pcl_process_nonpayloads(write_pcl
, xdr
,
716 svc_rdma_xb_count_sges
, &args
);
720 if (args
.pd_length
< RPCRDMA_PULLUP_THRESH
)
722 return args
.pd_num_sges
>= rdma
->sc_max_send_sges
;
726 * svc_rdma_xb_linearize - Copy region of xdr_buf to flat buffer
727 * @xdr: xdr_buf containing portion of an RPC message to copy
728 * @data: pointer to arguments
733 static int svc_rdma_xb_linearize(const struct xdr_buf
*xdr
,
736 struct svc_rdma_pullup_data
*args
= data
;
737 unsigned int len
, remaining
;
738 unsigned long pageoff
;
739 struct page
**ppages
;
741 if (xdr
->head
[0].iov_len
) {
742 memcpy(args
->pd_dest
, xdr
->head
[0].iov_base
, xdr
->head
[0].iov_len
);
743 args
->pd_dest
+= xdr
->head
[0].iov_len
;
746 ppages
= xdr
->pages
+ (xdr
->page_base
>> PAGE_SHIFT
);
747 pageoff
= offset_in_page(xdr
->page_base
);
748 remaining
= xdr
->page_len
;
750 len
= min_t(u32
, PAGE_SIZE
- pageoff
, remaining
);
751 memcpy(args
->pd_dest
, page_address(*ppages
) + pageoff
, len
);
753 args
->pd_dest
+= len
;
758 if (xdr
->tail
[0].iov_len
) {
759 memcpy(args
->pd_dest
, xdr
->tail
[0].iov_base
, xdr
->tail
[0].iov_len
);
760 args
->pd_dest
+= xdr
->tail
[0].iov_len
;
763 args
->pd_length
+= xdr
->len
;
768 * svc_rdma_pull_up_reply_msg - Copy Reply into a single buffer
769 * @rdma: controlling transport
770 * @sctxt: send_ctxt for the Send WR; xprt hdr is already prepared
771 * @write_pcl: Write chunk list provided by client
772 * @xdr: prepared xdr_buf containing RPC message
774 * The device is not capable of sending the reply directly.
775 * Assemble the elements of @xdr into the transport header buffer.
778 * pull_up_needed has determined that @xdr will fit in the buffer.
781 * %0 if pull-up was successful
782 * %-EMSGSIZE if a buffer manipulation problem occurred
784 static int svc_rdma_pull_up_reply_msg(const struct svcxprt_rdma
*rdma
,
785 struct svc_rdma_send_ctxt
*sctxt
,
786 const struct svc_rdma_pcl
*write_pcl
,
787 const struct xdr_buf
*xdr
)
789 struct svc_rdma_pullup_data args
= {
790 .pd_dest
= sctxt
->sc_xprt_buf
+ sctxt
->sc_hdrbuf
.len
,
794 ret
= pcl_process_nonpayloads(write_pcl
, xdr
,
795 svc_rdma_xb_linearize
, &args
);
799 sctxt
->sc_sges
[0].length
= sctxt
->sc_hdrbuf
.len
+ args
.pd_length
;
800 trace_svcrdma_send_pullup(sctxt
, args
.pd_length
);
804 /* svc_rdma_map_reply_msg - DMA map the buffer holding RPC message
805 * @rdma: controlling transport
806 * @sctxt: send_ctxt for the Send WR
807 * @write_pcl: Write chunk list provided by client
808 * @reply_pcl: Reply chunk provided by client
809 * @xdr: prepared xdr_buf containing RPC message
812 * %0 if DMA mapping was successful.
813 * %-EMSGSIZE if a buffer manipulation problem occurred
814 * %-EIO if DMA mapping failed
816 * The Send WR's num_sge field is set in all cases.
818 int svc_rdma_map_reply_msg(struct svcxprt_rdma
*rdma
,
819 struct svc_rdma_send_ctxt
*sctxt
,
820 const struct svc_rdma_pcl
*write_pcl
,
821 const struct svc_rdma_pcl
*reply_pcl
,
822 const struct xdr_buf
*xdr
)
824 struct svc_rdma_map_data args
= {
829 /* Set up the (persistently-mapped) transport header SGE. */
830 sctxt
->sc_send_wr
.num_sge
= 1;
831 sctxt
->sc_sges
[0].length
= sctxt
->sc_hdrbuf
.len
;
833 /* If there is a Reply chunk, nothing follows the transport
834 * header, so there is nothing to map.
836 if (!pcl_is_empty(reply_pcl
))
839 /* For pull-up, svc_rdma_send() will sync the transport header.
840 * No additional DMA mapping is necessary.
842 if (svc_rdma_pull_up_needed(rdma
, sctxt
, write_pcl
, xdr
))
843 return svc_rdma_pull_up_reply_msg(rdma
, sctxt
, write_pcl
, xdr
);
845 return pcl_process_nonpayloads(write_pcl
, xdr
,
846 svc_rdma_xb_dma_map
, &args
);
849 /* The svc_rqst and all resources it owns are released as soon as
850 * svc_rdma_sendto returns. Transfer pages under I/O to the ctxt
851 * so they are released by the Send completion handler.
853 static void svc_rdma_save_io_pages(struct svc_rqst
*rqstp
,
854 struct svc_rdma_send_ctxt
*ctxt
)
856 int i
, pages
= rqstp
->rq_next_page
- rqstp
->rq_respages
;
858 ctxt
->sc_page_count
+= pages
;
859 for (i
= 0; i
< pages
; i
++) {
860 ctxt
->sc_pages
[i
] = rqstp
->rq_respages
[i
];
861 rqstp
->rq_respages
[i
] = NULL
;
864 /* Prevent svc_xprt_release from releasing pages in rq_pages */
865 rqstp
->rq_next_page
= rqstp
->rq_respages
;
868 /* Prepare the portion of the RPC Reply that will be transmitted
869 * via RDMA Send. The RPC-over-RDMA transport header is prepared
870 * in sc_sges[0], and the RPC xdr_buf is prepared in following sges.
872 * Depending on whether a Write list or Reply chunk is present,
873 * the server may Send all, a portion of, or none of the xdr_buf.
874 * In the latter case, only the transport header (sc_sges[0]) is
878 * - The Reply's transport header will never be larger than a page.
880 static int svc_rdma_send_reply_msg(struct svcxprt_rdma
*rdma
,
881 struct svc_rdma_send_ctxt
*sctxt
,
882 const struct svc_rdma_recv_ctxt
*rctxt
,
883 struct svc_rqst
*rqstp
)
885 struct ib_send_wr
*send_wr
= &sctxt
->sc_send_wr
;
888 ret
= svc_rdma_map_reply_msg(rdma
, sctxt
, &rctxt
->rc_write_pcl
,
889 &rctxt
->rc_reply_pcl
, &rqstp
->rq_res
);
893 /* Transfer pages involved in RDMA Writes to the sctxt's
894 * page array. Completion handling releases these pages.
896 svc_rdma_save_io_pages(rqstp
, sctxt
);
898 if (rctxt
->rc_inv_rkey
) {
899 send_wr
->opcode
= IB_WR_SEND_WITH_INV
;
900 send_wr
->ex
.invalidate_rkey
= rctxt
->rc_inv_rkey
;
902 send_wr
->opcode
= IB_WR_SEND
;
905 return svc_rdma_post_send(rdma
, sctxt
);
909 * svc_rdma_send_error_msg - Send an RPC/RDMA v1 error response
910 * @rdma: controlling transport context
911 * @sctxt: Send context for the response
912 * @rctxt: Receive context for incoming bad message
913 * @status: negative errno indicating error that occurred
915 * Given the client-provided Read, Write, and Reply chunks, the
916 * server was not able to parse the Call or form a complete Reply.
917 * Return an RDMA_ERROR message so the client can retire the RPC
920 * The caller does not have to release @sctxt. It is released by
921 * Send completion, or by this function on error.
923 void svc_rdma_send_error_msg(struct svcxprt_rdma
*rdma
,
924 struct svc_rdma_send_ctxt
*sctxt
,
925 struct svc_rdma_recv_ctxt
*rctxt
,
928 __be32
*rdma_argp
= rctxt
->rc_recv_buf
;
931 rpcrdma_set_xdrlen(&sctxt
->sc_hdrbuf
, 0);
932 xdr_init_encode(&sctxt
->sc_stream
, &sctxt
->sc_hdrbuf
,
933 sctxt
->sc_xprt_buf
, NULL
);
935 p
= xdr_reserve_space(&sctxt
->sc_stream
,
936 rpcrdma_fixed_maxsz
* sizeof(*p
));
941 *p
++ = *(rdma_argp
+ 1);
942 *p
++ = rdma
->sc_fc_credits
;
946 case -EPROTONOSUPPORT
:
947 p
= xdr_reserve_space(&sctxt
->sc_stream
, 3 * sizeof(*p
));
952 *p
++ = rpcrdma_version
;
953 *p
= rpcrdma_version
;
954 trace_svcrdma_err_vers(*rdma_argp
);
957 p
= xdr_reserve_space(&sctxt
->sc_stream
, sizeof(*p
));
962 trace_svcrdma_err_chunk(*rdma_argp
);
965 /* Remote Invalidation is skipped for simplicity. */
966 sctxt
->sc_send_wr
.num_sge
= 1;
967 sctxt
->sc_send_wr
.opcode
= IB_WR_SEND
;
968 sctxt
->sc_sges
[0].length
= sctxt
->sc_hdrbuf
.len
;
969 if (svc_rdma_post_send(rdma
, sctxt
))
974 svc_rdma_send_ctxt_put(rdma
, sctxt
);
978 * svc_rdma_sendto - Transmit an RPC reply
979 * @rqstp: processed RPC request, reply XDR already in ::rq_res
981 * Any resources still associated with @rqstp are released upon return.
982 * If no reply message was possible, the connection is closed.
985 * %0 if an RPC reply has been successfully posted,
986 * %-ENOMEM if a resource shortage occurred (connection is lost),
987 * %-ENOTCONN if posting failed (connection is lost).
989 int svc_rdma_sendto(struct svc_rqst
*rqstp
)
991 struct svc_xprt
*xprt
= rqstp
->rq_xprt
;
992 struct svcxprt_rdma
*rdma
=
993 container_of(xprt
, struct svcxprt_rdma
, sc_xprt
);
994 struct svc_rdma_recv_ctxt
*rctxt
= rqstp
->rq_xprt_ctxt
;
995 __be32
*rdma_argp
= rctxt
->rc_recv_buf
;
996 struct svc_rdma_send_ctxt
*sctxt
;
997 unsigned int rc_size
;
1002 if (svc_xprt_is_dead(xprt
))
1003 goto drop_connection
;
1006 sctxt
= svc_rdma_send_ctxt_get(rdma
);
1008 goto drop_connection
;
1011 p
= xdr_reserve_space(&sctxt
->sc_stream
,
1012 rpcrdma_fixed_maxsz
* sizeof(*p
));
1016 ret
= svc_rdma_send_write_list(rdma
, rctxt
, &rqstp
->rq_res
);
1021 if (!pcl_is_empty(&rctxt
->rc_reply_pcl
)) {
1022 ret
= svc_rdma_prepare_reply_chunk(rdma
, &rctxt
->rc_write_pcl
,
1023 &rctxt
->rc_reply_pcl
, sctxt
,
1031 *p
++ = *(rdma_argp
+ 1);
1032 *p
++ = rdma
->sc_fc_credits
;
1033 *p
= pcl_is_empty(&rctxt
->rc_reply_pcl
) ? rdma_msg
: rdma_nomsg
;
1035 ret
= svc_rdma_encode_read_list(sctxt
);
1038 ret
= svc_rdma_encode_write_list(rctxt
, sctxt
);
1041 ret
= svc_rdma_encode_reply_chunk(rctxt
, sctxt
, rc_size
);
1045 ret
= svc_rdma_send_reply_msg(rdma
, sctxt
, rctxt
, rqstp
);
1051 if (ret
!= -E2BIG
&& ret
!= -EINVAL
)
1054 /* Send completion releases payload pages that were part
1055 * of previously posted RDMA Writes.
1057 svc_rdma_save_io_pages(rqstp
, sctxt
);
1058 svc_rdma_send_error_msg(rdma
, sctxt
, rctxt
, ret
);
1062 svc_rdma_send_ctxt_put(rdma
, sctxt
);
1064 trace_svcrdma_send_err(rqstp
, ret
);
1065 svc_xprt_deferred_close(&rdma
->sc_xprt
);
1070 * svc_rdma_result_payload - special processing for a result payload
1071 * @rqstp: RPC transaction context
1072 * @offset: payload's byte offset in @rqstp->rq_res
1073 * @length: size of payload, in bytes
1075 * Assign the passed-in result payload to the current Write chunk,
1076 * and advance to cur_result_payload to the next Write chunk, if
1080 * %0 if successful or nothing needed to be done
1081 * %-E2BIG if the payload was larger than the Write chunk
1083 int svc_rdma_result_payload(struct svc_rqst
*rqstp
, unsigned int offset
,
1084 unsigned int length
)
1086 struct svc_rdma_recv_ctxt
*rctxt
= rqstp
->rq_xprt_ctxt
;
1087 struct svc_rdma_chunk
*chunk
;
1089 chunk
= rctxt
->rc_cur_result_payload
;
1090 if (!length
|| !chunk
)
1092 rctxt
->rc_cur_result_payload
=
1093 pcl_next_chunk(&rctxt
->rc_write_pcl
, chunk
);
1095 if (length
> chunk
->ch_length
)
1097 chunk
->ch_position
= offset
;
1098 chunk
->ch_payload_length
= length
;