2 * Copyright (c) 2015 Oracle. All rights reserved.
4 * Support for backward direction RPCs on RPC/RDMA (server-side).
7 #include <linux/module.h>
8 #include <linux/sunrpc/svc_rdma.h>
11 #define RPCDBG_FACILITY RPCDBG_SVCXPRT
13 #undef SVCRDMA_BACKCHANNEL_DEBUG
16 * svc_rdma_handle_bc_reply - Process incoming backchannel reply
17 * @xprt: controlling backchannel transport
18 * @rdma_resp: pointer to incoming transport header
19 * @rcvbuf: XDR buffer into which to decode the reply
22 * %0 if @rcvbuf is filled in, xprt_complete_rqst called,
23 * %-EAGAIN if server should call ->recvfrom again.
25 int svc_rdma_handle_bc_reply(struct rpc_xprt
*xprt
, __be32
*rdma_resp
,
26 struct xdr_buf
*rcvbuf
)
28 struct rpcrdma_xprt
*r_xprt
= rpcx_to_rdmax(xprt
);
29 struct kvec
*dst
, *src
= &rcvbuf
->head
[0];
38 p
= (__be32
*)src
->iov_base
;
42 #ifdef SVCRDMA_BACKCHANNEL_DEBUG
43 pr_info("%s: xid=%08x, length=%zu\n",
44 __func__
, be32_to_cpu(xid
), len
);
45 pr_info("%s: RPC/RDMA: %*ph\n",
46 __func__
, (int)RPCRDMA_HDRLEN_MIN
, rdma_resp
);
47 pr_info("%s: RPC: %*ph\n",
48 __func__
, (int)len
, p
);
52 if (src
->iov_len
< 24)
55 spin_lock(&xprt
->recv_lock
);
56 req
= xprt_lookup_rqst(xprt
, xid
);
60 dst
= &req
->rq_private_buf
.head
[0];
61 memcpy(&req
->rq_private_buf
, &req
->rq_rcv_buf
, sizeof(struct xdr_buf
));
62 if (dst
->iov_len
< len
)
64 memcpy(dst
->iov_base
, p
, len
);
66 credits
= be32_to_cpup(rdma_resp
+ 2);
68 credits
= 1; /* don't deadlock */
69 else if (credits
> r_xprt
->rx_buf
.rb_bc_max_requests
)
70 credits
= r_xprt
->rx_buf
.rb_bc_max_requests
;
72 spin_lock_bh(&xprt
->transport_lock
);
74 xprt
->cwnd
= credits
<< RPC_CWNDSHIFT
;
75 if (xprt
->cwnd
> cwnd
)
76 xprt_release_rqst_cong(req
->rq_task
);
77 spin_unlock_bh(&xprt
->transport_lock
);
81 xprt_complete_rqst(req
->rq_task
, rcvbuf
->len
);
85 spin_unlock(&xprt
->recv_lock
);
90 dprintk("svcrdma: short bc reply: xprt=%p, len=%zu\n",
95 dprintk("svcrdma: unrecognized bc reply: xprt=%p, xid=%08x\n",
96 xprt
, be32_to_cpu(xid
));
101 /* Send a backwards direction RPC call.
103 * Caller holds the connection's mutex and has already marshaled
104 * the RPC/RDMA request.
106 * This is similar to svc_rdma_send_reply_msg, but takes a struct
107 * rpc_rqst instead, does not support chunks, and avoids blocking
110 * XXX: There is still an opportunity to block in svc_rdma_send()
111 * if there are no SQ entries to post the Send. This may occur if
112 * the adapter has a small maximum SQ depth.
114 static int svc_rdma_bc_sendto(struct svcxprt_rdma
*rdma
,
115 struct rpc_rqst
*rqst
)
117 struct svc_rdma_op_ctxt
*ctxt
;
120 ctxt
= svc_rdma_get_context(rdma
);
122 /* rpcrdma_bc_send_request builds the transport header and
123 * the backchannel RPC message in the same buffer. Thus only
124 * one SGE is needed to send both.
126 ret
= svc_rdma_map_reply_hdr(rdma
, ctxt
, rqst
->rq_buffer
,
127 rqst
->rq_snd_buf
.len
);
131 ret
= svc_rdma_repost_recv(rdma
, GFP_NOIO
);
135 ret
= svc_rdma_post_send_wr(rdma
, ctxt
, 1, 0);
140 dprintk("svcrdma: %s returns %d\n", __func__
, ret
);
144 svc_rdma_unmap_dma(ctxt
);
145 svc_rdma_put_context(ctxt
, 1);
150 /* Server-side transport endpoint wants a whole page for its send
151 * buffer. The client RPC code constructs the RPC header in this
152 * buffer before it invokes ->send_request.
155 xprt_rdma_bc_allocate(struct rpc_task
*task
)
157 struct rpc_rqst
*rqst
= task
->tk_rqstp
;
158 size_t size
= rqst
->rq_callsize
;
161 if (size
> PAGE_SIZE
) {
162 WARN_ONCE(1, "svcrdma: large bc buffer request (size %zu)\n",
167 /* svc_rdma_sendto releases this page */
168 page
= alloc_page(RPCRDMA_DEF_GFP
);
171 rqst
->rq_buffer
= page_address(page
);
173 rqst
->rq_rbuffer
= kmalloc(rqst
->rq_rcvsize
, RPCRDMA_DEF_GFP
);
174 if (!rqst
->rq_rbuffer
) {
182 xprt_rdma_bc_free(struct rpc_task
*task
)
184 struct rpc_rqst
*rqst
= task
->tk_rqstp
;
186 kfree(rqst
->rq_rbuffer
);
190 rpcrdma_bc_send_request(struct svcxprt_rdma
*rdma
, struct rpc_rqst
*rqst
)
192 struct rpc_xprt
*xprt
= rqst
->rq_xprt
;
193 struct rpcrdma_xprt
*r_xprt
= rpcx_to_rdmax(xprt
);
197 /* Space in the send buffer for an RPC/RDMA header is reserved
198 * via xprt->tsh_size.
202 *p
++ = rpcrdma_version
;
203 *p
++ = cpu_to_be32(r_xprt
->rx_buf
.rb_bc_max_requests
);
209 #ifdef SVCRDMA_BACKCHANNEL_DEBUG
210 pr_info("%s: %*ph\n", __func__
, 64, rqst
->rq_buffer
);
213 rc
= svc_rdma_bc_sendto(rdma
, rqst
);
215 goto drop_connection
;
219 dprintk("svcrdma: failed to send bc call\n");
220 xprt_disconnect_done(xprt
);
224 /* Send an RPC call on the passive end of a transport
228 xprt_rdma_bc_send_request(struct rpc_task
*task
)
230 struct rpc_rqst
*rqst
= task
->tk_rqstp
;
231 struct svc_xprt
*sxprt
= rqst
->rq_xprt
->bc_xprt
;
232 struct svcxprt_rdma
*rdma
;
235 dprintk("svcrdma: sending bc call with xid: %08x\n",
236 be32_to_cpu(rqst
->rq_xid
));
238 if (!mutex_trylock(&sxprt
->xpt_mutex
)) {
239 rpc_sleep_on(&sxprt
->xpt_bc_pending
, task
, NULL
);
240 if (!mutex_trylock(&sxprt
->xpt_mutex
))
242 rpc_wake_up_queued_task(&sxprt
->xpt_bc_pending
, task
);
246 rdma
= container_of(sxprt
, struct svcxprt_rdma
, sc_xprt
);
247 if (!test_bit(XPT_DEAD
, &sxprt
->xpt_flags
))
248 ret
= rpcrdma_bc_send_request(rdma
, rqst
);
250 mutex_unlock(&sxprt
->xpt_mutex
);
258 xprt_rdma_bc_close(struct rpc_xprt
*xprt
)
260 dprintk("svcrdma: %s: xprt %p\n", __func__
, xprt
);
264 xprt_rdma_bc_put(struct rpc_xprt
*xprt
)
266 dprintk("svcrdma: %s: xprt %p\n", __func__
, xprt
);
269 module_put(THIS_MODULE
);
272 static const struct rpc_xprt_ops xprt_rdma_bc_procs
= {
273 .reserve_xprt
= xprt_reserve_xprt_cong
,
274 .release_xprt
= xprt_release_xprt_cong
,
275 .alloc_slot
= xprt_alloc_slot
,
276 .release_request
= xprt_release_rqst_cong
,
277 .buf_alloc
= xprt_rdma_bc_allocate
,
278 .buf_free
= xprt_rdma_bc_free
,
279 .send_request
= xprt_rdma_bc_send_request
,
280 .set_retrans_timeout
= xprt_set_retrans_timeout_def
,
281 .close
= xprt_rdma_bc_close
,
282 .destroy
= xprt_rdma_bc_put
,
283 .print_stats
= xprt_rdma_print_stats
286 static const struct rpc_timeout xprt_rdma_bc_timeout
= {
287 .to_initval
= 60 * HZ
,
288 .to_maxval
= 60 * HZ
,
291 /* It shouldn't matter if the number of backchannel session slots
292 * doesn't match the number of RPC/RDMA credits. That just means
293 * one or the other will have extra slots that aren't used.
295 static struct rpc_xprt
*
296 xprt_setup_rdma_bc(struct xprt_create
*args
)
298 struct rpc_xprt
*xprt
;
299 struct rpcrdma_xprt
*new_xprt
;
301 if (args
->addrlen
> sizeof(xprt
->addr
)) {
302 dprintk("RPC: %s: address too large\n", __func__
);
303 return ERR_PTR(-EBADF
);
306 xprt
= xprt_alloc(args
->net
, sizeof(*new_xprt
),
307 RPCRDMA_MAX_BC_REQUESTS
,
308 RPCRDMA_MAX_BC_REQUESTS
);
310 dprintk("RPC: %s: couldn't allocate rpc_xprt\n",
312 return ERR_PTR(-ENOMEM
);
315 xprt
->timeout
= &xprt_rdma_bc_timeout
;
316 xprt_set_bound(xprt
);
317 xprt_set_connected(xprt
);
318 xprt
->bind_timeout
= RPCRDMA_BIND_TO
;
319 xprt
->reestablish_timeout
= RPCRDMA_INIT_REEST_TO
;
320 xprt
->idle_timeout
= RPCRDMA_IDLE_DISC_TO
;
322 xprt
->prot
= XPRT_TRANSPORT_BC_RDMA
;
323 xprt
->tsh_size
= RPCRDMA_HDRLEN_MIN
/ sizeof(__be32
);
324 xprt
->ops
= &xprt_rdma_bc_procs
;
326 memcpy(&xprt
->addr
, args
->dstaddr
, args
->addrlen
);
327 xprt
->addrlen
= args
->addrlen
;
328 xprt_rdma_format_addresses(xprt
, (struct sockaddr
*)&xprt
->addr
);
331 xprt
->max_payload
= xprt_rdma_max_inline_read
;
333 new_xprt
= rpcx_to_rdmax(xprt
);
334 new_xprt
->rx_buf
.rb_bc_max_requests
= xprt
->max_reqs
;
337 args
->bc_xprt
->xpt_bc_xprt
= xprt
;
338 xprt
->bc_xprt
= args
->bc_xprt
;
340 if (!try_module_get(THIS_MODULE
))
343 /* Final put for backchannel xprt is in __svc_rdma_free */
348 xprt_rdma_free_addresses(xprt
);
349 args
->bc_xprt
->xpt_bc_xprt
= NULL
;
350 args
->bc_xprt
->xpt_bc_xps
= NULL
;
353 return ERR_PTR(-EINVAL
);
356 struct xprt_class xprt_rdma_bc
= {
357 .list
= LIST_HEAD_INIT(xprt_rdma_bc
.list
),
358 .name
= "rdma backchannel",
359 .owner
= THIS_MODULE
,
360 .ident
= XPRT_TRANSPORT_BC_RDMA
,
361 .setup
= xprt_setup_rdma_bc
,