2 * Copyright (c) 2015 Oracle. All rights reserved.
4 * Support for backward direction RPCs on RPC/RDMA (server-side).
7 #include <linux/sunrpc/svc_rdma.h>
10 #define RPCDBG_FACILITY RPCDBG_SVCXPRT
12 #undef SVCRDMA_BACKCHANNEL_DEBUG
14 int svc_rdma_handle_bc_reply(struct rpc_xprt
*xprt
, struct rpcrdma_msg
*rmsgp
,
15 struct xdr_buf
*rcvbuf
)
17 struct rpcrdma_xprt
*r_xprt
= rpcx_to_rdmax(xprt
);
18 struct kvec
*dst
, *src
= &rcvbuf
->head
[0];
27 p
= (__be32
*)src
->iov_base
;
31 #ifdef SVCRDMA_BACKCHANNEL_DEBUG
32 pr_info("%s: xid=%08x, length=%zu\n",
33 __func__
, be32_to_cpu(xid
), len
);
34 pr_info("%s: RPC/RDMA: %*ph\n",
35 __func__
, (int)RPCRDMA_HDRLEN_MIN
, rmsgp
);
36 pr_info("%s: RPC: %*ph\n",
37 __func__
, (int)len
, p
);
41 if (src
->iov_len
< 24)
44 spin_lock_bh(&xprt
->transport_lock
);
45 req
= xprt_lookup_rqst(xprt
, xid
);
49 dst
= &req
->rq_private_buf
.head
[0];
50 memcpy(&req
->rq_private_buf
, &req
->rq_rcv_buf
, sizeof(struct xdr_buf
));
51 if (dst
->iov_len
< len
)
53 memcpy(dst
->iov_base
, p
, len
);
55 credits
= be32_to_cpu(rmsgp
->rm_credit
);
57 credits
= 1; /* don't deadlock */
58 else if (credits
> r_xprt
->rx_buf
.rb_bc_max_requests
)
59 credits
= r_xprt
->rx_buf
.rb_bc_max_requests
;
62 xprt
->cwnd
= credits
<< RPC_CWNDSHIFT
;
63 if (xprt
->cwnd
> cwnd
)
64 xprt_release_rqst_cong(req
->rq_task
);
67 xprt_complete_rqst(req
->rq_task
, rcvbuf
->len
);
71 spin_unlock_bh(&xprt
->transport_lock
);
76 dprintk("svcrdma: short bc reply: xprt=%p, len=%zu\n",
81 dprintk("svcrdma: unrecognized bc reply: xprt=%p, xid=%08x\n",
82 xprt
, be32_to_cpu(xid
));
87 /* Send a backwards direction RPC call.
89 * Caller holds the connection's mutex and has already marshaled
90 * the RPC/RDMA request.
92 * This is similar to svc_rdma_reply, but takes an rpc_rqst
93 * instead, does not support chunks, and avoids blocking memory
96 * XXX: There is still an opportunity to block in svc_rdma_send()
97 * if there are no SQ entries to post the Send. This may occur if
98 * the adapter has a small maximum SQ depth.
100 static int svc_rdma_bc_sendto(struct svcxprt_rdma
*rdma
,
101 struct rpc_rqst
*rqst
)
103 struct xdr_buf
*sndbuf
= &rqst
->rq_snd_buf
;
104 struct svc_rdma_op_ctxt
*ctxt
;
105 struct svc_rdma_req_map
*vec
;
106 struct ib_send_wr send_wr
;
109 vec
= svc_rdma_get_req_map(rdma
);
110 ret
= svc_rdma_map_xdr(rdma
, sndbuf
, vec
, false);
114 ret
= svc_rdma_repost_recv(rdma
, GFP_NOIO
);
118 ctxt
= svc_rdma_get_context(rdma
);
119 ctxt
->pages
[0] = virt_to_page(rqst
->rq_buffer
);
122 ctxt
->direction
= DMA_TO_DEVICE
;
123 ctxt
->sge
[0].lkey
= rdma
->sc_pd
->local_dma_lkey
;
124 ctxt
->sge
[0].length
= sndbuf
->len
;
126 ib_dma_map_page(rdma
->sc_cm_id
->device
, ctxt
->pages
[0], 0,
127 sndbuf
->len
, DMA_TO_DEVICE
);
128 if (ib_dma_mapping_error(rdma
->sc_cm_id
->device
, ctxt
->sge
[0].addr
)) {
132 svc_rdma_count_mappings(rdma
, ctxt
);
134 memset(&send_wr
, 0, sizeof(send_wr
));
135 ctxt
->cqe
.done
= svc_rdma_wc_send
;
136 send_wr
.wr_cqe
= &ctxt
->cqe
;
137 send_wr
.sg_list
= ctxt
->sge
;
139 send_wr
.opcode
= IB_WR_SEND
;
140 send_wr
.send_flags
= IB_SEND_SIGNALED
;
142 ret
= svc_rdma_send(rdma
, &send_wr
);
149 svc_rdma_put_req_map(rdma
, vec
);
150 dprintk("svcrdma: %s returns %d\n", __func__
, ret
);
154 svc_rdma_unmap_dma(ctxt
);
155 svc_rdma_put_context(ctxt
, 1);
159 /* Server-side transport endpoint wants a whole page for its send
160 * buffer. The client RPC code constructs the RPC header in this
161 * buffer before it invokes ->send_request.
164 xprt_rdma_bc_allocate(struct rpc_task
*task
)
166 struct rpc_rqst
*rqst
= task
->tk_rqstp
;
167 size_t size
= rqst
->rq_callsize
;
170 if (size
> PAGE_SIZE
) {
171 WARN_ONCE(1, "svcrdma: large bc buffer request (size %zu)\n",
176 /* svc_rdma_sendto releases this page */
177 page
= alloc_page(RPCRDMA_DEF_GFP
);
180 rqst
->rq_buffer
= page_address(page
);
182 rqst
->rq_rbuffer
= kmalloc(rqst
->rq_rcvsize
, RPCRDMA_DEF_GFP
);
183 if (!rqst
->rq_rbuffer
) {
191 xprt_rdma_bc_free(struct rpc_task
*task
)
193 struct rpc_rqst
*rqst
= task
->tk_rqstp
;
195 kfree(rqst
->rq_rbuffer
);
199 rpcrdma_bc_send_request(struct svcxprt_rdma
*rdma
, struct rpc_rqst
*rqst
)
201 struct rpc_xprt
*xprt
= rqst
->rq_xprt
;
202 struct rpcrdma_xprt
*r_xprt
= rpcx_to_rdmax(xprt
);
203 struct rpcrdma_msg
*headerp
= (struct rpcrdma_msg
*)rqst
->rq_buffer
;
206 /* Space in the send buffer for an RPC/RDMA header is reserved
207 * via xprt->tsh_size.
209 headerp
->rm_xid
= rqst
->rq_xid
;
210 headerp
->rm_vers
= rpcrdma_version
;
211 headerp
->rm_credit
= cpu_to_be32(r_xprt
->rx_buf
.rb_bc_max_requests
);
212 headerp
->rm_type
= rdma_msg
;
213 headerp
->rm_body
.rm_chunks
[0] = xdr_zero
;
214 headerp
->rm_body
.rm_chunks
[1] = xdr_zero
;
215 headerp
->rm_body
.rm_chunks
[2] = xdr_zero
;
217 #ifdef SVCRDMA_BACKCHANNEL_DEBUG
218 pr_info("%s: %*ph\n", __func__
, 64, rqst
->rq_buffer
);
221 rc
= svc_rdma_bc_sendto(rdma
, rqst
);
223 goto drop_connection
;
227 dprintk("svcrdma: failed to send bc call\n");
228 xprt_disconnect_done(xprt
);
232 /* Send an RPC call on the passive end of a transport
236 xprt_rdma_bc_send_request(struct rpc_task
*task
)
238 struct rpc_rqst
*rqst
= task
->tk_rqstp
;
239 struct svc_xprt
*sxprt
= rqst
->rq_xprt
->bc_xprt
;
240 struct svcxprt_rdma
*rdma
;
243 dprintk("svcrdma: sending bc call with xid: %08x\n",
244 be32_to_cpu(rqst
->rq_xid
));
246 if (!mutex_trylock(&sxprt
->xpt_mutex
)) {
247 rpc_sleep_on(&sxprt
->xpt_bc_pending
, task
, NULL
);
248 if (!mutex_trylock(&sxprt
->xpt_mutex
))
250 rpc_wake_up_queued_task(&sxprt
->xpt_bc_pending
, task
);
254 rdma
= container_of(sxprt
, struct svcxprt_rdma
, sc_xprt
);
255 if (!test_bit(XPT_DEAD
, &sxprt
->xpt_flags
))
256 ret
= rpcrdma_bc_send_request(rdma
, rqst
);
258 mutex_unlock(&sxprt
->xpt_mutex
);
266 xprt_rdma_bc_close(struct rpc_xprt
*xprt
)
268 dprintk("svcrdma: %s: xprt %p\n", __func__
, xprt
);
272 xprt_rdma_bc_put(struct rpc_xprt
*xprt
)
274 dprintk("svcrdma: %s: xprt %p\n", __func__
, xprt
);
277 module_put(THIS_MODULE
);
280 static struct rpc_xprt_ops xprt_rdma_bc_procs
= {
281 .reserve_xprt
= xprt_reserve_xprt_cong
,
282 .release_xprt
= xprt_release_xprt_cong
,
283 .alloc_slot
= xprt_alloc_slot
,
284 .release_request
= xprt_release_rqst_cong
,
285 .buf_alloc
= xprt_rdma_bc_allocate
,
286 .buf_free
= xprt_rdma_bc_free
,
287 .send_request
= xprt_rdma_bc_send_request
,
288 .set_retrans_timeout
= xprt_set_retrans_timeout_def
,
289 .close
= xprt_rdma_bc_close
,
290 .destroy
= xprt_rdma_bc_put
,
291 .print_stats
= xprt_rdma_print_stats
294 static const struct rpc_timeout xprt_rdma_bc_timeout
= {
295 .to_initval
= 60 * HZ
,
296 .to_maxval
= 60 * HZ
,
299 /* It shouldn't matter if the number of backchannel session slots
300 * doesn't match the number of RPC/RDMA credits. That just means
301 * one or the other will have extra slots that aren't used.
303 static struct rpc_xprt
*
304 xprt_setup_rdma_bc(struct xprt_create
*args
)
306 struct rpc_xprt
*xprt
;
307 struct rpcrdma_xprt
*new_xprt
;
309 if (args
->addrlen
> sizeof(xprt
->addr
)) {
310 dprintk("RPC: %s: address too large\n", __func__
);
311 return ERR_PTR(-EBADF
);
314 xprt
= xprt_alloc(args
->net
, sizeof(*new_xprt
),
315 RPCRDMA_MAX_BC_REQUESTS
,
316 RPCRDMA_MAX_BC_REQUESTS
);
318 dprintk("RPC: %s: couldn't allocate rpc_xprt\n",
320 return ERR_PTR(-ENOMEM
);
323 xprt
->timeout
= &xprt_rdma_bc_timeout
;
324 xprt_set_bound(xprt
);
325 xprt_set_connected(xprt
);
326 xprt
->bind_timeout
= RPCRDMA_BIND_TO
;
327 xprt
->reestablish_timeout
= RPCRDMA_INIT_REEST_TO
;
328 xprt
->idle_timeout
= RPCRDMA_IDLE_DISC_TO
;
330 xprt
->prot
= XPRT_TRANSPORT_BC_RDMA
;
331 xprt
->tsh_size
= RPCRDMA_HDRLEN_MIN
/ sizeof(__be32
);
332 xprt
->ops
= &xprt_rdma_bc_procs
;
334 memcpy(&xprt
->addr
, args
->dstaddr
, args
->addrlen
);
335 xprt
->addrlen
= args
->addrlen
;
336 xprt_rdma_format_addresses(xprt
, (struct sockaddr
*)&xprt
->addr
);
339 xprt
->max_payload
= xprt_rdma_max_inline_read
;
341 new_xprt
= rpcx_to_rdmax(xprt
);
342 new_xprt
->rx_buf
.rb_bc_max_requests
= xprt
->max_reqs
;
345 args
->bc_xprt
->xpt_bc_xprt
= xprt
;
346 xprt
->bc_xprt
= args
->bc_xprt
;
348 if (!try_module_get(THIS_MODULE
))
351 /* Final put for backchannel xprt is in __svc_rdma_free */
356 xprt_rdma_free_addresses(xprt
);
357 args
->bc_xprt
->xpt_bc_xprt
= NULL
;
358 args
->bc_xprt
->xpt_bc_xps
= NULL
;
361 return ERR_PTR(-EINVAL
);
364 struct xprt_class xprt_rdma_bc
= {
365 .list
= LIST_HEAD_INIT(xprt_rdma_bc
.list
),
366 .name
= "rdma backchannel",
367 .owner
= THIS_MODULE
,
368 .ident
= XPRT_TRANSPORT_BC_RDMA
,
369 .setup
= xprt_setup_rdma_bc
,