2 * Copyright (c) 2015 Oracle. All rights reserved.
4 * Support for backward direction RPCs on RPC/RDMA (server-side).
7 #include <linux/module.h>
8 #include <linux/sunrpc/svc_rdma.h>
11 #define RPCDBG_FACILITY RPCDBG_SVCXPRT
13 #undef SVCRDMA_BACKCHANNEL_DEBUG
15 int svc_rdma_handle_bc_reply(struct rpc_xprt
*xprt
, struct rpcrdma_msg
*rmsgp
,
16 struct xdr_buf
*rcvbuf
)
18 struct rpcrdma_xprt
*r_xprt
= rpcx_to_rdmax(xprt
);
19 struct kvec
*dst
, *src
= &rcvbuf
->head
[0];
28 p
= (__be32
*)src
->iov_base
;
32 #ifdef SVCRDMA_BACKCHANNEL_DEBUG
33 pr_info("%s: xid=%08x, length=%zu\n",
34 __func__
, be32_to_cpu(xid
), len
);
35 pr_info("%s: RPC/RDMA: %*ph\n",
36 __func__
, (int)RPCRDMA_HDRLEN_MIN
, rmsgp
);
37 pr_info("%s: RPC: %*ph\n",
38 __func__
, (int)len
, p
);
42 if (src
->iov_len
< 24)
45 spin_lock_bh(&xprt
->transport_lock
);
46 req
= xprt_lookup_rqst(xprt
, xid
);
50 dst
= &req
->rq_private_buf
.head
[0];
51 memcpy(&req
->rq_private_buf
, &req
->rq_rcv_buf
, sizeof(struct xdr_buf
));
52 if (dst
->iov_len
< len
)
54 memcpy(dst
->iov_base
, p
, len
);
56 credits
= be32_to_cpu(rmsgp
->rm_credit
);
58 credits
= 1; /* don't deadlock */
59 else if (credits
> r_xprt
->rx_buf
.rb_bc_max_requests
)
60 credits
= r_xprt
->rx_buf
.rb_bc_max_requests
;
63 xprt
->cwnd
= credits
<< RPC_CWNDSHIFT
;
64 if (xprt
->cwnd
> cwnd
)
65 xprt_release_rqst_cong(req
->rq_task
);
68 xprt_complete_rqst(req
->rq_task
, rcvbuf
->len
);
72 spin_unlock_bh(&xprt
->transport_lock
);
77 dprintk("svcrdma: short bc reply: xprt=%p, len=%zu\n",
82 dprintk("svcrdma: unrecognized bc reply: xprt=%p, xid=%08x\n",
83 xprt
, be32_to_cpu(xid
));
88 /* Send a backwards direction RPC call.
90 * Caller holds the connection's mutex and has already marshaled
91 * the RPC/RDMA request.
93 * This is similar to svc_rdma_reply, but takes an rpc_rqst
94 * instead, does not support chunks, and avoids blocking memory
97 * XXX: There is still an opportunity to block in svc_rdma_send()
98 * if there are no SQ entries to post the Send. This may occur if
99 * the adapter has a small maximum SQ depth.
101 static int svc_rdma_bc_sendto(struct svcxprt_rdma
*rdma
,
102 struct rpc_rqst
*rqst
)
104 struct xdr_buf
*sndbuf
= &rqst
->rq_snd_buf
;
105 struct svc_rdma_op_ctxt
*ctxt
;
106 struct svc_rdma_req_map
*vec
;
107 struct ib_send_wr send_wr
;
110 vec
= svc_rdma_get_req_map(rdma
);
111 ret
= svc_rdma_map_xdr(rdma
, sndbuf
, vec
, false);
115 ret
= svc_rdma_repost_recv(rdma
, GFP_NOIO
);
119 ctxt
= svc_rdma_get_context(rdma
);
120 ctxt
->pages
[0] = virt_to_page(rqst
->rq_buffer
);
123 ctxt
->direction
= DMA_TO_DEVICE
;
124 ctxt
->sge
[0].lkey
= rdma
->sc_pd
->local_dma_lkey
;
125 ctxt
->sge
[0].length
= sndbuf
->len
;
127 ib_dma_map_page(rdma
->sc_cm_id
->device
, ctxt
->pages
[0], 0,
128 sndbuf
->len
, DMA_TO_DEVICE
);
129 if (ib_dma_mapping_error(rdma
->sc_cm_id
->device
, ctxt
->sge
[0].addr
)) {
133 svc_rdma_count_mappings(rdma
, ctxt
);
135 memset(&send_wr
, 0, sizeof(send_wr
));
136 ctxt
->cqe
.done
= svc_rdma_wc_send
;
137 send_wr
.wr_cqe
= &ctxt
->cqe
;
138 send_wr
.sg_list
= ctxt
->sge
;
140 send_wr
.opcode
= IB_WR_SEND
;
141 send_wr
.send_flags
= IB_SEND_SIGNALED
;
143 ret
= svc_rdma_send(rdma
, &send_wr
);
150 svc_rdma_put_req_map(rdma
, vec
);
151 dprintk("svcrdma: %s returns %d\n", __func__
, ret
);
155 svc_rdma_unmap_dma(ctxt
);
156 svc_rdma_put_context(ctxt
, 1);
160 /* Server-side transport endpoint wants a whole page for its send
161 * buffer. The client RPC code constructs the RPC header in this
162 * buffer before it invokes ->send_request.
165 xprt_rdma_bc_allocate(struct rpc_task
*task
)
167 struct rpc_rqst
*rqst
= task
->tk_rqstp
;
168 size_t size
= rqst
->rq_callsize
;
171 if (size
> PAGE_SIZE
) {
172 WARN_ONCE(1, "svcrdma: large bc buffer request (size %zu)\n",
177 /* svc_rdma_sendto releases this page */
178 page
= alloc_page(RPCRDMA_DEF_GFP
);
181 rqst
->rq_buffer
= page_address(page
);
183 rqst
->rq_rbuffer
= kmalloc(rqst
->rq_rcvsize
, RPCRDMA_DEF_GFP
);
184 if (!rqst
->rq_rbuffer
) {
192 xprt_rdma_bc_free(struct rpc_task
*task
)
194 struct rpc_rqst
*rqst
= task
->tk_rqstp
;
196 kfree(rqst
->rq_rbuffer
);
200 rpcrdma_bc_send_request(struct svcxprt_rdma
*rdma
, struct rpc_rqst
*rqst
)
202 struct rpc_xprt
*xprt
= rqst
->rq_xprt
;
203 struct rpcrdma_xprt
*r_xprt
= rpcx_to_rdmax(xprt
);
207 /* Space in the send buffer for an RPC/RDMA header is reserved
208 * via xprt->tsh_size.
212 *p
++ = rpcrdma_version
;
213 *p
++ = cpu_to_be32(r_xprt
->rx_buf
.rb_bc_max_requests
);
219 #ifdef SVCRDMA_BACKCHANNEL_DEBUG
220 pr_info("%s: %*ph\n", __func__
, 64, rqst
->rq_buffer
);
223 rc
= svc_rdma_bc_sendto(rdma
, rqst
);
225 goto drop_connection
;
229 dprintk("svcrdma: failed to send bc call\n");
230 xprt_disconnect_done(xprt
);
234 /* Send an RPC call on the passive end of a transport
238 xprt_rdma_bc_send_request(struct rpc_task
*task
)
240 struct rpc_rqst
*rqst
= task
->tk_rqstp
;
241 struct svc_xprt
*sxprt
= rqst
->rq_xprt
->bc_xprt
;
242 struct svcxprt_rdma
*rdma
;
245 dprintk("svcrdma: sending bc call with xid: %08x\n",
246 be32_to_cpu(rqst
->rq_xid
));
248 if (!mutex_trylock(&sxprt
->xpt_mutex
)) {
249 rpc_sleep_on(&sxprt
->xpt_bc_pending
, task
, NULL
);
250 if (!mutex_trylock(&sxprt
->xpt_mutex
))
252 rpc_wake_up_queued_task(&sxprt
->xpt_bc_pending
, task
);
256 rdma
= container_of(sxprt
, struct svcxprt_rdma
, sc_xprt
);
257 if (!test_bit(XPT_DEAD
, &sxprt
->xpt_flags
))
258 ret
= rpcrdma_bc_send_request(rdma
, rqst
);
260 mutex_unlock(&sxprt
->xpt_mutex
);
268 xprt_rdma_bc_close(struct rpc_xprt
*xprt
)
270 dprintk("svcrdma: %s: xprt %p\n", __func__
, xprt
);
274 xprt_rdma_bc_put(struct rpc_xprt
*xprt
)
276 dprintk("svcrdma: %s: xprt %p\n", __func__
, xprt
);
279 module_put(THIS_MODULE
);
282 static struct rpc_xprt_ops xprt_rdma_bc_procs
= {
283 .reserve_xprt
= xprt_reserve_xprt_cong
,
284 .release_xprt
= xprt_release_xprt_cong
,
285 .alloc_slot
= xprt_alloc_slot
,
286 .release_request
= xprt_release_rqst_cong
,
287 .buf_alloc
= xprt_rdma_bc_allocate
,
288 .buf_free
= xprt_rdma_bc_free
,
289 .send_request
= xprt_rdma_bc_send_request
,
290 .set_retrans_timeout
= xprt_set_retrans_timeout_def
,
291 .close
= xprt_rdma_bc_close
,
292 .destroy
= xprt_rdma_bc_put
,
293 .print_stats
= xprt_rdma_print_stats
296 static const struct rpc_timeout xprt_rdma_bc_timeout
= {
297 .to_initval
= 60 * HZ
,
298 .to_maxval
= 60 * HZ
,
301 /* It shouldn't matter if the number of backchannel session slots
302 * doesn't match the number of RPC/RDMA credits. That just means
303 * one or the other will have extra slots that aren't used.
305 static struct rpc_xprt
*
306 xprt_setup_rdma_bc(struct xprt_create
*args
)
308 struct rpc_xprt
*xprt
;
309 struct rpcrdma_xprt
*new_xprt
;
311 if (args
->addrlen
> sizeof(xprt
->addr
)) {
312 dprintk("RPC: %s: address too large\n", __func__
);
313 return ERR_PTR(-EBADF
);
316 xprt
= xprt_alloc(args
->net
, sizeof(*new_xprt
),
317 RPCRDMA_MAX_BC_REQUESTS
,
318 RPCRDMA_MAX_BC_REQUESTS
);
320 dprintk("RPC: %s: couldn't allocate rpc_xprt\n",
322 return ERR_PTR(-ENOMEM
);
325 xprt
->timeout
= &xprt_rdma_bc_timeout
;
326 xprt_set_bound(xprt
);
327 xprt_set_connected(xprt
);
328 xprt
->bind_timeout
= RPCRDMA_BIND_TO
;
329 xprt
->reestablish_timeout
= RPCRDMA_INIT_REEST_TO
;
330 xprt
->idle_timeout
= RPCRDMA_IDLE_DISC_TO
;
332 xprt
->prot
= XPRT_TRANSPORT_BC_RDMA
;
333 xprt
->tsh_size
= RPCRDMA_HDRLEN_MIN
/ sizeof(__be32
);
334 xprt
->ops
= &xprt_rdma_bc_procs
;
336 memcpy(&xprt
->addr
, args
->dstaddr
, args
->addrlen
);
337 xprt
->addrlen
= args
->addrlen
;
338 xprt_rdma_format_addresses(xprt
, (struct sockaddr
*)&xprt
->addr
);
341 xprt
->max_payload
= xprt_rdma_max_inline_read
;
343 new_xprt
= rpcx_to_rdmax(xprt
);
344 new_xprt
->rx_buf
.rb_bc_max_requests
= xprt
->max_reqs
;
347 args
->bc_xprt
->xpt_bc_xprt
= xprt
;
348 xprt
->bc_xprt
= args
->bc_xprt
;
350 if (!try_module_get(THIS_MODULE
))
353 /* Final put for backchannel xprt is in __svc_rdma_free */
358 xprt_rdma_free_addresses(xprt
);
359 args
->bc_xprt
->xpt_bc_xprt
= NULL
;
360 args
->bc_xprt
->xpt_bc_xps
= NULL
;
363 return ERR_PTR(-EINVAL
);
366 struct xprt_class xprt_rdma_bc
= {
367 .list
= LIST_HEAD_INIT(xprt_rdma_bc
.list
),
368 .name
= "rdma backchannel",
369 .owner
= THIS_MODULE
,
370 .ident
= XPRT_TRANSPORT_BC_RDMA
,
371 .setup
= xprt_setup_rdma_bc
,