1 // SPDX-License-Identifier: GPL-2.0 OR BSD-3-Clause
3 * Copyright (c) 2014-2017 Oracle. All rights reserved.
4 * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
6 * This software is available to you under a choice of one of two
7 * licenses. You may choose to be licensed under the terms of the GNU
8 * General Public License (GPL) Version 2, available from the file
9 * COPYING in the main directory of this source tree, or the BSD-type
12 * Redistribution and use in source and binary forms, with or without
13 * modification, are permitted provided that the following conditions
16 * Redistributions of source code must retain the above copyright
17 * notice, this list of conditions and the following disclaimer.
19 * Redistributions in binary form must reproduce the above
20 * copyright notice, this list of conditions and the following
21 * disclaimer in the documentation and/or other materials provided
22 * with the distribution.
24 * Neither the name of the Network Appliance, Inc. nor the names of
25 * its contributors may be used to endorse or promote products
26 * derived from this software without specific prior written
29 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
30 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
31 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
32 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
33 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
34 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
35 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
36 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
37 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
38 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
39 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
45 * Encapsulates the major functions managing:
52 #include <linux/interrupt.h>
53 #include <linux/slab.h>
54 #include <linux/sunrpc/addr.h>
55 #include <linux/sunrpc/svc_rdma.h>
57 #include <asm-generic/barrier.h>
58 #include <asm/bitops.h>
60 #include <rdma/ib_cm.h>
62 #include "xprt_rdma.h"
63 #include <trace/events/rpcrdma.h>
69 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
70 # define RPCDBG_FACILITY RPCDBG_TRANS
76 static void rpcrdma_sendctx_put_locked(struct rpcrdma_sendctx
*sc
);
77 static void rpcrdma_mrs_create(struct rpcrdma_xprt
*r_xprt
);
78 static void rpcrdma_mrs_destroy(struct rpcrdma_buffer
*buf
);
79 static int rpcrdma_create_rep(struct rpcrdma_xprt
*r_xprt
, bool temp
);
80 static void rpcrdma_dma_unmap_regbuf(struct rpcrdma_regbuf
*rb
);
82 struct workqueue_struct
*rpcrdma_receive_wq __read_mostly
;
85 rpcrdma_alloc_wq(void)
87 struct workqueue_struct
*recv_wq
;
89 recv_wq
= alloc_workqueue("xprtrdma_receive",
90 WQ_MEM_RECLAIM
| WQ_HIGHPRI
,
95 rpcrdma_receive_wq
= recv_wq
;
100 rpcrdma_destroy_wq(void)
102 struct workqueue_struct
*wq
;
104 if (rpcrdma_receive_wq
) {
105 wq
= rpcrdma_receive_wq
;
106 rpcrdma_receive_wq
= NULL
;
107 destroy_workqueue(wq
);
112 * rpcrdma_disconnect_worker - Force a disconnect
113 * @work: endpoint to be disconnected
115 * Provider callbacks can possibly run in an IRQ context. This function
116 * is invoked in a worker thread to guarantee that disconnect wake-up
117 * calls are always done in process context.
120 rpcrdma_disconnect_worker(struct work_struct
*work
)
122 struct rpcrdma_ep
*ep
= container_of(work
, struct rpcrdma_ep
,
123 rep_disconnect_worker
.work
);
124 struct rpcrdma_xprt
*r_xprt
=
125 container_of(ep
, struct rpcrdma_xprt
, rx_ep
);
127 xprt_force_disconnect(&r_xprt
->rx_xprt
);
131 * rpcrdma_qp_event_handler - Handle one QP event (error notification)
132 * @event: details of the event
133 * @context: ep that owns QP where event occurred
135 * Called from the RDMA provider (device driver) possibly in an interrupt
139 rpcrdma_qp_event_handler(struct ib_event
*event
, void *context
)
141 struct rpcrdma_ep
*ep
= context
;
142 struct rpcrdma_xprt
*r_xprt
= container_of(ep
, struct rpcrdma_xprt
,
145 trace_xprtrdma_qp_event(r_xprt
, event
);
146 pr_err("rpcrdma: %s on device %s connected to %s:%s\n",
147 ib_event_msg(event
->event
), event
->device
->name
,
148 rpcrdma_addrstr(r_xprt
), rpcrdma_portstr(r_xprt
));
150 if (ep
->rep_connected
== 1) {
151 ep
->rep_connected
= -EIO
;
152 schedule_delayed_work(&ep
->rep_disconnect_worker
, 0);
153 wake_up_all(&ep
->rep_connect_wait
);
158 * rpcrdma_wc_send - Invoked by RDMA provider for each polled Send WC
159 * @cq: completion queue (ignored)
164 rpcrdma_wc_send(struct ib_cq
*cq
, struct ib_wc
*wc
)
166 struct ib_cqe
*cqe
= wc
->wr_cqe
;
167 struct rpcrdma_sendctx
*sc
=
168 container_of(cqe
, struct rpcrdma_sendctx
, sc_cqe
);
170 /* WARNING: Only wr_cqe and status are reliable at this point */
171 trace_xprtrdma_wc_send(sc
, wc
);
172 if (wc
->status
!= IB_WC_SUCCESS
&& wc
->status
!= IB_WC_WR_FLUSH_ERR
)
173 pr_err("rpcrdma: Send: %s (%u/0x%x)\n",
174 ib_wc_status_msg(wc
->status
),
175 wc
->status
, wc
->vendor_err
);
177 rpcrdma_sendctx_put_locked(sc
);
181 * rpcrdma_wc_receive - Invoked by RDMA provider for each polled Receive WC
182 * @cq: completion queue (ignored)
187 rpcrdma_wc_receive(struct ib_cq
*cq
, struct ib_wc
*wc
)
189 struct ib_cqe
*cqe
= wc
->wr_cqe
;
190 struct rpcrdma_rep
*rep
= container_of(cqe
, struct rpcrdma_rep
,
193 /* WARNING: Only wr_id and status are reliable at this point */
194 trace_xprtrdma_wc_receive(wc
);
195 if (wc
->status
!= IB_WC_SUCCESS
)
198 /* status == SUCCESS means all fields in wc are trustworthy */
199 rpcrdma_set_xdrlen(&rep
->rr_hdrbuf
, wc
->byte_len
);
200 rep
->rr_wc_flags
= wc
->wc_flags
;
201 rep
->rr_inv_rkey
= wc
->ex
.invalidate_rkey
;
203 ib_dma_sync_single_for_cpu(rdmab_device(rep
->rr_rdmabuf
),
204 rdmab_addr(rep
->rr_rdmabuf
),
205 wc
->byte_len
, DMA_FROM_DEVICE
);
208 rpcrdma_reply_handler(rep
);
212 if (wc
->status
!= IB_WC_WR_FLUSH_ERR
)
213 pr_err("rpcrdma: Recv: %s (%u/0x%x)\n",
214 ib_wc_status_msg(wc
->status
),
215 wc
->status
, wc
->vendor_err
);
216 rpcrdma_set_xdrlen(&rep
->rr_hdrbuf
, 0);
221 rpcrdma_update_connect_private(struct rpcrdma_xprt
*r_xprt
,
222 struct rdma_conn_param
*param
)
224 struct rpcrdma_create_data_internal
*cdata
= &r_xprt
->rx_data
;
225 const struct rpcrdma_connect_private
*pmsg
= param
->private_data
;
226 unsigned int rsize
, wsize
;
228 /* Default settings for RPC-over-RDMA Version One */
229 r_xprt
->rx_ia
.ri_implicit_roundup
= xprt_rdma_pad_optimize
;
230 rsize
= RPCRDMA_V1_DEF_INLINE_SIZE
;
231 wsize
= RPCRDMA_V1_DEF_INLINE_SIZE
;
234 pmsg
->cp_magic
== rpcrdma_cmp_magic
&&
235 pmsg
->cp_version
== RPCRDMA_CMP_VERSION
) {
236 r_xprt
->rx_ia
.ri_implicit_roundup
= true;
237 rsize
= rpcrdma_decode_buffer_size(pmsg
->cp_send_size
);
238 wsize
= rpcrdma_decode_buffer_size(pmsg
->cp_recv_size
);
241 if (rsize
< cdata
->inline_rsize
)
242 cdata
->inline_rsize
= rsize
;
243 if (wsize
< cdata
->inline_wsize
)
244 cdata
->inline_wsize
= wsize
;
245 dprintk("RPC: %s: max send %u, max recv %u\n",
246 __func__
, cdata
->inline_wsize
, cdata
->inline_rsize
);
247 rpcrdma_set_max_header_sizes(r_xprt
);
251 * rpcrdma_cm_event_handler - Handle RDMA CM events
252 * @id: rdma_cm_id on which an event has occurred
253 * @event: details of the event
255 * Called with @id's mutex held. Returns 1 if caller should
256 * destroy @id, otherwise 0.
259 rpcrdma_cm_event_handler(struct rdma_cm_id
*id
, struct rdma_cm_event
*event
)
261 struct rpcrdma_xprt
*r_xprt
= id
->context
;
262 struct rpcrdma_ia
*ia
= &r_xprt
->rx_ia
;
263 struct rpcrdma_ep
*ep
= &r_xprt
->rx_ep
;
264 struct rpc_xprt
*xprt
= &r_xprt
->rx_xprt
;
268 trace_xprtrdma_cm_event(r_xprt
, event
);
269 switch (event
->event
) {
270 case RDMA_CM_EVENT_ADDR_RESOLVED
:
271 case RDMA_CM_EVENT_ROUTE_RESOLVED
:
273 complete(&ia
->ri_done
);
275 case RDMA_CM_EVENT_ADDR_ERROR
:
276 ia
->ri_async_rc
= -EPROTO
;
277 complete(&ia
->ri_done
);
279 case RDMA_CM_EVENT_ROUTE_ERROR
:
280 ia
->ri_async_rc
= -ENETUNREACH
;
281 complete(&ia
->ri_done
);
283 case RDMA_CM_EVENT_DEVICE_REMOVAL
:
284 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
285 pr_info("rpcrdma: removing device %s for %s:%s\n",
287 rpcrdma_addrstr(r_xprt
), rpcrdma_portstr(r_xprt
));
289 set_bit(RPCRDMA_IAF_REMOVING
, &ia
->ri_flags
);
290 ep
->rep_connected
= -ENODEV
;
291 xprt_force_disconnect(xprt
);
292 wait_for_completion(&ia
->ri_remove_done
);
295 ia
->ri_device
= NULL
;
296 /* Return 1 to ensure the core destroys the id. */
298 case RDMA_CM_EVENT_ESTABLISHED
:
299 ++xprt
->connect_cookie
;
300 ep
->rep_connected
= 1;
301 rpcrdma_update_connect_private(r_xprt
, &event
->param
.conn
);
302 wake_up_all(&ep
->rep_connect_wait
);
304 case RDMA_CM_EVENT_CONNECT_ERROR
:
305 ep
->rep_connected
= -ENOTCONN
;
307 case RDMA_CM_EVENT_UNREACHABLE
:
308 ep
->rep_connected
= -ENETUNREACH
;
310 case RDMA_CM_EVENT_REJECTED
:
311 dprintk("rpcrdma: connection to %s:%s rejected: %s\n",
312 rpcrdma_addrstr(r_xprt
), rpcrdma_portstr(r_xprt
),
313 rdma_reject_msg(id
, event
->status
));
314 ep
->rep_connected
= -ECONNREFUSED
;
315 if (event
->status
== IB_CM_REJ_STALE_CONN
)
316 ep
->rep_connected
= -EAGAIN
;
318 case RDMA_CM_EVENT_DISCONNECTED
:
319 ep
->rep_connected
= -ECONNABORTED
;
321 xprt_force_disconnect(xprt
);
322 wake_up_all(&ep
->rep_connect_wait
);
328 dprintk("RPC: %s: %s:%s on %s/%s: %s\n", __func__
,
329 rpcrdma_addrstr(r_xprt
), rpcrdma_portstr(r_xprt
),
330 ia
->ri_device
->name
, ia
->ri_ops
->ro_displayname
,
331 rdma_event_msg(event
->event
));
335 static struct rdma_cm_id
*
336 rpcrdma_create_id(struct rpcrdma_xprt
*xprt
, struct rpcrdma_ia
*ia
)
338 unsigned long wtimeout
= msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT
) + 1;
339 struct rdma_cm_id
*id
;
342 trace_xprtrdma_conn_start(xprt
);
344 init_completion(&ia
->ri_done
);
345 init_completion(&ia
->ri_remove_done
);
347 id
= rdma_create_id(xprt
->rx_xprt
.xprt_net
, rpcrdma_cm_event_handler
,
348 xprt
, RDMA_PS_TCP
, IB_QPT_RC
);
351 dprintk("RPC: %s: rdma_create_id() failed %i\n",
356 ia
->ri_async_rc
= -ETIMEDOUT
;
357 rc
= rdma_resolve_addr(id
, NULL
,
358 (struct sockaddr
*)&xprt
->rx_xprt
.addr
,
359 RDMA_RESOLVE_TIMEOUT
);
361 dprintk("RPC: %s: rdma_resolve_addr() failed %i\n",
365 rc
= wait_for_completion_interruptible_timeout(&ia
->ri_done
, wtimeout
);
367 trace_xprtrdma_conn_tout(xprt
);
371 rc
= ia
->ri_async_rc
;
375 ia
->ri_async_rc
= -ETIMEDOUT
;
376 rc
= rdma_resolve_route(id
, RDMA_RESOLVE_TIMEOUT
);
378 dprintk("RPC: %s: rdma_resolve_route() failed %i\n",
382 rc
= wait_for_completion_interruptible_timeout(&ia
->ri_done
, wtimeout
);
384 trace_xprtrdma_conn_tout(xprt
);
387 rc
= ia
->ri_async_rc
;
399 * Exported functions.
403 * rpcrdma_ia_open - Open and initialize an Interface Adapter.
404 * @xprt: transport with IA to (re)initialize
406 * Returns 0 on success, negative errno if an appropriate
407 * Interface Adapter could not be found and opened.
410 rpcrdma_ia_open(struct rpcrdma_xprt
*xprt
)
412 struct rpcrdma_ia
*ia
= &xprt
->rx_ia
;
415 ia
->ri_id
= rpcrdma_create_id(xprt
, ia
);
416 if (IS_ERR(ia
->ri_id
)) {
417 rc
= PTR_ERR(ia
->ri_id
);
420 ia
->ri_device
= ia
->ri_id
->device
;
422 ia
->ri_pd
= ib_alloc_pd(ia
->ri_device
, 0);
423 if (IS_ERR(ia
->ri_pd
)) {
424 rc
= PTR_ERR(ia
->ri_pd
);
425 pr_err("rpcrdma: ib_alloc_pd() returned %d\n", rc
);
429 switch (xprt_rdma_memreg_strategy
) {
431 if (frwr_is_supported(ia
)) {
432 ia
->ri_ops
= &rpcrdma_frwr_memreg_ops
;
436 case RPCRDMA_MTHCAFMR
:
437 if (fmr_is_supported(ia
)) {
438 ia
->ri_ops
= &rpcrdma_fmr_memreg_ops
;
443 pr_err("rpcrdma: Device %s does not support memreg mode %d\n",
444 ia
->ri_device
->name
, xprt_rdma_memreg_strategy
);
452 rpcrdma_ia_close(ia
);
457 * rpcrdma_ia_remove - Handle device driver unload
458 * @ia: interface adapter being removed
460 * Divest transport H/W resources associated with this adapter,
461 * but allow it to be restored later.
464 rpcrdma_ia_remove(struct rpcrdma_ia
*ia
)
466 struct rpcrdma_xprt
*r_xprt
= container_of(ia
, struct rpcrdma_xprt
,
468 struct rpcrdma_ep
*ep
= &r_xprt
->rx_ep
;
469 struct rpcrdma_buffer
*buf
= &r_xprt
->rx_buf
;
470 struct rpcrdma_req
*req
;
471 struct rpcrdma_rep
*rep
;
473 cancel_delayed_work_sync(&buf
->rb_refresh_worker
);
475 /* This is similar to rpcrdma_ep_destroy, but:
476 * - Don't cancel the connect worker.
477 * - Don't call rpcrdma_ep_disconnect, which waits
478 * for another conn upcall, which will deadlock.
479 * - rdma_disconnect is unneeded, the underlying
480 * connection is already gone.
483 ib_drain_qp(ia
->ri_id
->qp
);
484 rdma_destroy_qp(ia
->ri_id
);
485 ia
->ri_id
->qp
= NULL
;
487 ib_free_cq(ep
->rep_attr
.recv_cq
);
488 ep
->rep_attr
.recv_cq
= NULL
;
489 ib_free_cq(ep
->rep_attr
.send_cq
);
490 ep
->rep_attr
.send_cq
= NULL
;
492 /* The ULP is responsible for ensuring all DMA
493 * mappings and MRs are gone.
495 list_for_each_entry(rep
, &buf
->rb_recv_bufs
, rr_list
)
496 rpcrdma_dma_unmap_regbuf(rep
->rr_rdmabuf
);
497 list_for_each_entry(req
, &buf
->rb_allreqs
, rl_all
) {
498 rpcrdma_dma_unmap_regbuf(req
->rl_rdmabuf
);
499 rpcrdma_dma_unmap_regbuf(req
->rl_sendbuf
);
500 rpcrdma_dma_unmap_regbuf(req
->rl_recvbuf
);
502 rpcrdma_mrs_destroy(buf
);
503 ib_dealloc_pd(ia
->ri_pd
);
506 /* Allow waiters to continue */
507 complete(&ia
->ri_remove_done
);
509 trace_xprtrdma_remove(r_xprt
);
513 * rpcrdma_ia_close - Clean up/close an IA.
514 * @ia: interface adapter to close
518 rpcrdma_ia_close(struct rpcrdma_ia
*ia
)
520 if (ia
->ri_id
!= NULL
&& !IS_ERR(ia
->ri_id
)) {
522 rdma_destroy_qp(ia
->ri_id
);
523 rdma_destroy_id(ia
->ri_id
);
526 ia
->ri_device
= NULL
;
528 /* If the pd is still busy, xprtrdma missed freeing a resource */
529 if (ia
->ri_pd
&& !IS_ERR(ia
->ri_pd
))
530 ib_dealloc_pd(ia
->ri_pd
);
535 * Create unconnected endpoint.
538 rpcrdma_ep_create(struct rpcrdma_ep
*ep
, struct rpcrdma_ia
*ia
,
539 struct rpcrdma_create_data_internal
*cdata
)
541 struct rpcrdma_connect_private
*pmsg
= &ep
->rep_cm_private
;
542 struct ib_cq
*sendcq
, *recvcq
;
543 unsigned int max_sge
;
546 max_sge
= min_t(unsigned int, ia
->ri_device
->attrs
.max_send_sge
,
547 RPCRDMA_MAX_SEND_SGES
);
548 if (max_sge
< RPCRDMA_MIN_SEND_SGES
) {
549 pr_warn("rpcrdma: HCA provides only %d send SGEs\n", max_sge
);
552 ia
->ri_max_send_sges
= max_sge
;
554 rc
= ia
->ri_ops
->ro_open(ia
, ep
, cdata
);
558 ep
->rep_attr
.event_handler
= rpcrdma_qp_event_handler
;
559 ep
->rep_attr
.qp_context
= ep
;
560 ep
->rep_attr
.srq
= NULL
;
561 ep
->rep_attr
.cap
.max_send_sge
= max_sge
;
562 ep
->rep_attr
.cap
.max_recv_sge
= 1;
563 ep
->rep_attr
.cap
.max_inline_data
= 0;
564 ep
->rep_attr
.sq_sig_type
= IB_SIGNAL_REQ_WR
;
565 ep
->rep_attr
.qp_type
= IB_QPT_RC
;
566 ep
->rep_attr
.port_num
= ~0;
568 dprintk("RPC: %s: requested max: dtos: send %d recv %d; "
569 "iovs: send %d recv %d\n",
571 ep
->rep_attr
.cap
.max_send_wr
,
572 ep
->rep_attr
.cap
.max_recv_wr
,
573 ep
->rep_attr
.cap
.max_send_sge
,
574 ep
->rep_attr
.cap
.max_recv_sge
);
576 /* set trigger for requesting send completion */
577 ep
->rep_send_batch
= min_t(unsigned int, RPCRDMA_MAX_SEND_BATCH
,
578 cdata
->max_requests
>> 2);
579 ep
->rep_send_count
= ep
->rep_send_batch
;
580 init_waitqueue_head(&ep
->rep_connect_wait
);
581 INIT_DELAYED_WORK(&ep
->rep_disconnect_worker
,
582 rpcrdma_disconnect_worker
);
584 sendcq
= ib_alloc_cq(ia
->ri_device
, NULL
,
585 ep
->rep_attr
.cap
.max_send_wr
+ 1,
586 1, IB_POLL_WORKQUEUE
);
587 if (IS_ERR(sendcq
)) {
588 rc
= PTR_ERR(sendcq
);
589 dprintk("RPC: %s: failed to create send CQ: %i\n",
594 recvcq
= ib_alloc_cq(ia
->ri_device
, NULL
,
595 ep
->rep_attr
.cap
.max_recv_wr
+ 1,
596 0, IB_POLL_WORKQUEUE
);
597 if (IS_ERR(recvcq
)) {
598 rc
= PTR_ERR(recvcq
);
599 dprintk("RPC: %s: failed to create recv CQ: %i\n",
604 ep
->rep_attr
.send_cq
= sendcq
;
605 ep
->rep_attr
.recv_cq
= recvcq
;
607 /* Initialize cma parameters */
608 memset(&ep
->rep_remote_cma
, 0, sizeof(ep
->rep_remote_cma
));
610 /* Prepare RDMA-CM private message */
611 pmsg
->cp_magic
= rpcrdma_cmp_magic
;
612 pmsg
->cp_version
= RPCRDMA_CMP_VERSION
;
613 pmsg
->cp_flags
|= ia
->ri_ops
->ro_send_w_inv_ok
;
614 pmsg
->cp_send_size
= rpcrdma_encode_buffer_size(cdata
->inline_wsize
);
615 pmsg
->cp_recv_size
= rpcrdma_encode_buffer_size(cdata
->inline_rsize
);
616 ep
->rep_remote_cma
.private_data
= pmsg
;
617 ep
->rep_remote_cma
.private_data_len
= sizeof(*pmsg
);
619 /* Client offers RDMA Read but does not initiate */
620 ep
->rep_remote_cma
.initiator_depth
= 0;
621 ep
->rep_remote_cma
.responder_resources
=
622 min_t(int, U8_MAX
, ia
->ri_device
->attrs
.max_qp_rd_atom
);
624 /* Limit transport retries so client can detect server
625 * GID changes quickly. RPC layer handles re-establishing
626 * transport connection and retransmission.
628 ep
->rep_remote_cma
.retry_count
= 6;
630 /* RPC-over-RDMA handles its own flow control. In addition,
631 * make all RNR NAKs visible so we know that RPC-over-RDMA
632 * flow control is working correctly (no NAKs should be seen).
634 ep
->rep_remote_cma
.flow_control
= 0;
635 ep
->rep_remote_cma
.rnr_retry_count
= 0;
648 * Disconnect and destroy endpoint. After this, the only
649 * valid operations on the ep are to free it (if dynamically
650 * allocated) or re-create it.
653 rpcrdma_ep_destroy(struct rpcrdma_ep
*ep
, struct rpcrdma_ia
*ia
)
655 cancel_delayed_work_sync(&ep
->rep_disconnect_worker
);
657 if (ia
->ri_id
&& ia
->ri_id
->qp
) {
658 rpcrdma_ep_disconnect(ep
, ia
);
659 rdma_destroy_qp(ia
->ri_id
);
660 ia
->ri_id
->qp
= NULL
;
663 if (ep
->rep_attr
.recv_cq
)
664 ib_free_cq(ep
->rep_attr
.recv_cq
);
665 if (ep
->rep_attr
.send_cq
)
666 ib_free_cq(ep
->rep_attr
.send_cq
);
669 /* Re-establish a connection after a device removal event.
670 * Unlike a normal reconnection, a fresh PD and a new set
671 * of MRs and buffers is needed.
674 rpcrdma_ep_recreate_xprt(struct rpcrdma_xprt
*r_xprt
,
675 struct rpcrdma_ep
*ep
, struct rpcrdma_ia
*ia
)
679 trace_xprtrdma_reinsert(r_xprt
);
682 if (rpcrdma_ia_open(r_xprt
))
686 err
= rpcrdma_ep_create(ep
, ia
, &r_xprt
->rx_data
);
688 pr_err("rpcrdma: rpcrdma_ep_create returned %d\n", err
);
693 err
= rdma_create_qp(ia
->ri_id
, ia
->ri_pd
, &ep
->rep_attr
);
695 pr_err("rpcrdma: rdma_create_qp returned %d\n", err
);
699 rpcrdma_mrs_create(r_xprt
);
703 rpcrdma_ep_destroy(ep
, ia
);
705 rpcrdma_ia_close(ia
);
711 rpcrdma_ep_reconnect(struct rpcrdma_xprt
*r_xprt
, struct rpcrdma_ep
*ep
,
712 struct rpcrdma_ia
*ia
)
714 struct rdma_cm_id
*id
, *old
;
717 trace_xprtrdma_reconnect(r_xprt
);
719 rpcrdma_ep_disconnect(ep
, ia
);
722 id
= rpcrdma_create_id(r_xprt
, ia
);
726 /* As long as the new ID points to the same device as the
727 * old ID, we can reuse the transport's existing PD and all
728 * previously allocated MRs. Also, the same device means
729 * the transport's previous DMA mappings are still valid.
731 * This is a sanity check only. There should be no way these
732 * point to two different devices here.
736 if (ia
->ri_device
!= id
->device
) {
737 pr_err("rpcrdma: can't reconnect on different device!\n");
741 err
= rdma_create_qp(id
, ia
->ri_pd
, &ep
->rep_attr
);
743 dprintk("RPC: %s: rdma_create_qp returned %d\n",
748 /* Atomically replace the transport's ID and QP. */
752 rdma_destroy_qp(old
);
755 rdma_destroy_id(old
);
761 * Connect unconnected endpoint.
764 rpcrdma_ep_connect(struct rpcrdma_ep
*ep
, struct rpcrdma_ia
*ia
)
766 struct rpcrdma_xprt
*r_xprt
= container_of(ia
, struct rpcrdma_xprt
,
768 struct rpc_xprt
*xprt
= &r_xprt
->rx_xprt
;
772 switch (ep
->rep_connected
) {
774 dprintk("RPC: %s: connecting...\n", __func__
);
775 rc
= rdma_create_qp(ia
->ri_id
, ia
->ri_pd
, &ep
->rep_attr
);
777 dprintk("RPC: %s: rdma_create_qp failed %i\n",
784 rc
= rpcrdma_ep_recreate_xprt(r_xprt
, ep
, ia
);
789 rc
= rpcrdma_ep_reconnect(r_xprt
, ep
, ia
);
794 ep
->rep_connected
= 0;
795 xprt_clear_connected(xprt
);
797 rpcrdma_post_recvs(r_xprt
, true);
799 rc
= rdma_connect(ia
->ri_id
, &ep
->rep_remote_cma
);
801 dprintk("RPC: %s: rdma_connect() failed with %i\n",
806 wait_event_interruptible(ep
->rep_connect_wait
, ep
->rep_connected
!= 0);
807 if (ep
->rep_connected
<= 0) {
808 if (ep
->rep_connected
== -EAGAIN
)
810 rc
= ep
->rep_connected
;
814 dprintk("RPC: %s: connected\n", __func__
);
818 ep
->rep_connected
= rc
;
825 * rpcrdma_ep_disconnect
827 * This is separate from destroy to facilitate the ability
828 * to reconnect without recreating the endpoint.
830 * This call is not reentrant, and must not be made in parallel
831 * on the same endpoint.
834 rpcrdma_ep_disconnect(struct rpcrdma_ep
*ep
, struct rpcrdma_ia
*ia
)
838 rc
= rdma_disconnect(ia
->ri_id
);
840 /* returns without wait if not connected */
841 wait_event_interruptible(ep
->rep_connect_wait
,
842 ep
->rep_connected
!= 1);
844 ep
->rep_connected
= rc
;
845 trace_xprtrdma_disconnect(container_of(ep
, struct rpcrdma_xprt
,
848 ib_drain_qp(ia
->ri_id
->qp
);
851 /* Fixed-size circular FIFO queue. This implementation is wait-free and
854 * Consumer is the code path that posts Sends. This path dequeues a
855 * sendctx for use by a Send operation. Multiple consumer threads
856 * are serialized by the RPC transport lock, which allows only one
857 * ->send_request call at a time.
859 * Producer is the code path that handles Send completions. This path
860 * enqueues a sendctx that has been completed. Multiple producer
861 * threads are serialized by the ib_poll_cq() function.
864 /* rpcrdma_sendctxs_destroy() assumes caller has already quiesced
865 * queue activity, and ib_drain_qp has flushed all remaining Send
868 static void rpcrdma_sendctxs_destroy(struct rpcrdma_buffer
*buf
)
872 for (i
= 0; i
<= buf
->rb_sc_last
; i
++)
873 kfree(buf
->rb_sc_ctxs
[i
]);
874 kfree(buf
->rb_sc_ctxs
);
877 static struct rpcrdma_sendctx
*rpcrdma_sendctx_create(struct rpcrdma_ia
*ia
)
879 struct rpcrdma_sendctx
*sc
;
881 sc
= kzalloc(sizeof(*sc
) +
882 ia
->ri_max_send_sges
* sizeof(struct ib_sge
),
887 sc
->sc_wr
.wr_cqe
= &sc
->sc_cqe
;
888 sc
->sc_wr
.sg_list
= sc
->sc_sges
;
889 sc
->sc_wr
.opcode
= IB_WR_SEND
;
890 sc
->sc_cqe
.done
= rpcrdma_wc_send
;
894 static int rpcrdma_sendctxs_create(struct rpcrdma_xprt
*r_xprt
)
896 struct rpcrdma_buffer
*buf
= &r_xprt
->rx_buf
;
897 struct rpcrdma_sendctx
*sc
;
900 /* Maximum number of concurrent outstanding Send WRs. Capping
901 * the circular queue size stops Send Queue overflow by causing
902 * the ->send_request call to fail temporarily before too many
905 i
= buf
->rb_max_requests
+ RPCRDMA_MAX_BC_REQUESTS
;
906 dprintk("RPC: %s: allocating %lu send_ctxs\n", __func__
, i
);
907 buf
->rb_sc_ctxs
= kcalloc(i
, sizeof(sc
), GFP_KERNEL
);
908 if (!buf
->rb_sc_ctxs
)
911 buf
->rb_sc_last
= i
- 1;
912 for (i
= 0; i
<= buf
->rb_sc_last
; i
++) {
913 sc
= rpcrdma_sendctx_create(&r_xprt
->rx_ia
);
917 sc
->sc_xprt
= r_xprt
;
918 buf
->rb_sc_ctxs
[i
] = sc
;
924 rpcrdma_sendctxs_destroy(buf
);
928 /* The sendctx queue is not guaranteed to have a size that is a
929 * power of two, thus the helpers in circ_buf.h cannot be used.
930 * The other option is to use modulus (%), which can be expensive.
932 static unsigned long rpcrdma_sendctx_next(struct rpcrdma_buffer
*buf
,
935 return likely(item
< buf
->rb_sc_last
) ? item
+ 1 : 0;
939 * rpcrdma_sendctx_get_locked - Acquire a send context
940 * @buf: transport buffers from which to acquire an unused context
942 * Returns pointer to a free send completion context; or NULL if
943 * the queue is empty.
945 * Usage: Called to acquire an SGE array before preparing a Send WR.
947 * The caller serializes calls to this function (per rpcrdma_buffer),
948 * and provides an effective memory barrier that flushes the new value
951 struct rpcrdma_sendctx
*rpcrdma_sendctx_get_locked(struct rpcrdma_buffer
*buf
)
953 struct rpcrdma_xprt
*r_xprt
;
954 struct rpcrdma_sendctx
*sc
;
955 unsigned long next_head
;
957 next_head
= rpcrdma_sendctx_next(buf
, buf
->rb_sc_head
);
959 if (next_head
== READ_ONCE(buf
->rb_sc_tail
))
962 /* ORDER: item must be accessed _before_ head is updated */
963 sc
= buf
->rb_sc_ctxs
[next_head
];
965 /* Releasing the lock in the caller acts as a memory
966 * barrier that flushes rb_sc_head.
968 buf
->rb_sc_head
= next_head
;
973 /* The queue is "empty" if there have not been enough Send
974 * completions recently. This is a sign the Send Queue is
975 * backing up. Cause the caller to pause and try again.
977 set_bit(RPCRDMA_BUF_F_EMPTY_SCQ
, &buf
->rb_flags
);
978 r_xprt
= container_of(buf
, struct rpcrdma_xprt
, rx_buf
);
979 r_xprt
->rx_stats
.empty_sendctx_q
++;
984 * rpcrdma_sendctx_put_locked - Release a send context
985 * @sc: send context to release
987 * Usage: Called from Send completion to return a sendctxt
990 * The caller serializes calls to this function (per rpcrdma_buffer).
993 rpcrdma_sendctx_put_locked(struct rpcrdma_sendctx
*sc
)
995 struct rpcrdma_buffer
*buf
= &sc
->sc_xprt
->rx_buf
;
996 unsigned long next_tail
;
998 /* Unmap SGEs of previously completed by unsignaled
999 * Sends by walking up the queue until @sc is found.
1001 next_tail
= buf
->rb_sc_tail
;
1003 next_tail
= rpcrdma_sendctx_next(buf
, next_tail
);
1005 /* ORDER: item must be accessed _before_ tail is updated */
1006 rpcrdma_unmap_sendctx(buf
->rb_sc_ctxs
[next_tail
]);
1008 } while (buf
->rb_sc_ctxs
[next_tail
] != sc
);
1010 /* Paired with READ_ONCE */
1011 smp_store_release(&buf
->rb_sc_tail
, next_tail
);
1013 if (test_and_clear_bit(RPCRDMA_BUF_F_EMPTY_SCQ
, &buf
->rb_flags
)) {
1014 smp_mb__after_atomic();
1015 xprt_write_space(&sc
->sc_xprt
->rx_xprt
);
1020 rpcrdma_mrs_create(struct rpcrdma_xprt
*r_xprt
)
1022 struct rpcrdma_buffer
*buf
= &r_xprt
->rx_buf
;
1023 struct rpcrdma_ia
*ia
= &r_xprt
->rx_ia
;
1028 for (count
= 0; count
< ia
->ri_max_segs
; count
++) {
1029 struct rpcrdma_mr
*mr
;
1032 mr
= kzalloc(sizeof(*mr
), GFP_KERNEL
);
1036 rc
= ia
->ri_ops
->ro_init_mr(ia
, mr
);
1042 mr
->mr_xprt
= r_xprt
;
1044 list_add(&mr
->mr_list
, &free
);
1045 list_add(&mr
->mr_all
, &all
);
1048 spin_lock(&buf
->rb_mrlock
);
1049 list_splice(&free
, &buf
->rb_mrs
);
1050 list_splice(&all
, &buf
->rb_all
);
1051 r_xprt
->rx_stats
.mrs_allocated
+= count
;
1052 spin_unlock(&buf
->rb_mrlock
);
1053 trace_xprtrdma_createmrs(r_xprt
, count
);
1055 xprt_write_space(&r_xprt
->rx_xprt
);
1059 rpcrdma_mr_refresh_worker(struct work_struct
*work
)
1061 struct rpcrdma_buffer
*buf
= container_of(work
, struct rpcrdma_buffer
,
1062 rb_refresh_worker
.work
);
1063 struct rpcrdma_xprt
*r_xprt
= container_of(buf
, struct rpcrdma_xprt
,
1066 rpcrdma_mrs_create(r_xprt
);
1069 struct rpcrdma_req
*
1070 rpcrdma_create_req(struct rpcrdma_xprt
*r_xprt
)
1072 struct rpcrdma_buffer
*buffer
= &r_xprt
->rx_buf
;
1073 struct rpcrdma_regbuf
*rb
;
1074 struct rpcrdma_req
*req
;
1076 req
= kzalloc(sizeof(*req
), GFP_KERNEL
);
1078 return ERR_PTR(-ENOMEM
);
1080 rb
= rpcrdma_alloc_regbuf(RPCRDMA_HDRBUF_SIZE
,
1081 DMA_TO_DEVICE
, GFP_KERNEL
);
1084 return ERR_PTR(-ENOMEM
);
1086 req
->rl_rdmabuf
= rb
;
1087 xdr_buf_init(&req
->rl_hdrbuf
, rb
->rg_base
, rdmab_length(rb
));
1088 req
->rl_buffer
= buffer
;
1089 INIT_LIST_HEAD(&req
->rl_registered
);
1091 spin_lock(&buffer
->rb_reqslock
);
1092 list_add(&req
->rl_all
, &buffer
->rb_allreqs
);
1093 spin_unlock(&buffer
->rb_reqslock
);
1098 rpcrdma_create_rep(struct rpcrdma_xprt
*r_xprt
, bool temp
)
1100 struct rpcrdma_create_data_internal
*cdata
= &r_xprt
->rx_data
;
1101 struct rpcrdma_buffer
*buf
= &r_xprt
->rx_buf
;
1102 struct rpcrdma_rep
*rep
;
1106 rep
= kzalloc(sizeof(*rep
), GFP_KERNEL
);
1110 rep
->rr_rdmabuf
= rpcrdma_alloc_regbuf(cdata
->inline_rsize
,
1111 DMA_FROM_DEVICE
, GFP_KERNEL
);
1112 if (IS_ERR(rep
->rr_rdmabuf
)) {
1113 rc
= PTR_ERR(rep
->rr_rdmabuf
);
1116 xdr_buf_init(&rep
->rr_hdrbuf
, rep
->rr_rdmabuf
->rg_base
,
1117 rdmab_length(rep
->rr_rdmabuf
));
1119 rep
->rr_cqe
.done
= rpcrdma_wc_receive
;
1120 rep
->rr_rxprt
= r_xprt
;
1121 INIT_WORK(&rep
->rr_work
, rpcrdma_deferred_completion
);
1122 rep
->rr_recv_wr
.next
= NULL
;
1123 rep
->rr_recv_wr
.wr_cqe
= &rep
->rr_cqe
;
1124 rep
->rr_recv_wr
.sg_list
= &rep
->rr_rdmabuf
->rg_iov
;
1125 rep
->rr_recv_wr
.num_sge
= 1;
1126 rep
->rr_temp
= temp
;
1128 spin_lock(&buf
->rb_lock
);
1129 list_add(&rep
->rr_list
, &buf
->rb_recv_bufs
);
1130 spin_unlock(&buf
->rb_lock
);
1136 dprintk("RPC: %s: reply buffer %d alloc failed\n",
1142 rpcrdma_buffer_create(struct rpcrdma_xprt
*r_xprt
)
1144 struct rpcrdma_buffer
*buf
= &r_xprt
->rx_buf
;
1148 buf
->rb_max_requests
= r_xprt
->rx_data
.max_requests
;
1149 buf
->rb_bc_srv_max_requests
= 0;
1150 spin_lock_init(&buf
->rb_mrlock
);
1151 spin_lock_init(&buf
->rb_lock
);
1152 INIT_LIST_HEAD(&buf
->rb_mrs
);
1153 INIT_LIST_HEAD(&buf
->rb_all
);
1154 INIT_DELAYED_WORK(&buf
->rb_refresh_worker
,
1155 rpcrdma_mr_refresh_worker
);
1157 rpcrdma_mrs_create(r_xprt
);
1159 INIT_LIST_HEAD(&buf
->rb_send_bufs
);
1160 INIT_LIST_HEAD(&buf
->rb_allreqs
);
1161 spin_lock_init(&buf
->rb_reqslock
);
1162 for (i
= 0; i
< buf
->rb_max_requests
; i
++) {
1163 struct rpcrdma_req
*req
;
1165 req
= rpcrdma_create_req(r_xprt
);
1167 dprintk("RPC: %s: request buffer %d alloc"
1168 " failed\n", __func__
, i
);
1172 list_add(&req
->rl_list
, &buf
->rb_send_bufs
);
1175 buf
->rb_credits
= 1;
1176 buf
->rb_posted_receives
= 0;
1177 INIT_LIST_HEAD(&buf
->rb_recv_bufs
);
1179 rc
= rpcrdma_sendctxs_create(r_xprt
);
1185 rpcrdma_buffer_destroy(buf
);
1190 rpcrdma_destroy_rep(struct rpcrdma_rep
*rep
)
1192 rpcrdma_free_regbuf(rep
->rr_rdmabuf
);
1197 rpcrdma_destroy_req(struct rpcrdma_req
*req
)
1199 rpcrdma_free_regbuf(req
->rl_recvbuf
);
1200 rpcrdma_free_regbuf(req
->rl_sendbuf
);
1201 rpcrdma_free_regbuf(req
->rl_rdmabuf
);
1206 rpcrdma_mrs_destroy(struct rpcrdma_buffer
*buf
)
1208 struct rpcrdma_xprt
*r_xprt
= container_of(buf
, struct rpcrdma_xprt
,
1210 struct rpcrdma_ia
*ia
= rdmab_to_ia(buf
);
1211 struct rpcrdma_mr
*mr
;
1215 spin_lock(&buf
->rb_mrlock
);
1216 while (!list_empty(&buf
->rb_all
)) {
1217 mr
= list_entry(buf
->rb_all
.next
, struct rpcrdma_mr
, mr_all
);
1218 list_del(&mr
->mr_all
);
1220 spin_unlock(&buf
->rb_mrlock
);
1222 /* Ensure MW is not on any rl_registered list */
1223 if (!list_empty(&mr
->mr_list
))
1224 list_del(&mr
->mr_list
);
1226 ia
->ri_ops
->ro_release_mr(mr
);
1228 spin_lock(&buf
->rb_mrlock
);
1230 spin_unlock(&buf
->rb_mrlock
);
1231 r_xprt
->rx_stats
.mrs_allocated
= 0;
1233 dprintk("RPC: %s: released %u MRs\n", __func__
, count
);
1237 rpcrdma_buffer_destroy(struct rpcrdma_buffer
*buf
)
1239 cancel_delayed_work_sync(&buf
->rb_refresh_worker
);
1241 rpcrdma_sendctxs_destroy(buf
);
1243 while (!list_empty(&buf
->rb_recv_bufs
)) {
1244 struct rpcrdma_rep
*rep
;
1246 rep
= list_first_entry(&buf
->rb_recv_bufs
,
1247 struct rpcrdma_rep
, rr_list
);
1248 list_del(&rep
->rr_list
);
1249 rpcrdma_destroy_rep(rep
);
1252 spin_lock(&buf
->rb_reqslock
);
1253 while (!list_empty(&buf
->rb_allreqs
)) {
1254 struct rpcrdma_req
*req
;
1256 req
= list_first_entry(&buf
->rb_allreqs
,
1257 struct rpcrdma_req
, rl_all
);
1258 list_del(&req
->rl_all
);
1260 spin_unlock(&buf
->rb_reqslock
);
1261 rpcrdma_destroy_req(req
);
1262 spin_lock(&buf
->rb_reqslock
);
1264 spin_unlock(&buf
->rb_reqslock
);
1266 rpcrdma_mrs_destroy(buf
);
1270 * rpcrdma_mr_get - Allocate an rpcrdma_mr object
1271 * @r_xprt: controlling transport
1273 * Returns an initialized rpcrdma_mr or NULL if no free
1274 * rpcrdma_mr objects are available.
1277 rpcrdma_mr_get(struct rpcrdma_xprt
*r_xprt
)
1279 struct rpcrdma_buffer
*buf
= &r_xprt
->rx_buf
;
1280 struct rpcrdma_mr
*mr
= NULL
;
1282 spin_lock(&buf
->rb_mrlock
);
1283 if (!list_empty(&buf
->rb_mrs
))
1284 mr
= rpcrdma_mr_pop(&buf
->rb_mrs
);
1285 spin_unlock(&buf
->rb_mrlock
);
1292 trace_xprtrdma_nomrs(r_xprt
);
1293 if (r_xprt
->rx_ep
.rep_connected
!= -ENODEV
)
1294 schedule_delayed_work(&buf
->rb_refresh_worker
, 0);
1296 /* Allow the reply handler and refresh worker to run */
1303 __rpcrdma_mr_put(struct rpcrdma_buffer
*buf
, struct rpcrdma_mr
*mr
)
1305 spin_lock(&buf
->rb_mrlock
);
1306 rpcrdma_mr_push(mr
, &buf
->rb_mrs
);
1307 spin_unlock(&buf
->rb_mrlock
);
1311 * rpcrdma_mr_put - Release an rpcrdma_mr object
1312 * @mr: object to release
1316 rpcrdma_mr_put(struct rpcrdma_mr
*mr
)
1318 __rpcrdma_mr_put(&mr
->mr_xprt
->rx_buf
, mr
);
1322 * rpcrdma_mr_unmap_and_put - DMA unmap an MR and release it
1323 * @mr: object to release
1327 rpcrdma_mr_unmap_and_put(struct rpcrdma_mr
*mr
)
1329 struct rpcrdma_xprt
*r_xprt
= mr
->mr_xprt
;
1331 if (mr
->mr_dir
!= DMA_NONE
) {
1332 trace_xprtrdma_mr_unmap(mr
);
1333 ib_dma_unmap_sg(r_xprt
->rx_ia
.ri_device
,
1334 mr
->mr_sg
, mr
->mr_nents
, mr
->mr_dir
);
1335 mr
->mr_dir
= DMA_NONE
;
1337 __rpcrdma_mr_put(&r_xprt
->rx_buf
, mr
);
1341 * rpcrdma_buffer_get - Get a request buffer
1342 * @buffers: Buffer pool from which to obtain a buffer
1344 * Returns a fresh rpcrdma_req, or NULL if none are available.
1346 struct rpcrdma_req
*
1347 rpcrdma_buffer_get(struct rpcrdma_buffer
*buffers
)
1349 struct rpcrdma_req
*req
;
1351 spin_lock(&buffers
->rb_lock
);
1352 req
= list_first_entry_or_null(&buffers
->rb_send_bufs
,
1353 struct rpcrdma_req
, rl_list
);
1355 list_del_init(&req
->rl_list
);
1356 spin_unlock(&buffers
->rb_lock
);
1361 * rpcrdma_buffer_put - Put request/reply buffers back into pool
1362 * @req: object to return
1366 rpcrdma_buffer_put(struct rpcrdma_req
*req
)
1368 struct rpcrdma_buffer
*buffers
= req
->rl_buffer
;
1369 struct rpcrdma_rep
*rep
= req
->rl_reply
;
1371 req
->rl_reply
= NULL
;
1373 spin_lock(&buffers
->rb_lock
);
1374 list_add(&req
->rl_list
, &buffers
->rb_send_bufs
);
1376 if (!rep
->rr_temp
) {
1377 list_add(&rep
->rr_list
, &buffers
->rb_recv_bufs
);
1381 spin_unlock(&buffers
->rb_lock
);
1383 rpcrdma_destroy_rep(rep
);
1387 * Put reply buffers back into pool when not attached to
1388 * request. This happens in error conditions.
1391 rpcrdma_recv_buffer_put(struct rpcrdma_rep
*rep
)
1393 struct rpcrdma_buffer
*buffers
= &rep
->rr_rxprt
->rx_buf
;
1395 if (!rep
->rr_temp
) {
1396 spin_lock(&buffers
->rb_lock
);
1397 list_add(&rep
->rr_list
, &buffers
->rb_recv_bufs
);
1398 spin_unlock(&buffers
->rb_lock
);
1400 rpcrdma_destroy_rep(rep
);
1405 * rpcrdma_alloc_regbuf - allocate and DMA-map memory for SEND/RECV buffers
1406 * @size: size of buffer to be allocated, in bytes
1407 * @direction: direction of data movement
1410 * Returns an ERR_PTR, or a pointer to a regbuf, a buffer that
1411 * can be persistently DMA-mapped for I/O.
1413 * xprtrdma uses a regbuf for posting an outgoing RDMA SEND, or for
1414 * receiving the payload of RDMA RECV operations. During Long Calls
1415 * or Replies they may be registered externally via ro_map.
1417 struct rpcrdma_regbuf
*
1418 rpcrdma_alloc_regbuf(size_t size
, enum dma_data_direction direction
,
1421 struct rpcrdma_regbuf
*rb
;
1423 rb
= kmalloc(sizeof(*rb
) + size
, flags
);
1425 return ERR_PTR(-ENOMEM
);
1427 rb
->rg_device
= NULL
;
1428 rb
->rg_direction
= direction
;
1429 rb
->rg_iov
.length
= size
;
1435 * __rpcrdma_map_regbuf - DMA-map a regbuf
1436 * @ia: controlling rpcrdma_ia
1437 * @rb: regbuf to be mapped
1440 __rpcrdma_dma_map_regbuf(struct rpcrdma_ia
*ia
, struct rpcrdma_regbuf
*rb
)
1442 struct ib_device
*device
= ia
->ri_device
;
1444 if (rb
->rg_direction
== DMA_NONE
)
1447 rb
->rg_iov
.addr
= ib_dma_map_single(device
,
1448 (void *)rb
->rg_base
,
1451 if (ib_dma_mapping_error(device
, rdmab_addr(rb
)))
1454 rb
->rg_device
= device
;
1455 rb
->rg_iov
.lkey
= ia
->ri_pd
->local_dma_lkey
;
1460 rpcrdma_dma_unmap_regbuf(struct rpcrdma_regbuf
*rb
)
1465 if (!rpcrdma_regbuf_is_mapped(rb
))
1468 ib_dma_unmap_single(rb
->rg_device
, rdmab_addr(rb
),
1469 rdmab_length(rb
), rb
->rg_direction
);
1470 rb
->rg_device
= NULL
;
1474 * rpcrdma_free_regbuf - deregister and free registered buffer
1475 * @rb: regbuf to be deregistered and freed
1478 rpcrdma_free_regbuf(struct rpcrdma_regbuf
*rb
)
1480 rpcrdma_dma_unmap_regbuf(rb
);
1485 * Prepost any receive buffer, then post send.
1487 * Receive buffer is donated to hardware, reclaimed upon recv completion.
1490 rpcrdma_ep_post(struct rpcrdma_ia
*ia
,
1491 struct rpcrdma_ep
*ep
,
1492 struct rpcrdma_req
*req
)
1494 struct ib_send_wr
*send_wr
= &req
->rl_sendctx
->sc_wr
;
1497 if (!ep
->rep_send_count
||
1498 test_bit(RPCRDMA_REQ_F_TX_RESOURCES
, &req
->rl_flags
)) {
1499 send_wr
->send_flags
|= IB_SEND_SIGNALED
;
1500 ep
->rep_send_count
= ep
->rep_send_batch
;
1502 send_wr
->send_flags
&= ~IB_SEND_SIGNALED
;
1503 --ep
->rep_send_count
;
1506 rc
= ia
->ri_ops
->ro_send(ia
, req
);
1507 trace_xprtrdma_post_send(req
, rc
);
1514 * rpcrdma_post_recvs - Maybe post some Receive buffers
1515 * @r_xprt: controlling transport
1516 * @temp: when true, allocate temp rpcrdma_rep objects
1520 rpcrdma_post_recvs(struct rpcrdma_xprt
*r_xprt
, bool temp
)
1522 struct rpcrdma_buffer
*buf
= &r_xprt
->rx_buf
;
1523 struct ib_recv_wr
*wr
, *bad_wr
;
1524 int needed
, count
, rc
;
1528 needed
= buf
->rb_credits
+ (buf
->rb_bc_srv_max_requests
<< 1);
1529 if (buf
->rb_posted_receives
> needed
)
1531 needed
-= buf
->rb_posted_receives
;
1536 struct rpcrdma_regbuf
*rb
;
1537 struct rpcrdma_rep
*rep
;
1539 spin_lock(&buf
->rb_lock
);
1540 rep
= list_first_entry_or_null(&buf
->rb_recv_bufs
,
1541 struct rpcrdma_rep
, rr_list
);
1543 list_del(&rep
->rr_list
);
1544 spin_unlock(&buf
->rb_lock
);
1546 if (rpcrdma_create_rep(r_xprt
, temp
))
1551 rb
= rep
->rr_rdmabuf
;
1552 if (!rpcrdma_regbuf_is_mapped(rb
)) {
1553 if (!__rpcrdma_dma_map_regbuf(&r_xprt
->rx_ia
, rb
)) {
1554 rpcrdma_recv_buffer_put(rep
);
1559 trace_xprtrdma_post_recv(rep
->rr_recv_wr
.wr_cqe
);
1560 rep
->rr_recv_wr
.next
= wr
;
1561 wr
= &rep
->rr_recv_wr
;
1568 rc
= ib_post_recv(r_xprt
->rx_ia
.ri_id
->qp
, wr
,
1569 (const struct ib_recv_wr
**)&bad_wr
);
1571 for (wr
= bad_wr
; wr
; wr
= wr
->next
) {
1572 struct rpcrdma_rep
*rep
;
1574 rep
= container_of(wr
, struct rpcrdma_rep
, rr_recv_wr
);
1575 rpcrdma_recv_buffer_put(rep
);
1579 buf
->rb_posted_receives
+= count
;
1581 trace_xprtrdma_post_recvs(r_xprt
, count
, rc
);