1 // SPDX-License-Identifier: GPL-2.0 OR BSD-3-Clause
3 * Copyright (c) 2014-2017 Oracle. All rights reserved.
4 * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
6 * This software is available to you under a choice of one of two
7 * licenses. You may choose to be licensed under the terms of the GNU
8 * General Public License (GPL) Version 2, available from the file
9 * COPYING in the main directory of this source tree, or the BSD-type
12 * Redistribution and use in source and binary forms, with or without
13 * modification, are permitted provided that the following conditions
16 * Redistributions of source code must retain the above copyright
17 * notice, this list of conditions and the following disclaimer.
19 * Redistributions in binary form must reproduce the above
20 * copyright notice, this list of conditions and the following
21 * disclaimer in the documentation and/or other materials provided
22 * with the distribution.
24 * Neither the name of the Network Appliance, Inc. nor the names of
25 * its contributors may be used to endorse or promote products
26 * derived from this software without specific prior written
29 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
30 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
31 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
32 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
33 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
34 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
35 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
36 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
37 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
38 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
39 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
45 * Encapsulates the major functions managing:
52 #include <linux/bitops.h>
53 #include <linux/interrupt.h>
54 #include <linux/slab.h>
55 #include <linux/sunrpc/addr.h>
56 #include <linux/sunrpc/svc_rdma.h>
57 #include <linux/log2.h>
59 #include <asm/barrier.h>
61 #include <rdma/ib_cm.h>
63 #include "xprt_rdma.h"
64 #include <trace/events/rpcrdma.h>
66 static int rpcrdma_sendctxs_create(struct rpcrdma_xprt
*r_xprt
);
67 static void rpcrdma_sendctxs_destroy(struct rpcrdma_xprt
*r_xprt
);
68 static void rpcrdma_sendctx_put_locked(struct rpcrdma_xprt
*r_xprt
,
69 struct rpcrdma_sendctx
*sc
);
70 static int rpcrdma_reqs_setup(struct rpcrdma_xprt
*r_xprt
);
71 static void rpcrdma_reqs_reset(struct rpcrdma_xprt
*r_xprt
);
72 static void rpcrdma_reps_unmap(struct rpcrdma_xprt
*r_xprt
);
73 static void rpcrdma_mrs_create(struct rpcrdma_xprt
*r_xprt
);
74 static void rpcrdma_mrs_destroy(struct rpcrdma_xprt
*r_xprt
);
75 static void rpcrdma_ep_get(struct rpcrdma_ep
*ep
);
76 static int rpcrdma_ep_put(struct rpcrdma_ep
*ep
);
77 static struct rpcrdma_regbuf
*
78 rpcrdma_regbuf_alloc_node(size_t size
, enum dma_data_direction direction
,
80 static struct rpcrdma_regbuf
*
81 rpcrdma_regbuf_alloc(size_t size
, enum dma_data_direction direction
);
82 static void rpcrdma_regbuf_dma_unmap(struct rpcrdma_regbuf
*rb
);
83 static void rpcrdma_regbuf_free(struct rpcrdma_regbuf
*rb
);
85 /* Wait for outstanding transport work to finish. ib_drain_qp
86 * handles the drains in the wrong order for us, so open code
89 static void rpcrdma_xprt_drain(struct rpcrdma_xprt
*r_xprt
)
91 struct rpcrdma_ep
*ep
= r_xprt
->rx_ep
;
92 struct rdma_cm_id
*id
= ep
->re_id
;
94 /* Wait for rpcrdma_post_recvs() to leave its critical
97 if (atomic_inc_return(&ep
->re_receiving
) > 1)
98 wait_for_completion(&ep
->re_done
);
100 /* Flush Receives, then wait for deferred Reply work
105 /* Deferred Reply processing might have scheduled
106 * local invalidations.
113 /* Ensure xprt_force_disconnect() is invoked exactly once when a
114 * connection is closed or lost. (The important thing is it needs
115 * to be invoked "at least" once).
117 void rpcrdma_force_disconnect(struct rpcrdma_ep
*ep
)
119 if (atomic_add_unless(&ep
->re_force_disconnect
, 1, 1))
120 xprt_force_disconnect(ep
->re_xprt
);
124 * rpcrdma_flush_disconnect - Disconnect on flushed completion
125 * @r_xprt: transport to disconnect
126 * @wc: work completion entry
128 * Must be called in process context.
130 void rpcrdma_flush_disconnect(struct rpcrdma_xprt
*r_xprt
, struct ib_wc
*wc
)
132 if (wc
->status
!= IB_WC_SUCCESS
)
133 rpcrdma_force_disconnect(r_xprt
->rx_ep
);
137 * rpcrdma_wc_send - Invoked by RDMA provider for each polled Send WC
138 * @cq: completion queue
139 * @wc: WCE for a completed Send WR
142 static void rpcrdma_wc_send(struct ib_cq
*cq
, struct ib_wc
*wc
)
144 struct ib_cqe
*cqe
= wc
->wr_cqe
;
145 struct rpcrdma_sendctx
*sc
=
146 container_of(cqe
, struct rpcrdma_sendctx
, sc_cqe
);
147 struct rpcrdma_xprt
*r_xprt
= cq
->cq_context
;
149 /* WARNING: Only wr_cqe and status are reliable at this point */
150 trace_xprtrdma_wc_send(wc
, &sc
->sc_cid
);
151 rpcrdma_sendctx_put_locked(r_xprt
, sc
);
152 rpcrdma_flush_disconnect(r_xprt
, wc
);
156 * rpcrdma_wc_receive - Invoked by RDMA provider for each polled Receive WC
157 * @cq: completion queue
158 * @wc: WCE for a completed Receive WR
161 static void rpcrdma_wc_receive(struct ib_cq
*cq
, struct ib_wc
*wc
)
163 struct ib_cqe
*cqe
= wc
->wr_cqe
;
164 struct rpcrdma_rep
*rep
= container_of(cqe
, struct rpcrdma_rep
,
166 struct rpcrdma_xprt
*r_xprt
= cq
->cq_context
;
168 /* WARNING: Only wr_cqe and status are reliable at this point */
169 trace_xprtrdma_wc_receive(wc
, &rep
->rr_cid
);
170 --r_xprt
->rx_ep
->re_receive_count
;
171 if (wc
->status
!= IB_WC_SUCCESS
)
174 /* status == SUCCESS means all fields in wc are trustworthy */
175 rpcrdma_set_xdrlen(&rep
->rr_hdrbuf
, wc
->byte_len
);
176 rep
->rr_wc_flags
= wc
->wc_flags
;
177 rep
->rr_inv_rkey
= wc
->ex
.invalidate_rkey
;
179 ib_dma_sync_single_for_cpu(rdmab_device(rep
->rr_rdmabuf
),
180 rdmab_addr(rep
->rr_rdmabuf
),
181 wc
->byte_len
, DMA_FROM_DEVICE
);
183 rpcrdma_reply_handler(rep
);
187 rpcrdma_flush_disconnect(r_xprt
, wc
);
188 rpcrdma_rep_put(&r_xprt
->rx_buf
, rep
);
191 static void rpcrdma_update_cm_private(struct rpcrdma_ep
*ep
,
192 struct rdma_conn_param
*param
)
194 const struct rpcrdma_connect_private
*pmsg
= param
->private_data
;
195 unsigned int rsize
, wsize
;
197 /* Default settings for RPC-over-RDMA Version One */
198 rsize
= RPCRDMA_V1_DEF_INLINE_SIZE
;
199 wsize
= RPCRDMA_V1_DEF_INLINE_SIZE
;
202 pmsg
->cp_magic
== rpcrdma_cmp_magic
&&
203 pmsg
->cp_version
== RPCRDMA_CMP_VERSION
) {
204 rsize
= rpcrdma_decode_buffer_size(pmsg
->cp_send_size
);
205 wsize
= rpcrdma_decode_buffer_size(pmsg
->cp_recv_size
);
208 if (rsize
< ep
->re_inline_recv
)
209 ep
->re_inline_recv
= rsize
;
210 if (wsize
< ep
->re_inline_send
)
211 ep
->re_inline_send
= wsize
;
213 rpcrdma_set_max_header_sizes(ep
);
217 * rpcrdma_cm_event_handler - Handle RDMA CM events
218 * @id: rdma_cm_id on which an event has occurred
219 * @event: details of the event
221 * Called with @id's mutex held. Returns 1 if caller should
222 * destroy @id, otherwise 0.
225 rpcrdma_cm_event_handler(struct rdma_cm_id
*id
, struct rdma_cm_event
*event
)
227 struct rpcrdma_ep
*ep
= id
->context
;
231 switch (event
->event
) {
232 case RDMA_CM_EVENT_ADDR_RESOLVED
:
233 case RDMA_CM_EVENT_ROUTE_RESOLVED
:
235 complete(&ep
->re_done
);
237 case RDMA_CM_EVENT_ADDR_ERROR
:
238 ep
->re_async_rc
= -EPROTO
;
239 complete(&ep
->re_done
);
241 case RDMA_CM_EVENT_ROUTE_ERROR
:
242 ep
->re_async_rc
= -ENETUNREACH
;
243 complete(&ep
->re_done
);
245 case RDMA_CM_EVENT_ADDR_CHANGE
:
246 ep
->re_connect_status
= -ENODEV
;
248 case RDMA_CM_EVENT_ESTABLISHED
:
250 ep
->re_connect_status
= 1;
251 rpcrdma_update_cm_private(ep
, &event
->param
.conn
);
252 trace_xprtrdma_inline_thresh(ep
);
253 wake_up_all(&ep
->re_connect_wait
);
255 case RDMA_CM_EVENT_CONNECT_ERROR
:
256 ep
->re_connect_status
= -ENOTCONN
;
257 goto wake_connect_worker
;
258 case RDMA_CM_EVENT_UNREACHABLE
:
259 ep
->re_connect_status
= -ENETUNREACH
;
260 goto wake_connect_worker
;
261 case RDMA_CM_EVENT_REJECTED
:
262 ep
->re_connect_status
= -ECONNREFUSED
;
263 if (event
->status
== IB_CM_REJ_STALE_CONN
)
264 ep
->re_connect_status
= -ENOTCONN
;
266 wake_up_all(&ep
->re_connect_wait
);
268 case RDMA_CM_EVENT_DISCONNECTED
:
269 ep
->re_connect_status
= -ECONNABORTED
;
271 rpcrdma_force_disconnect(ep
);
272 return rpcrdma_ep_put(ep
);
280 static void rpcrdma_ep_removal_done(struct rpcrdma_notification
*rn
)
282 struct rpcrdma_ep
*ep
= container_of(rn
, struct rpcrdma_ep
, re_rn
);
284 trace_xprtrdma_device_removal(ep
->re_id
);
285 xprt_force_disconnect(ep
->re_xprt
);
288 static struct rdma_cm_id
*rpcrdma_create_id(struct rpcrdma_xprt
*r_xprt
,
289 struct rpcrdma_ep
*ep
)
291 unsigned long wtimeout
= msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT
) + 1;
292 struct rpc_xprt
*xprt
= &r_xprt
->rx_xprt
;
293 struct rdma_cm_id
*id
;
296 init_completion(&ep
->re_done
);
298 id
= rdma_create_id(xprt
->xprt_net
, rpcrdma_cm_event_handler
, ep
,
299 RDMA_PS_TCP
, IB_QPT_RC
);
303 ep
->re_async_rc
= -ETIMEDOUT
;
304 rc
= rdma_resolve_addr(id
, NULL
, (struct sockaddr
*)&xprt
->addr
,
305 RDMA_RESOLVE_TIMEOUT
);
308 rc
= wait_for_completion_interruptible_timeout(&ep
->re_done
, wtimeout
);
312 rc
= ep
->re_async_rc
;
316 ep
->re_async_rc
= -ETIMEDOUT
;
317 rc
= rdma_resolve_route(id
, RDMA_RESOLVE_TIMEOUT
);
320 rc
= wait_for_completion_interruptible_timeout(&ep
->re_done
, wtimeout
);
323 rc
= ep
->re_async_rc
;
327 rc
= rpcrdma_rn_register(id
->device
, &ep
->re_rn
, rpcrdma_ep_removal_done
);
338 static void rpcrdma_ep_destroy(struct kref
*kref
)
340 struct rpcrdma_ep
*ep
= container_of(kref
, struct rpcrdma_ep
, re_kref
);
343 rdma_destroy_qp(ep
->re_id
);
344 ep
->re_id
->qp
= NULL
;
347 if (ep
->re_attr
.recv_cq
)
348 ib_free_cq(ep
->re_attr
.recv_cq
);
349 ep
->re_attr
.recv_cq
= NULL
;
350 if (ep
->re_attr
.send_cq
)
351 ib_free_cq(ep
->re_attr
.send_cq
);
352 ep
->re_attr
.send_cq
= NULL
;
355 ib_dealloc_pd(ep
->re_pd
);
358 rpcrdma_rn_unregister(ep
->re_id
->device
, &ep
->re_rn
);
361 module_put(THIS_MODULE
);
364 static noinline
void rpcrdma_ep_get(struct rpcrdma_ep
*ep
)
366 kref_get(&ep
->re_kref
);
370 * %0 if @ep still has a positive kref count, or
371 * %1 if @ep was destroyed successfully.
373 static noinline
int rpcrdma_ep_put(struct rpcrdma_ep
*ep
)
375 return kref_put(&ep
->re_kref
, rpcrdma_ep_destroy
);
378 static int rpcrdma_ep_create(struct rpcrdma_xprt
*r_xprt
)
380 struct rpcrdma_connect_private
*pmsg
;
381 struct ib_device
*device
;
382 struct rdma_cm_id
*id
;
383 struct rpcrdma_ep
*ep
;
386 ep
= kzalloc(sizeof(*ep
), XPRTRDMA_GFP_FLAGS
);
389 ep
->re_xprt
= &r_xprt
->rx_xprt
;
390 kref_init(&ep
->re_kref
);
392 id
= rpcrdma_create_id(r_xprt
, ep
);
397 __module_get(THIS_MODULE
);
400 reinit_completion(&ep
->re_done
);
402 ep
->re_max_requests
= r_xprt
->rx_xprt
.max_reqs
;
403 ep
->re_inline_send
= xprt_rdma_max_inline_write
;
404 ep
->re_inline_recv
= xprt_rdma_max_inline_read
;
405 rc
= frwr_query_device(ep
, device
);
409 r_xprt
->rx_buf
.rb_max_requests
= cpu_to_be32(ep
->re_max_requests
);
411 ep
->re_attr
.srq
= NULL
;
412 ep
->re_attr
.cap
.max_inline_data
= 0;
413 ep
->re_attr
.sq_sig_type
= IB_SIGNAL_REQ_WR
;
414 ep
->re_attr
.qp_type
= IB_QPT_RC
;
415 ep
->re_attr
.port_num
= ~0;
417 ep
->re_send_batch
= ep
->re_max_requests
>> 3;
418 ep
->re_send_count
= ep
->re_send_batch
;
419 init_waitqueue_head(&ep
->re_connect_wait
);
421 ep
->re_attr
.send_cq
= ib_alloc_cq_any(device
, r_xprt
,
422 ep
->re_attr
.cap
.max_send_wr
,
424 if (IS_ERR(ep
->re_attr
.send_cq
)) {
425 rc
= PTR_ERR(ep
->re_attr
.send_cq
);
426 ep
->re_attr
.send_cq
= NULL
;
430 ep
->re_attr
.recv_cq
= ib_alloc_cq_any(device
, r_xprt
,
431 ep
->re_attr
.cap
.max_recv_wr
,
433 if (IS_ERR(ep
->re_attr
.recv_cq
)) {
434 rc
= PTR_ERR(ep
->re_attr
.recv_cq
);
435 ep
->re_attr
.recv_cq
= NULL
;
438 ep
->re_receive_count
= 0;
440 /* Initialize cma parameters */
441 memset(&ep
->re_remote_cma
, 0, sizeof(ep
->re_remote_cma
));
443 /* Prepare RDMA-CM private message */
444 pmsg
= &ep
->re_cm_private
;
445 pmsg
->cp_magic
= rpcrdma_cmp_magic
;
446 pmsg
->cp_version
= RPCRDMA_CMP_VERSION
;
447 pmsg
->cp_flags
|= RPCRDMA_CMP_F_SND_W_INV_OK
;
448 pmsg
->cp_send_size
= rpcrdma_encode_buffer_size(ep
->re_inline_send
);
449 pmsg
->cp_recv_size
= rpcrdma_encode_buffer_size(ep
->re_inline_recv
);
450 ep
->re_remote_cma
.private_data
= pmsg
;
451 ep
->re_remote_cma
.private_data_len
= sizeof(*pmsg
);
453 /* Client offers RDMA Read but does not initiate */
454 ep
->re_remote_cma
.initiator_depth
= 0;
455 ep
->re_remote_cma
.responder_resources
=
456 min_t(int, U8_MAX
, device
->attrs
.max_qp_rd_atom
);
458 /* Limit transport retries so client can detect server
459 * GID changes quickly. RPC layer handles re-establishing
460 * transport connection and retransmission.
462 ep
->re_remote_cma
.retry_count
= 6;
464 /* RPC-over-RDMA handles its own flow control. In addition,
465 * make all RNR NAKs visible so we know that RPC-over-RDMA
466 * flow control is working correctly (no NAKs should be seen).
468 ep
->re_remote_cma
.flow_control
= 0;
469 ep
->re_remote_cma
.rnr_retry_count
= 0;
471 ep
->re_pd
= ib_alloc_pd(device
, 0);
472 if (IS_ERR(ep
->re_pd
)) {
473 rc
= PTR_ERR(ep
->re_pd
);
478 rc
= rdma_create_qp(id
, ep
->re_pd
, &ep
->re_attr
);
492 * rpcrdma_xprt_connect - Connect an unconnected transport
493 * @r_xprt: controlling transport instance
495 * Returns 0 on success or a negative errno.
497 int rpcrdma_xprt_connect(struct rpcrdma_xprt
*r_xprt
)
499 struct rpc_xprt
*xprt
= &r_xprt
->rx_xprt
;
500 struct rpcrdma_ep
*ep
;
503 rc
= rpcrdma_ep_create(r_xprt
);
508 xprt_clear_connected(xprt
);
509 rpcrdma_reset_cwnd(r_xprt
);
511 /* Bump the ep's reference count while there are
512 * outstanding Receives.
515 rpcrdma_post_recvs(r_xprt
, 1);
517 rc
= rdma_connect(ep
->re_id
, &ep
->re_remote_cma
);
521 if (xprt
->reestablish_timeout
< RPCRDMA_INIT_REEST_TO
)
522 xprt
->reestablish_timeout
= RPCRDMA_INIT_REEST_TO
;
523 wait_event_interruptible(ep
->re_connect_wait
,
524 ep
->re_connect_status
!= 0);
525 if (ep
->re_connect_status
<= 0) {
526 rc
= ep
->re_connect_status
;
530 rc
= rpcrdma_sendctxs_create(r_xprt
);
536 rc
= rpcrdma_reqs_setup(r_xprt
);
541 rpcrdma_mrs_create(r_xprt
);
542 frwr_wp_create(r_xprt
);
545 trace_xprtrdma_connect(r_xprt
, rc
);
550 * rpcrdma_xprt_disconnect - Disconnect underlying transport
551 * @r_xprt: controlling transport instance
553 * Caller serializes. Either the transport send lock is held,
554 * or we're being called to destroy the transport.
556 * On return, @r_xprt is completely divested of all hardware
557 * resources and prepared for the next ->connect operation.
559 void rpcrdma_xprt_disconnect(struct rpcrdma_xprt
*r_xprt
)
561 struct rpcrdma_ep
*ep
= r_xprt
->rx_ep
;
562 struct rdma_cm_id
*id
;
569 rc
= rdma_disconnect(id
);
570 trace_xprtrdma_disconnect(r_xprt
, rc
);
572 rpcrdma_xprt_drain(r_xprt
);
573 rpcrdma_reps_unmap(r_xprt
);
574 rpcrdma_reqs_reset(r_xprt
);
575 rpcrdma_mrs_destroy(r_xprt
);
576 rpcrdma_sendctxs_destroy(r_xprt
);
578 if (rpcrdma_ep_put(ep
))
581 r_xprt
->rx_ep
= NULL
;
584 /* Fixed-size circular FIFO queue. This implementation is wait-free and
587 * Consumer is the code path that posts Sends. This path dequeues a
588 * sendctx for use by a Send operation. Multiple consumer threads
589 * are serialized by the RPC transport lock, which allows only one
590 * ->send_request call at a time.
592 * Producer is the code path that handles Send completions. This path
593 * enqueues a sendctx that has been completed. Multiple producer
594 * threads are serialized by the ib_poll_cq() function.
597 /* rpcrdma_sendctxs_destroy() assumes caller has already quiesced
598 * queue activity, and rpcrdma_xprt_drain has flushed all remaining
601 static void rpcrdma_sendctxs_destroy(struct rpcrdma_xprt
*r_xprt
)
603 struct rpcrdma_buffer
*buf
= &r_xprt
->rx_buf
;
606 if (!buf
->rb_sc_ctxs
)
608 for (i
= 0; i
<= buf
->rb_sc_last
; i
++)
609 kfree(buf
->rb_sc_ctxs
[i
]);
610 kfree(buf
->rb_sc_ctxs
);
611 buf
->rb_sc_ctxs
= NULL
;
614 static struct rpcrdma_sendctx
*rpcrdma_sendctx_create(struct rpcrdma_ep
*ep
)
616 struct rpcrdma_sendctx
*sc
;
618 sc
= kzalloc(struct_size(sc
, sc_sges
, ep
->re_attr
.cap
.max_send_sge
),
623 sc
->sc_cqe
.done
= rpcrdma_wc_send
;
624 sc
->sc_cid
.ci_queue_id
= ep
->re_attr
.send_cq
->res
.id
;
625 sc
->sc_cid
.ci_completion_id
=
626 atomic_inc_return(&ep
->re_completion_ids
);
630 static int rpcrdma_sendctxs_create(struct rpcrdma_xprt
*r_xprt
)
632 struct rpcrdma_buffer
*buf
= &r_xprt
->rx_buf
;
633 struct rpcrdma_sendctx
*sc
;
636 /* Maximum number of concurrent outstanding Send WRs. Capping
637 * the circular queue size stops Send Queue overflow by causing
638 * the ->send_request call to fail temporarily before too many
641 i
= r_xprt
->rx_ep
->re_max_requests
+ RPCRDMA_MAX_BC_REQUESTS
;
642 buf
->rb_sc_ctxs
= kcalloc(i
, sizeof(sc
), XPRTRDMA_GFP_FLAGS
);
643 if (!buf
->rb_sc_ctxs
)
646 buf
->rb_sc_last
= i
- 1;
647 for (i
= 0; i
<= buf
->rb_sc_last
; i
++) {
648 sc
= rpcrdma_sendctx_create(r_xprt
->rx_ep
);
652 buf
->rb_sc_ctxs
[i
] = sc
;
660 /* The sendctx queue is not guaranteed to have a size that is a
661 * power of two, thus the helpers in circ_buf.h cannot be used.
662 * The other option is to use modulus (%), which can be expensive.
664 static unsigned long rpcrdma_sendctx_next(struct rpcrdma_buffer
*buf
,
667 return likely(item
< buf
->rb_sc_last
) ? item
+ 1 : 0;
671 * rpcrdma_sendctx_get_locked - Acquire a send context
672 * @r_xprt: controlling transport instance
674 * Returns pointer to a free send completion context; or NULL if
675 * the queue is empty.
677 * Usage: Called to acquire an SGE array before preparing a Send WR.
679 * The caller serializes calls to this function (per transport), and
680 * provides an effective memory barrier that flushes the new value
683 struct rpcrdma_sendctx
*rpcrdma_sendctx_get_locked(struct rpcrdma_xprt
*r_xprt
)
685 struct rpcrdma_buffer
*buf
= &r_xprt
->rx_buf
;
686 struct rpcrdma_sendctx
*sc
;
687 unsigned long next_head
;
689 next_head
= rpcrdma_sendctx_next(buf
, buf
->rb_sc_head
);
691 if (next_head
== READ_ONCE(buf
->rb_sc_tail
))
694 /* ORDER: item must be accessed _before_ head is updated */
695 sc
= buf
->rb_sc_ctxs
[next_head
];
697 /* Releasing the lock in the caller acts as a memory
698 * barrier that flushes rb_sc_head.
700 buf
->rb_sc_head
= next_head
;
705 /* The queue is "empty" if there have not been enough Send
706 * completions recently. This is a sign the Send Queue is
707 * backing up. Cause the caller to pause and try again.
709 xprt_wait_for_buffer_space(&r_xprt
->rx_xprt
);
710 r_xprt
->rx_stats
.empty_sendctx_q
++;
715 * rpcrdma_sendctx_put_locked - Release a send context
716 * @r_xprt: controlling transport instance
717 * @sc: send context to release
719 * Usage: Called from Send completion to return a sendctxt
722 * The caller serializes calls to this function (per transport).
724 static void rpcrdma_sendctx_put_locked(struct rpcrdma_xprt
*r_xprt
,
725 struct rpcrdma_sendctx
*sc
)
727 struct rpcrdma_buffer
*buf
= &r_xprt
->rx_buf
;
728 unsigned long next_tail
;
730 /* Unmap SGEs of previously completed but unsignaled
731 * Sends by walking up the queue until @sc is found.
733 next_tail
= buf
->rb_sc_tail
;
735 next_tail
= rpcrdma_sendctx_next(buf
, next_tail
);
737 /* ORDER: item must be accessed _before_ tail is updated */
738 rpcrdma_sendctx_unmap(buf
->rb_sc_ctxs
[next_tail
]);
740 } while (buf
->rb_sc_ctxs
[next_tail
] != sc
);
742 /* Paired with READ_ONCE */
743 smp_store_release(&buf
->rb_sc_tail
, next_tail
);
745 xprt_write_space(&r_xprt
->rx_xprt
);
749 rpcrdma_mrs_create(struct rpcrdma_xprt
*r_xprt
)
751 struct rpcrdma_buffer
*buf
= &r_xprt
->rx_buf
;
752 struct rpcrdma_ep
*ep
= r_xprt
->rx_ep
;
753 struct ib_device
*device
= ep
->re_id
->device
;
756 /* Try to allocate enough to perform one full-sized I/O */
757 for (count
= 0; count
< ep
->re_max_rdma_segs
; count
++) {
758 struct rpcrdma_mr
*mr
;
761 mr
= kzalloc_node(sizeof(*mr
), XPRTRDMA_GFP_FLAGS
,
762 ibdev_to_node(device
));
766 rc
= frwr_mr_init(r_xprt
, mr
);
772 spin_lock(&buf
->rb_lock
);
773 rpcrdma_mr_push(mr
, &buf
->rb_mrs
);
774 list_add(&mr
->mr_all
, &buf
->rb_all_mrs
);
775 spin_unlock(&buf
->rb_lock
);
778 r_xprt
->rx_stats
.mrs_allocated
+= count
;
779 trace_xprtrdma_createmrs(r_xprt
, count
);
783 rpcrdma_mr_refresh_worker(struct work_struct
*work
)
785 struct rpcrdma_buffer
*buf
= container_of(work
, struct rpcrdma_buffer
,
787 struct rpcrdma_xprt
*r_xprt
= container_of(buf
, struct rpcrdma_xprt
,
790 rpcrdma_mrs_create(r_xprt
);
791 xprt_write_space(&r_xprt
->rx_xprt
);
795 * rpcrdma_mrs_refresh - Wake the MR refresh worker
796 * @r_xprt: controlling transport instance
799 void rpcrdma_mrs_refresh(struct rpcrdma_xprt
*r_xprt
)
801 struct rpcrdma_buffer
*buf
= &r_xprt
->rx_buf
;
802 struct rpcrdma_ep
*ep
= r_xprt
->rx_ep
;
804 /* If there is no underlying connection, it's no use
805 * to wake the refresh worker.
807 if (ep
->re_connect_status
!= 1)
809 queue_work(system_highpri_wq
, &buf
->rb_refresh_worker
);
813 * rpcrdma_req_create - Allocate an rpcrdma_req object
814 * @r_xprt: controlling r_xprt
815 * @size: initial size, in bytes, of send and receive buffers
817 * Returns an allocated and fully initialized rpcrdma_req or NULL.
819 struct rpcrdma_req
*rpcrdma_req_create(struct rpcrdma_xprt
*r_xprt
,
822 struct rpcrdma_buffer
*buffer
= &r_xprt
->rx_buf
;
823 struct rpcrdma_req
*req
;
825 req
= kzalloc(sizeof(*req
), XPRTRDMA_GFP_FLAGS
);
829 req
->rl_sendbuf
= rpcrdma_regbuf_alloc(size
, DMA_TO_DEVICE
);
830 if (!req
->rl_sendbuf
)
833 req
->rl_recvbuf
= rpcrdma_regbuf_alloc(size
, DMA_NONE
);
834 if (!req
->rl_recvbuf
)
837 INIT_LIST_HEAD(&req
->rl_free_mrs
);
838 INIT_LIST_HEAD(&req
->rl_registered
);
839 spin_lock(&buffer
->rb_lock
);
840 list_add(&req
->rl_all
, &buffer
->rb_allreqs
);
841 spin_unlock(&buffer
->rb_lock
);
845 rpcrdma_regbuf_free(req
->rl_sendbuf
);
853 * rpcrdma_req_setup - Per-connection instance setup of an rpcrdma_req object
854 * @r_xprt: controlling transport instance
855 * @req: rpcrdma_req object to set up
857 * Returns zero on success, and a negative errno on failure.
859 int rpcrdma_req_setup(struct rpcrdma_xprt
*r_xprt
, struct rpcrdma_req
*req
)
861 struct rpcrdma_regbuf
*rb
;
864 /* Compute maximum header buffer size in bytes */
865 maxhdrsize
= rpcrdma_fixed_maxsz
+ 3 +
866 r_xprt
->rx_ep
->re_max_rdma_segs
* rpcrdma_readchunk_maxsz
;
867 maxhdrsize
*= sizeof(__be32
);
868 rb
= rpcrdma_regbuf_alloc(__roundup_pow_of_two(maxhdrsize
),
873 if (!__rpcrdma_regbuf_dma_map(r_xprt
, rb
))
876 req
->rl_rdmabuf
= rb
;
877 xdr_buf_init(&req
->rl_hdrbuf
, rdmab_data(rb
), rdmab_length(rb
));
881 rpcrdma_regbuf_free(rb
);
886 /* ASSUMPTION: the rb_allreqs list is stable for the duration,
887 * and thus can be walked without holding rb_lock. Eg. the
888 * caller is holding the transport send lock to exclude
889 * device removal or disconnection.
891 static int rpcrdma_reqs_setup(struct rpcrdma_xprt
*r_xprt
)
893 struct rpcrdma_buffer
*buf
= &r_xprt
->rx_buf
;
894 struct rpcrdma_req
*req
;
897 list_for_each_entry(req
, &buf
->rb_allreqs
, rl_all
) {
898 rc
= rpcrdma_req_setup(r_xprt
, req
);
905 static void rpcrdma_req_reset(struct rpcrdma_req
*req
)
907 struct rpcrdma_mr
*mr
;
909 /* Credits are valid for only one connection */
910 req
->rl_slot
.rq_cong
= 0;
912 rpcrdma_regbuf_free(req
->rl_rdmabuf
);
913 req
->rl_rdmabuf
= NULL
;
915 rpcrdma_regbuf_dma_unmap(req
->rl_sendbuf
);
916 rpcrdma_regbuf_dma_unmap(req
->rl_recvbuf
);
918 /* The verbs consumer can't know the state of an MR on the
919 * req->rl_registered list unless a successful completion
920 * has occurred, so they cannot be re-used.
922 while ((mr
= rpcrdma_mr_pop(&req
->rl_registered
))) {
923 struct rpcrdma_buffer
*buf
= &mr
->mr_xprt
->rx_buf
;
925 spin_lock(&buf
->rb_lock
);
926 list_del(&mr
->mr_all
);
927 spin_unlock(&buf
->rb_lock
);
933 /* ASSUMPTION: the rb_allreqs list is stable for the duration,
934 * and thus can be walked without holding rb_lock. Eg. the
935 * caller is holding the transport send lock to exclude
936 * device removal or disconnection.
938 static void rpcrdma_reqs_reset(struct rpcrdma_xprt
*r_xprt
)
940 struct rpcrdma_buffer
*buf
= &r_xprt
->rx_buf
;
941 struct rpcrdma_req
*req
;
943 list_for_each_entry(req
, &buf
->rb_allreqs
, rl_all
)
944 rpcrdma_req_reset(req
);
948 struct rpcrdma_rep
*rpcrdma_rep_create(struct rpcrdma_xprt
*r_xprt
)
950 struct rpcrdma_buffer
*buf
= &r_xprt
->rx_buf
;
951 struct rpcrdma_ep
*ep
= r_xprt
->rx_ep
;
952 struct ib_device
*device
= ep
->re_id
->device
;
953 struct rpcrdma_rep
*rep
;
955 rep
= kzalloc(sizeof(*rep
), XPRTRDMA_GFP_FLAGS
);
959 rep
->rr_rdmabuf
= rpcrdma_regbuf_alloc_node(ep
->re_inline_recv
,
961 ibdev_to_node(device
));
962 if (!rep
->rr_rdmabuf
)
965 rep
->rr_cid
.ci_completion_id
=
966 atomic_inc_return(&r_xprt
->rx_ep
->re_completion_ids
);
968 xdr_buf_init(&rep
->rr_hdrbuf
, rdmab_data(rep
->rr_rdmabuf
),
969 rdmab_length(rep
->rr_rdmabuf
));
970 rep
->rr_cqe
.done
= rpcrdma_wc_receive
;
971 rep
->rr_rxprt
= r_xprt
;
972 rep
->rr_recv_wr
.next
= NULL
;
973 rep
->rr_recv_wr
.wr_cqe
= &rep
->rr_cqe
;
974 rep
->rr_recv_wr
.sg_list
= &rep
->rr_rdmabuf
->rg_iov
;
975 rep
->rr_recv_wr
.num_sge
= 1;
977 spin_lock(&buf
->rb_lock
);
978 list_add(&rep
->rr_all
, &buf
->rb_all_reps
);
979 spin_unlock(&buf
->rb_lock
);
988 static void rpcrdma_rep_free(struct rpcrdma_rep
*rep
)
990 rpcrdma_regbuf_free(rep
->rr_rdmabuf
);
994 static struct rpcrdma_rep
*rpcrdma_rep_get_locked(struct rpcrdma_buffer
*buf
)
996 struct llist_node
*node
;
998 /* Calls to llist_del_first are required to be serialized */
999 node
= llist_del_first(&buf
->rb_free_reps
);
1002 return llist_entry(node
, struct rpcrdma_rep
, rr_node
);
1006 * rpcrdma_rep_put - Release rpcrdma_rep back to free list
1008 * @rep: rep to release
1011 void rpcrdma_rep_put(struct rpcrdma_buffer
*buf
, struct rpcrdma_rep
*rep
)
1013 llist_add(&rep
->rr_node
, &buf
->rb_free_reps
);
1016 /* Caller must ensure the QP is quiescent (RQ is drained) before
1017 * invoking this function, to guarantee rb_all_reps is not
1020 static void rpcrdma_reps_unmap(struct rpcrdma_xprt
*r_xprt
)
1022 struct rpcrdma_buffer
*buf
= &r_xprt
->rx_buf
;
1023 struct rpcrdma_rep
*rep
;
1025 list_for_each_entry(rep
, &buf
->rb_all_reps
, rr_all
)
1026 rpcrdma_regbuf_dma_unmap(rep
->rr_rdmabuf
);
1029 static void rpcrdma_reps_destroy(struct rpcrdma_buffer
*buf
)
1031 struct rpcrdma_rep
*rep
;
1033 spin_lock(&buf
->rb_lock
);
1034 while ((rep
= list_first_entry_or_null(&buf
->rb_all_reps
,
1037 list_del(&rep
->rr_all
);
1038 spin_unlock(&buf
->rb_lock
);
1040 rpcrdma_rep_free(rep
);
1042 spin_lock(&buf
->rb_lock
);
1044 spin_unlock(&buf
->rb_lock
);
1048 * rpcrdma_buffer_create - Create initial set of req/rep objects
1049 * @r_xprt: transport instance to (re)initialize
1051 * Returns zero on success, otherwise a negative errno.
1053 int rpcrdma_buffer_create(struct rpcrdma_xprt
*r_xprt
)
1055 struct rpcrdma_buffer
*buf
= &r_xprt
->rx_buf
;
1058 buf
->rb_bc_srv_max_requests
= 0;
1059 spin_lock_init(&buf
->rb_lock
);
1060 INIT_LIST_HEAD(&buf
->rb_mrs
);
1061 INIT_LIST_HEAD(&buf
->rb_all_mrs
);
1062 INIT_WORK(&buf
->rb_refresh_worker
, rpcrdma_mr_refresh_worker
);
1064 INIT_LIST_HEAD(&buf
->rb_send_bufs
);
1065 INIT_LIST_HEAD(&buf
->rb_allreqs
);
1066 INIT_LIST_HEAD(&buf
->rb_all_reps
);
1069 for (i
= 0; i
< r_xprt
->rx_xprt
.max_reqs
; i
++) {
1070 struct rpcrdma_req
*req
;
1072 req
= rpcrdma_req_create(r_xprt
,
1073 RPCRDMA_V1_DEF_INLINE_SIZE
* 2);
1076 list_add(&req
->rl_list
, &buf
->rb_send_bufs
);
1079 init_llist_head(&buf
->rb_free_reps
);
1083 rpcrdma_buffer_destroy(buf
);
1088 * rpcrdma_req_destroy - Destroy an rpcrdma_req object
1089 * @req: unused object to be destroyed
1091 * Relies on caller holding the transport send lock to protect
1092 * removing req->rl_all from buf->rb_all_reqs safely.
1094 void rpcrdma_req_destroy(struct rpcrdma_req
*req
)
1096 struct rpcrdma_mr
*mr
;
1098 list_del(&req
->rl_all
);
1100 while ((mr
= rpcrdma_mr_pop(&req
->rl_free_mrs
))) {
1101 struct rpcrdma_buffer
*buf
= &mr
->mr_xprt
->rx_buf
;
1103 spin_lock(&buf
->rb_lock
);
1104 list_del(&mr
->mr_all
);
1105 spin_unlock(&buf
->rb_lock
);
1107 frwr_mr_release(mr
);
1110 rpcrdma_regbuf_free(req
->rl_recvbuf
);
1111 rpcrdma_regbuf_free(req
->rl_sendbuf
);
1112 rpcrdma_regbuf_free(req
->rl_rdmabuf
);
1117 * rpcrdma_mrs_destroy - Release all of a transport's MRs
1118 * @r_xprt: controlling transport instance
1120 * Relies on caller holding the transport send lock to protect
1121 * removing mr->mr_list from req->rl_free_mrs safely.
1123 static void rpcrdma_mrs_destroy(struct rpcrdma_xprt
*r_xprt
)
1125 struct rpcrdma_buffer
*buf
= &r_xprt
->rx_buf
;
1126 struct rpcrdma_mr
*mr
;
1128 cancel_work_sync(&buf
->rb_refresh_worker
);
1130 spin_lock(&buf
->rb_lock
);
1131 while ((mr
= list_first_entry_or_null(&buf
->rb_all_mrs
,
1134 list_del(&mr
->mr_list
);
1135 list_del(&mr
->mr_all
);
1136 spin_unlock(&buf
->rb_lock
);
1138 frwr_mr_release(mr
);
1140 spin_lock(&buf
->rb_lock
);
1142 spin_unlock(&buf
->rb_lock
);
1146 * rpcrdma_buffer_destroy - Release all hw resources
1147 * @buf: root control block for resources
1149 * ORDERING: relies on a prior rpcrdma_xprt_drain :
1150 * - No more Send or Receive completions can occur
1151 * - All MRs, reps, and reqs are returned to their free lists
1154 rpcrdma_buffer_destroy(struct rpcrdma_buffer
*buf
)
1156 rpcrdma_reps_destroy(buf
);
1158 while (!list_empty(&buf
->rb_send_bufs
)) {
1159 struct rpcrdma_req
*req
;
1161 req
= list_first_entry(&buf
->rb_send_bufs
,
1162 struct rpcrdma_req
, rl_list
);
1163 list_del(&req
->rl_list
);
1164 rpcrdma_req_destroy(req
);
1169 * rpcrdma_mr_get - Allocate an rpcrdma_mr object
1170 * @r_xprt: controlling transport
1172 * Returns an initialized rpcrdma_mr or NULL if no free
1173 * rpcrdma_mr objects are available.
1176 rpcrdma_mr_get(struct rpcrdma_xprt
*r_xprt
)
1178 struct rpcrdma_buffer
*buf
= &r_xprt
->rx_buf
;
1179 struct rpcrdma_mr
*mr
;
1181 spin_lock(&buf
->rb_lock
);
1182 mr
= rpcrdma_mr_pop(&buf
->rb_mrs
);
1183 spin_unlock(&buf
->rb_lock
);
1188 * rpcrdma_reply_put - Put reply buffers back into pool
1189 * @buffers: buffer pool
1190 * @req: object to return
1193 void rpcrdma_reply_put(struct rpcrdma_buffer
*buffers
, struct rpcrdma_req
*req
)
1195 if (req
->rl_reply
) {
1196 rpcrdma_rep_put(buffers
, req
->rl_reply
);
1197 req
->rl_reply
= NULL
;
1202 * rpcrdma_buffer_get - Get a request buffer
1203 * @buffers: Buffer pool from which to obtain a buffer
1205 * Returns a fresh rpcrdma_req, or NULL if none are available.
1207 struct rpcrdma_req
*
1208 rpcrdma_buffer_get(struct rpcrdma_buffer
*buffers
)
1210 struct rpcrdma_req
*req
;
1212 spin_lock(&buffers
->rb_lock
);
1213 req
= list_first_entry_or_null(&buffers
->rb_send_bufs
,
1214 struct rpcrdma_req
, rl_list
);
1216 list_del_init(&req
->rl_list
);
1217 spin_unlock(&buffers
->rb_lock
);
1222 * rpcrdma_buffer_put - Put request/reply buffers back into pool
1223 * @buffers: buffer pool
1224 * @req: object to return
1227 void rpcrdma_buffer_put(struct rpcrdma_buffer
*buffers
, struct rpcrdma_req
*req
)
1229 rpcrdma_reply_put(buffers
, req
);
1231 spin_lock(&buffers
->rb_lock
);
1232 list_add(&req
->rl_list
, &buffers
->rb_send_bufs
);
1233 spin_unlock(&buffers
->rb_lock
);
1236 /* Returns a pointer to a rpcrdma_regbuf object, or NULL.
1238 * xprtrdma uses a regbuf for posting an outgoing RDMA SEND, or for
1239 * receiving the payload of RDMA RECV operations. During Long Calls
1240 * or Replies they may be registered externally via frwr_map.
1242 static struct rpcrdma_regbuf
*
1243 rpcrdma_regbuf_alloc_node(size_t size
, enum dma_data_direction direction
,
1246 struct rpcrdma_regbuf
*rb
;
1248 rb
= kmalloc_node(sizeof(*rb
), XPRTRDMA_GFP_FLAGS
, node
);
1251 rb
->rg_data
= kmalloc_node(size
, XPRTRDMA_GFP_FLAGS
, node
);
1257 rb
->rg_device
= NULL
;
1258 rb
->rg_direction
= direction
;
1259 rb
->rg_iov
.length
= size
;
1263 static struct rpcrdma_regbuf
*
1264 rpcrdma_regbuf_alloc(size_t size
, enum dma_data_direction direction
)
1266 return rpcrdma_regbuf_alloc_node(size
, direction
, NUMA_NO_NODE
);
1270 * rpcrdma_regbuf_realloc - re-allocate a SEND/RECV buffer
1271 * @rb: regbuf to reallocate
1272 * @size: size of buffer to be allocated, in bytes
1275 * Returns true if reallocation was successful. If false is
1276 * returned, @rb is left untouched.
1278 bool rpcrdma_regbuf_realloc(struct rpcrdma_regbuf
*rb
, size_t size
, gfp_t flags
)
1282 buf
= kmalloc(size
, flags
);
1286 rpcrdma_regbuf_dma_unmap(rb
);
1290 rb
->rg_iov
.length
= size
;
1295 * __rpcrdma_regbuf_dma_map - DMA-map a regbuf
1296 * @r_xprt: controlling transport instance
1297 * @rb: regbuf to be mapped
1299 * Returns true if the buffer is now DMA mapped to @r_xprt's device
1301 bool __rpcrdma_regbuf_dma_map(struct rpcrdma_xprt
*r_xprt
,
1302 struct rpcrdma_regbuf
*rb
)
1304 struct ib_device
*device
= r_xprt
->rx_ep
->re_id
->device
;
1306 if (rb
->rg_direction
== DMA_NONE
)
1309 rb
->rg_iov
.addr
= ib_dma_map_single(device
, rdmab_data(rb
),
1310 rdmab_length(rb
), rb
->rg_direction
);
1311 if (ib_dma_mapping_error(device
, rdmab_addr(rb
))) {
1312 trace_xprtrdma_dma_maperr(rdmab_addr(rb
));
1316 rb
->rg_device
= device
;
1317 rb
->rg_iov
.lkey
= r_xprt
->rx_ep
->re_pd
->local_dma_lkey
;
1321 static void rpcrdma_regbuf_dma_unmap(struct rpcrdma_regbuf
*rb
)
1326 if (!rpcrdma_regbuf_is_mapped(rb
))
1329 ib_dma_unmap_single(rb
->rg_device
, rdmab_addr(rb
), rdmab_length(rb
),
1331 rb
->rg_device
= NULL
;
1334 static void rpcrdma_regbuf_free(struct rpcrdma_regbuf
*rb
)
1336 rpcrdma_regbuf_dma_unmap(rb
);
1343 * rpcrdma_post_recvs - Refill the Receive Queue
1344 * @r_xprt: controlling transport instance
1345 * @needed: current credit grant
1348 void rpcrdma_post_recvs(struct rpcrdma_xprt
*r_xprt
, int needed
)
1350 struct rpcrdma_buffer
*buf
= &r_xprt
->rx_buf
;
1351 struct rpcrdma_ep
*ep
= r_xprt
->rx_ep
;
1352 struct ib_recv_wr
*wr
, *bad_wr
;
1353 struct rpcrdma_rep
*rep
;
1359 if (likely(ep
->re_receive_count
> needed
))
1361 needed
-= ep
->re_receive_count
;
1362 needed
+= RPCRDMA_MAX_RECV_BATCH
;
1364 if (atomic_inc_return(&ep
->re_receiving
) > 1)
1367 /* fast path: all needed reps can be found on the free list */
1370 rep
= rpcrdma_rep_get_locked(buf
);
1372 rep
= rpcrdma_rep_create(r_xprt
);
1375 if (!rpcrdma_regbuf_dma_map(r_xprt
, rep
->rr_rdmabuf
)) {
1376 rpcrdma_rep_put(buf
, rep
);
1380 rep
->rr_cid
.ci_queue_id
= ep
->re_attr
.recv_cq
->res
.id
;
1381 trace_xprtrdma_post_recv(&rep
->rr_cid
);
1382 rep
->rr_recv_wr
.next
= wr
;
1383 wr
= &rep
->rr_recv_wr
;
1390 rc
= ib_post_recv(ep
->re_id
->qp
, wr
,
1391 (const struct ib_recv_wr
**)&bad_wr
);
1393 trace_xprtrdma_post_recvs_err(r_xprt
, rc
);
1394 for (wr
= bad_wr
; wr
;) {
1395 struct rpcrdma_rep
*rep
;
1397 rep
= container_of(wr
, struct rpcrdma_rep
, rr_recv_wr
);
1399 rpcrdma_rep_put(buf
, rep
);
1403 if (atomic_dec_return(&ep
->re_receiving
) > 0)
1404 complete(&ep
->re_done
);
1407 trace_xprtrdma_post_recvs(r_xprt
, count
);
1408 ep
->re_receive_count
+= count
;