1 // SPDX-License-Identifier: GPL-2.0 OR BSD-3-Clause
3 * Copyright (c) 2015-2018 Oracle. All rights reserved.
4 * Copyright (c) 2014 Open Grid Computing, Inc. All rights reserved.
5 * Copyright (c) 2005-2007 Network Appliance, Inc. All rights reserved.
7 * This software is available to you under a choice of one of two
8 * licenses. You may choose to be licensed under the terms of the GNU
9 * General Public License (GPL) Version 2, available from the file
10 * COPYING in the main directory of this source tree, or the BSD-type
13 * Redistribution and use in source and binary forms, with or without
14 * modification, are permitted provided that the following conditions
17 * Redistributions of source code must retain the above copyright
18 * notice, this list of conditions and the following disclaimer.
20 * Redistributions in binary form must reproduce the above
21 * copyright notice, this list of conditions and the following
22 * disclaimer in the documentation and/or other materials provided
23 * with the distribution.
25 * Neither the name of the Network Appliance, Inc. nor the names of
26 * its contributors may be used to endorse or promote products
27 * derived from this software without specific prior written
30 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
31 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
32 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
33 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
34 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
35 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
36 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
37 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
38 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
39 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
40 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
42 * Author: Tom Tucker <tom@opengridcomputing.com>
45 #include <linux/interrupt.h>
46 #include <linux/sched.h>
47 #include <linux/slab.h>
48 #include <linux/spinlock.h>
49 #include <linux/workqueue.h>
50 #include <linux/export.h>
52 #include <rdma/ib_verbs.h>
53 #include <rdma/rdma_cm.h>
56 #include <linux/sunrpc/addr.h>
57 #include <linux/sunrpc/debug.h>
58 #include <linux/sunrpc/svc_xprt.h>
59 #include <linux/sunrpc/svc_rdma.h>
61 #include "xprt_rdma.h"
62 #include <trace/events/rpcrdma.h>
64 #define RPCDBG_FACILITY RPCDBG_SVCXPRT
66 static struct svcxprt_rdma
*svc_rdma_create_xprt(struct svc_serv
*serv
,
67 struct net
*net
, int node
);
68 static int svc_rdma_listen_handler(struct rdma_cm_id
*cma_id
,
69 struct rdma_cm_event
*event
);
70 static struct svc_xprt
*svc_rdma_create(struct svc_serv
*serv
,
72 struct sockaddr
*sa
, int salen
,
74 static struct svc_xprt
*svc_rdma_accept(struct svc_xprt
*xprt
);
75 static void svc_rdma_detach(struct svc_xprt
*xprt
);
76 static void svc_rdma_free(struct svc_xprt
*xprt
);
77 static int svc_rdma_has_wspace(struct svc_xprt
*xprt
);
78 static void svc_rdma_kill_temp_xprt(struct svc_xprt
*);
80 static const struct svc_xprt_ops svc_rdma_ops
= {
81 .xpo_create
= svc_rdma_create
,
82 .xpo_recvfrom
= svc_rdma_recvfrom
,
83 .xpo_sendto
= svc_rdma_sendto
,
84 .xpo_result_payload
= svc_rdma_result_payload
,
85 .xpo_release_ctxt
= svc_rdma_release_ctxt
,
86 .xpo_detach
= svc_rdma_detach
,
87 .xpo_free
= svc_rdma_free
,
88 .xpo_has_wspace
= svc_rdma_has_wspace
,
89 .xpo_accept
= svc_rdma_accept
,
90 .xpo_kill_temp_xprt
= svc_rdma_kill_temp_xprt
,
93 struct svc_xprt_class svc_rdma_class
= {
95 .xcl_owner
= THIS_MODULE
,
96 .xcl_ops
= &svc_rdma_ops
,
97 .xcl_max_payload
= RPCSVC_MAXPAYLOAD_RDMA
,
98 .xcl_ident
= XPRT_TRANSPORT_RDMA
,
101 /* QP event handler */
102 static void qp_event_handler(struct ib_event
*event
, void *context
)
104 struct svc_xprt
*xprt
= context
;
106 trace_svcrdma_qp_error(event
, (struct sockaddr
*)&xprt
->xpt_remote
);
107 switch (event
->event
) {
108 /* These are considered benign events */
109 case IB_EVENT_PATH_MIG
:
110 case IB_EVENT_COMM_EST
:
111 case IB_EVENT_SQ_DRAINED
:
112 case IB_EVENT_QP_LAST_WQE_REACHED
:
115 /* These are considered fatal events */
116 case IB_EVENT_PATH_MIG_ERR
:
117 case IB_EVENT_QP_FATAL
:
118 case IB_EVENT_QP_REQ_ERR
:
119 case IB_EVENT_QP_ACCESS_ERR
:
120 case IB_EVENT_DEVICE_FATAL
:
122 svc_xprt_deferred_close(xprt
);
127 static struct rdma_cm_id
*
128 svc_rdma_create_listen_id(struct net
*net
, struct sockaddr
*sap
,
131 struct rdma_cm_id
*listen_id
;
134 listen_id
= rdma_create_id(net
, svc_rdma_listen_handler
, context
,
135 RDMA_PS_TCP
, IB_QPT_RC
);
136 if (IS_ERR(listen_id
))
139 /* Allow both IPv4 and IPv6 sockets to bind a single port
142 #if IS_ENABLED(CONFIG_IPV6)
143 ret
= rdma_set_afonly(listen_id
, 1);
147 ret
= rdma_bind_addr(listen_id
, sap
);
151 ret
= rdma_listen(listen_id
, RPCRDMA_LISTEN_BACKLOG
);
158 rdma_destroy_id(listen_id
);
162 static struct svcxprt_rdma
*svc_rdma_create_xprt(struct svc_serv
*serv
,
163 struct net
*net
, int node
)
165 static struct lock_class_key svcrdma_rwctx_lock
;
166 static struct lock_class_key svcrdma_sctx_lock
;
167 static struct lock_class_key svcrdma_dto_lock
;
168 struct svcxprt_rdma
*cma_xprt
;
170 cma_xprt
= kzalloc_node(sizeof(*cma_xprt
), GFP_KERNEL
, node
);
174 svc_xprt_init(net
, &svc_rdma_class
, &cma_xprt
->sc_xprt
, serv
);
175 INIT_LIST_HEAD(&cma_xprt
->sc_accept_q
);
176 INIT_LIST_HEAD(&cma_xprt
->sc_rq_dto_q
);
177 INIT_LIST_HEAD(&cma_xprt
->sc_read_complete_q
);
178 init_llist_head(&cma_xprt
->sc_send_ctxts
);
179 init_llist_head(&cma_xprt
->sc_recv_ctxts
);
180 init_llist_head(&cma_xprt
->sc_rw_ctxts
);
181 init_waitqueue_head(&cma_xprt
->sc_send_wait
);
183 spin_lock_init(&cma_xprt
->sc_lock
);
184 spin_lock_init(&cma_xprt
->sc_rq_dto_lock
);
185 lockdep_set_class(&cma_xprt
->sc_rq_dto_lock
, &svcrdma_dto_lock
);
186 spin_lock_init(&cma_xprt
->sc_send_lock
);
187 lockdep_set_class(&cma_xprt
->sc_send_lock
, &svcrdma_sctx_lock
);
188 spin_lock_init(&cma_xprt
->sc_rw_ctxt_lock
);
189 lockdep_set_class(&cma_xprt
->sc_rw_ctxt_lock
, &svcrdma_rwctx_lock
);
192 * Note that this implies that the underlying transport support
193 * has some form of congestion control (see RFC 7530 section 3.1
194 * paragraph 2). For now, we assume that all supported RDMA
195 * transports are suitable here.
197 set_bit(XPT_CONG_CTRL
, &cma_xprt
->sc_xprt
.xpt_flags
);
203 svc_rdma_parse_connect_private(struct svcxprt_rdma
*newxprt
,
204 struct rdma_conn_param
*param
)
206 const struct rpcrdma_connect_private
*pmsg
= param
->private_data
;
209 pmsg
->cp_magic
== rpcrdma_cmp_magic
&&
210 pmsg
->cp_version
== RPCRDMA_CMP_VERSION
) {
211 newxprt
->sc_snd_w_inv
= pmsg
->cp_flags
&
212 RPCRDMA_CMP_F_SND_W_INV_OK
;
214 dprintk("svcrdma: client send_size %u, recv_size %u "
215 "remote inv %ssupported\n",
216 rpcrdma_decode_buffer_size(pmsg
->cp_send_size
),
217 rpcrdma_decode_buffer_size(pmsg
->cp_recv_size
),
218 newxprt
->sc_snd_w_inv
? "" : "un");
223 * This function handles the CONNECT_REQUEST event on a listening
224 * endpoint. It is passed the cma_id for the _new_ connection. The context in
225 * this cma_id is inherited from the listening cma_id and is the svc_xprt
226 * structure for the listening endpoint.
228 * This function creates a new xprt for the new connection and enqueues it on
229 * the accept queue for the listent xprt. When the listen thread is kicked, it
230 * will call the recvfrom method on the listen xprt which will accept the new
233 static void handle_connect_req(struct rdma_cm_id
*new_cma_id
,
234 struct rdma_conn_param
*param
)
236 struct svcxprt_rdma
*listen_xprt
= new_cma_id
->context
;
237 struct svcxprt_rdma
*newxprt
;
240 newxprt
= svc_rdma_create_xprt(listen_xprt
->sc_xprt
.xpt_server
,
241 listen_xprt
->sc_xprt
.xpt_net
,
242 ibdev_to_node(new_cma_id
->device
));
245 newxprt
->sc_cm_id
= new_cma_id
;
246 new_cma_id
->context
= newxprt
;
247 svc_rdma_parse_connect_private(newxprt
, param
);
249 /* Save client advertised inbound read limit for use later in accept. */
250 newxprt
->sc_ord
= param
->initiator_depth
;
252 sa
= (struct sockaddr
*)&newxprt
->sc_cm_id
->route
.addr
.dst_addr
;
253 newxprt
->sc_xprt
.xpt_remotelen
= svc_addr_len(sa
);
254 memcpy(&newxprt
->sc_xprt
.xpt_remote
, sa
,
255 newxprt
->sc_xprt
.xpt_remotelen
);
256 snprintf(newxprt
->sc_xprt
.xpt_remotebuf
,
257 sizeof(newxprt
->sc_xprt
.xpt_remotebuf
) - 1, "%pISc", sa
);
259 /* The remote port is arbitrary and not under the control of the
260 * client ULP. Set it to a fixed value so that the DRC continues
261 * to be effective after a reconnect.
263 rpc_set_port((struct sockaddr
*)&newxprt
->sc_xprt
.xpt_remote
, 0);
265 sa
= (struct sockaddr
*)&newxprt
->sc_cm_id
->route
.addr
.src_addr
;
266 svc_xprt_set_local(&newxprt
->sc_xprt
, sa
, svc_addr_len(sa
));
269 * Enqueue the new transport on the accept queue of the listening
272 spin_lock(&listen_xprt
->sc_lock
);
273 list_add_tail(&newxprt
->sc_accept_q
, &listen_xprt
->sc_accept_q
);
274 spin_unlock(&listen_xprt
->sc_lock
);
276 set_bit(XPT_CONN
, &listen_xprt
->sc_xprt
.xpt_flags
);
277 svc_xprt_enqueue(&listen_xprt
->sc_xprt
);
281 * svc_rdma_listen_handler - Handle CM events generated on a listening endpoint
282 * @cma_id: the server's listener rdma_cm_id
283 * @event: details of the event
286 * %0: Do not destroy @cma_id
287 * %1: Destroy @cma_id
289 * NB: There is never a DEVICE_REMOVAL event for INADDR_ANY listeners.
291 static int svc_rdma_listen_handler(struct rdma_cm_id
*cma_id
,
292 struct rdma_cm_event
*event
)
294 struct sockaddr
*sap
= (struct sockaddr
*)&cma_id
->route
.addr
.src_addr
;
295 struct svcxprt_rdma
*cma_xprt
= cma_id
->context
;
296 struct svc_xprt
*cma_rdma
= &cma_xprt
->sc_xprt
;
297 struct rdma_cm_id
*listen_id
;
299 switch (event
->event
) {
300 case RDMA_CM_EVENT_CONNECT_REQUEST
:
301 handle_connect_req(cma_id
, &event
->param
.conn
);
303 case RDMA_CM_EVENT_ADDR_CHANGE
:
304 listen_id
= svc_rdma_create_listen_id(cma_rdma
->xpt_net
,
306 if (IS_ERR(listen_id
)) {
307 pr_err("Listener dead, address change failed for device %s\n",
308 cma_id
->device
->name
);
310 cma_xprt
->sc_cm_id
= listen_id
;
319 * svc_rdma_cma_handler - Handle CM events on client connections
320 * @cma_id: the server's listener rdma_cm_id
321 * @event: details of the event
324 * %0: Do not destroy @cma_id
325 * %1: Destroy @cma_id (never returned here)
327 static int svc_rdma_cma_handler(struct rdma_cm_id
*cma_id
,
328 struct rdma_cm_event
*event
)
330 struct svcxprt_rdma
*rdma
= cma_id
->context
;
331 struct svc_xprt
*xprt
= &rdma
->sc_xprt
;
333 switch (event
->event
) {
334 case RDMA_CM_EVENT_ESTABLISHED
:
335 clear_bit(RDMAXPRT_CONN_PENDING
, &rdma
->sc_flags
);
337 /* Handle any requests that were received while
338 * CONN_PENDING was set. */
339 svc_xprt_enqueue(xprt
);
341 case RDMA_CM_EVENT_DISCONNECTED
:
342 svc_xprt_deferred_close(xprt
);
351 * Create a listening RDMA service endpoint.
353 static struct svc_xprt
*svc_rdma_create(struct svc_serv
*serv
,
355 struct sockaddr
*sa
, int salen
,
358 struct rdma_cm_id
*listen_id
;
359 struct svcxprt_rdma
*cma_xprt
;
361 if (sa
->sa_family
!= AF_INET
&& sa
->sa_family
!= AF_INET6
)
362 return ERR_PTR(-EAFNOSUPPORT
);
363 cma_xprt
= svc_rdma_create_xprt(serv
, net
, NUMA_NO_NODE
);
365 return ERR_PTR(-ENOMEM
);
366 set_bit(XPT_LISTENER
, &cma_xprt
->sc_xprt
.xpt_flags
);
367 strcpy(cma_xprt
->sc_xprt
.xpt_remotebuf
, "listener");
369 listen_id
= svc_rdma_create_listen_id(net
, sa
, cma_xprt
);
370 if (IS_ERR(listen_id
)) {
372 return ERR_CAST(listen_id
);
374 cma_xprt
->sc_cm_id
= listen_id
;
377 * We need to use the address from the cm_id in case the
378 * caller specified 0 for the port number.
380 sa
= (struct sockaddr
*)&cma_xprt
->sc_cm_id
->route
.addr
.src_addr
;
381 svc_xprt_set_local(&cma_xprt
->sc_xprt
, sa
, salen
);
383 return &cma_xprt
->sc_xprt
;
386 static void svc_rdma_xprt_done(struct rpcrdma_notification
*rn
)
388 struct svcxprt_rdma
*rdma
= container_of(rn
, struct svcxprt_rdma
,
390 struct rdma_cm_id
*id
= rdma
->sc_cm_id
;
392 trace_svcrdma_device_removal(id
);
393 svc_xprt_close(&rdma
->sc_xprt
);
397 * This is the xpo_recvfrom function for listening endpoints. Its
398 * purpose is to accept incoming connections. The CMA callback handler
399 * has already created a new transport and attached it to the new CMA
402 * There is a queue of pending connections hung on the listening
403 * transport. This queue contains the new svc_xprt structure. This
404 * function takes svc_xprt structures off the accept_q and completes
407 static struct svc_xprt
*svc_rdma_accept(struct svc_xprt
*xprt
)
409 struct svcxprt_rdma
*listen_rdma
;
410 struct svcxprt_rdma
*newxprt
= NULL
;
411 struct rdma_conn_param conn_param
;
412 struct rpcrdma_connect_private pmsg
;
413 struct ib_qp_init_attr qp_attr
;
414 unsigned int ctxts
, rq_depth
;
415 struct ib_device
*dev
;
417 RPC_IFDEBUG(struct sockaddr
*sap
);
419 listen_rdma
= container_of(xprt
, struct svcxprt_rdma
, sc_xprt
);
420 clear_bit(XPT_CONN
, &xprt
->xpt_flags
);
421 /* Get the next entry off the accept list */
422 spin_lock(&listen_rdma
->sc_lock
);
423 if (!list_empty(&listen_rdma
->sc_accept_q
)) {
424 newxprt
= list_entry(listen_rdma
->sc_accept_q
.next
,
425 struct svcxprt_rdma
, sc_accept_q
);
426 list_del_init(&newxprt
->sc_accept_q
);
428 if (!list_empty(&listen_rdma
->sc_accept_q
))
429 set_bit(XPT_CONN
, &listen_rdma
->sc_xprt
.xpt_flags
);
430 spin_unlock(&listen_rdma
->sc_lock
);
434 dev
= newxprt
->sc_cm_id
->device
;
435 newxprt
->sc_port_num
= newxprt
->sc_cm_id
->port_num
;
437 if (rpcrdma_rn_register(dev
, &newxprt
->sc_rn
, svc_rdma_xprt_done
))
440 newxprt
->sc_max_req_size
= svcrdma_max_req_size
;
441 newxprt
->sc_max_requests
= svcrdma_max_requests
;
442 newxprt
->sc_max_bc_requests
= svcrdma_max_bc_requests
;
443 newxprt
->sc_recv_batch
= RPCRDMA_MAX_RECV_BATCH
;
444 newxprt
->sc_fc_credits
= cpu_to_be32(newxprt
->sc_max_requests
);
446 /* Qualify the transport's resource defaults with the
447 * capabilities of this particular device.
450 /* Transport header, head iovec, tail iovec */
451 newxprt
->sc_max_send_sges
= 3;
452 /* Add one SGE per page list entry */
453 newxprt
->sc_max_send_sges
+= (svcrdma_max_req_size
/ PAGE_SIZE
) + 1;
454 if (newxprt
->sc_max_send_sges
> dev
->attrs
.max_send_sge
)
455 newxprt
->sc_max_send_sges
= dev
->attrs
.max_send_sge
;
456 rq_depth
= newxprt
->sc_max_requests
+ newxprt
->sc_max_bc_requests
+
457 newxprt
->sc_recv_batch
+ 1 /* drain */;
458 if (rq_depth
> dev
->attrs
.max_qp_wr
) {
459 rq_depth
= dev
->attrs
.max_qp_wr
;
460 newxprt
->sc_recv_batch
= 1;
461 newxprt
->sc_max_requests
= rq_depth
- 2;
462 newxprt
->sc_max_bc_requests
= 2;
465 /* Arbitrarily estimate the number of rw_ctxs needed for
466 * this transport. This is enough rw_ctxs to make forward
467 * progress even if the client is using one rkey per page
468 * in each Read chunk.
470 ctxts
= 3 * RPCSVC_MAXPAGES
;
471 newxprt
->sc_sq_depth
= rq_depth
+ ctxts
;
472 if (newxprt
->sc_sq_depth
> dev
->attrs
.max_qp_wr
)
473 newxprt
->sc_sq_depth
= dev
->attrs
.max_qp_wr
;
474 atomic_set(&newxprt
->sc_sq_avail
, newxprt
->sc_sq_depth
);
476 newxprt
->sc_pd
= ib_alloc_pd(dev
, 0);
477 if (IS_ERR(newxprt
->sc_pd
)) {
478 trace_svcrdma_pd_err(newxprt
, PTR_ERR(newxprt
->sc_pd
));
481 newxprt
->sc_sq_cq
= ib_alloc_cq_any(dev
, newxprt
, newxprt
->sc_sq_depth
,
483 if (IS_ERR(newxprt
->sc_sq_cq
))
486 ib_alloc_cq_any(dev
, newxprt
, rq_depth
, IB_POLL_WORKQUEUE
);
487 if (IS_ERR(newxprt
->sc_rq_cq
))
490 memset(&qp_attr
, 0, sizeof qp_attr
);
491 qp_attr
.event_handler
= qp_event_handler
;
492 qp_attr
.qp_context
= &newxprt
->sc_xprt
;
493 qp_attr
.port_num
= newxprt
->sc_port_num
;
494 qp_attr
.cap
.max_rdma_ctxs
= ctxts
;
495 qp_attr
.cap
.max_send_wr
= newxprt
->sc_sq_depth
- ctxts
;
496 qp_attr
.cap
.max_recv_wr
= rq_depth
;
497 qp_attr
.cap
.max_send_sge
= newxprt
->sc_max_send_sges
;
498 qp_attr
.cap
.max_recv_sge
= 1;
499 qp_attr
.sq_sig_type
= IB_SIGNAL_REQ_WR
;
500 qp_attr
.qp_type
= IB_QPT_RC
;
501 qp_attr
.send_cq
= newxprt
->sc_sq_cq
;
502 qp_attr
.recv_cq
= newxprt
->sc_rq_cq
;
503 dprintk(" cap.max_send_wr = %d, cap.max_recv_wr = %d\n",
504 qp_attr
.cap
.max_send_wr
, qp_attr
.cap
.max_recv_wr
);
505 dprintk(" cap.max_send_sge = %d, cap.max_recv_sge = %d\n",
506 qp_attr
.cap
.max_send_sge
, qp_attr
.cap
.max_recv_sge
);
507 dprintk(" send CQ depth = %u, recv CQ depth = %u\n",
508 newxprt
->sc_sq_depth
, rq_depth
);
509 ret
= rdma_create_qp(newxprt
->sc_cm_id
, newxprt
->sc_pd
, &qp_attr
);
511 trace_svcrdma_qp_err(newxprt
, ret
);
514 newxprt
->sc_max_send_sges
= qp_attr
.cap
.max_send_sge
;
515 newxprt
->sc_qp
= newxprt
->sc_cm_id
->qp
;
517 if (!(dev
->attrs
.device_cap_flags
& IB_DEVICE_MEM_MGT_EXTENSIONS
))
518 newxprt
->sc_snd_w_inv
= false;
519 if (!rdma_protocol_iwarp(dev
, newxprt
->sc_port_num
) &&
520 !rdma_ib_or_roce(dev
, newxprt
->sc_port_num
)) {
521 trace_svcrdma_fabric_err(newxprt
, -EINVAL
);
525 if (!svc_rdma_post_recvs(newxprt
))
528 /* Construct RDMA-CM private message */
529 pmsg
.cp_magic
= rpcrdma_cmp_magic
;
530 pmsg
.cp_version
= RPCRDMA_CMP_VERSION
;
532 pmsg
.cp_send_size
= pmsg
.cp_recv_size
=
533 rpcrdma_encode_buffer_size(newxprt
->sc_max_req_size
);
535 /* Accept Connection */
536 set_bit(RDMAXPRT_CONN_PENDING
, &newxprt
->sc_flags
);
537 memset(&conn_param
, 0, sizeof conn_param
);
538 conn_param
.responder_resources
= 0;
539 conn_param
.initiator_depth
= min_t(int, newxprt
->sc_ord
,
540 dev
->attrs
.max_qp_init_rd_atom
);
541 if (!conn_param
.initiator_depth
) {
543 trace_svcrdma_initdepth_err(newxprt
, ret
);
546 conn_param
.private_data
= &pmsg
;
547 conn_param
.private_data_len
= sizeof(pmsg
);
548 rdma_lock_handler(newxprt
->sc_cm_id
);
549 newxprt
->sc_cm_id
->event_handler
= svc_rdma_cma_handler
;
550 ret
= rdma_accept(newxprt
->sc_cm_id
, &conn_param
);
551 rdma_unlock_handler(newxprt
->sc_cm_id
);
553 trace_svcrdma_accept_err(newxprt
, ret
);
557 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
558 dprintk("svcrdma: new connection accepted on device %s:\n", dev
->name
);
559 sap
= (struct sockaddr
*)&newxprt
->sc_cm_id
->route
.addr
.src_addr
;
560 dprintk(" local address : %pIS:%u\n", sap
, rpc_get_port(sap
));
561 sap
= (struct sockaddr
*)&newxprt
->sc_cm_id
->route
.addr
.dst_addr
;
562 dprintk(" remote address : %pIS:%u\n", sap
, rpc_get_port(sap
));
563 dprintk(" max_sge : %d\n", newxprt
->sc_max_send_sges
);
564 dprintk(" sq_depth : %d\n", newxprt
->sc_sq_depth
);
565 dprintk(" rdma_rw_ctxs : %d\n", ctxts
);
566 dprintk(" max_requests : %d\n", newxprt
->sc_max_requests
);
567 dprintk(" ord : %d\n", conn_param
.initiator_depth
);
570 return &newxprt
->sc_xprt
;
573 /* Take a reference in case the DTO handler runs */
574 svc_xprt_get(&newxprt
->sc_xprt
);
575 if (newxprt
->sc_qp
&& !IS_ERR(newxprt
->sc_qp
))
576 ib_destroy_qp(newxprt
->sc_qp
);
577 rdma_destroy_id(newxprt
->sc_cm_id
);
578 /* This call to put will destroy the transport */
579 svc_xprt_put(&newxprt
->sc_xprt
);
583 static void svc_rdma_detach(struct svc_xprt
*xprt
)
585 struct svcxprt_rdma
*rdma
=
586 container_of(xprt
, struct svcxprt_rdma
, sc_xprt
);
588 rdma_disconnect(rdma
->sc_cm_id
);
591 static void __svc_rdma_free(struct work_struct
*work
)
593 struct svcxprt_rdma
*rdma
=
594 container_of(work
, struct svcxprt_rdma
, sc_work
);
595 struct ib_device
*device
= rdma
->sc_cm_id
->device
;
597 /* This blocks until the Completion Queues are empty */
598 if (rdma
->sc_qp
&& !IS_ERR(rdma
->sc_qp
))
599 ib_drain_qp(rdma
->sc_qp
);
600 flush_workqueue(svcrdma_wq
);
602 svc_rdma_flush_recv_queues(rdma
);
604 svc_rdma_destroy_rw_ctxts(rdma
);
605 svc_rdma_send_ctxts_destroy(rdma
);
606 svc_rdma_recv_ctxts_destroy(rdma
);
608 /* Destroy the QP if present (not a listener) */
609 if (rdma
->sc_qp
&& !IS_ERR(rdma
->sc_qp
))
610 ib_destroy_qp(rdma
->sc_qp
);
612 if (rdma
->sc_sq_cq
&& !IS_ERR(rdma
->sc_sq_cq
))
613 ib_free_cq(rdma
->sc_sq_cq
);
615 if (rdma
->sc_rq_cq
&& !IS_ERR(rdma
->sc_rq_cq
))
616 ib_free_cq(rdma
->sc_rq_cq
);
618 if (rdma
->sc_pd
&& !IS_ERR(rdma
->sc_pd
))
619 ib_dealloc_pd(rdma
->sc_pd
);
621 /* Destroy the CM ID */
622 rdma_destroy_id(rdma
->sc_cm_id
);
624 rpcrdma_rn_unregister(device
, &rdma
->sc_rn
);
628 static void svc_rdma_free(struct svc_xprt
*xprt
)
630 struct svcxprt_rdma
*rdma
=
631 container_of(xprt
, struct svcxprt_rdma
, sc_xprt
);
633 INIT_WORK(&rdma
->sc_work
, __svc_rdma_free
);
634 schedule_work(&rdma
->sc_work
);
637 static int svc_rdma_has_wspace(struct svc_xprt
*xprt
)
639 struct svcxprt_rdma
*rdma
=
640 container_of(xprt
, struct svcxprt_rdma
, sc_xprt
);
643 * If there are already waiters on the SQ,
646 if (waitqueue_active(&rdma
->sc_send_wait
))
649 /* Otherwise return true. */
653 static void svc_rdma_kill_temp_xprt(struct svc_xprt
*xprt
)