1 // SPDX-License-Identifier: GPL-2.0
3 * Copyright (c) 2015-2018 Oracle. All rights reserved.
5 * Support for backward direction RPCs on RPC/RDMA (server-side).
8 #include <linux/sunrpc/svc_rdma.h>
10 #include "xprt_rdma.h"
11 #include <trace/events/rpcrdma.h>
14 * svc_rdma_handle_bc_reply - Process incoming backchannel Reply
15 * @rqstp: resources for handling the Reply
16 * @rctxt: Received message
19 void svc_rdma_handle_bc_reply(struct svc_rqst
*rqstp
,
20 struct svc_rdma_recv_ctxt
*rctxt
)
22 struct svc_xprt
*sxprt
= rqstp
->rq_xprt
;
23 struct rpc_xprt
*xprt
= sxprt
->xpt_bc_xprt
;
24 struct rpcrdma_xprt
*r_xprt
= rpcx_to_rdmax(xprt
);
25 struct xdr_buf
*rcvbuf
= &rqstp
->rq_arg
;
26 struct kvec
*dst
, *src
= &rcvbuf
->head
[0];
27 __be32
*rdma_resp
= rctxt
->rc_recv_buf
;
31 spin_lock(&xprt
->queue_lock
);
32 req
= xprt_lookup_rqst(xprt
, *rdma_resp
);
36 dst
= &req
->rq_private_buf
.head
[0];
37 memcpy(&req
->rq_private_buf
, &req
->rq_rcv_buf
, sizeof(struct xdr_buf
));
38 if (dst
->iov_len
< src
->iov_len
)
40 memcpy(dst
->iov_base
, src
->iov_base
, src
->iov_len
);
42 spin_unlock(&xprt
->queue_lock
);
44 credits
= be32_to_cpup(rdma_resp
+ 2);
46 credits
= 1; /* don't deadlock */
47 else if (credits
> r_xprt
->rx_buf
.rb_bc_max_requests
)
48 credits
= r_xprt
->rx_buf
.rb_bc_max_requests
;
49 spin_lock(&xprt
->transport_lock
);
50 xprt
->cwnd
= credits
<< RPC_CWNDSHIFT
;
51 spin_unlock(&xprt
->transport_lock
);
53 spin_lock(&xprt
->queue_lock
);
54 xprt_complete_rqst(req
->rq_task
, rcvbuf
->len
);
59 spin_unlock(&xprt
->queue_lock
);
62 /* Send a backwards direction RPC call.
64 * Caller holds the connection's mutex and has already marshaled
65 * the RPC/RDMA request.
67 * This is similar to svc_rdma_send_reply_msg, but takes a struct
68 * rpc_rqst instead, does not support chunks, and avoids blocking
71 * XXX: There is still an opportunity to block in svc_rdma_send()
72 * if there are no SQ entries to post the Send. This may occur if
73 * the adapter has a small maximum SQ depth.
75 static int svc_rdma_bc_sendto(struct svcxprt_rdma
*rdma
,
76 struct rpc_rqst
*rqst
,
77 struct svc_rdma_send_ctxt
*sctxt
)
79 struct svc_rdma_recv_ctxt
*rctxt
;
82 rctxt
= svc_rdma_recv_ctxt_get(rdma
);
86 ret
= svc_rdma_map_reply_msg(rdma
, sctxt
, rctxt
, &rqst
->rq_snd_buf
);
87 svc_rdma_recv_ctxt_put(rdma
, rctxt
);
91 /* Bump page refcnt so Send completion doesn't release
92 * the rq_buffer before all retransmits are complete.
94 get_page(virt_to_page(rqst
->rq_buffer
));
95 sctxt
->sc_send_wr
.opcode
= IB_WR_SEND
;
96 return svc_rdma_send(rdma
, sctxt
);
99 /* Server-side transport endpoint wants a whole page for its send
100 * buffer. The client RPC code constructs the RPC header in this
101 * buffer before it invokes ->send_request.
104 xprt_rdma_bc_allocate(struct rpc_task
*task
)
106 struct rpc_rqst
*rqst
= task
->tk_rqstp
;
107 size_t size
= rqst
->rq_callsize
;
110 if (size
> PAGE_SIZE
) {
111 WARN_ONCE(1, "svcrdma: large bc buffer request (size %zu)\n",
116 page
= alloc_page(RPCRDMA_DEF_GFP
);
119 rqst
->rq_buffer
= page_address(page
);
121 rqst
->rq_rbuffer
= kmalloc(rqst
->rq_rcvsize
, RPCRDMA_DEF_GFP
);
122 if (!rqst
->rq_rbuffer
) {
130 xprt_rdma_bc_free(struct rpc_task
*task
)
132 struct rpc_rqst
*rqst
= task
->tk_rqstp
;
134 put_page(virt_to_page(rqst
->rq_buffer
));
135 kfree(rqst
->rq_rbuffer
);
139 rpcrdma_bc_send_request(struct svcxprt_rdma
*rdma
, struct rpc_rqst
*rqst
)
141 struct rpc_xprt
*xprt
= rqst
->rq_xprt
;
142 struct rpcrdma_xprt
*r_xprt
= rpcx_to_rdmax(xprt
);
143 struct svc_rdma_send_ctxt
*ctxt
;
147 ctxt
= svc_rdma_send_ctxt_get(rdma
);
149 goto drop_connection
;
151 p
= xdr_reserve_space(&ctxt
->sc_stream
, RPCRDMA_HDRLEN_MIN
);
155 *p
++ = rpcrdma_version
;
156 *p
++ = cpu_to_be32(r_xprt
->rx_buf
.rb_bc_max_requests
);
162 rqst
->rq_xtime
= ktime_get();
163 rc
= svc_rdma_bc_sendto(rdma
, rqst
, ctxt
);
169 svc_rdma_send_ctxt_put(rdma
, ctxt
);
176 * xprt_rdma_bc_send_request - Send a reverse-direction Call
177 * @rqst: rpc_rqst containing Call message to be sent
180 * %0 if the message was sent successfully
181 * %ENOTCONN if the message was not sent
183 static int xprt_rdma_bc_send_request(struct rpc_rqst
*rqst
)
185 struct svc_xprt
*sxprt
= rqst
->rq_xprt
->bc_xprt
;
186 struct svcxprt_rdma
*rdma
=
187 container_of(sxprt
, struct svcxprt_rdma
, sc_xprt
);
190 if (test_bit(XPT_DEAD
, &sxprt
->xpt_flags
))
193 ret
= rpcrdma_bc_send_request(rdma
, rqst
);
194 if (ret
== -ENOTCONN
)
195 svc_close_xprt(sxprt
);
200 xprt_rdma_bc_close(struct rpc_xprt
*xprt
)
202 xprt_disconnect_done(xprt
);
203 xprt
->cwnd
= RPC_CWNDSHIFT
;
207 xprt_rdma_bc_put(struct rpc_xprt
*xprt
)
209 xprt_rdma_free_addresses(xprt
);
213 static const struct rpc_xprt_ops xprt_rdma_bc_procs
= {
214 .reserve_xprt
= xprt_reserve_xprt_cong
,
215 .release_xprt
= xprt_release_xprt_cong
,
216 .alloc_slot
= xprt_alloc_slot
,
217 .free_slot
= xprt_free_slot
,
218 .release_request
= xprt_release_rqst_cong
,
219 .buf_alloc
= xprt_rdma_bc_allocate
,
220 .buf_free
= xprt_rdma_bc_free
,
221 .send_request
= xprt_rdma_bc_send_request
,
222 .wait_for_reply_request
= xprt_wait_for_reply_request_def
,
223 .close
= xprt_rdma_bc_close
,
224 .destroy
= xprt_rdma_bc_put
,
225 .print_stats
= xprt_rdma_print_stats
228 static const struct rpc_timeout xprt_rdma_bc_timeout
= {
229 .to_initval
= 60 * HZ
,
230 .to_maxval
= 60 * HZ
,
233 /* It shouldn't matter if the number of backchannel session slots
234 * doesn't match the number of RPC/RDMA credits. That just means
235 * one or the other will have extra slots that aren't used.
237 static struct rpc_xprt
*
238 xprt_setup_rdma_bc(struct xprt_create
*args
)
240 struct rpc_xprt
*xprt
;
241 struct rpcrdma_xprt
*new_xprt
;
243 if (args
->addrlen
> sizeof(xprt
->addr
))
244 return ERR_PTR(-EBADF
);
246 xprt
= xprt_alloc(args
->net
, sizeof(*new_xprt
),
247 RPCRDMA_MAX_BC_REQUESTS
,
248 RPCRDMA_MAX_BC_REQUESTS
);
250 return ERR_PTR(-ENOMEM
);
252 xprt
->timeout
= &xprt_rdma_bc_timeout
;
253 xprt_set_bound(xprt
);
254 xprt_set_connected(xprt
);
255 xprt
->bind_timeout
= RPCRDMA_BIND_TO
;
256 xprt
->reestablish_timeout
= RPCRDMA_INIT_REEST_TO
;
257 xprt
->idle_timeout
= RPCRDMA_IDLE_DISC_TO
;
259 xprt
->prot
= XPRT_TRANSPORT_BC_RDMA
;
260 xprt
->ops
= &xprt_rdma_bc_procs
;
262 memcpy(&xprt
->addr
, args
->dstaddr
, args
->addrlen
);
263 xprt
->addrlen
= args
->addrlen
;
264 xprt_rdma_format_addresses(xprt
, (struct sockaddr
*)&xprt
->addr
);
267 xprt
->max_payload
= xprt_rdma_max_inline_read
;
269 new_xprt
= rpcx_to_rdmax(xprt
);
270 new_xprt
->rx_buf
.rb_bc_max_requests
= xprt
->max_reqs
;
273 args
->bc_xprt
->xpt_bc_xprt
= xprt
;
274 xprt
->bc_xprt
= args
->bc_xprt
;
276 /* Final put for backchannel xprt is in __svc_rdma_free */
281 struct xprt_class xprt_rdma_bc
= {
282 .list
= LIST_HEAD_INIT(xprt_rdma_bc
.list
),
283 .name
= "rdma backchannel",
284 .owner
= THIS_MODULE
,
285 .ident
= XPRT_TRANSPORT_BC_RDMA
,
286 .setup
= xprt_setup_rdma_bc
,