1 // SPDX-License-Identifier: GPL-2.0
3 * Copyright (c) 2015-2018 Oracle. All rights reserved.
5 * Support for reverse-direction RPCs on RPC/RDMA (server-side).
8 #include <linux/sunrpc/svc_rdma.h>
10 #include "xprt_rdma.h"
11 #include <trace/events/rpcrdma.h>
14 * svc_rdma_handle_bc_reply - Process incoming backchannel Reply
15 * @rqstp: resources for handling the Reply
16 * @rctxt: Received message
19 void svc_rdma_handle_bc_reply(struct svc_rqst
*rqstp
,
20 struct svc_rdma_recv_ctxt
*rctxt
)
22 struct svc_xprt
*sxprt
= rqstp
->rq_xprt
;
23 struct rpc_xprt
*xprt
= sxprt
->xpt_bc_xprt
;
24 struct rpcrdma_xprt
*r_xprt
= rpcx_to_rdmax(xprt
);
25 struct xdr_buf
*rcvbuf
= &rqstp
->rq_arg
;
26 struct kvec
*dst
, *src
= &rcvbuf
->head
[0];
27 __be32
*rdma_resp
= rctxt
->rc_recv_buf
;
31 spin_lock(&xprt
->queue_lock
);
32 req
= xprt_lookup_rqst(xprt
, *rdma_resp
);
36 dst
= &req
->rq_private_buf
.head
[0];
37 memcpy(&req
->rq_private_buf
, &req
->rq_rcv_buf
, sizeof(struct xdr_buf
));
38 if (dst
->iov_len
< src
->iov_len
)
40 memcpy(dst
->iov_base
, src
->iov_base
, src
->iov_len
);
42 spin_unlock(&xprt
->queue_lock
);
44 credits
= be32_to_cpup(rdma_resp
+ 2);
46 credits
= 1; /* don't deadlock */
47 else if (credits
> r_xprt
->rx_buf
.rb_bc_max_requests
)
48 credits
= r_xprt
->rx_buf
.rb_bc_max_requests
;
49 spin_lock(&xprt
->transport_lock
);
50 xprt
->cwnd
= credits
<< RPC_CWNDSHIFT
;
51 spin_unlock(&xprt
->transport_lock
);
53 spin_lock(&xprt
->queue_lock
);
54 xprt_complete_rqst(req
->rq_task
, rcvbuf
->len
);
59 spin_unlock(&xprt
->queue_lock
);
62 /* Send a reverse-direction RPC Call.
64 * Caller holds the connection's mutex and has already marshaled
65 * the RPC/RDMA request.
67 * This is similar to svc_rdma_send_reply_msg, but takes a struct
68 * rpc_rqst instead, does not support chunks, and avoids blocking
71 * XXX: There is still an opportunity to block in svc_rdma_send()
72 * if there are no SQ entries to post the Send. This may occur if
73 * the adapter has a small maximum SQ depth.
75 static int svc_rdma_bc_sendto(struct svcxprt_rdma
*rdma
,
76 struct rpc_rqst
*rqst
,
77 struct svc_rdma_send_ctxt
*sctxt
)
79 struct svc_rdma_pcl empty_pcl
;
83 ret
= svc_rdma_map_reply_msg(rdma
, sctxt
, &empty_pcl
, &empty_pcl
,
88 /* Bump page refcnt so Send completion doesn't release
89 * the rq_buffer before all retransmits are complete.
91 get_page(virt_to_page(rqst
->rq_buffer
));
92 sctxt
->sc_send_wr
.opcode
= IB_WR_SEND
;
93 return svc_rdma_post_send(rdma
, sctxt
);
96 /* Server-side transport endpoint wants a whole page for its send
97 * buffer. The client RPC code constructs the RPC header in this
98 * buffer before it invokes ->send_request.
101 xprt_rdma_bc_allocate(struct rpc_task
*task
)
103 struct rpc_rqst
*rqst
= task
->tk_rqstp
;
104 size_t size
= rqst
->rq_callsize
;
107 if (size
> PAGE_SIZE
) {
108 WARN_ONCE(1, "svcrdma: large bc buffer request (size %zu)\n",
113 page
= alloc_page(GFP_NOIO
| __GFP_NOWARN
);
116 rqst
->rq_buffer
= page_address(page
);
118 rqst
->rq_rbuffer
= kmalloc(rqst
->rq_rcvsize
, GFP_NOIO
| __GFP_NOWARN
);
119 if (!rqst
->rq_rbuffer
) {
127 xprt_rdma_bc_free(struct rpc_task
*task
)
129 struct rpc_rqst
*rqst
= task
->tk_rqstp
;
131 put_page(virt_to_page(rqst
->rq_buffer
));
132 kfree(rqst
->rq_rbuffer
);
136 rpcrdma_bc_send_request(struct svcxprt_rdma
*rdma
, struct rpc_rqst
*rqst
)
138 struct rpc_xprt
*xprt
= rqst
->rq_xprt
;
139 struct rpcrdma_xprt
*r_xprt
= rpcx_to_rdmax(xprt
);
140 struct svc_rdma_send_ctxt
*ctxt
;
144 ctxt
= svc_rdma_send_ctxt_get(rdma
);
146 goto drop_connection
;
148 p
= xdr_reserve_space(&ctxt
->sc_stream
, RPCRDMA_HDRLEN_MIN
);
152 *p
++ = rpcrdma_version
;
153 *p
++ = cpu_to_be32(r_xprt
->rx_buf
.rb_bc_max_requests
);
159 rqst
->rq_xtime
= ktime_get();
160 rc
= svc_rdma_bc_sendto(rdma
, rqst
, ctxt
);
166 svc_rdma_send_ctxt_put(rdma
, ctxt
);
173 * xprt_rdma_bc_send_request - Send a reverse-direction Call
174 * @rqst: rpc_rqst containing Call message to be sent
177 * %0 if the message was sent successfully
178 * %ENOTCONN if the message was not sent
180 static int xprt_rdma_bc_send_request(struct rpc_rqst
*rqst
)
182 struct svc_xprt
*sxprt
= rqst
->rq_xprt
->bc_xprt
;
183 struct svcxprt_rdma
*rdma
=
184 container_of(sxprt
, struct svcxprt_rdma
, sc_xprt
);
187 if (test_bit(XPT_DEAD
, &sxprt
->xpt_flags
))
190 ret
= rpcrdma_bc_send_request(rdma
, rqst
);
191 if (ret
== -ENOTCONN
)
192 svc_xprt_close(sxprt
);
197 xprt_rdma_bc_close(struct rpc_xprt
*xprt
)
199 xprt_disconnect_done(xprt
);
200 xprt
->cwnd
= RPC_CWNDSHIFT
;
204 xprt_rdma_bc_put(struct rpc_xprt
*xprt
)
206 xprt_rdma_free_addresses(xprt
);
210 static const struct rpc_xprt_ops xprt_rdma_bc_procs
= {
211 .reserve_xprt
= xprt_reserve_xprt_cong
,
212 .release_xprt
= xprt_release_xprt_cong
,
213 .alloc_slot
= xprt_alloc_slot
,
214 .free_slot
= xprt_free_slot
,
215 .release_request
= xprt_release_rqst_cong
,
216 .buf_alloc
= xprt_rdma_bc_allocate
,
217 .buf_free
= xprt_rdma_bc_free
,
218 .send_request
= xprt_rdma_bc_send_request
,
219 .wait_for_reply_request
= xprt_wait_for_reply_request_def
,
220 .close
= xprt_rdma_bc_close
,
221 .destroy
= xprt_rdma_bc_put
,
222 .print_stats
= xprt_rdma_print_stats
225 static const struct rpc_timeout xprt_rdma_bc_timeout
= {
226 .to_initval
= 60 * HZ
,
227 .to_maxval
= 60 * HZ
,
230 /* It shouldn't matter if the number of backchannel session slots
231 * doesn't match the number of RPC/RDMA credits. That just means
232 * one or the other will have extra slots that aren't used.
234 static struct rpc_xprt
*
235 xprt_setup_rdma_bc(struct xprt_create
*args
)
237 struct rpc_xprt
*xprt
;
238 struct rpcrdma_xprt
*new_xprt
;
240 if (args
->addrlen
> sizeof(xprt
->addr
))
241 return ERR_PTR(-EBADF
);
243 xprt
= xprt_alloc(args
->net
, sizeof(*new_xprt
),
244 RPCRDMA_MAX_BC_REQUESTS
,
245 RPCRDMA_MAX_BC_REQUESTS
);
247 return ERR_PTR(-ENOMEM
);
249 xprt
->timeout
= &xprt_rdma_bc_timeout
;
250 xprt_set_bound(xprt
);
251 xprt_set_connected(xprt
);
252 xprt
->bind_timeout
= 0;
253 xprt
->reestablish_timeout
= 0;
254 xprt
->idle_timeout
= 0;
256 xprt
->prot
= XPRT_TRANSPORT_BC_RDMA
;
257 xprt
->ops
= &xprt_rdma_bc_procs
;
259 memcpy(&xprt
->addr
, args
->dstaddr
, args
->addrlen
);
260 xprt
->addrlen
= args
->addrlen
;
261 xprt_rdma_format_addresses(xprt
, (struct sockaddr
*)&xprt
->addr
);
264 xprt
->max_payload
= xprt_rdma_max_inline_read
;
266 new_xprt
= rpcx_to_rdmax(xprt
);
267 new_xprt
->rx_buf
.rb_bc_max_requests
= xprt
->max_reqs
;
270 args
->bc_xprt
->xpt_bc_xprt
= xprt
;
271 xprt
->bc_xprt
= args
->bc_xprt
;
273 /* Final put for backchannel xprt is in __svc_rdma_free */
278 struct xprt_class xprt_rdma_bc
= {
279 .list
= LIST_HEAD_INIT(xprt_rdma_bc
.list
),
280 .name
= "rdma backchannel",
281 .owner
= THIS_MODULE
,
282 .ident
= XPRT_TRANSPORT_BC_RDMA
,
283 .setup
= xprt_setup_rdma_bc
,