2 * Copyright (c) 2015 Oracle. All rights reserved.
4 * Support for backward direction RPCs on RPC/RDMA (server-side).
7 #include <linux/sunrpc/svc_rdma.h>
10 #define RPCDBG_FACILITY RPCDBG_SVCXPRT
12 #undef SVCRDMA_BACKCHANNEL_DEBUG
14 int svc_rdma_handle_bc_reply(struct rpc_xprt
*xprt
, struct rpcrdma_msg
*rmsgp
,
15 struct xdr_buf
*rcvbuf
)
17 struct rpcrdma_xprt
*r_xprt
= rpcx_to_rdmax(xprt
);
18 struct kvec
*dst
, *src
= &rcvbuf
->head
[0];
27 p
= (__be32
*)src
->iov_base
;
31 #ifdef SVCRDMA_BACKCHANNEL_DEBUG
32 pr_info("%s: xid=%08x, length=%zu\n",
33 __func__
, be32_to_cpu(xid
), len
);
34 pr_info("%s: RPC/RDMA: %*ph\n",
35 __func__
, (int)RPCRDMA_HDRLEN_MIN
, rmsgp
);
36 pr_info("%s: RPC: %*ph\n",
37 __func__
, (int)len
, p
);
41 if (src
->iov_len
< 24)
44 spin_lock_bh(&xprt
->transport_lock
);
45 req
= xprt_lookup_rqst(xprt
, xid
);
49 dst
= &req
->rq_private_buf
.head
[0];
50 memcpy(&req
->rq_private_buf
, &req
->rq_rcv_buf
, sizeof(struct xdr_buf
));
51 if (dst
->iov_len
< len
)
53 memcpy(dst
->iov_base
, p
, len
);
55 credits
= be32_to_cpu(rmsgp
->rm_credit
);
57 credits
= 1; /* don't deadlock */
58 else if (credits
> r_xprt
->rx_buf
.rb_bc_max_requests
)
59 credits
= r_xprt
->rx_buf
.rb_bc_max_requests
;
62 xprt
->cwnd
= credits
<< RPC_CWNDSHIFT
;
63 if (xprt
->cwnd
> cwnd
)
64 xprt_release_rqst_cong(req
->rq_task
);
67 xprt_complete_rqst(req
->rq_task
, rcvbuf
->len
);
71 spin_unlock_bh(&xprt
->transport_lock
);
76 dprintk("svcrdma: short bc reply: xprt=%p, len=%zu\n",
81 dprintk("svcrdma: unrecognized bc reply: xprt=%p, xid=%08x\n",
82 xprt
, be32_to_cpu(xid
));
87 /* Send a backwards direction RPC call.
89 * Caller holds the connection's mutex and has already marshaled
90 * the RPC/RDMA request.
92 * This is similar to svc_rdma_reply, but takes an rpc_rqst
93 * instead, does not support chunks, and avoids blocking memory
96 * XXX: There is still an opportunity to block in svc_rdma_send()
97 * if there are no SQ entries to post the Send. This may occur if
98 * the adapter has a small maximum SQ depth.
100 static int svc_rdma_bc_sendto(struct svcxprt_rdma
*rdma
,
101 struct rpc_rqst
*rqst
)
103 struct xdr_buf
*sndbuf
= &rqst
->rq_snd_buf
;
104 struct svc_rdma_op_ctxt
*ctxt
;
105 struct svc_rdma_req_map
*vec
;
106 struct ib_send_wr send_wr
;
109 vec
= svc_rdma_get_req_map(rdma
);
110 ret
= svc_rdma_map_xdr(rdma
, sndbuf
, vec
, false);
114 ret
= svc_rdma_repost_recv(rdma
, GFP_NOIO
);
118 ctxt
= svc_rdma_get_context(rdma
);
119 ctxt
->pages
[0] = virt_to_page(rqst
->rq_buffer
);
122 ctxt
->direction
= DMA_TO_DEVICE
;
123 ctxt
->sge
[0].lkey
= rdma
->sc_pd
->local_dma_lkey
;
124 ctxt
->sge
[0].length
= sndbuf
->len
;
126 ib_dma_map_page(rdma
->sc_cm_id
->device
, ctxt
->pages
[0], 0,
127 sndbuf
->len
, DMA_TO_DEVICE
);
128 if (ib_dma_mapping_error(rdma
->sc_cm_id
->device
, ctxt
->sge
[0].addr
)) {
132 atomic_inc(&rdma
->sc_dma_used
);
134 memset(&send_wr
, 0, sizeof(send_wr
));
135 ctxt
->cqe
.done
= svc_rdma_wc_send
;
136 send_wr
.wr_cqe
= &ctxt
->cqe
;
137 send_wr
.sg_list
= ctxt
->sge
;
139 send_wr
.opcode
= IB_WR_SEND
;
140 send_wr
.send_flags
= IB_SEND_SIGNALED
;
142 ret
= svc_rdma_send(rdma
, &send_wr
);
149 svc_rdma_put_req_map(rdma
, vec
);
150 dprintk("svcrdma: %s returns %d\n", __func__
, ret
);
154 svc_rdma_unmap_dma(ctxt
);
155 svc_rdma_put_context(ctxt
, 1);
159 /* Server-side transport endpoint wants a whole page for its send
160 * buffer. The client RPC code constructs the RPC header in this
161 * buffer before it invokes ->send_request.
163 * Returns NULL if there was a temporary allocation failure.
166 xprt_rdma_bc_allocate(struct rpc_task
*task
, size_t size
)
168 struct rpc_rqst
*rqst
= task
->tk_rqstp
;
169 struct svc_xprt
*sxprt
= rqst
->rq_xprt
->bc_xprt
;
170 struct svcxprt_rdma
*rdma
;
173 rdma
= container_of(sxprt
, struct svcxprt_rdma
, sc_xprt
);
175 /* Prevent an infinite loop: try to make this case work */
176 if (size
> PAGE_SIZE
)
177 WARN_ONCE(1, "svcrdma: large bc buffer request (size %zu)\n",
180 page
= alloc_page(RPCRDMA_DEF_GFP
);
184 return page_address(page
);
188 xprt_rdma_bc_free(void *buffer
)
190 /* No-op: ctxt and page have already been freed. */
194 rpcrdma_bc_send_request(struct svcxprt_rdma
*rdma
, struct rpc_rqst
*rqst
)
196 struct rpc_xprt
*xprt
= rqst
->rq_xprt
;
197 struct rpcrdma_xprt
*r_xprt
= rpcx_to_rdmax(xprt
);
198 struct rpcrdma_msg
*headerp
= (struct rpcrdma_msg
*)rqst
->rq_buffer
;
201 /* Space in the send buffer for an RPC/RDMA header is reserved
202 * via xprt->tsh_size.
204 headerp
->rm_xid
= rqst
->rq_xid
;
205 headerp
->rm_vers
= rpcrdma_version
;
206 headerp
->rm_credit
= cpu_to_be32(r_xprt
->rx_buf
.rb_bc_max_requests
);
207 headerp
->rm_type
= rdma_msg
;
208 headerp
->rm_body
.rm_chunks
[0] = xdr_zero
;
209 headerp
->rm_body
.rm_chunks
[1] = xdr_zero
;
210 headerp
->rm_body
.rm_chunks
[2] = xdr_zero
;
212 #ifdef SVCRDMA_BACKCHANNEL_DEBUG
213 pr_info("%s: %*ph\n", __func__
, 64, rqst
->rq_buffer
);
216 rc
= svc_rdma_bc_sendto(rdma
, rqst
);
218 goto drop_connection
;
222 dprintk("svcrdma: failed to send bc call\n");
223 xprt_disconnect_done(xprt
);
227 /* Send an RPC call on the passive end of a transport
231 xprt_rdma_bc_send_request(struct rpc_task
*task
)
233 struct rpc_rqst
*rqst
= task
->tk_rqstp
;
234 struct svc_xprt
*sxprt
= rqst
->rq_xprt
->bc_xprt
;
235 struct svcxprt_rdma
*rdma
;
238 dprintk("svcrdma: sending bc call with xid: %08x\n",
239 be32_to_cpu(rqst
->rq_xid
));
241 if (!mutex_trylock(&sxprt
->xpt_mutex
)) {
242 rpc_sleep_on(&sxprt
->xpt_bc_pending
, task
, NULL
);
243 if (!mutex_trylock(&sxprt
->xpt_mutex
))
245 rpc_wake_up_queued_task(&sxprt
->xpt_bc_pending
, task
);
249 rdma
= container_of(sxprt
, struct svcxprt_rdma
, sc_xprt
);
250 if (!test_bit(XPT_DEAD
, &sxprt
->xpt_flags
))
251 ret
= rpcrdma_bc_send_request(rdma
, rqst
);
253 mutex_unlock(&sxprt
->xpt_mutex
);
261 xprt_rdma_bc_close(struct rpc_xprt
*xprt
)
263 dprintk("svcrdma: %s: xprt %p\n", __func__
, xprt
);
267 xprt_rdma_bc_put(struct rpc_xprt
*xprt
)
269 dprintk("svcrdma: %s: xprt %p\n", __func__
, xprt
);
272 module_put(THIS_MODULE
);
275 static struct rpc_xprt_ops xprt_rdma_bc_procs
= {
276 .reserve_xprt
= xprt_reserve_xprt_cong
,
277 .release_xprt
= xprt_release_xprt_cong
,
278 .alloc_slot
= xprt_alloc_slot
,
279 .release_request
= xprt_release_rqst_cong
,
280 .buf_alloc
= xprt_rdma_bc_allocate
,
281 .buf_free
= xprt_rdma_bc_free
,
282 .send_request
= xprt_rdma_bc_send_request
,
283 .set_retrans_timeout
= xprt_set_retrans_timeout_def
,
284 .close
= xprt_rdma_bc_close
,
285 .destroy
= xprt_rdma_bc_put
,
286 .print_stats
= xprt_rdma_print_stats
289 static const struct rpc_timeout xprt_rdma_bc_timeout
= {
290 .to_initval
= 60 * HZ
,
291 .to_maxval
= 60 * HZ
,
294 /* It shouldn't matter if the number of backchannel session slots
295 * doesn't match the number of RPC/RDMA credits. That just means
296 * one or the other will have extra slots that aren't used.
298 static struct rpc_xprt
*
299 xprt_setup_rdma_bc(struct xprt_create
*args
)
301 struct rpc_xprt
*xprt
;
302 struct rpcrdma_xprt
*new_xprt
;
304 if (args
->addrlen
> sizeof(xprt
->addr
)) {
305 dprintk("RPC: %s: address too large\n", __func__
);
306 return ERR_PTR(-EBADF
);
309 xprt
= xprt_alloc(args
->net
, sizeof(*new_xprt
),
310 RPCRDMA_MAX_BC_REQUESTS
,
311 RPCRDMA_MAX_BC_REQUESTS
);
313 dprintk("RPC: %s: couldn't allocate rpc_xprt\n",
315 return ERR_PTR(-ENOMEM
);
318 xprt
->timeout
= &xprt_rdma_bc_timeout
;
319 xprt_set_bound(xprt
);
320 xprt_set_connected(xprt
);
321 xprt
->bind_timeout
= RPCRDMA_BIND_TO
;
322 xprt
->reestablish_timeout
= RPCRDMA_INIT_REEST_TO
;
323 xprt
->idle_timeout
= RPCRDMA_IDLE_DISC_TO
;
325 xprt
->prot
= XPRT_TRANSPORT_BC_RDMA
;
326 xprt
->tsh_size
= RPCRDMA_HDRLEN_MIN
/ sizeof(__be32
);
327 xprt
->ops
= &xprt_rdma_bc_procs
;
329 memcpy(&xprt
->addr
, args
->dstaddr
, args
->addrlen
);
330 xprt
->addrlen
= args
->addrlen
;
331 xprt_rdma_format_addresses(xprt
, (struct sockaddr
*)&xprt
->addr
);
334 xprt
->max_payload
= xprt_rdma_max_inline_read
;
336 new_xprt
= rpcx_to_rdmax(xprt
);
337 new_xprt
->rx_buf
.rb_bc_max_requests
= xprt
->max_reqs
;
340 args
->bc_xprt
->xpt_bc_xprt
= xprt
;
341 xprt
->bc_xprt
= args
->bc_xprt
;
343 if (!try_module_get(THIS_MODULE
))
346 /* Final put for backchannel xprt is in __svc_rdma_free */
351 xprt_rdma_free_addresses(xprt
);
352 args
->bc_xprt
->xpt_bc_xprt
= NULL
;
355 return ERR_PTR(-EINVAL
);
358 struct xprt_class xprt_rdma_bc
= {
359 .list
= LIST_HEAD_INIT(xprt_rdma_bc
.list
),
360 .name
= "rdma backchannel",
361 .owner
= THIS_MODULE
,
362 .ident
= XPRT_TRANSPORT_BC_RDMA
,
363 .setup
= xprt_setup_rdma_bc
,