1 // SPDX-License-Identifier: GPL-2.0
3 * Copyright (c) 2015-2018 Oracle. All rights reserved.
5 * Support for backward direction RPCs on RPC/RDMA (server-side).
8 #include <linux/module.h>
10 #include <linux/sunrpc/svc_rdma.h>
12 #include "xprt_rdma.h"
13 #include <trace/events/rpcrdma.h>
15 #define RPCDBG_FACILITY RPCDBG_SVCXPRT
17 #undef SVCRDMA_BACKCHANNEL_DEBUG
20 * svc_rdma_handle_bc_reply - Process incoming backchannel reply
21 * @xprt: controlling backchannel transport
22 * @rdma_resp: pointer to incoming transport header
23 * @rcvbuf: XDR buffer into which to decode the reply
26 * %0 if @rcvbuf is filled in, xprt_complete_rqst called,
27 * %-EAGAIN if server should call ->recvfrom again.
29 int svc_rdma_handle_bc_reply(struct rpc_xprt
*xprt
, __be32
*rdma_resp
,
30 struct xdr_buf
*rcvbuf
)
32 struct rpcrdma_xprt
*r_xprt
= rpcx_to_rdmax(xprt
);
33 struct kvec
*dst
, *src
= &rcvbuf
->head
[0];
42 p
= (__be32
*)src
->iov_base
;
46 #ifdef SVCRDMA_BACKCHANNEL_DEBUG
47 pr_info("%s: xid=%08x, length=%zu\n",
48 __func__
, be32_to_cpu(xid
), len
);
49 pr_info("%s: RPC/RDMA: %*ph\n",
50 __func__
, (int)RPCRDMA_HDRLEN_MIN
, rdma_resp
);
51 pr_info("%s: RPC: %*ph\n",
52 __func__
, (int)len
, p
);
56 if (src
->iov_len
< 24)
59 spin_lock(&xprt
->queue_lock
);
60 req
= xprt_lookup_rqst(xprt
, xid
);
64 dst
= &req
->rq_private_buf
.head
[0];
65 memcpy(&req
->rq_private_buf
, &req
->rq_rcv_buf
, sizeof(struct xdr_buf
));
66 if (dst
->iov_len
< len
)
68 memcpy(dst
->iov_base
, p
, len
);
70 credits
= be32_to_cpup(rdma_resp
+ 2);
72 credits
= 1; /* don't deadlock */
73 else if (credits
> r_xprt
->rx_buf
.rb_bc_max_requests
)
74 credits
= r_xprt
->rx_buf
.rb_bc_max_requests
;
76 spin_lock_bh(&xprt
->transport_lock
);
78 xprt
->cwnd
= credits
<< RPC_CWNDSHIFT
;
79 if (xprt
->cwnd
> cwnd
)
80 xprt_release_rqst_cong(req
->rq_task
);
81 spin_unlock_bh(&xprt
->transport_lock
);
85 xprt_complete_rqst(req
->rq_task
, rcvbuf
->len
);
89 spin_unlock(&xprt
->queue_lock
);
94 dprintk("svcrdma: short bc reply: xprt=%p, len=%zu\n",
99 dprintk("svcrdma: unrecognized bc reply: xprt=%p, xid=%08x\n",
100 xprt
, be32_to_cpu(xid
));
104 /* Send a backwards direction RPC call.
106 * Caller holds the connection's mutex and has already marshaled
107 * the RPC/RDMA request.
109 * This is similar to svc_rdma_send_reply_msg, but takes a struct
110 * rpc_rqst instead, does not support chunks, and avoids blocking
113 * XXX: There is still an opportunity to block in svc_rdma_send()
114 * if there are no SQ entries to post the Send. This may occur if
115 * the adapter has a small maximum SQ depth.
117 static int svc_rdma_bc_sendto(struct svcxprt_rdma
*rdma
,
118 struct rpc_rqst
*rqst
,
119 struct svc_rdma_send_ctxt
*ctxt
)
123 ret
= svc_rdma_map_reply_msg(rdma
, ctxt
, &rqst
->rq_snd_buf
, NULL
);
127 /* Bump page refcnt so Send completion doesn't release
128 * the rq_buffer before all retransmits are complete.
130 get_page(virt_to_page(rqst
->rq_buffer
));
131 ctxt
->sc_send_wr
.opcode
= IB_WR_SEND
;
132 return svc_rdma_send(rdma
, &ctxt
->sc_send_wr
);
135 /* Server-side transport endpoint wants a whole page for its send
136 * buffer. The client RPC code constructs the RPC header in this
137 * buffer before it invokes ->send_request.
140 xprt_rdma_bc_allocate(struct rpc_task
*task
)
142 struct rpc_rqst
*rqst
= task
->tk_rqstp
;
143 size_t size
= rqst
->rq_callsize
;
146 if (size
> PAGE_SIZE
) {
147 WARN_ONCE(1, "svcrdma: large bc buffer request (size %zu)\n",
152 page
= alloc_page(RPCRDMA_DEF_GFP
);
155 rqst
->rq_buffer
= page_address(page
);
157 rqst
->rq_rbuffer
= kmalloc(rqst
->rq_rcvsize
, RPCRDMA_DEF_GFP
);
158 if (!rqst
->rq_rbuffer
) {
166 xprt_rdma_bc_free(struct rpc_task
*task
)
168 struct rpc_rqst
*rqst
= task
->tk_rqstp
;
170 put_page(virt_to_page(rqst
->rq_buffer
));
171 kfree(rqst
->rq_rbuffer
);
175 rpcrdma_bc_send_request(struct svcxprt_rdma
*rdma
, struct rpc_rqst
*rqst
)
177 struct rpc_xprt
*xprt
= rqst
->rq_xprt
;
178 struct rpcrdma_xprt
*r_xprt
= rpcx_to_rdmax(xprt
);
179 struct svc_rdma_send_ctxt
*ctxt
;
183 ctxt
= svc_rdma_send_ctxt_get(rdma
);
185 goto drop_connection
;
187 p
= ctxt
->sc_xprt_buf
;
189 *p
++ = rpcrdma_version
;
190 *p
++ = cpu_to_be32(r_xprt
->rx_buf
.rb_bc_max_requests
);
195 svc_rdma_sync_reply_hdr(rdma
, ctxt
, RPCRDMA_HDRLEN_MIN
);
197 #ifdef SVCRDMA_BACKCHANNEL_DEBUG
198 pr_info("%s: %*ph\n", __func__
, 64, rqst
->rq_buffer
);
201 rc
= svc_rdma_bc_sendto(rdma
, rqst
, ctxt
);
203 svc_rdma_send_ctxt_put(rdma
, ctxt
);
204 goto drop_connection
;
209 dprintk("svcrdma: failed to send bc call\n");
210 xprt_disconnect_done(xprt
);
214 /* Send an RPC call on the passive end of a transport
218 xprt_rdma_bc_send_request(struct rpc_rqst
*rqst
)
220 struct svc_xprt
*sxprt
= rqst
->rq_xprt
->bc_xprt
;
221 struct svcxprt_rdma
*rdma
;
224 dprintk("svcrdma: sending bc call with xid: %08x\n",
225 be32_to_cpu(rqst
->rq_xid
));
227 mutex_lock(&sxprt
->xpt_mutex
);
230 rdma
= container_of(sxprt
, struct svcxprt_rdma
, sc_xprt
);
231 if (!test_bit(XPT_DEAD
, &sxprt
->xpt_flags
))
232 ret
= rpcrdma_bc_send_request(rdma
, rqst
);
234 mutex_unlock(&sxprt
->xpt_mutex
);
242 xprt_rdma_bc_close(struct rpc_xprt
*xprt
)
244 dprintk("svcrdma: %s: xprt %p\n", __func__
, xprt
);
245 xprt
->cwnd
= RPC_CWNDSHIFT
;
249 xprt_rdma_bc_put(struct rpc_xprt
*xprt
)
251 dprintk("svcrdma: %s: xprt %p\n", __func__
, xprt
);
254 module_put(THIS_MODULE
);
257 static const struct rpc_xprt_ops xprt_rdma_bc_procs
= {
258 .reserve_xprt
= xprt_reserve_xprt_cong
,
259 .release_xprt
= xprt_release_xprt_cong
,
260 .alloc_slot
= xprt_alloc_slot
,
261 .free_slot
= xprt_free_slot
,
262 .release_request
= xprt_release_rqst_cong
,
263 .buf_alloc
= xprt_rdma_bc_allocate
,
264 .buf_free
= xprt_rdma_bc_free
,
265 .send_request
= xprt_rdma_bc_send_request
,
266 .set_retrans_timeout
= xprt_set_retrans_timeout_def
,
267 .close
= xprt_rdma_bc_close
,
268 .destroy
= xprt_rdma_bc_put
,
269 .print_stats
= xprt_rdma_print_stats
272 static const struct rpc_timeout xprt_rdma_bc_timeout
= {
273 .to_initval
= 60 * HZ
,
274 .to_maxval
= 60 * HZ
,
277 /* It shouldn't matter if the number of backchannel session slots
278 * doesn't match the number of RPC/RDMA credits. That just means
279 * one or the other will have extra slots that aren't used.
281 static struct rpc_xprt
*
282 xprt_setup_rdma_bc(struct xprt_create
*args
)
284 struct rpc_xprt
*xprt
;
285 struct rpcrdma_xprt
*new_xprt
;
287 if (args
->addrlen
> sizeof(xprt
->addr
)) {
288 dprintk("RPC: %s: address too large\n", __func__
);
289 return ERR_PTR(-EBADF
);
292 xprt
= xprt_alloc(args
->net
, sizeof(*new_xprt
),
293 RPCRDMA_MAX_BC_REQUESTS
,
294 RPCRDMA_MAX_BC_REQUESTS
);
296 dprintk("RPC: %s: couldn't allocate rpc_xprt\n",
298 return ERR_PTR(-ENOMEM
);
301 xprt
->timeout
= &xprt_rdma_bc_timeout
;
302 xprt_set_bound(xprt
);
303 xprt_set_connected(xprt
);
304 xprt
->bind_timeout
= RPCRDMA_BIND_TO
;
305 xprt
->reestablish_timeout
= RPCRDMA_INIT_REEST_TO
;
306 xprt
->idle_timeout
= RPCRDMA_IDLE_DISC_TO
;
308 xprt
->prot
= XPRT_TRANSPORT_BC_RDMA
;
310 xprt
->ops
= &xprt_rdma_bc_procs
;
312 memcpy(&xprt
->addr
, args
->dstaddr
, args
->addrlen
);
313 xprt
->addrlen
= args
->addrlen
;
314 xprt_rdma_format_addresses(xprt
, (struct sockaddr
*)&xprt
->addr
);
317 xprt
->max_payload
= xprt_rdma_max_inline_read
;
319 new_xprt
= rpcx_to_rdmax(xprt
);
320 new_xprt
->rx_buf
.rb_bc_max_requests
= xprt
->max_reqs
;
323 args
->bc_xprt
->xpt_bc_xprt
= xprt
;
324 xprt
->bc_xprt
= args
->bc_xprt
;
326 if (!try_module_get(THIS_MODULE
))
329 /* Final put for backchannel xprt is in __svc_rdma_free */
334 xprt_rdma_free_addresses(xprt
);
335 args
->bc_xprt
->xpt_bc_xprt
= NULL
;
336 args
->bc_xprt
->xpt_bc_xps
= NULL
;
339 return ERR_PTR(-EINVAL
);
342 struct xprt_class xprt_rdma_bc
= {
343 .list
= LIST_HEAD_INIT(xprt_rdma_bc
.list
),
344 .name
= "rdma backchannel",
345 .owner
= THIS_MODULE
,
346 .ident
= XPRT_TRANSPORT_BC_RDMA
,
347 .setup
= xprt_setup_rdma_bc
,