1 // SPDX-License-Identifier: GPL-2.0
3 * Copyright (c) 2015-2020, Oracle and/or its affiliates.
5 * Support for reverse-direction RPCs on RPC/RDMA.
8 #include <linux/sunrpc/xprt.h>
9 #include <linux/sunrpc/svc.h>
10 #include <linux/sunrpc/svc_xprt.h>
11 #include <linux/sunrpc/svc_rdma.h>
13 #include "xprt_rdma.h"
14 #include <trace/events/rpcrdma.h>
16 #undef RPCRDMA_BACKCHANNEL_DEBUG
19 * xprt_rdma_bc_setup - Pre-allocate resources for handling backchannel requests
20 * @xprt: transport associated with these backchannel resources
21 * @reqs: number of concurrent incoming requests to expect
23 * Returns 0 on success; otherwise a negative errno
25 int xprt_rdma_bc_setup(struct rpc_xprt
*xprt
, unsigned int reqs
)
27 struct rpcrdma_xprt
*r_xprt
= rpcx_to_rdmax(xprt
);
29 r_xprt
->rx_buf
.rb_bc_srv_max_requests
= RPCRDMA_BACKWARD_WRS
>> 1;
30 trace_xprtrdma_cb_setup(r_xprt
, reqs
);
35 * xprt_rdma_bc_maxpayload - Return maximum backchannel message size
38 * Returns maximum size, in bytes, of a backchannel message
40 size_t xprt_rdma_bc_maxpayload(struct rpc_xprt
*xprt
)
42 struct rpcrdma_xprt
*r_xprt
= rpcx_to_rdmax(xprt
);
43 struct rpcrdma_ep
*ep
= r_xprt
->rx_ep
;
46 maxmsg
= min_t(unsigned int, ep
->re_inline_send
, ep
->re_inline_recv
);
47 maxmsg
= min_t(unsigned int, maxmsg
, PAGE_SIZE
);
48 return maxmsg
- RPCRDMA_HDRLEN_MIN
;
51 unsigned int xprt_rdma_bc_max_slots(struct rpc_xprt
*xprt
)
53 return RPCRDMA_BACKWARD_WRS
>> 1;
56 static int rpcrdma_bc_marshal_reply(struct rpc_rqst
*rqst
)
58 struct rpcrdma_xprt
*r_xprt
= rpcx_to_rdmax(rqst
->rq_xprt
);
59 struct rpcrdma_req
*req
= rpcr_to_rdmar(rqst
);
62 rpcrdma_set_xdrlen(&req
->rl_hdrbuf
, 0);
63 xdr_init_encode(&req
->rl_stream
, &req
->rl_hdrbuf
,
64 rdmab_data(req
->rl_rdmabuf
), rqst
);
66 p
= xdr_reserve_space(&req
->rl_stream
, 28);
70 *p
++ = rpcrdma_version
;
71 *p
++ = cpu_to_be32(r_xprt
->rx_buf
.rb_bc_srv_max_requests
);
77 if (rpcrdma_prepare_send_sges(r_xprt
, req
, RPCRDMA_HDRLEN_MIN
,
78 &rqst
->rq_snd_buf
, rpcrdma_noch_pullup
))
81 trace_xprtrdma_cb_reply(r_xprt
, rqst
);
86 * xprt_rdma_bc_send_reply - marshal and send a backchannel reply
87 * @rqst: RPC rqst with a backchannel RPC reply in rq_snd_buf
89 * Caller holds the transport's write lock.
92 * %0 if the RPC message has been sent
93 * %-ENOTCONN if the caller should reconnect and call again
94 * %-EIO if a permanent error occurred and the request was not
95 * sent. Do not try to send this message again.
97 int xprt_rdma_bc_send_reply(struct rpc_rqst
*rqst
)
99 struct rpc_xprt
*xprt
= rqst
->rq_xprt
;
100 struct rpcrdma_xprt
*r_xprt
= rpcx_to_rdmax(xprt
);
101 struct rpcrdma_req
*req
= rpcr_to_rdmar(rqst
);
104 if (!xprt_connected(xprt
))
107 if (!xprt_request_get_cong(xprt
, rqst
))
110 rc
= rpcrdma_bc_marshal_reply(rqst
);
114 if (frwr_send(r_xprt
, req
))
115 goto drop_connection
;
122 xprt_rdma_close(xprt
);
127 * xprt_rdma_bc_destroy - Release resources for handling backchannel requests
128 * @xprt: transport associated with these backchannel resources
129 * @reqs: number of incoming requests to destroy; ignored
131 void xprt_rdma_bc_destroy(struct rpc_xprt
*xprt
, unsigned int reqs
)
133 struct rpc_rqst
*rqst
, *tmp
;
135 spin_lock(&xprt
->bc_pa_lock
);
136 list_for_each_entry_safe(rqst
, tmp
, &xprt
->bc_pa_list
, rq_bc_pa_list
) {
137 list_del(&rqst
->rq_bc_pa_list
);
138 spin_unlock(&xprt
->bc_pa_lock
);
140 rpcrdma_req_destroy(rpcr_to_rdmar(rqst
));
142 spin_lock(&xprt
->bc_pa_lock
);
144 spin_unlock(&xprt
->bc_pa_lock
);
148 * xprt_rdma_bc_free_rqst - Release a backchannel rqst
149 * @rqst: request to release
151 void xprt_rdma_bc_free_rqst(struct rpc_rqst
*rqst
)
153 struct rpcrdma_req
*req
= rpcr_to_rdmar(rqst
);
154 struct rpcrdma_rep
*rep
= req
->rl_reply
;
155 struct rpc_xprt
*xprt
= rqst
->rq_xprt
;
156 struct rpcrdma_xprt
*r_xprt
= rpcx_to_rdmax(xprt
);
158 rpcrdma_rep_put(&r_xprt
->rx_buf
, rep
);
159 req
->rl_reply
= NULL
;
161 spin_lock(&xprt
->bc_pa_lock
);
162 list_add_tail(&rqst
->rq_bc_pa_list
, &xprt
->bc_pa_list
);
163 spin_unlock(&xprt
->bc_pa_lock
);
167 static struct rpc_rqst
*rpcrdma_bc_rqst_get(struct rpcrdma_xprt
*r_xprt
)
169 struct rpc_xprt
*xprt
= &r_xprt
->rx_xprt
;
170 struct rpcrdma_req
*req
;
171 struct rpc_rqst
*rqst
;
174 spin_lock(&xprt
->bc_pa_lock
);
175 rqst
= list_first_entry_or_null(&xprt
->bc_pa_list
, struct rpc_rqst
,
179 list_del(&rqst
->rq_bc_pa_list
);
180 spin_unlock(&xprt
->bc_pa_lock
);
184 spin_unlock(&xprt
->bc_pa_lock
);
186 /* Set a limit to prevent a remote from overrunning our resources.
188 if (xprt
->bc_alloc_count
>= RPCRDMA_BACKWARD_WRS
)
191 size
= min_t(size_t, r_xprt
->rx_ep
->re_inline_recv
, PAGE_SIZE
);
192 req
= rpcrdma_req_create(r_xprt
, size
);
195 if (rpcrdma_req_setup(r_xprt
, req
)) {
196 rpcrdma_req_destroy(req
);
200 xprt
->bc_alloc_count
++;
201 rqst
= &req
->rl_slot
;
202 rqst
->rq_xprt
= xprt
;
203 __set_bit(RPC_BC_PA_IN_USE
, &rqst
->rq_bc_pa_state
);
204 xdr_buf_init(&rqst
->rq_snd_buf
, rdmab_data(req
->rl_sendbuf
), size
);
209 * rpcrdma_bc_receive_call - Handle a reverse-direction Call
210 * @r_xprt: transport receiving the call
211 * @rep: receive buffer containing the call
213 * Operational assumptions:
214 * o Backchannel credits are ignored, just as the NFS server
215 * forechannel currently does
216 * o The ULP manages a replay cache (eg, NFSv4.1 sessions).
217 * No replay detection is done at the transport level
219 void rpcrdma_bc_receive_call(struct rpcrdma_xprt
*r_xprt
,
220 struct rpcrdma_rep
*rep
)
222 struct rpc_xprt
*xprt
= &r_xprt
->rx_xprt
;
223 struct svc_serv
*bc_serv
;
224 struct rpcrdma_req
*req
;
225 struct rpc_rqst
*rqst
;
230 p
= xdr_inline_decode(&rep
->rr_stream
, 0);
231 size
= xdr_stream_remaining(&rep
->rr_stream
);
233 #ifdef RPCRDMA_BACKCHANNEL_DEBUG
234 pr_info("RPC: %s: callback XID %08x, length=%u\n",
235 __func__
, be32_to_cpup(p
), size
);
236 pr_info("RPC: %s: %*ph\n", __func__
, size
, p
);
239 rqst
= rpcrdma_bc_rqst_get(r_xprt
);
243 rqst
->rq_reply_bytes_recvd
= 0;
246 rqst
->rq_private_buf
.len
= size
;
248 buf
= &rqst
->rq_rcv_buf
;
249 memset(buf
, 0, sizeof(*buf
));
250 buf
->head
[0].iov_base
= p
;
251 buf
->head
[0].iov_len
= size
;
254 /* The receive buffer has to be hooked to the rpcrdma_req
255 * so that it is not released while the req is pointing
256 * to its buffer, and so that it can be reposted after
257 * the Upper Layer is done decoding it.
259 req
= rpcr_to_rdmar(rqst
);
261 trace_xprtrdma_cb_call(r_xprt
, rqst
);
263 /* Queue rqst for ULP's callback service */
264 bc_serv
= xprt
->bc_serv
;
266 lwq_enqueue(&rqst
->rq_bc_list
, &bc_serv
->sv_cb_list
);
268 svc_pool_wake_idle_thread(&bc_serv
->sv_pools
[0]);
270 r_xprt
->rx_stats
.bcall_count
++;
274 pr_warn("RPC/RDMA backchannel overflow\n");
275 xprt_force_disconnect(xprt
);
276 /* This receive buffer gets reposted automatically
277 * when the connection is re-established.