1 // SPDX-License-Identifier: GPL-2.0
3 * Copyright (c) 2016-2018 Oracle. All rights reserved.
5 * Use the core R/W API to move RPC-over-RDMA Read and Write chunks.
10 #include <linux/sunrpc/xdr.h>
11 #include <linux/sunrpc/rpc_rdma.h>
12 #include <linux/sunrpc/svc_rdma.h>
14 #include "xprt_rdma.h"
15 #include <trace/events/rpcrdma.h>
17 static void svc_rdma_write_done(struct ib_cq
*cq
, struct ib_wc
*wc
);
18 static void svc_rdma_wc_read_done(struct ib_cq
*cq
, struct ib_wc
*wc
);
20 /* Each R/W context contains state for one chain of RDMA Read or
21 * Write Work Requests.
23 * Each WR chain handles a single contiguous server-side buffer,
24 * because scatterlist entries after the first have to start on
25 * page alignment. xdr_buf iovecs cannot guarantee alignment.
27 * Each WR chain handles only one R_key. Each RPC-over-RDMA segment
28 * from a client may contain a unique R_key, so each WR chain moves
29 * up to one segment at a time.
31 * The scatterlist makes this data structure over 4KB in size. To
32 * make it less likely to fail, and to handle the allocation for
33 * smaller I/O requests without disabling bottom-halves, these
34 * contexts are created on demand, but cached and reused until the
35 * controlling svcxprt_rdma is destroyed.
37 struct svc_rdma_rw_ctxt
{
38 struct llist_node rw_node
;
39 struct list_head rw_list
;
40 struct rdma_rw_ctx rw_ctx
;
41 unsigned int rw_nents
;
42 unsigned int rw_first_sgl_nents
;
43 struct sg_table rw_sg_table
;
44 struct scatterlist rw_first_sgl
[];
47 static inline struct svc_rdma_rw_ctxt
*
48 svc_rdma_next_ctxt(struct list_head
*list
)
50 return list_first_entry_or_null(list
, struct svc_rdma_rw_ctxt
,
54 static struct svc_rdma_rw_ctxt
*
55 svc_rdma_get_rw_ctxt(struct svcxprt_rdma
*rdma
, unsigned int sges
)
57 struct ib_device
*dev
= rdma
->sc_cm_id
->device
;
58 unsigned int first_sgl_nents
= dev
->attrs
.max_send_sge
;
59 struct svc_rdma_rw_ctxt
*ctxt
;
60 struct llist_node
*node
;
62 spin_lock(&rdma
->sc_rw_ctxt_lock
);
63 node
= llist_del_first(&rdma
->sc_rw_ctxts
);
64 spin_unlock(&rdma
->sc_rw_ctxt_lock
);
66 ctxt
= llist_entry(node
, struct svc_rdma_rw_ctxt
, rw_node
);
68 ctxt
= kmalloc_node(struct_size(ctxt
, rw_first_sgl
, first_sgl_nents
),
69 GFP_KERNEL
, ibdev_to_node(dev
));
73 INIT_LIST_HEAD(&ctxt
->rw_list
);
74 ctxt
->rw_first_sgl_nents
= first_sgl_nents
;
77 ctxt
->rw_sg_table
.sgl
= ctxt
->rw_first_sgl
;
78 if (sg_alloc_table_chained(&ctxt
->rw_sg_table
, sges
,
79 ctxt
->rw_sg_table
.sgl
,
87 trace_svcrdma_rwctx_empty(rdma
, sges
);
91 static void __svc_rdma_put_rw_ctxt(struct svc_rdma_rw_ctxt
*ctxt
,
92 struct llist_head
*list
)
94 sg_free_table_chained(&ctxt
->rw_sg_table
, ctxt
->rw_first_sgl_nents
);
95 llist_add(&ctxt
->rw_node
, list
);
98 static void svc_rdma_put_rw_ctxt(struct svcxprt_rdma
*rdma
,
99 struct svc_rdma_rw_ctxt
*ctxt
)
101 __svc_rdma_put_rw_ctxt(ctxt
, &rdma
->sc_rw_ctxts
);
105 * svc_rdma_destroy_rw_ctxts - Free accumulated R/W contexts
106 * @rdma: transport about to be destroyed
109 void svc_rdma_destroy_rw_ctxts(struct svcxprt_rdma
*rdma
)
111 struct svc_rdma_rw_ctxt
*ctxt
;
112 struct llist_node
*node
;
114 while ((node
= llist_del_first(&rdma
->sc_rw_ctxts
)) != NULL
) {
115 ctxt
= llist_entry(node
, struct svc_rdma_rw_ctxt
, rw_node
);
121 * svc_rdma_rw_ctx_init - Prepare a R/W context for I/O
122 * @rdma: controlling transport instance
123 * @ctxt: R/W context to prepare
124 * @offset: RDMA offset
125 * @handle: RDMA tag/handle
126 * @direction: I/O direction
128 * Returns on success, the number of WQEs that will be needed
129 * on the workqueue, or a negative errno.
131 static int svc_rdma_rw_ctx_init(struct svcxprt_rdma
*rdma
,
132 struct svc_rdma_rw_ctxt
*ctxt
,
133 u64 offset
, u32 handle
,
134 enum dma_data_direction direction
)
138 ret
= rdma_rw_ctx_init(&ctxt
->rw_ctx
, rdma
->sc_qp
, rdma
->sc_port_num
,
139 ctxt
->rw_sg_table
.sgl
, ctxt
->rw_nents
,
140 0, offset
, handle
, direction
);
141 if (unlikely(ret
< 0)) {
142 trace_svcrdma_dma_map_rw_err(rdma
, offset
, handle
,
143 ctxt
->rw_nents
, ret
);
144 svc_rdma_put_rw_ctxt(rdma
, ctxt
);
150 * svc_rdma_cc_init - Initialize an svc_rdma_chunk_ctxt
151 * @rdma: controlling transport instance
152 * @cc: svc_rdma_chunk_ctxt to be initialized
154 void svc_rdma_cc_init(struct svcxprt_rdma
*rdma
,
155 struct svc_rdma_chunk_ctxt
*cc
)
157 struct rpc_rdma_cid
*cid
= &cc
->cc_cid
;
159 if (unlikely(!cid
->ci_completion_id
))
160 svc_rdma_send_cid_init(rdma
, cid
);
162 INIT_LIST_HEAD(&cc
->cc_rwctxts
);
167 * svc_rdma_cc_release - Release resources held by a svc_rdma_chunk_ctxt
168 * @rdma: controlling transport instance
169 * @cc: svc_rdma_chunk_ctxt to be released
170 * @dir: DMA direction
172 void svc_rdma_cc_release(struct svcxprt_rdma
*rdma
,
173 struct svc_rdma_chunk_ctxt
*cc
,
174 enum dma_data_direction dir
)
176 struct llist_node
*first
, *last
;
177 struct svc_rdma_rw_ctxt
*ctxt
;
180 trace_svcrdma_cc_release(&cc
->cc_cid
, cc
->cc_sqecount
);
183 while ((ctxt
= svc_rdma_next_ctxt(&cc
->cc_rwctxts
)) != NULL
) {
184 list_del(&ctxt
->rw_list
);
186 rdma_rw_ctx_destroy(&ctxt
->rw_ctx
, rdma
->sc_qp
,
187 rdma
->sc_port_num
, ctxt
->rw_sg_table
.sgl
,
188 ctxt
->rw_nents
, dir
);
189 __svc_rdma_put_rw_ctxt(ctxt
, &free
);
191 ctxt
->rw_node
.next
= first
;
192 first
= &ctxt
->rw_node
;
197 llist_add_batch(first
, last
, &rdma
->sc_rw_ctxts
);
200 static struct svc_rdma_write_info
*
201 svc_rdma_write_info_alloc(struct svcxprt_rdma
*rdma
,
202 const struct svc_rdma_chunk
*chunk
)
204 struct svc_rdma_write_info
*info
;
206 info
= kzalloc_node(sizeof(*info
), GFP_KERNEL
,
207 ibdev_to_node(rdma
->sc_cm_id
->device
));
211 info
->wi_rdma
= rdma
;
212 info
->wi_chunk
= chunk
;
213 svc_rdma_cc_init(rdma
, &info
->wi_cc
);
214 info
->wi_cc
.cc_cqe
.done
= svc_rdma_write_done
;
218 static void svc_rdma_write_info_free_async(struct work_struct
*work
)
220 struct svc_rdma_write_info
*info
;
222 info
= container_of(work
, struct svc_rdma_write_info
, wi_work
);
223 svc_rdma_cc_release(info
->wi_rdma
, &info
->wi_cc
, DMA_TO_DEVICE
);
227 static void svc_rdma_write_info_free(struct svc_rdma_write_info
*info
)
229 INIT_WORK(&info
->wi_work
, svc_rdma_write_info_free_async
);
230 queue_work(svcrdma_wq
, &info
->wi_work
);
234 * svc_rdma_reply_chunk_release - Release Reply chunk I/O resources
235 * @rdma: controlling transport
236 * @ctxt: Send context that is being released
238 void svc_rdma_reply_chunk_release(struct svcxprt_rdma
*rdma
,
239 struct svc_rdma_send_ctxt
*ctxt
)
241 struct svc_rdma_chunk_ctxt
*cc
= &ctxt
->sc_reply_info
.wi_cc
;
243 if (!cc
->cc_sqecount
)
245 svc_rdma_cc_release(rdma
, cc
, DMA_TO_DEVICE
);
249 * svc_rdma_reply_done - Reply chunk Write completion handler
250 * @cq: controlling Completion Queue
251 * @wc: Work Completion report
253 * Pages under I/O are released by a subsequent Send completion.
255 static void svc_rdma_reply_done(struct ib_cq
*cq
, struct ib_wc
*wc
)
257 struct ib_cqe
*cqe
= wc
->wr_cqe
;
258 struct svc_rdma_chunk_ctxt
*cc
=
259 container_of(cqe
, struct svc_rdma_chunk_ctxt
, cc_cqe
);
260 struct svcxprt_rdma
*rdma
= cq
->cq_context
;
262 switch (wc
->status
) {
264 trace_svcrdma_wc_reply(&cc
->cc_cid
);
266 case IB_WC_WR_FLUSH_ERR
:
267 trace_svcrdma_wc_reply_flush(wc
, &cc
->cc_cid
);
270 trace_svcrdma_wc_reply_err(wc
, &cc
->cc_cid
);
273 svc_xprt_deferred_close(&rdma
->sc_xprt
);
277 * svc_rdma_write_done - Write chunk completion
278 * @cq: controlling Completion Queue
279 * @wc: Work Completion
281 * Pages under I/O are freed by a subsequent Send completion.
283 static void svc_rdma_write_done(struct ib_cq
*cq
, struct ib_wc
*wc
)
285 struct svcxprt_rdma
*rdma
= cq
->cq_context
;
286 struct ib_cqe
*cqe
= wc
->wr_cqe
;
287 struct svc_rdma_chunk_ctxt
*cc
=
288 container_of(cqe
, struct svc_rdma_chunk_ctxt
, cc_cqe
);
289 struct svc_rdma_write_info
*info
=
290 container_of(cc
, struct svc_rdma_write_info
, wi_cc
);
292 switch (wc
->status
) {
294 trace_svcrdma_wc_write(&cc
->cc_cid
);
296 case IB_WC_WR_FLUSH_ERR
:
297 trace_svcrdma_wc_write_flush(wc
, &cc
->cc_cid
);
300 trace_svcrdma_wc_write_err(wc
, &cc
->cc_cid
);
303 svc_rdma_wake_send_waiters(rdma
, cc
->cc_sqecount
);
305 if (unlikely(wc
->status
!= IB_WC_SUCCESS
))
306 svc_xprt_deferred_close(&rdma
->sc_xprt
);
308 svc_rdma_write_info_free(info
);
312 * svc_rdma_wc_read_done - Handle completion of an RDMA Read ctx
313 * @cq: controlling Completion Queue
314 * @wc: Work Completion
317 static void svc_rdma_wc_read_done(struct ib_cq
*cq
, struct ib_wc
*wc
)
319 struct svcxprt_rdma
*rdma
= cq
->cq_context
;
320 struct ib_cqe
*cqe
= wc
->wr_cqe
;
321 struct svc_rdma_chunk_ctxt
*cc
=
322 container_of(cqe
, struct svc_rdma_chunk_ctxt
, cc_cqe
);
323 struct svc_rdma_recv_ctxt
*ctxt
;
325 svc_rdma_wake_send_waiters(rdma
, cc
->cc_sqecount
);
327 ctxt
= container_of(cc
, struct svc_rdma_recv_ctxt
, rc_cc
);
328 switch (wc
->status
) {
330 trace_svcrdma_wc_read(wc
, &cc
->cc_cid
, ctxt
->rc_readbytes
,
333 spin_lock(&rdma
->sc_rq_dto_lock
);
334 list_add_tail(&ctxt
->rc_list
, &rdma
->sc_read_complete_q
);
335 /* the unlock pairs with the smp_rmb in svc_xprt_ready */
336 set_bit(XPT_DATA
, &rdma
->sc_xprt
.xpt_flags
);
337 spin_unlock(&rdma
->sc_rq_dto_lock
);
338 svc_xprt_enqueue(&rdma
->sc_xprt
);
340 case IB_WC_WR_FLUSH_ERR
:
341 trace_svcrdma_wc_read_flush(wc
, &cc
->cc_cid
);
344 trace_svcrdma_wc_read_err(wc
, &cc
->cc_cid
);
347 /* The RDMA Read has flushed, so the incoming RPC message
348 * cannot be constructed and must be dropped. Signal the
349 * loss to the client by closing the connection.
351 svc_rdma_cc_release(rdma
, cc
, DMA_FROM_DEVICE
);
352 svc_rdma_recv_ctxt_put(rdma
, ctxt
);
353 svc_xprt_deferred_close(&rdma
->sc_xprt
);
358 * - If ib_post_send() succeeds, only one completion is expected,
359 * even if one or more WRs are flushed. This is true when posting
360 * an rdma_rw_ctx or when posting a single signaled WR.
362 static int svc_rdma_post_chunk_ctxt(struct svcxprt_rdma
*rdma
,
363 struct svc_rdma_chunk_ctxt
*cc
)
365 struct ib_send_wr
*first_wr
;
366 const struct ib_send_wr
*bad_wr
;
367 struct list_head
*tmp
;
373 if (cc
->cc_sqecount
> rdma
->sc_sq_depth
)
378 list_for_each(tmp
, &cc
->cc_rwctxts
) {
379 struct svc_rdma_rw_ctxt
*ctxt
;
381 ctxt
= list_entry(tmp
, struct svc_rdma_rw_ctxt
, rw_list
);
382 first_wr
= rdma_rw_ctx_wrs(&ctxt
->rw_ctx
, rdma
->sc_qp
,
383 rdma
->sc_port_num
, cqe
, first_wr
);
388 if (atomic_sub_return(cc
->cc_sqecount
,
389 &rdma
->sc_sq_avail
) > 0) {
390 cc
->cc_posttime
= ktime_get();
391 ret
= ib_post_send(rdma
->sc_qp
, first_wr
, &bad_wr
);
397 percpu_counter_inc(&svcrdma_stat_sq_starve
);
398 trace_svcrdma_sq_full(rdma
, &cc
->cc_cid
);
399 atomic_add(cc
->cc_sqecount
, &rdma
->sc_sq_avail
);
400 wait_event(rdma
->sc_send_wait
,
401 atomic_read(&rdma
->sc_sq_avail
) > cc
->cc_sqecount
);
402 trace_svcrdma_sq_retry(rdma
, &cc
->cc_cid
);
405 trace_svcrdma_sq_post_err(rdma
, &cc
->cc_cid
, ret
);
406 svc_xprt_deferred_close(&rdma
->sc_xprt
);
408 /* If even one was posted, there will be a completion. */
409 if (bad_wr
!= first_wr
)
412 atomic_add(cc
->cc_sqecount
, &rdma
->sc_sq_avail
);
413 wake_up(&rdma
->sc_send_wait
);
417 /* Build and DMA-map an SGL that covers one kvec in an xdr_buf
419 static void svc_rdma_vec_to_sg(struct svc_rdma_write_info
*info
,
421 struct svc_rdma_rw_ctxt
*ctxt
)
423 struct scatterlist
*sg
= ctxt
->rw_sg_table
.sgl
;
425 sg_set_buf(&sg
[0], info
->wi_base
, len
);
426 info
->wi_base
+= len
;
431 /* Build and DMA-map an SGL that covers part of an xdr_buf's pagelist.
433 static void svc_rdma_pagelist_to_sg(struct svc_rdma_write_info
*info
,
434 unsigned int remaining
,
435 struct svc_rdma_rw_ctxt
*ctxt
)
437 unsigned int sge_no
, sge_bytes
, page_off
, page_no
;
438 const struct xdr_buf
*xdr
= info
->wi_xdr
;
439 struct scatterlist
*sg
;
442 page_off
= info
->wi_next_off
+ xdr
->page_base
;
443 page_no
= page_off
>> PAGE_SHIFT
;
444 page_off
= offset_in_page(page_off
);
445 page
= xdr
->pages
+ page_no
;
446 info
->wi_next_off
+= remaining
;
447 sg
= ctxt
->rw_sg_table
.sgl
;
450 sge_bytes
= min_t(unsigned int, remaining
,
451 PAGE_SIZE
- page_off
);
452 sg_set_page(sg
, *page
, sge_bytes
, page_off
);
454 remaining
-= sge_bytes
;
461 ctxt
->rw_nents
= sge_no
;
464 /* Construct RDMA Write WRs to send a portion of an xdr_buf containing
468 svc_rdma_build_writes(struct svc_rdma_write_info
*info
,
469 void (*constructor
)(struct svc_rdma_write_info
*info
,
471 struct svc_rdma_rw_ctxt
*ctxt
),
472 unsigned int remaining
)
474 struct svc_rdma_chunk_ctxt
*cc
= &info
->wi_cc
;
475 struct svcxprt_rdma
*rdma
= info
->wi_rdma
;
476 const struct svc_rdma_segment
*seg
;
477 struct svc_rdma_rw_ctxt
*ctxt
;
481 unsigned int write_len
;
484 if (info
->wi_seg_no
>= info
->wi_chunk
->ch_segcount
)
487 seg
= &info
->wi_chunk
->ch_segments
[info
->wi_seg_no
];
488 write_len
= min(remaining
, seg
->rs_length
- info
->wi_seg_off
);
491 ctxt
= svc_rdma_get_rw_ctxt(rdma
,
492 (write_len
>> PAGE_SHIFT
) + 2);
496 constructor(info
, write_len
, ctxt
);
497 offset
= seg
->rs_offset
+ info
->wi_seg_off
;
498 ret
= svc_rdma_rw_ctx_init(rdma
, ctxt
, offset
, seg
->rs_handle
,
502 percpu_counter_inc(&svcrdma_stat_write
);
504 list_add(&ctxt
->rw_list
, &cc
->cc_rwctxts
);
505 cc
->cc_sqecount
+= ret
;
506 if (write_len
== seg
->rs_length
- info
->wi_seg_off
) {
508 info
->wi_seg_off
= 0;
510 info
->wi_seg_off
+= write_len
;
512 remaining
-= write_len
;
518 trace_svcrdma_small_wrch_err(&cc
->cc_cid
, remaining
, info
->wi_seg_no
,
519 info
->wi_chunk
->ch_segcount
);
524 * svc_rdma_iov_write - Construct RDMA Writes from an iov
525 * @info: pointer to write arguments
526 * @iov: kvec to write
529 * On success, returns zero
530 * %-E2BIG if the client-provided Write chunk is too small
531 * %-ENOMEM if a resource has been exhausted
532 * %-EIO if an rdma-rw error occurred
534 static int svc_rdma_iov_write(struct svc_rdma_write_info
*info
,
535 const struct kvec
*iov
)
537 info
->wi_base
= iov
->iov_base
;
538 return svc_rdma_build_writes(info
, svc_rdma_vec_to_sg
,
543 * svc_rdma_pages_write - Construct RDMA Writes from pages
544 * @info: pointer to write arguments
545 * @xdr: xdr_buf with pages to write
546 * @offset: offset into the content of @xdr
547 * @length: number of bytes to write
550 * On success, returns zero
551 * %-E2BIG if the client-provided Write chunk is too small
552 * %-ENOMEM if a resource has been exhausted
553 * %-EIO if an rdma-rw error occurred
555 static int svc_rdma_pages_write(struct svc_rdma_write_info
*info
,
556 const struct xdr_buf
*xdr
,
558 unsigned long length
)
561 info
->wi_next_off
= offset
- xdr
->head
[0].iov_len
;
562 return svc_rdma_build_writes(info
, svc_rdma_pagelist_to_sg
,
567 * svc_rdma_xb_write - Construct RDMA Writes to write an xdr_buf
568 * @xdr: xdr_buf to write
569 * @data: pointer to write arguments
572 * On success, returns zero
573 * %-E2BIG if the client-provided Write chunk is too small
574 * %-ENOMEM if a resource has been exhausted
575 * %-EIO if an rdma-rw error occurred
577 static int svc_rdma_xb_write(const struct xdr_buf
*xdr
, void *data
)
579 struct svc_rdma_write_info
*info
= data
;
582 if (xdr
->head
[0].iov_len
) {
583 ret
= svc_rdma_iov_write(info
, &xdr
->head
[0]);
589 ret
= svc_rdma_pages_write(info
, xdr
, xdr
->head
[0].iov_len
,
595 if (xdr
->tail
[0].iov_len
) {
596 ret
= svc_rdma_iov_write(info
, &xdr
->tail
[0]);
604 static int svc_rdma_send_write_chunk(struct svcxprt_rdma
*rdma
,
605 const struct svc_rdma_chunk
*chunk
,
606 const struct xdr_buf
*xdr
)
608 struct svc_rdma_write_info
*info
;
609 struct svc_rdma_chunk_ctxt
*cc
;
610 struct xdr_buf payload
;
613 if (xdr_buf_subsegment(xdr
, &payload
, chunk
->ch_position
,
614 chunk
->ch_payload_length
))
617 info
= svc_rdma_write_info_alloc(rdma
, chunk
);
622 ret
= svc_rdma_xb_write(&payload
, info
);
623 if (ret
!= payload
.len
)
626 trace_svcrdma_post_write_chunk(&cc
->cc_cid
, cc
->cc_sqecount
);
627 ret
= svc_rdma_post_chunk_ctxt(rdma
, cc
);
633 svc_rdma_write_info_free(info
);
638 * svc_rdma_send_write_list - Send all chunks on the Write list
639 * @rdma: controlling RDMA transport
640 * @rctxt: Write list provisioned by the client
641 * @xdr: xdr_buf containing an RPC Reply message
643 * Returns zero on success, or a negative errno if one or more
644 * Write chunks could not be sent.
646 int svc_rdma_send_write_list(struct svcxprt_rdma
*rdma
,
647 const struct svc_rdma_recv_ctxt
*rctxt
,
648 const struct xdr_buf
*xdr
)
650 struct svc_rdma_chunk
*chunk
;
653 pcl_for_each_chunk(chunk
, &rctxt
->rc_write_pcl
) {
654 if (!chunk
->ch_payload_length
)
656 ret
= svc_rdma_send_write_chunk(rdma
, chunk
, xdr
);
664 * svc_rdma_prepare_reply_chunk - Construct WR chain for writing the Reply chunk
665 * @rdma: controlling RDMA transport
666 * @write_pcl: Write chunk list provided by client
667 * @reply_pcl: Reply chunk provided by client
668 * @sctxt: Send WR resources
669 * @xdr: xdr_buf containing an RPC Reply
671 * Returns a non-negative number of bytes the chunk consumed, or
672 * %-E2BIG if the payload was larger than the Reply chunk,
673 * %-EINVAL if client provided too many segments,
674 * %-ENOMEM if rdma_rw context pool was exhausted,
675 * %-ENOTCONN if posting failed (connection is lost),
676 * %-EIO if rdma_rw initialization failed (DMA mapping, etc).
678 int svc_rdma_prepare_reply_chunk(struct svcxprt_rdma
*rdma
,
679 const struct svc_rdma_pcl
*write_pcl
,
680 const struct svc_rdma_pcl
*reply_pcl
,
681 struct svc_rdma_send_ctxt
*sctxt
,
682 const struct xdr_buf
*xdr
)
684 struct svc_rdma_write_info
*info
= &sctxt
->sc_reply_info
;
685 struct svc_rdma_chunk_ctxt
*cc
= &info
->wi_cc
;
686 struct ib_send_wr
*first_wr
;
687 struct list_head
*pos
;
691 info
->wi_rdma
= rdma
;
692 info
->wi_chunk
= pcl_first_chunk(reply_pcl
);
693 info
->wi_seg_off
= 0;
695 info
->wi_cc
.cc_cqe
.done
= svc_rdma_reply_done
;
697 ret
= pcl_process_nonpayloads(write_pcl
, xdr
,
698 svc_rdma_xb_write
, info
);
702 first_wr
= sctxt
->sc_wr_chain
;
704 list_for_each(pos
, &cc
->cc_rwctxts
) {
705 struct svc_rdma_rw_ctxt
*rwc
;
707 rwc
= list_entry(pos
, struct svc_rdma_rw_ctxt
, rw_list
);
708 first_wr
= rdma_rw_ctx_wrs(&rwc
->rw_ctx
, rdma
->sc_qp
,
709 rdma
->sc_port_num
, cqe
, first_wr
);
712 sctxt
->sc_wr_chain
= first_wr
;
713 sctxt
->sc_sqecount
+= cc
->cc_sqecount
;
715 trace_svcrdma_post_reply_chunk(&cc
->cc_cid
, cc
->cc_sqecount
);
720 * svc_rdma_build_read_segment - Build RDMA Read WQEs to pull one RDMA segment
721 * @rqstp: RPC transaction context
722 * @head: context for ongoing I/O
723 * @segment: co-ordinates of remote memory to be read
726 * %0: the Read WR chain was constructed successfully
727 * %-EINVAL: there were not enough rq_pages to finish
728 * %-ENOMEM: allocating a local resources failed
729 * %-EIO: a DMA mapping error occurred
731 static int svc_rdma_build_read_segment(struct svc_rqst
*rqstp
,
732 struct svc_rdma_recv_ctxt
*head
,
733 const struct svc_rdma_segment
*segment
)
735 struct svcxprt_rdma
*rdma
= svc_rdma_rqst_rdma(rqstp
);
736 struct svc_rdma_chunk_ctxt
*cc
= &head
->rc_cc
;
737 unsigned int sge_no
, seg_len
, len
;
738 struct svc_rdma_rw_ctxt
*ctxt
;
739 struct scatterlist
*sg
;
742 len
= segment
->rs_length
;
743 sge_no
= PAGE_ALIGN(head
->rc_pageoff
+ len
) >> PAGE_SHIFT
;
744 ctxt
= svc_rdma_get_rw_ctxt(rdma
, sge_no
);
747 ctxt
->rw_nents
= sge_no
;
749 sg
= ctxt
->rw_sg_table
.sgl
;
750 for (sge_no
= 0; sge_no
< ctxt
->rw_nents
; sge_no
++) {
751 seg_len
= min_t(unsigned int, len
,
752 PAGE_SIZE
- head
->rc_pageoff
);
754 if (!head
->rc_pageoff
)
755 head
->rc_page_count
++;
757 sg_set_page(sg
, rqstp
->rq_pages
[head
->rc_curpage
],
758 seg_len
, head
->rc_pageoff
);
761 head
->rc_pageoff
+= seg_len
;
762 if (head
->rc_pageoff
== PAGE_SIZE
) {
764 head
->rc_pageoff
= 0;
768 if (len
&& ((head
->rc_curpage
+ 1) > ARRAY_SIZE(rqstp
->rq_pages
)))
772 ret
= svc_rdma_rw_ctx_init(rdma
, ctxt
, segment
->rs_offset
,
773 segment
->rs_handle
, DMA_FROM_DEVICE
);
776 percpu_counter_inc(&svcrdma_stat_read
);
778 list_add(&ctxt
->rw_list
, &cc
->cc_rwctxts
);
779 cc
->cc_sqecount
+= ret
;
783 trace_svcrdma_page_overrun_err(&cc
->cc_cid
, head
->rc_curpage
);
788 * svc_rdma_build_read_chunk - Build RDMA Read WQEs to pull one RDMA chunk
789 * @rqstp: RPC transaction context
790 * @head: context for ongoing I/O
791 * @chunk: Read chunk to pull
794 * %0: the Read WR chain was constructed successfully
795 * %-EINVAL: there were not enough resources to finish
796 * %-ENOMEM: allocating a local resources failed
797 * %-EIO: a DMA mapping error occurred
799 static int svc_rdma_build_read_chunk(struct svc_rqst
*rqstp
,
800 struct svc_rdma_recv_ctxt
*head
,
801 const struct svc_rdma_chunk
*chunk
)
803 const struct svc_rdma_segment
*segment
;
807 pcl_for_each_segment(segment
, chunk
) {
808 ret
= svc_rdma_build_read_segment(rqstp
, head
, segment
);
811 head
->rc_readbytes
+= segment
->rs_length
;
817 * svc_rdma_copy_inline_range - Copy part of the inline content into pages
818 * @rqstp: RPC transaction context
819 * @head: context for ongoing I/O
820 * @offset: offset into the Receive buffer of region to copy
821 * @remaining: length of region to copy
823 * Take a page at a time from rqstp->rq_pages and copy the inline
824 * content from the Receive buffer into that page. Update
825 * head->rc_curpage and head->rc_pageoff so that the next RDMA Read
826 * result will land contiguously with the copied content.
829 * %0: Inline content was successfully copied
830 * %-EINVAL: offset or length was incorrect
832 static int svc_rdma_copy_inline_range(struct svc_rqst
*rqstp
,
833 struct svc_rdma_recv_ctxt
*head
,
835 unsigned int remaining
)
837 unsigned char *dst
, *src
= head
->rc_recv_buf
;
838 unsigned int page_no
, numpages
;
840 numpages
= PAGE_ALIGN(head
->rc_pageoff
+ remaining
) >> PAGE_SHIFT
;
841 for (page_no
= 0; page_no
< numpages
; page_no
++) {
842 unsigned int page_len
;
844 page_len
= min_t(unsigned int, remaining
,
845 PAGE_SIZE
- head
->rc_pageoff
);
847 if (!head
->rc_pageoff
)
848 head
->rc_page_count
++;
850 dst
= page_address(rqstp
->rq_pages
[head
->rc_curpage
]);
851 memcpy(dst
+ head
->rc_curpage
, src
+ offset
, page_len
);
853 head
->rc_readbytes
+= page_len
;
854 head
->rc_pageoff
+= page_len
;
855 if (head
->rc_pageoff
== PAGE_SIZE
) {
857 head
->rc_pageoff
= 0;
859 remaining
-= page_len
;
867 * svc_rdma_read_multiple_chunks - Construct RDMA Reads to pull data item Read chunks
868 * @rqstp: RPC transaction context
869 * @head: context for ongoing I/O
871 * The chunk data lands in rqstp->rq_arg as a series of contiguous pages,
872 * like an incoming TCP call.
875 * %0: RDMA Read WQEs were successfully built
876 * %-EINVAL: client provided too many chunks or segments,
877 * %-ENOMEM: rdma_rw context pool was exhausted,
878 * %-ENOTCONN: posting failed (connection is lost),
879 * %-EIO: rdma_rw initialization failed (DMA mapping, etc).
882 svc_rdma_read_multiple_chunks(struct svc_rqst
*rqstp
,
883 struct svc_rdma_recv_ctxt
*head
)
885 const struct svc_rdma_pcl
*pcl
= &head
->rc_read_pcl
;
886 struct svc_rdma_chunk
*chunk
, *next
;
887 unsigned int start
, length
;
891 chunk
= pcl_first_chunk(pcl
);
892 length
= chunk
->ch_position
;
893 ret
= svc_rdma_copy_inline_range(rqstp
, head
, start
, length
);
897 pcl_for_each_chunk(chunk
, pcl
) {
898 ret
= svc_rdma_build_read_chunk(rqstp
, head
, chunk
);
902 next
= pcl_next_chunk(pcl
, chunk
);
907 length
= next
->ch_position
- head
->rc_readbytes
;
908 ret
= svc_rdma_copy_inline_range(rqstp
, head
, start
, length
);
914 length
= head
->rc_byte_len
- start
;
915 return svc_rdma_copy_inline_range(rqstp
, head
, start
, length
);
919 * svc_rdma_read_data_item - Construct RDMA Reads to pull data item Read chunks
920 * @rqstp: RPC transaction context
921 * @head: context for ongoing I/O
923 * The chunk data lands in the page list of rqstp->rq_arg.pages.
925 * Currently NFSD does not look at the rqstp->rq_arg.tail[0] kvec.
926 * Therefore, XDR round-up of the Read chunk and trailing
927 * inline content must both be added at the end of the pagelist.
930 * %0: RDMA Read WQEs were successfully built
931 * %-EINVAL: client provided too many chunks or segments,
932 * %-ENOMEM: rdma_rw context pool was exhausted,
933 * %-ENOTCONN: posting failed (connection is lost),
934 * %-EIO: rdma_rw initialization failed (DMA mapping, etc).
936 static int svc_rdma_read_data_item(struct svc_rqst
*rqstp
,
937 struct svc_rdma_recv_ctxt
*head
)
939 return svc_rdma_build_read_chunk(rqstp
, head
,
940 pcl_first_chunk(&head
->rc_read_pcl
));
944 * svc_rdma_read_chunk_range - Build RDMA Read WRs for portion of a chunk
945 * @rqstp: RPC transaction context
946 * @head: context for ongoing I/O
947 * @chunk: parsed Call chunk to pull
948 * @offset: offset of region to pull
949 * @length: length of region to pull
952 * %0: RDMA Read WQEs were successfully built
953 * %-EINVAL: there were not enough resources to finish
954 * %-ENOMEM: rdma_rw context pool was exhausted,
955 * %-ENOTCONN: posting failed (connection is lost),
956 * %-EIO: rdma_rw initialization failed (DMA mapping, etc).
958 static int svc_rdma_read_chunk_range(struct svc_rqst
*rqstp
,
959 struct svc_rdma_recv_ctxt
*head
,
960 const struct svc_rdma_chunk
*chunk
,
961 unsigned int offset
, unsigned int length
)
963 const struct svc_rdma_segment
*segment
;
967 pcl_for_each_segment(segment
, chunk
) {
968 struct svc_rdma_segment dummy
;
970 if (offset
> segment
->rs_length
) {
971 offset
-= segment
->rs_length
;
975 dummy
.rs_handle
= segment
->rs_handle
;
976 dummy
.rs_length
= min_t(u32
, length
, segment
->rs_length
) - offset
;
977 dummy
.rs_offset
= segment
->rs_offset
+ offset
;
979 ret
= svc_rdma_build_read_segment(rqstp
, head
, &dummy
);
983 head
->rc_readbytes
+= dummy
.rs_length
;
984 length
-= dummy
.rs_length
;
991 * svc_rdma_read_call_chunk - Build RDMA Read WQEs to pull a Long Message
992 * @rqstp: RPC transaction context
993 * @head: context for ongoing I/O
996 * %0: RDMA Read WQEs were successfully built
997 * %-EINVAL: there were not enough resources to finish
998 * %-ENOMEM: rdma_rw context pool was exhausted,
999 * %-ENOTCONN: posting failed (connection is lost),
1000 * %-EIO: rdma_rw initialization failed (DMA mapping, etc).
1002 static int svc_rdma_read_call_chunk(struct svc_rqst
*rqstp
,
1003 struct svc_rdma_recv_ctxt
*head
)
1005 const struct svc_rdma_chunk
*call_chunk
=
1006 pcl_first_chunk(&head
->rc_call_pcl
);
1007 const struct svc_rdma_pcl
*pcl
= &head
->rc_read_pcl
;
1008 struct svc_rdma_chunk
*chunk
, *next
;
1009 unsigned int start
, length
;
1012 if (pcl_is_empty(pcl
))
1013 return svc_rdma_build_read_chunk(rqstp
, head
, call_chunk
);
1016 chunk
= pcl_first_chunk(pcl
);
1017 length
= chunk
->ch_position
;
1018 ret
= svc_rdma_read_chunk_range(rqstp
, head
, call_chunk
,
1023 pcl_for_each_chunk(chunk
, pcl
) {
1024 ret
= svc_rdma_build_read_chunk(rqstp
, head
, chunk
);
1028 next
= pcl_next_chunk(pcl
, chunk
);
1033 length
= next
->ch_position
- head
->rc_readbytes
;
1034 ret
= svc_rdma_read_chunk_range(rqstp
, head
, call_chunk
,
1041 length
= call_chunk
->ch_length
- start
;
1042 return svc_rdma_read_chunk_range(rqstp
, head
, call_chunk
,
1047 * svc_rdma_read_special - Build RDMA Read WQEs to pull a Long Message
1048 * @rqstp: RPC transaction context
1049 * @head: context for ongoing I/O
1051 * The start of the data lands in the first page just after the
1052 * Transport header, and the rest lands in rqstp->rq_arg.pages.
1055 * - A PZRC is never sent in an RDMA_MSG message, though it's
1059 * %0: RDMA Read WQEs were successfully built
1060 * %-EINVAL: client provided too many chunks or segments,
1061 * %-ENOMEM: rdma_rw context pool was exhausted,
1062 * %-ENOTCONN: posting failed (connection is lost),
1063 * %-EIO: rdma_rw initialization failed (DMA mapping, etc).
1065 static noinline
int svc_rdma_read_special(struct svc_rqst
*rqstp
,
1066 struct svc_rdma_recv_ctxt
*head
)
1068 return svc_rdma_read_call_chunk(rqstp
, head
);
1071 /* Pages under I/O have been copied to head->rc_pages. Ensure that
1072 * svc_xprt_release() does not put them when svc_rdma_recvfrom()
1073 * returns. This has to be done after all Read WRs are constructed
1074 * to properly handle a page that happens to be part of I/O on behalf
1075 * of two different RDMA segments.
1077 * Note: if the subsequent post_send fails, these pages have already
1078 * been moved to head->rc_pages and thus will be cleaned up by
1079 * svc_rdma_recv_ctxt_put().
1081 static void svc_rdma_clear_rqst_pages(struct svc_rqst
*rqstp
,
1082 struct svc_rdma_recv_ctxt
*head
)
1086 for (i
= 0; i
< head
->rc_page_count
; i
++) {
1087 head
->rc_pages
[i
] = rqstp
->rq_pages
[i
];
1088 rqstp
->rq_pages
[i
] = NULL
;
1093 * svc_rdma_process_read_list - Pull list of Read chunks from the client
1094 * @rdma: controlling RDMA transport
1095 * @rqstp: set of pages to use as Read sink buffers
1096 * @head: pages under I/O collect here
1098 * The RPC/RDMA protocol assumes that the upper layer's XDR decoders
1099 * pull each Read chunk as they decode an incoming RPC message.
1101 * On Linux, however, the server needs to have a fully-constructed RPC
1102 * message in rqstp->rq_arg when there is a positive return code from
1103 * ->xpo_recvfrom. So the Read list is safety-checked immediately when
1104 * it is received, then here the whole Read list is pulled all at once.
1105 * The ingress RPC message is fully reconstructed once all associated
1106 * RDMA Reads have completed.
1109 * %1: all needed RDMA Reads were posted successfully,
1110 * %-EINVAL: client provided too many chunks or segments,
1111 * %-ENOMEM: rdma_rw context pool was exhausted,
1112 * %-ENOTCONN: posting failed (connection is lost),
1113 * %-EIO: rdma_rw initialization failed (DMA mapping, etc).
1115 int svc_rdma_process_read_list(struct svcxprt_rdma
*rdma
,
1116 struct svc_rqst
*rqstp
,
1117 struct svc_rdma_recv_ctxt
*head
)
1119 struct svc_rdma_chunk_ctxt
*cc
= &head
->rc_cc
;
1122 cc
->cc_cqe
.done
= svc_rdma_wc_read_done
;
1123 cc
->cc_sqecount
= 0;
1124 head
->rc_pageoff
= 0;
1125 head
->rc_curpage
= 0;
1126 head
->rc_readbytes
= 0;
1128 if (pcl_is_empty(&head
->rc_call_pcl
)) {
1129 if (head
->rc_read_pcl
.cl_count
== 1)
1130 ret
= svc_rdma_read_data_item(rqstp
, head
);
1132 ret
= svc_rdma_read_multiple_chunks(rqstp
, head
);
1134 ret
= svc_rdma_read_special(rqstp
, head
);
1135 svc_rdma_clear_rqst_pages(rqstp
, head
);
1139 trace_svcrdma_post_read_chunk(&cc
->cc_cid
, cc
->cc_sqecount
);
1140 ret
= svc_rdma_post_chunk_ctxt(rdma
, cc
);
1141 return ret
< 0 ? ret
: 1;