Avoid beyond bounds copy while caching ACL
[zen-stable.git] / net / sunrpc / xprtrdma / verbs.c
blob28236bab57f929e1edadd8245e5851a2fb925bc2
1 /*
2 * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
4 * This software is available to you under a choice of one of two
5 * licenses. You may choose to be licensed under the terms of the GNU
6 * General Public License (GPL) Version 2, available from the file
7 * COPYING in the main directory of this source tree, or the BSD-type
8 * license below:
10 * Redistribution and use in source and binary forms, with or without
11 * modification, are permitted provided that the following conditions
12 * are met:
14 * Redistributions of source code must retain the above copyright
15 * notice, this list of conditions and the following disclaimer.
17 * Redistributions in binary form must reproduce the above
18 * copyright notice, this list of conditions and the following
19 * disclaimer in the documentation and/or other materials provided
20 * with the distribution.
22 * Neither the name of the Network Appliance, Inc. nor the names of
23 * its contributors may be used to endorse or promote products
24 * derived from this software without specific prior written
25 * permission.
27 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
28 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
29 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
30 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
31 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
32 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
33 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
34 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
35 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
36 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
37 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
41 * verbs.c
43 * Encapsulates the major functions managing:
44 * o adapters
45 * o endpoints
46 * o connections
47 * o buffer memory
50 #include <linux/interrupt.h>
51 #include <linux/pci.h> /* for Tavor hack below */
52 #include <linux/slab.h>
54 #include "xprt_rdma.h"
57 * Globals/Macros
60 #ifdef RPC_DEBUG
61 # define RPCDBG_FACILITY RPCDBG_TRANS
62 #endif
65 * internal functions
69 * handle replies in tasklet context, using a single, global list
70 * rdma tasklet function -- just turn around and call the func
71 * for all replies on the list
74 static DEFINE_SPINLOCK(rpcrdma_tk_lock_g);
75 static LIST_HEAD(rpcrdma_tasklets_g);
77 static void
78 rpcrdma_run_tasklet(unsigned long data)
80 struct rpcrdma_rep *rep;
81 void (*func)(struct rpcrdma_rep *);
82 unsigned long flags;
84 data = data;
85 spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
86 while (!list_empty(&rpcrdma_tasklets_g)) {
87 rep = list_entry(rpcrdma_tasklets_g.next,
88 struct rpcrdma_rep, rr_list);
89 list_del(&rep->rr_list);
90 func = rep->rr_func;
91 rep->rr_func = NULL;
92 spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
94 if (func)
95 func(rep);
96 else
97 rpcrdma_recv_buffer_put(rep);
99 spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
101 spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
104 static DECLARE_TASKLET(rpcrdma_tasklet_g, rpcrdma_run_tasklet, 0UL);
106 static inline void
107 rpcrdma_schedule_tasklet(struct rpcrdma_rep *rep)
109 unsigned long flags;
111 spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
112 list_add_tail(&rep->rr_list, &rpcrdma_tasklets_g);
113 spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
114 tasklet_schedule(&rpcrdma_tasklet_g);
117 static void
118 rpcrdma_qp_async_error_upcall(struct ib_event *event, void *context)
120 struct rpcrdma_ep *ep = context;
122 dprintk("RPC: %s: QP error %X on device %s ep %p\n",
123 __func__, event->event, event->device->name, context);
124 if (ep->rep_connected == 1) {
125 ep->rep_connected = -EIO;
126 ep->rep_func(ep);
127 wake_up_all(&ep->rep_connect_wait);
131 static void
132 rpcrdma_cq_async_error_upcall(struct ib_event *event, void *context)
134 struct rpcrdma_ep *ep = context;
136 dprintk("RPC: %s: CQ error %X on device %s ep %p\n",
137 __func__, event->event, event->device->name, context);
138 if (ep->rep_connected == 1) {
139 ep->rep_connected = -EIO;
140 ep->rep_func(ep);
141 wake_up_all(&ep->rep_connect_wait);
145 static inline
146 void rpcrdma_event_process(struct ib_wc *wc)
148 struct rpcrdma_mw *frmr;
149 struct rpcrdma_rep *rep =
150 (struct rpcrdma_rep *)(unsigned long) wc->wr_id;
152 dprintk("RPC: %s: event rep %p status %X opcode %X length %u\n",
153 __func__, rep, wc->status, wc->opcode, wc->byte_len);
155 if (!rep) /* send or bind completion that we don't care about */
156 return;
158 if (IB_WC_SUCCESS != wc->status) {
159 dprintk("RPC: %s: WC opcode %d status %X, connection lost\n",
160 __func__, wc->opcode, wc->status);
161 rep->rr_len = ~0U;
162 if (wc->opcode != IB_WC_FAST_REG_MR && wc->opcode != IB_WC_LOCAL_INV)
163 rpcrdma_schedule_tasklet(rep);
164 return;
167 switch (wc->opcode) {
168 case IB_WC_FAST_REG_MR:
169 frmr = (struct rpcrdma_mw *)(unsigned long)wc->wr_id;
170 frmr->r.frmr.state = FRMR_IS_VALID;
171 break;
172 case IB_WC_LOCAL_INV:
173 frmr = (struct rpcrdma_mw *)(unsigned long)wc->wr_id;
174 frmr->r.frmr.state = FRMR_IS_INVALID;
175 break;
176 case IB_WC_RECV:
177 rep->rr_len = wc->byte_len;
178 ib_dma_sync_single_for_cpu(
179 rdmab_to_ia(rep->rr_buffer)->ri_id->device,
180 rep->rr_iov.addr, rep->rr_len, DMA_FROM_DEVICE);
181 /* Keep (only) the most recent credits, after check validity */
182 if (rep->rr_len >= 16) {
183 struct rpcrdma_msg *p =
184 (struct rpcrdma_msg *) rep->rr_base;
185 unsigned int credits = ntohl(p->rm_credit);
186 if (credits == 0) {
187 dprintk("RPC: %s: server"
188 " dropped credits to 0!\n", __func__);
189 /* don't deadlock */
190 credits = 1;
191 } else if (credits > rep->rr_buffer->rb_max_requests) {
192 dprintk("RPC: %s: server"
193 " over-crediting: %d (%d)\n",
194 __func__, credits,
195 rep->rr_buffer->rb_max_requests);
196 credits = rep->rr_buffer->rb_max_requests;
198 atomic_set(&rep->rr_buffer->rb_credits, credits);
200 /* fall through */
201 case IB_WC_BIND_MW:
202 rpcrdma_schedule_tasklet(rep);
203 break;
204 default:
205 dprintk("RPC: %s: unexpected WC event %X\n",
206 __func__, wc->opcode);
207 break;
211 static inline int
212 rpcrdma_cq_poll(struct ib_cq *cq)
214 struct ib_wc wc;
215 int rc;
217 for (;;) {
218 rc = ib_poll_cq(cq, 1, &wc);
219 if (rc < 0) {
220 dprintk("RPC: %s: ib_poll_cq failed %i\n",
221 __func__, rc);
222 return rc;
224 if (rc == 0)
225 break;
227 rpcrdma_event_process(&wc);
230 return 0;
234 * rpcrdma_cq_event_upcall
236 * This upcall handles recv, send, bind and unbind events.
237 * It is reentrant but processes single events in order to maintain
238 * ordering of receives to keep server credits.
240 * It is the responsibility of the scheduled tasklet to return
241 * recv buffers to the pool. NOTE: this affects synchronization of
242 * connection shutdown. That is, the structures required for
243 * the completion of the reply handler must remain intact until
244 * all memory has been reclaimed.
246 * Note that send events are suppressed and do not result in an upcall.
248 static void
249 rpcrdma_cq_event_upcall(struct ib_cq *cq, void *context)
251 int rc;
253 rc = rpcrdma_cq_poll(cq);
254 if (rc)
255 return;
257 rc = ib_req_notify_cq(cq, IB_CQ_NEXT_COMP);
258 if (rc) {
259 dprintk("RPC: %s: ib_req_notify_cq failed %i\n",
260 __func__, rc);
261 return;
264 rpcrdma_cq_poll(cq);
267 #ifdef RPC_DEBUG
268 static const char * const conn[] = {
269 "address resolved",
270 "address error",
271 "route resolved",
272 "route error",
273 "connect request",
274 "connect response",
275 "connect error",
276 "unreachable",
277 "rejected",
278 "established",
279 "disconnected",
280 "device removal"
282 #endif
284 static int
285 rpcrdma_conn_upcall(struct rdma_cm_id *id, struct rdma_cm_event *event)
287 struct rpcrdma_xprt *xprt = id->context;
288 struct rpcrdma_ia *ia = &xprt->rx_ia;
289 struct rpcrdma_ep *ep = &xprt->rx_ep;
290 #ifdef RPC_DEBUG
291 struct sockaddr_in *addr = (struct sockaddr_in *) &ep->rep_remote_addr;
292 #endif
293 struct ib_qp_attr attr;
294 struct ib_qp_init_attr iattr;
295 int connstate = 0;
297 switch (event->event) {
298 case RDMA_CM_EVENT_ADDR_RESOLVED:
299 case RDMA_CM_EVENT_ROUTE_RESOLVED:
300 ia->ri_async_rc = 0;
301 complete(&ia->ri_done);
302 break;
303 case RDMA_CM_EVENT_ADDR_ERROR:
304 ia->ri_async_rc = -EHOSTUNREACH;
305 dprintk("RPC: %s: CM address resolution error, ep 0x%p\n",
306 __func__, ep);
307 complete(&ia->ri_done);
308 break;
309 case RDMA_CM_EVENT_ROUTE_ERROR:
310 ia->ri_async_rc = -ENETUNREACH;
311 dprintk("RPC: %s: CM route resolution error, ep 0x%p\n",
312 __func__, ep);
313 complete(&ia->ri_done);
314 break;
315 case RDMA_CM_EVENT_ESTABLISHED:
316 connstate = 1;
317 ib_query_qp(ia->ri_id->qp, &attr,
318 IB_QP_MAX_QP_RD_ATOMIC | IB_QP_MAX_DEST_RD_ATOMIC,
319 &iattr);
320 dprintk("RPC: %s: %d responder resources"
321 " (%d initiator)\n",
322 __func__, attr.max_dest_rd_atomic, attr.max_rd_atomic);
323 goto connected;
324 case RDMA_CM_EVENT_CONNECT_ERROR:
325 connstate = -ENOTCONN;
326 goto connected;
327 case RDMA_CM_EVENT_UNREACHABLE:
328 connstate = -ENETDOWN;
329 goto connected;
330 case RDMA_CM_EVENT_REJECTED:
331 connstate = -ECONNREFUSED;
332 goto connected;
333 case RDMA_CM_EVENT_DISCONNECTED:
334 connstate = -ECONNABORTED;
335 goto connected;
336 case RDMA_CM_EVENT_DEVICE_REMOVAL:
337 connstate = -ENODEV;
338 connected:
339 dprintk("RPC: %s: %s: %pI4:%u (ep 0x%p event 0x%x)\n",
340 __func__,
341 (event->event <= 11) ? conn[event->event] :
342 "unknown connection error",
343 &addr->sin_addr.s_addr,
344 ntohs(addr->sin_port),
345 ep, event->event);
346 atomic_set(&rpcx_to_rdmax(ep->rep_xprt)->rx_buf.rb_credits, 1);
347 dprintk("RPC: %s: %sconnected\n",
348 __func__, connstate > 0 ? "" : "dis");
349 ep->rep_connected = connstate;
350 ep->rep_func(ep);
351 wake_up_all(&ep->rep_connect_wait);
352 break;
353 default:
354 dprintk("RPC: %s: unexpected CM event %d\n",
355 __func__, event->event);
356 break;
359 #ifdef RPC_DEBUG
360 if (connstate == 1) {
361 int ird = attr.max_dest_rd_atomic;
362 int tird = ep->rep_remote_cma.responder_resources;
363 printk(KERN_INFO "rpcrdma: connection to %pI4:%u "
364 "on %s, memreg %d slots %d ird %d%s\n",
365 &addr->sin_addr.s_addr,
366 ntohs(addr->sin_port),
367 ia->ri_id->device->name,
368 ia->ri_memreg_strategy,
369 xprt->rx_buf.rb_max_requests,
370 ird, ird < 4 && ird < tird / 2 ? " (low!)" : "");
371 } else if (connstate < 0) {
372 printk(KERN_INFO "rpcrdma: connection to %pI4:%u closed (%d)\n",
373 &addr->sin_addr.s_addr,
374 ntohs(addr->sin_port),
375 connstate);
377 #endif
379 return 0;
382 static struct rdma_cm_id *
383 rpcrdma_create_id(struct rpcrdma_xprt *xprt,
384 struct rpcrdma_ia *ia, struct sockaddr *addr)
386 struct rdma_cm_id *id;
387 int rc;
389 init_completion(&ia->ri_done);
391 id = rdma_create_id(rpcrdma_conn_upcall, xprt, RDMA_PS_TCP, IB_QPT_RC);
392 if (IS_ERR(id)) {
393 rc = PTR_ERR(id);
394 dprintk("RPC: %s: rdma_create_id() failed %i\n",
395 __func__, rc);
396 return id;
399 ia->ri_async_rc = -ETIMEDOUT;
400 rc = rdma_resolve_addr(id, NULL, addr, RDMA_RESOLVE_TIMEOUT);
401 if (rc) {
402 dprintk("RPC: %s: rdma_resolve_addr() failed %i\n",
403 __func__, rc);
404 goto out;
406 wait_for_completion_interruptible_timeout(&ia->ri_done,
407 msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT) + 1);
408 rc = ia->ri_async_rc;
409 if (rc)
410 goto out;
412 ia->ri_async_rc = -ETIMEDOUT;
413 rc = rdma_resolve_route(id, RDMA_RESOLVE_TIMEOUT);
414 if (rc) {
415 dprintk("RPC: %s: rdma_resolve_route() failed %i\n",
416 __func__, rc);
417 goto out;
419 wait_for_completion_interruptible_timeout(&ia->ri_done,
420 msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT) + 1);
421 rc = ia->ri_async_rc;
422 if (rc)
423 goto out;
425 return id;
427 out:
428 rdma_destroy_id(id);
429 return ERR_PTR(rc);
433 * Drain any cq, prior to teardown.
435 static void
436 rpcrdma_clean_cq(struct ib_cq *cq)
438 struct ib_wc wc;
439 int count = 0;
441 while (1 == ib_poll_cq(cq, 1, &wc))
442 ++count;
444 if (count)
445 dprintk("RPC: %s: flushed %d events (last 0x%x)\n",
446 __func__, count, wc.opcode);
450 * Exported functions.
454 * Open and initialize an Interface Adapter.
455 * o initializes fields of struct rpcrdma_ia, including
456 * interface and provider attributes and protection zone.
459 rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg)
461 int rc, mem_priv;
462 struct ib_device_attr devattr;
463 struct rpcrdma_ia *ia = &xprt->rx_ia;
465 ia->ri_id = rpcrdma_create_id(xprt, ia, addr);
466 if (IS_ERR(ia->ri_id)) {
467 rc = PTR_ERR(ia->ri_id);
468 goto out1;
471 ia->ri_pd = ib_alloc_pd(ia->ri_id->device);
472 if (IS_ERR(ia->ri_pd)) {
473 rc = PTR_ERR(ia->ri_pd);
474 dprintk("RPC: %s: ib_alloc_pd() failed %i\n",
475 __func__, rc);
476 goto out2;
480 * Query the device to determine if the requested memory
481 * registration strategy is supported. If it isn't, set the
482 * strategy to a globally supported model.
484 rc = ib_query_device(ia->ri_id->device, &devattr);
485 if (rc) {
486 dprintk("RPC: %s: ib_query_device failed %d\n",
487 __func__, rc);
488 goto out2;
491 if (devattr.device_cap_flags & IB_DEVICE_LOCAL_DMA_LKEY) {
492 ia->ri_have_dma_lkey = 1;
493 ia->ri_dma_lkey = ia->ri_id->device->local_dma_lkey;
496 switch (memreg) {
497 case RPCRDMA_MEMWINDOWS:
498 case RPCRDMA_MEMWINDOWS_ASYNC:
499 if (!(devattr.device_cap_flags & IB_DEVICE_MEM_WINDOW)) {
500 dprintk("RPC: %s: MEMWINDOWS registration "
501 "specified but not supported by adapter, "
502 "using slower RPCRDMA_REGISTER\n",
503 __func__);
504 memreg = RPCRDMA_REGISTER;
506 break;
507 case RPCRDMA_MTHCAFMR:
508 if (!ia->ri_id->device->alloc_fmr) {
509 #if RPCRDMA_PERSISTENT_REGISTRATION
510 dprintk("RPC: %s: MTHCAFMR registration "
511 "specified but not supported by adapter, "
512 "using riskier RPCRDMA_ALLPHYSICAL\n",
513 __func__);
514 memreg = RPCRDMA_ALLPHYSICAL;
515 #else
516 dprintk("RPC: %s: MTHCAFMR registration "
517 "specified but not supported by adapter, "
518 "using slower RPCRDMA_REGISTER\n",
519 __func__);
520 memreg = RPCRDMA_REGISTER;
521 #endif
523 break;
524 case RPCRDMA_FRMR:
525 /* Requires both frmr reg and local dma lkey */
526 if ((devattr.device_cap_flags &
527 (IB_DEVICE_MEM_MGT_EXTENSIONS|IB_DEVICE_LOCAL_DMA_LKEY)) !=
528 (IB_DEVICE_MEM_MGT_EXTENSIONS|IB_DEVICE_LOCAL_DMA_LKEY)) {
529 #if RPCRDMA_PERSISTENT_REGISTRATION
530 dprintk("RPC: %s: FRMR registration "
531 "specified but not supported by adapter, "
532 "using riskier RPCRDMA_ALLPHYSICAL\n",
533 __func__);
534 memreg = RPCRDMA_ALLPHYSICAL;
535 #else
536 dprintk("RPC: %s: FRMR registration "
537 "specified but not supported by adapter, "
538 "using slower RPCRDMA_REGISTER\n",
539 __func__);
540 memreg = RPCRDMA_REGISTER;
541 #endif
543 break;
547 * Optionally obtain an underlying physical identity mapping in
548 * order to do a memory window-based bind. This base registration
549 * is protected from remote access - that is enabled only by binding
550 * for the specific bytes targeted during each RPC operation, and
551 * revoked after the corresponding completion similar to a storage
552 * adapter.
554 switch (memreg) {
555 case RPCRDMA_BOUNCEBUFFERS:
556 case RPCRDMA_REGISTER:
557 case RPCRDMA_FRMR:
558 break;
559 #if RPCRDMA_PERSISTENT_REGISTRATION
560 case RPCRDMA_ALLPHYSICAL:
561 mem_priv = IB_ACCESS_LOCAL_WRITE |
562 IB_ACCESS_REMOTE_WRITE |
563 IB_ACCESS_REMOTE_READ;
564 goto register_setup;
565 #endif
566 case RPCRDMA_MEMWINDOWS_ASYNC:
567 case RPCRDMA_MEMWINDOWS:
568 mem_priv = IB_ACCESS_LOCAL_WRITE |
569 IB_ACCESS_MW_BIND;
570 goto register_setup;
571 case RPCRDMA_MTHCAFMR:
572 if (ia->ri_have_dma_lkey)
573 break;
574 mem_priv = IB_ACCESS_LOCAL_WRITE;
575 register_setup:
576 ia->ri_bind_mem = ib_get_dma_mr(ia->ri_pd, mem_priv);
577 if (IS_ERR(ia->ri_bind_mem)) {
578 printk(KERN_ALERT "%s: ib_get_dma_mr for "
579 "phys register failed with %lX\n\t"
580 "Will continue with degraded performance\n",
581 __func__, PTR_ERR(ia->ri_bind_mem));
582 memreg = RPCRDMA_REGISTER;
583 ia->ri_bind_mem = NULL;
585 break;
586 default:
587 printk(KERN_ERR "%s: invalid memory registration mode %d\n",
588 __func__, memreg);
589 rc = -EINVAL;
590 goto out2;
592 dprintk("RPC: %s: memory registration strategy is %d\n",
593 __func__, memreg);
595 /* Else will do memory reg/dereg for each chunk */
596 ia->ri_memreg_strategy = memreg;
598 return 0;
599 out2:
600 rdma_destroy_id(ia->ri_id);
601 ia->ri_id = NULL;
602 out1:
603 return rc;
607 * Clean up/close an IA.
608 * o if event handles and PD have been initialized, free them.
609 * o close the IA
611 void
612 rpcrdma_ia_close(struct rpcrdma_ia *ia)
614 int rc;
616 dprintk("RPC: %s: entering\n", __func__);
617 if (ia->ri_bind_mem != NULL) {
618 rc = ib_dereg_mr(ia->ri_bind_mem);
619 dprintk("RPC: %s: ib_dereg_mr returned %i\n",
620 __func__, rc);
622 if (ia->ri_id != NULL && !IS_ERR(ia->ri_id)) {
623 if (ia->ri_id->qp)
624 rdma_destroy_qp(ia->ri_id);
625 rdma_destroy_id(ia->ri_id);
626 ia->ri_id = NULL;
628 if (ia->ri_pd != NULL && !IS_ERR(ia->ri_pd)) {
629 rc = ib_dealloc_pd(ia->ri_pd);
630 dprintk("RPC: %s: ib_dealloc_pd returned %i\n",
631 __func__, rc);
636 * Create unconnected endpoint.
639 rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
640 struct rpcrdma_create_data_internal *cdata)
642 struct ib_device_attr devattr;
643 int rc, err;
645 rc = ib_query_device(ia->ri_id->device, &devattr);
646 if (rc) {
647 dprintk("RPC: %s: ib_query_device failed %d\n",
648 __func__, rc);
649 return rc;
652 /* check provider's send/recv wr limits */
653 if (cdata->max_requests > devattr.max_qp_wr)
654 cdata->max_requests = devattr.max_qp_wr;
656 ep->rep_attr.event_handler = rpcrdma_qp_async_error_upcall;
657 ep->rep_attr.qp_context = ep;
658 /* send_cq and recv_cq initialized below */
659 ep->rep_attr.srq = NULL;
660 ep->rep_attr.cap.max_send_wr = cdata->max_requests;
661 switch (ia->ri_memreg_strategy) {
662 case RPCRDMA_FRMR:
663 /* Add room for frmr register and invalidate WRs.
664 * 1. FRMR reg WR for head
665 * 2. FRMR invalidate WR for head
666 * 3. FRMR reg WR for pagelist
667 * 4. FRMR invalidate WR for pagelist
668 * 5. FRMR reg WR for tail
669 * 6. FRMR invalidate WR for tail
670 * 7. The RDMA_SEND WR
672 ep->rep_attr.cap.max_send_wr *= 7;
673 if (ep->rep_attr.cap.max_send_wr > devattr.max_qp_wr) {
674 cdata->max_requests = devattr.max_qp_wr / 7;
675 if (!cdata->max_requests)
676 return -EINVAL;
677 ep->rep_attr.cap.max_send_wr = cdata->max_requests * 7;
679 break;
680 case RPCRDMA_MEMWINDOWS_ASYNC:
681 case RPCRDMA_MEMWINDOWS:
682 /* Add room for mw_binds+unbinds - overkill! */
683 ep->rep_attr.cap.max_send_wr++;
684 ep->rep_attr.cap.max_send_wr *= (2 * RPCRDMA_MAX_SEGS);
685 if (ep->rep_attr.cap.max_send_wr > devattr.max_qp_wr)
686 return -EINVAL;
687 break;
688 default:
689 break;
691 ep->rep_attr.cap.max_recv_wr = cdata->max_requests;
692 ep->rep_attr.cap.max_send_sge = (cdata->padding ? 4 : 2);
693 ep->rep_attr.cap.max_recv_sge = 1;
694 ep->rep_attr.cap.max_inline_data = 0;
695 ep->rep_attr.sq_sig_type = IB_SIGNAL_REQ_WR;
696 ep->rep_attr.qp_type = IB_QPT_RC;
697 ep->rep_attr.port_num = ~0;
699 dprintk("RPC: %s: requested max: dtos: send %d recv %d; "
700 "iovs: send %d recv %d\n",
701 __func__,
702 ep->rep_attr.cap.max_send_wr,
703 ep->rep_attr.cap.max_recv_wr,
704 ep->rep_attr.cap.max_send_sge,
705 ep->rep_attr.cap.max_recv_sge);
707 /* set trigger for requesting send completion */
708 ep->rep_cqinit = ep->rep_attr.cap.max_send_wr/2 /* - 1*/;
709 switch (ia->ri_memreg_strategy) {
710 case RPCRDMA_MEMWINDOWS_ASYNC:
711 case RPCRDMA_MEMWINDOWS:
712 ep->rep_cqinit -= RPCRDMA_MAX_SEGS;
713 break;
714 default:
715 break;
717 if (ep->rep_cqinit <= 2)
718 ep->rep_cqinit = 0;
719 INIT_CQCOUNT(ep);
720 ep->rep_ia = ia;
721 init_waitqueue_head(&ep->rep_connect_wait);
724 * Create a single cq for receive dto and mw_bind (only ever
725 * care about unbind, really). Send completions are suppressed.
726 * Use single threaded tasklet upcalls to maintain ordering.
728 ep->rep_cq = ib_create_cq(ia->ri_id->device, rpcrdma_cq_event_upcall,
729 rpcrdma_cq_async_error_upcall, NULL,
730 ep->rep_attr.cap.max_recv_wr +
731 ep->rep_attr.cap.max_send_wr + 1, 0);
732 if (IS_ERR(ep->rep_cq)) {
733 rc = PTR_ERR(ep->rep_cq);
734 dprintk("RPC: %s: ib_create_cq failed: %i\n",
735 __func__, rc);
736 goto out1;
739 rc = ib_req_notify_cq(ep->rep_cq, IB_CQ_NEXT_COMP);
740 if (rc) {
741 dprintk("RPC: %s: ib_req_notify_cq failed: %i\n",
742 __func__, rc);
743 goto out2;
746 ep->rep_attr.send_cq = ep->rep_cq;
747 ep->rep_attr.recv_cq = ep->rep_cq;
749 /* Initialize cma parameters */
751 /* RPC/RDMA does not use private data */
752 ep->rep_remote_cma.private_data = NULL;
753 ep->rep_remote_cma.private_data_len = 0;
755 /* Client offers RDMA Read but does not initiate */
756 ep->rep_remote_cma.initiator_depth = 0;
757 if (ia->ri_memreg_strategy == RPCRDMA_BOUNCEBUFFERS)
758 ep->rep_remote_cma.responder_resources = 0;
759 else if (devattr.max_qp_rd_atom > 32) /* arbitrary but <= 255 */
760 ep->rep_remote_cma.responder_resources = 32;
761 else
762 ep->rep_remote_cma.responder_resources = devattr.max_qp_rd_atom;
764 ep->rep_remote_cma.retry_count = 7;
765 ep->rep_remote_cma.flow_control = 0;
766 ep->rep_remote_cma.rnr_retry_count = 0;
768 return 0;
770 out2:
771 err = ib_destroy_cq(ep->rep_cq);
772 if (err)
773 dprintk("RPC: %s: ib_destroy_cq returned %i\n",
774 __func__, err);
775 out1:
776 return rc;
780 * rpcrdma_ep_destroy
782 * Disconnect and destroy endpoint. After this, the only
783 * valid operations on the ep are to free it (if dynamically
784 * allocated) or re-create it.
786 * The caller's error handling must be sure to not leak the endpoint
787 * if this function fails.
790 rpcrdma_ep_destroy(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
792 int rc;
794 dprintk("RPC: %s: entering, connected is %d\n",
795 __func__, ep->rep_connected);
797 if (ia->ri_id->qp) {
798 rc = rpcrdma_ep_disconnect(ep, ia);
799 if (rc)
800 dprintk("RPC: %s: rpcrdma_ep_disconnect"
801 " returned %i\n", __func__, rc);
802 rdma_destroy_qp(ia->ri_id);
803 ia->ri_id->qp = NULL;
806 /* padding - could be done in rpcrdma_buffer_destroy... */
807 if (ep->rep_pad_mr) {
808 rpcrdma_deregister_internal(ia, ep->rep_pad_mr, &ep->rep_pad);
809 ep->rep_pad_mr = NULL;
812 rpcrdma_clean_cq(ep->rep_cq);
813 rc = ib_destroy_cq(ep->rep_cq);
814 if (rc)
815 dprintk("RPC: %s: ib_destroy_cq returned %i\n",
816 __func__, rc);
818 return rc;
822 * Connect unconnected endpoint.
825 rpcrdma_ep_connect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
827 struct rdma_cm_id *id;
828 int rc = 0;
829 int retry_count = 0;
831 if (ep->rep_connected != 0) {
832 struct rpcrdma_xprt *xprt;
833 retry:
834 rc = rpcrdma_ep_disconnect(ep, ia);
835 if (rc && rc != -ENOTCONN)
836 dprintk("RPC: %s: rpcrdma_ep_disconnect"
837 " status %i\n", __func__, rc);
838 rpcrdma_clean_cq(ep->rep_cq);
840 xprt = container_of(ia, struct rpcrdma_xprt, rx_ia);
841 id = rpcrdma_create_id(xprt, ia,
842 (struct sockaddr *)&xprt->rx_data.addr);
843 if (IS_ERR(id)) {
844 rc = PTR_ERR(id);
845 goto out;
847 /* TEMP TEMP TEMP - fail if new device:
848 * Deregister/remarshal *all* requests!
849 * Close and recreate adapter, pd, etc!
850 * Re-determine all attributes still sane!
851 * More stuff I haven't thought of!
852 * Rrrgh!
854 if (ia->ri_id->device != id->device) {
855 printk("RPC: %s: can't reconnect on "
856 "different device!\n", __func__);
857 rdma_destroy_id(id);
858 rc = -ENETDOWN;
859 goto out;
861 /* END TEMP */
862 rdma_destroy_qp(ia->ri_id);
863 rdma_destroy_id(ia->ri_id);
864 ia->ri_id = id;
867 rc = rdma_create_qp(ia->ri_id, ia->ri_pd, &ep->rep_attr);
868 if (rc) {
869 dprintk("RPC: %s: rdma_create_qp failed %i\n",
870 __func__, rc);
871 goto out;
874 /* XXX Tavor device performs badly with 2K MTU! */
875 if (strnicmp(ia->ri_id->device->dma_device->bus->name, "pci", 3) == 0) {
876 struct pci_dev *pcid = to_pci_dev(ia->ri_id->device->dma_device);
877 if (pcid->device == PCI_DEVICE_ID_MELLANOX_TAVOR &&
878 (pcid->vendor == PCI_VENDOR_ID_MELLANOX ||
879 pcid->vendor == PCI_VENDOR_ID_TOPSPIN)) {
880 struct ib_qp_attr attr = {
881 .path_mtu = IB_MTU_1024
883 rc = ib_modify_qp(ia->ri_id->qp, &attr, IB_QP_PATH_MTU);
887 ep->rep_connected = 0;
889 rc = rdma_connect(ia->ri_id, &ep->rep_remote_cma);
890 if (rc) {
891 dprintk("RPC: %s: rdma_connect() failed with %i\n",
892 __func__, rc);
893 goto out;
896 wait_event_interruptible(ep->rep_connect_wait, ep->rep_connected != 0);
899 * Check state. A non-peer reject indicates no listener
900 * (ECONNREFUSED), which may be a transient state. All
901 * others indicate a transport condition which has already
902 * undergone a best-effort.
904 if (ep->rep_connected == -ECONNREFUSED &&
905 ++retry_count <= RDMA_CONNECT_RETRY_MAX) {
906 dprintk("RPC: %s: non-peer_reject, retry\n", __func__);
907 goto retry;
909 if (ep->rep_connected <= 0) {
910 /* Sometimes, the only way to reliably connect to remote
911 * CMs is to use same nonzero values for ORD and IRD. */
912 if (retry_count++ <= RDMA_CONNECT_RETRY_MAX + 1 &&
913 (ep->rep_remote_cma.responder_resources == 0 ||
914 ep->rep_remote_cma.initiator_depth !=
915 ep->rep_remote_cma.responder_resources)) {
916 if (ep->rep_remote_cma.responder_resources == 0)
917 ep->rep_remote_cma.responder_resources = 1;
918 ep->rep_remote_cma.initiator_depth =
919 ep->rep_remote_cma.responder_resources;
920 goto retry;
922 rc = ep->rep_connected;
923 } else {
924 dprintk("RPC: %s: connected\n", __func__);
927 out:
928 if (rc)
929 ep->rep_connected = rc;
930 return rc;
934 * rpcrdma_ep_disconnect
936 * This is separate from destroy to facilitate the ability
937 * to reconnect without recreating the endpoint.
939 * This call is not reentrant, and must not be made in parallel
940 * on the same endpoint.
943 rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
945 int rc;
947 rpcrdma_clean_cq(ep->rep_cq);
948 rc = rdma_disconnect(ia->ri_id);
949 if (!rc) {
950 /* returns without wait if not connected */
951 wait_event_interruptible(ep->rep_connect_wait,
952 ep->rep_connected != 1);
953 dprintk("RPC: %s: after wait, %sconnected\n", __func__,
954 (ep->rep_connected == 1) ? "still " : "dis");
955 } else {
956 dprintk("RPC: %s: rdma_disconnect %i\n", __func__, rc);
957 ep->rep_connected = rc;
959 return rc;
963 * Initialize buffer memory
966 rpcrdma_buffer_create(struct rpcrdma_buffer *buf, struct rpcrdma_ep *ep,
967 struct rpcrdma_ia *ia, struct rpcrdma_create_data_internal *cdata)
969 char *p;
970 size_t len;
971 int i, rc;
972 struct rpcrdma_mw *r;
974 buf->rb_max_requests = cdata->max_requests;
975 spin_lock_init(&buf->rb_lock);
976 atomic_set(&buf->rb_credits, 1);
978 /* Need to allocate:
979 * 1. arrays for send and recv pointers
980 * 2. arrays of struct rpcrdma_req to fill in pointers
981 * 3. array of struct rpcrdma_rep for replies
982 * 4. padding, if any
983 * 5. mw's, fmr's or frmr's, if any
984 * Send/recv buffers in req/rep need to be registered
987 len = buf->rb_max_requests *
988 (sizeof(struct rpcrdma_req *) + sizeof(struct rpcrdma_rep *));
989 len += cdata->padding;
990 switch (ia->ri_memreg_strategy) {
991 case RPCRDMA_FRMR:
992 len += buf->rb_max_requests * RPCRDMA_MAX_SEGS *
993 sizeof(struct rpcrdma_mw);
994 break;
995 case RPCRDMA_MTHCAFMR:
996 /* TBD we are perhaps overallocating here */
997 len += (buf->rb_max_requests + 1) * RPCRDMA_MAX_SEGS *
998 sizeof(struct rpcrdma_mw);
999 break;
1000 case RPCRDMA_MEMWINDOWS_ASYNC:
1001 case RPCRDMA_MEMWINDOWS:
1002 len += (buf->rb_max_requests + 1) * RPCRDMA_MAX_SEGS *
1003 sizeof(struct rpcrdma_mw);
1004 break;
1005 default:
1006 break;
1009 /* allocate 1, 4 and 5 in one shot */
1010 p = kzalloc(len, GFP_KERNEL);
1011 if (p == NULL) {
1012 dprintk("RPC: %s: req_t/rep_t/pad kzalloc(%zd) failed\n",
1013 __func__, len);
1014 rc = -ENOMEM;
1015 goto out;
1017 buf->rb_pool = p; /* for freeing it later */
1019 buf->rb_send_bufs = (struct rpcrdma_req **) p;
1020 p = (char *) &buf->rb_send_bufs[buf->rb_max_requests];
1021 buf->rb_recv_bufs = (struct rpcrdma_rep **) p;
1022 p = (char *) &buf->rb_recv_bufs[buf->rb_max_requests];
1025 * Register the zeroed pad buffer, if any.
1027 if (cdata->padding) {
1028 rc = rpcrdma_register_internal(ia, p, cdata->padding,
1029 &ep->rep_pad_mr, &ep->rep_pad);
1030 if (rc)
1031 goto out;
1033 p += cdata->padding;
1036 * Allocate the fmr's, or mw's for mw_bind chunk registration.
1037 * We "cycle" the mw's in order to minimize rkey reuse,
1038 * and also reduce unbind-to-bind collision.
1040 INIT_LIST_HEAD(&buf->rb_mws);
1041 r = (struct rpcrdma_mw *)p;
1042 switch (ia->ri_memreg_strategy) {
1043 case RPCRDMA_FRMR:
1044 for (i = buf->rb_max_requests * RPCRDMA_MAX_SEGS; i; i--) {
1045 r->r.frmr.fr_mr = ib_alloc_fast_reg_mr(ia->ri_pd,
1046 RPCRDMA_MAX_SEGS);
1047 if (IS_ERR(r->r.frmr.fr_mr)) {
1048 rc = PTR_ERR(r->r.frmr.fr_mr);
1049 dprintk("RPC: %s: ib_alloc_fast_reg_mr"
1050 " failed %i\n", __func__, rc);
1051 goto out;
1053 r->r.frmr.fr_pgl =
1054 ib_alloc_fast_reg_page_list(ia->ri_id->device,
1055 RPCRDMA_MAX_SEGS);
1056 if (IS_ERR(r->r.frmr.fr_pgl)) {
1057 rc = PTR_ERR(r->r.frmr.fr_pgl);
1058 dprintk("RPC: %s: "
1059 "ib_alloc_fast_reg_page_list "
1060 "failed %i\n", __func__, rc);
1061 goto out;
1063 list_add(&r->mw_list, &buf->rb_mws);
1064 ++r;
1066 break;
1067 case RPCRDMA_MTHCAFMR:
1068 /* TBD we are perhaps overallocating here */
1069 for (i = (buf->rb_max_requests+1) * RPCRDMA_MAX_SEGS; i; i--) {
1070 static struct ib_fmr_attr fa =
1071 { RPCRDMA_MAX_DATA_SEGS, 1, PAGE_SHIFT };
1072 r->r.fmr = ib_alloc_fmr(ia->ri_pd,
1073 IB_ACCESS_REMOTE_WRITE | IB_ACCESS_REMOTE_READ,
1074 &fa);
1075 if (IS_ERR(r->r.fmr)) {
1076 rc = PTR_ERR(r->r.fmr);
1077 dprintk("RPC: %s: ib_alloc_fmr"
1078 " failed %i\n", __func__, rc);
1079 goto out;
1081 list_add(&r->mw_list, &buf->rb_mws);
1082 ++r;
1084 break;
1085 case RPCRDMA_MEMWINDOWS_ASYNC:
1086 case RPCRDMA_MEMWINDOWS:
1087 /* Allocate one extra request's worth, for full cycling */
1088 for (i = (buf->rb_max_requests+1) * RPCRDMA_MAX_SEGS; i; i--) {
1089 r->r.mw = ib_alloc_mw(ia->ri_pd);
1090 if (IS_ERR(r->r.mw)) {
1091 rc = PTR_ERR(r->r.mw);
1092 dprintk("RPC: %s: ib_alloc_mw"
1093 " failed %i\n", __func__, rc);
1094 goto out;
1096 list_add(&r->mw_list, &buf->rb_mws);
1097 ++r;
1099 break;
1100 default:
1101 break;
1105 * Allocate/init the request/reply buffers. Doing this
1106 * using kmalloc for now -- one for each buf.
1108 for (i = 0; i < buf->rb_max_requests; i++) {
1109 struct rpcrdma_req *req;
1110 struct rpcrdma_rep *rep;
1112 len = cdata->inline_wsize + sizeof(struct rpcrdma_req);
1113 /* RPC layer requests *double* size + 1K RPC_SLACK_SPACE! */
1114 /* Typical ~2400b, so rounding up saves work later */
1115 if (len < 4096)
1116 len = 4096;
1117 req = kmalloc(len, GFP_KERNEL);
1118 if (req == NULL) {
1119 dprintk("RPC: %s: request buffer %d alloc"
1120 " failed\n", __func__, i);
1121 rc = -ENOMEM;
1122 goto out;
1124 memset(req, 0, sizeof(struct rpcrdma_req));
1125 buf->rb_send_bufs[i] = req;
1126 buf->rb_send_bufs[i]->rl_buffer = buf;
1128 rc = rpcrdma_register_internal(ia, req->rl_base,
1129 len - offsetof(struct rpcrdma_req, rl_base),
1130 &buf->rb_send_bufs[i]->rl_handle,
1131 &buf->rb_send_bufs[i]->rl_iov);
1132 if (rc)
1133 goto out;
1135 buf->rb_send_bufs[i]->rl_size = len-sizeof(struct rpcrdma_req);
1137 len = cdata->inline_rsize + sizeof(struct rpcrdma_rep);
1138 rep = kmalloc(len, GFP_KERNEL);
1139 if (rep == NULL) {
1140 dprintk("RPC: %s: reply buffer %d alloc failed\n",
1141 __func__, i);
1142 rc = -ENOMEM;
1143 goto out;
1145 memset(rep, 0, sizeof(struct rpcrdma_rep));
1146 buf->rb_recv_bufs[i] = rep;
1147 buf->rb_recv_bufs[i]->rr_buffer = buf;
1148 init_waitqueue_head(&rep->rr_unbind);
1150 rc = rpcrdma_register_internal(ia, rep->rr_base,
1151 len - offsetof(struct rpcrdma_rep, rr_base),
1152 &buf->rb_recv_bufs[i]->rr_handle,
1153 &buf->rb_recv_bufs[i]->rr_iov);
1154 if (rc)
1155 goto out;
1158 dprintk("RPC: %s: max_requests %d\n",
1159 __func__, buf->rb_max_requests);
1160 /* done */
1161 return 0;
1162 out:
1163 rpcrdma_buffer_destroy(buf);
1164 return rc;
1168 * Unregister and destroy buffer memory. Need to deal with
1169 * partial initialization, so it's callable from failed create.
1170 * Must be called before destroying endpoint, as registrations
1171 * reference it.
1173 void
1174 rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
1176 int rc, i;
1177 struct rpcrdma_ia *ia = rdmab_to_ia(buf);
1178 struct rpcrdma_mw *r;
1180 /* clean up in reverse order from create
1181 * 1. recv mr memory (mr free, then kfree)
1182 * 1a. bind mw memory
1183 * 2. send mr memory (mr free, then kfree)
1184 * 3. padding (if any) [moved to rpcrdma_ep_destroy]
1185 * 4. arrays
1187 dprintk("RPC: %s: entering\n", __func__);
1189 for (i = 0; i < buf->rb_max_requests; i++) {
1190 if (buf->rb_recv_bufs && buf->rb_recv_bufs[i]) {
1191 rpcrdma_deregister_internal(ia,
1192 buf->rb_recv_bufs[i]->rr_handle,
1193 &buf->rb_recv_bufs[i]->rr_iov);
1194 kfree(buf->rb_recv_bufs[i]);
1196 if (buf->rb_send_bufs && buf->rb_send_bufs[i]) {
1197 while (!list_empty(&buf->rb_mws)) {
1198 r = list_entry(buf->rb_mws.next,
1199 struct rpcrdma_mw, mw_list);
1200 list_del(&r->mw_list);
1201 switch (ia->ri_memreg_strategy) {
1202 case RPCRDMA_FRMR:
1203 rc = ib_dereg_mr(r->r.frmr.fr_mr);
1204 if (rc)
1205 dprintk("RPC: %s:"
1206 " ib_dereg_mr"
1207 " failed %i\n",
1208 __func__, rc);
1209 ib_free_fast_reg_page_list(r->r.frmr.fr_pgl);
1210 break;
1211 case RPCRDMA_MTHCAFMR:
1212 rc = ib_dealloc_fmr(r->r.fmr);
1213 if (rc)
1214 dprintk("RPC: %s:"
1215 " ib_dealloc_fmr"
1216 " failed %i\n",
1217 __func__, rc);
1218 break;
1219 case RPCRDMA_MEMWINDOWS_ASYNC:
1220 case RPCRDMA_MEMWINDOWS:
1221 rc = ib_dealloc_mw(r->r.mw);
1222 if (rc)
1223 dprintk("RPC: %s:"
1224 " ib_dealloc_mw"
1225 " failed %i\n",
1226 __func__, rc);
1227 break;
1228 default:
1229 break;
1232 rpcrdma_deregister_internal(ia,
1233 buf->rb_send_bufs[i]->rl_handle,
1234 &buf->rb_send_bufs[i]->rl_iov);
1235 kfree(buf->rb_send_bufs[i]);
1239 kfree(buf->rb_pool);
1243 * Get a set of request/reply buffers.
1245 * Reply buffer (if needed) is attached to send buffer upon return.
1246 * Rule:
1247 * rb_send_index and rb_recv_index MUST always be pointing to the
1248 * *next* available buffer (non-NULL). They are incremented after
1249 * removing buffers, and decremented *before* returning them.
1251 struct rpcrdma_req *
1252 rpcrdma_buffer_get(struct rpcrdma_buffer *buffers)
1254 struct rpcrdma_req *req;
1255 unsigned long flags;
1256 int i;
1257 struct rpcrdma_mw *r;
1259 spin_lock_irqsave(&buffers->rb_lock, flags);
1260 if (buffers->rb_send_index == buffers->rb_max_requests) {
1261 spin_unlock_irqrestore(&buffers->rb_lock, flags);
1262 dprintk("RPC: %s: out of request buffers\n", __func__);
1263 return ((struct rpcrdma_req *)NULL);
1266 req = buffers->rb_send_bufs[buffers->rb_send_index];
1267 if (buffers->rb_send_index < buffers->rb_recv_index) {
1268 dprintk("RPC: %s: %d extra receives outstanding (ok)\n",
1269 __func__,
1270 buffers->rb_recv_index - buffers->rb_send_index);
1271 req->rl_reply = NULL;
1272 } else {
1273 req->rl_reply = buffers->rb_recv_bufs[buffers->rb_recv_index];
1274 buffers->rb_recv_bufs[buffers->rb_recv_index++] = NULL;
1276 buffers->rb_send_bufs[buffers->rb_send_index++] = NULL;
1277 if (!list_empty(&buffers->rb_mws)) {
1278 i = RPCRDMA_MAX_SEGS - 1;
1279 do {
1280 r = list_entry(buffers->rb_mws.next,
1281 struct rpcrdma_mw, mw_list);
1282 list_del(&r->mw_list);
1283 req->rl_segments[i].mr_chunk.rl_mw = r;
1284 } while (--i >= 0);
1286 spin_unlock_irqrestore(&buffers->rb_lock, flags);
1287 return req;
1291 * Put request/reply buffers back into pool.
1292 * Pre-decrement counter/array index.
1294 void
1295 rpcrdma_buffer_put(struct rpcrdma_req *req)
1297 struct rpcrdma_buffer *buffers = req->rl_buffer;
1298 struct rpcrdma_ia *ia = rdmab_to_ia(buffers);
1299 int i;
1300 unsigned long flags;
1302 BUG_ON(req->rl_nchunks != 0);
1303 spin_lock_irqsave(&buffers->rb_lock, flags);
1304 buffers->rb_send_bufs[--buffers->rb_send_index] = req;
1305 req->rl_niovs = 0;
1306 if (req->rl_reply) {
1307 buffers->rb_recv_bufs[--buffers->rb_recv_index] = req->rl_reply;
1308 init_waitqueue_head(&req->rl_reply->rr_unbind);
1309 req->rl_reply->rr_func = NULL;
1310 req->rl_reply = NULL;
1312 switch (ia->ri_memreg_strategy) {
1313 case RPCRDMA_FRMR:
1314 case RPCRDMA_MTHCAFMR:
1315 case RPCRDMA_MEMWINDOWS_ASYNC:
1316 case RPCRDMA_MEMWINDOWS:
1318 * Cycle mw's back in reverse order, and "spin" them.
1319 * This delays and scrambles reuse as much as possible.
1321 i = 1;
1322 do {
1323 struct rpcrdma_mw **mw;
1324 mw = &req->rl_segments[i].mr_chunk.rl_mw;
1325 list_add_tail(&(*mw)->mw_list, &buffers->rb_mws);
1326 *mw = NULL;
1327 } while (++i < RPCRDMA_MAX_SEGS);
1328 list_add_tail(&req->rl_segments[0].mr_chunk.rl_mw->mw_list,
1329 &buffers->rb_mws);
1330 req->rl_segments[0].mr_chunk.rl_mw = NULL;
1331 break;
1332 default:
1333 break;
1335 spin_unlock_irqrestore(&buffers->rb_lock, flags);
1339 * Recover reply buffers from pool.
1340 * This happens when recovering from error conditions.
1341 * Post-increment counter/array index.
1343 void
1344 rpcrdma_recv_buffer_get(struct rpcrdma_req *req)
1346 struct rpcrdma_buffer *buffers = req->rl_buffer;
1347 unsigned long flags;
1349 if (req->rl_iov.length == 0) /* special case xprt_rdma_allocate() */
1350 buffers = ((struct rpcrdma_req *) buffers)->rl_buffer;
1351 spin_lock_irqsave(&buffers->rb_lock, flags);
1352 if (buffers->rb_recv_index < buffers->rb_max_requests) {
1353 req->rl_reply = buffers->rb_recv_bufs[buffers->rb_recv_index];
1354 buffers->rb_recv_bufs[buffers->rb_recv_index++] = NULL;
1356 spin_unlock_irqrestore(&buffers->rb_lock, flags);
1360 * Put reply buffers back into pool when not attached to
1361 * request. This happens in error conditions, and when
1362 * aborting unbinds. Pre-decrement counter/array index.
1364 void
1365 rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep)
1367 struct rpcrdma_buffer *buffers = rep->rr_buffer;
1368 unsigned long flags;
1370 rep->rr_func = NULL;
1371 spin_lock_irqsave(&buffers->rb_lock, flags);
1372 buffers->rb_recv_bufs[--buffers->rb_recv_index] = rep;
1373 spin_unlock_irqrestore(&buffers->rb_lock, flags);
1377 * Wrappers for internal-use kmalloc memory registration, used by buffer code.
1381 rpcrdma_register_internal(struct rpcrdma_ia *ia, void *va, int len,
1382 struct ib_mr **mrp, struct ib_sge *iov)
1384 struct ib_phys_buf ipb;
1385 struct ib_mr *mr;
1386 int rc;
1389 * All memory passed here was kmalloc'ed, therefore phys-contiguous.
1391 iov->addr = ib_dma_map_single(ia->ri_id->device,
1392 va, len, DMA_BIDIRECTIONAL);
1393 iov->length = len;
1395 if (ia->ri_have_dma_lkey) {
1396 *mrp = NULL;
1397 iov->lkey = ia->ri_dma_lkey;
1398 return 0;
1399 } else if (ia->ri_bind_mem != NULL) {
1400 *mrp = NULL;
1401 iov->lkey = ia->ri_bind_mem->lkey;
1402 return 0;
1405 ipb.addr = iov->addr;
1406 ipb.size = iov->length;
1407 mr = ib_reg_phys_mr(ia->ri_pd, &ipb, 1,
1408 IB_ACCESS_LOCAL_WRITE, &iov->addr);
1410 dprintk("RPC: %s: phys convert: 0x%llx "
1411 "registered 0x%llx length %d\n",
1412 __func__, (unsigned long long)ipb.addr,
1413 (unsigned long long)iov->addr, len);
1415 if (IS_ERR(mr)) {
1416 *mrp = NULL;
1417 rc = PTR_ERR(mr);
1418 dprintk("RPC: %s: failed with %i\n", __func__, rc);
1419 } else {
1420 *mrp = mr;
1421 iov->lkey = mr->lkey;
1422 rc = 0;
1425 return rc;
1429 rpcrdma_deregister_internal(struct rpcrdma_ia *ia,
1430 struct ib_mr *mr, struct ib_sge *iov)
1432 int rc;
1434 ib_dma_unmap_single(ia->ri_id->device,
1435 iov->addr, iov->length, DMA_BIDIRECTIONAL);
1437 if (NULL == mr)
1438 return 0;
1440 rc = ib_dereg_mr(mr);
1441 if (rc)
1442 dprintk("RPC: %s: ib_dereg_mr failed %i\n", __func__, rc);
1443 return rc;
1447 * Wrappers for chunk registration, shared by read/write chunk code.
1450 static void
1451 rpcrdma_map_one(struct rpcrdma_ia *ia, struct rpcrdma_mr_seg *seg, int writing)
1453 seg->mr_dir = writing ? DMA_FROM_DEVICE : DMA_TO_DEVICE;
1454 seg->mr_dmalen = seg->mr_len;
1455 if (seg->mr_page)
1456 seg->mr_dma = ib_dma_map_page(ia->ri_id->device,
1457 seg->mr_page, offset_in_page(seg->mr_offset),
1458 seg->mr_dmalen, seg->mr_dir);
1459 else
1460 seg->mr_dma = ib_dma_map_single(ia->ri_id->device,
1461 seg->mr_offset,
1462 seg->mr_dmalen, seg->mr_dir);
1463 if (ib_dma_mapping_error(ia->ri_id->device, seg->mr_dma)) {
1464 dprintk("RPC: %s: mr_dma %llx mr_offset %p mr_dma_len %zu\n",
1465 __func__,
1466 (unsigned long long)seg->mr_dma,
1467 seg->mr_offset, seg->mr_dmalen);
1471 static void
1472 rpcrdma_unmap_one(struct rpcrdma_ia *ia, struct rpcrdma_mr_seg *seg)
1474 if (seg->mr_page)
1475 ib_dma_unmap_page(ia->ri_id->device,
1476 seg->mr_dma, seg->mr_dmalen, seg->mr_dir);
1477 else
1478 ib_dma_unmap_single(ia->ri_id->device,
1479 seg->mr_dma, seg->mr_dmalen, seg->mr_dir);
1482 static int
1483 rpcrdma_register_frmr_external(struct rpcrdma_mr_seg *seg,
1484 int *nsegs, int writing, struct rpcrdma_ia *ia,
1485 struct rpcrdma_xprt *r_xprt)
1487 struct rpcrdma_mr_seg *seg1 = seg;
1488 struct ib_send_wr invalidate_wr, frmr_wr, *bad_wr, *post_wr;
1490 u8 key;
1491 int len, pageoff;
1492 int i, rc;
1494 pageoff = offset_in_page(seg1->mr_offset);
1495 seg1->mr_offset -= pageoff; /* start of page */
1496 seg1->mr_len += pageoff;
1497 len = -pageoff;
1498 if (*nsegs > RPCRDMA_MAX_DATA_SEGS)
1499 *nsegs = RPCRDMA_MAX_DATA_SEGS;
1500 for (i = 0; i < *nsegs;) {
1501 rpcrdma_map_one(ia, seg, writing);
1502 seg1->mr_chunk.rl_mw->r.frmr.fr_pgl->page_list[i] = seg->mr_dma;
1503 len += seg->mr_len;
1504 BUG_ON(seg->mr_len > PAGE_SIZE);
1505 ++seg;
1506 ++i;
1507 /* Check for holes */
1508 if ((i < *nsegs && offset_in_page(seg->mr_offset)) ||
1509 offset_in_page((seg-1)->mr_offset + (seg-1)->mr_len))
1510 break;
1512 dprintk("RPC: %s: Using frmr %p to map %d segments\n",
1513 __func__, seg1->mr_chunk.rl_mw, i);
1515 if (unlikely(seg1->mr_chunk.rl_mw->r.frmr.state == FRMR_IS_VALID)) {
1516 dprintk("RPC: %s: frmr %x left valid, posting invalidate.\n",
1517 __func__,
1518 seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey);
1519 /* Invalidate before using. */
1520 memset(&invalidate_wr, 0, sizeof invalidate_wr);
1521 invalidate_wr.wr_id = (unsigned long)(void *)seg1->mr_chunk.rl_mw;
1522 invalidate_wr.next = &frmr_wr;
1523 invalidate_wr.opcode = IB_WR_LOCAL_INV;
1524 invalidate_wr.send_flags = IB_SEND_SIGNALED;
1525 invalidate_wr.ex.invalidate_rkey =
1526 seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey;
1527 DECR_CQCOUNT(&r_xprt->rx_ep);
1528 post_wr = &invalidate_wr;
1529 } else
1530 post_wr = &frmr_wr;
1532 /* Bump the key */
1533 key = (u8)(seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey & 0x000000FF);
1534 ib_update_fast_reg_key(seg1->mr_chunk.rl_mw->r.frmr.fr_mr, ++key);
1536 /* Prepare FRMR WR */
1537 memset(&frmr_wr, 0, sizeof frmr_wr);
1538 frmr_wr.wr_id = (unsigned long)(void *)seg1->mr_chunk.rl_mw;
1539 frmr_wr.opcode = IB_WR_FAST_REG_MR;
1540 frmr_wr.send_flags = IB_SEND_SIGNALED;
1541 frmr_wr.wr.fast_reg.iova_start = seg1->mr_dma;
1542 frmr_wr.wr.fast_reg.page_list = seg1->mr_chunk.rl_mw->r.frmr.fr_pgl;
1543 frmr_wr.wr.fast_reg.page_list_len = i;
1544 frmr_wr.wr.fast_reg.page_shift = PAGE_SHIFT;
1545 frmr_wr.wr.fast_reg.length = i << PAGE_SHIFT;
1546 BUG_ON(frmr_wr.wr.fast_reg.length < len);
1547 frmr_wr.wr.fast_reg.access_flags = (writing ?
1548 IB_ACCESS_REMOTE_WRITE | IB_ACCESS_LOCAL_WRITE :
1549 IB_ACCESS_REMOTE_READ);
1550 frmr_wr.wr.fast_reg.rkey = seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey;
1551 DECR_CQCOUNT(&r_xprt->rx_ep);
1553 rc = ib_post_send(ia->ri_id->qp, post_wr, &bad_wr);
1555 if (rc) {
1556 dprintk("RPC: %s: failed ib_post_send for register,"
1557 " status %i\n", __func__, rc);
1558 while (i--)
1559 rpcrdma_unmap_one(ia, --seg);
1560 } else {
1561 seg1->mr_rkey = seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey;
1562 seg1->mr_base = seg1->mr_dma + pageoff;
1563 seg1->mr_nsegs = i;
1564 seg1->mr_len = len;
1566 *nsegs = i;
1567 return rc;
1570 static int
1571 rpcrdma_deregister_frmr_external(struct rpcrdma_mr_seg *seg,
1572 struct rpcrdma_ia *ia, struct rpcrdma_xprt *r_xprt)
1574 struct rpcrdma_mr_seg *seg1 = seg;
1575 struct ib_send_wr invalidate_wr, *bad_wr;
1576 int rc;
1578 while (seg1->mr_nsegs--)
1579 rpcrdma_unmap_one(ia, seg++);
1581 memset(&invalidate_wr, 0, sizeof invalidate_wr);
1582 invalidate_wr.wr_id = (unsigned long)(void *)seg1->mr_chunk.rl_mw;
1583 invalidate_wr.opcode = IB_WR_LOCAL_INV;
1584 invalidate_wr.send_flags = IB_SEND_SIGNALED;
1585 invalidate_wr.ex.invalidate_rkey = seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey;
1586 DECR_CQCOUNT(&r_xprt->rx_ep);
1588 rc = ib_post_send(ia->ri_id->qp, &invalidate_wr, &bad_wr);
1589 if (rc)
1590 dprintk("RPC: %s: failed ib_post_send for invalidate,"
1591 " status %i\n", __func__, rc);
1592 return rc;
1595 static int
1596 rpcrdma_register_fmr_external(struct rpcrdma_mr_seg *seg,
1597 int *nsegs, int writing, struct rpcrdma_ia *ia)
1599 struct rpcrdma_mr_seg *seg1 = seg;
1600 u64 physaddrs[RPCRDMA_MAX_DATA_SEGS];
1601 int len, pageoff, i, rc;
1603 pageoff = offset_in_page(seg1->mr_offset);
1604 seg1->mr_offset -= pageoff; /* start of page */
1605 seg1->mr_len += pageoff;
1606 len = -pageoff;
1607 if (*nsegs > RPCRDMA_MAX_DATA_SEGS)
1608 *nsegs = RPCRDMA_MAX_DATA_SEGS;
1609 for (i = 0; i < *nsegs;) {
1610 rpcrdma_map_one(ia, seg, writing);
1611 physaddrs[i] = seg->mr_dma;
1612 len += seg->mr_len;
1613 ++seg;
1614 ++i;
1615 /* Check for holes */
1616 if ((i < *nsegs && offset_in_page(seg->mr_offset)) ||
1617 offset_in_page((seg-1)->mr_offset + (seg-1)->mr_len))
1618 break;
1620 rc = ib_map_phys_fmr(seg1->mr_chunk.rl_mw->r.fmr,
1621 physaddrs, i, seg1->mr_dma);
1622 if (rc) {
1623 dprintk("RPC: %s: failed ib_map_phys_fmr "
1624 "%u@0x%llx+%i (%d)... status %i\n", __func__,
1625 len, (unsigned long long)seg1->mr_dma,
1626 pageoff, i, rc);
1627 while (i--)
1628 rpcrdma_unmap_one(ia, --seg);
1629 } else {
1630 seg1->mr_rkey = seg1->mr_chunk.rl_mw->r.fmr->rkey;
1631 seg1->mr_base = seg1->mr_dma + pageoff;
1632 seg1->mr_nsegs = i;
1633 seg1->mr_len = len;
1635 *nsegs = i;
1636 return rc;
1639 static int
1640 rpcrdma_deregister_fmr_external(struct rpcrdma_mr_seg *seg,
1641 struct rpcrdma_ia *ia)
1643 struct rpcrdma_mr_seg *seg1 = seg;
1644 LIST_HEAD(l);
1645 int rc;
1647 list_add(&seg1->mr_chunk.rl_mw->r.fmr->list, &l);
1648 rc = ib_unmap_fmr(&l);
1649 while (seg1->mr_nsegs--)
1650 rpcrdma_unmap_one(ia, seg++);
1651 if (rc)
1652 dprintk("RPC: %s: failed ib_unmap_fmr,"
1653 " status %i\n", __func__, rc);
1654 return rc;
1657 static int
1658 rpcrdma_register_memwin_external(struct rpcrdma_mr_seg *seg,
1659 int *nsegs, int writing, struct rpcrdma_ia *ia,
1660 struct rpcrdma_xprt *r_xprt)
1662 int mem_priv = (writing ? IB_ACCESS_REMOTE_WRITE :
1663 IB_ACCESS_REMOTE_READ);
1664 struct ib_mw_bind param;
1665 int rc;
1667 *nsegs = 1;
1668 rpcrdma_map_one(ia, seg, writing);
1669 param.mr = ia->ri_bind_mem;
1670 param.wr_id = 0ULL; /* no send cookie */
1671 param.addr = seg->mr_dma;
1672 param.length = seg->mr_len;
1673 param.send_flags = 0;
1674 param.mw_access_flags = mem_priv;
1676 DECR_CQCOUNT(&r_xprt->rx_ep);
1677 rc = ib_bind_mw(ia->ri_id->qp, seg->mr_chunk.rl_mw->r.mw, &param);
1678 if (rc) {
1679 dprintk("RPC: %s: failed ib_bind_mw "
1680 "%u@0x%llx status %i\n",
1681 __func__, seg->mr_len,
1682 (unsigned long long)seg->mr_dma, rc);
1683 rpcrdma_unmap_one(ia, seg);
1684 } else {
1685 seg->mr_rkey = seg->mr_chunk.rl_mw->r.mw->rkey;
1686 seg->mr_base = param.addr;
1687 seg->mr_nsegs = 1;
1689 return rc;
1692 static int
1693 rpcrdma_deregister_memwin_external(struct rpcrdma_mr_seg *seg,
1694 struct rpcrdma_ia *ia,
1695 struct rpcrdma_xprt *r_xprt, void **r)
1697 struct ib_mw_bind param;
1698 LIST_HEAD(l);
1699 int rc;
1701 BUG_ON(seg->mr_nsegs != 1);
1702 param.mr = ia->ri_bind_mem;
1703 param.addr = 0ULL; /* unbind */
1704 param.length = 0;
1705 param.mw_access_flags = 0;
1706 if (*r) {
1707 param.wr_id = (u64) (unsigned long) *r;
1708 param.send_flags = IB_SEND_SIGNALED;
1709 INIT_CQCOUNT(&r_xprt->rx_ep);
1710 } else {
1711 param.wr_id = 0ULL;
1712 param.send_flags = 0;
1713 DECR_CQCOUNT(&r_xprt->rx_ep);
1715 rc = ib_bind_mw(ia->ri_id->qp, seg->mr_chunk.rl_mw->r.mw, &param);
1716 rpcrdma_unmap_one(ia, seg);
1717 if (rc)
1718 dprintk("RPC: %s: failed ib_(un)bind_mw,"
1719 " status %i\n", __func__, rc);
1720 else
1721 *r = NULL; /* will upcall on completion */
1722 return rc;
1725 static int
1726 rpcrdma_register_default_external(struct rpcrdma_mr_seg *seg,
1727 int *nsegs, int writing, struct rpcrdma_ia *ia)
1729 int mem_priv = (writing ? IB_ACCESS_REMOTE_WRITE :
1730 IB_ACCESS_REMOTE_READ);
1731 struct rpcrdma_mr_seg *seg1 = seg;
1732 struct ib_phys_buf ipb[RPCRDMA_MAX_DATA_SEGS];
1733 int len, i, rc = 0;
1735 if (*nsegs > RPCRDMA_MAX_DATA_SEGS)
1736 *nsegs = RPCRDMA_MAX_DATA_SEGS;
1737 for (len = 0, i = 0; i < *nsegs;) {
1738 rpcrdma_map_one(ia, seg, writing);
1739 ipb[i].addr = seg->mr_dma;
1740 ipb[i].size = seg->mr_len;
1741 len += seg->mr_len;
1742 ++seg;
1743 ++i;
1744 /* Check for holes */
1745 if ((i < *nsegs && offset_in_page(seg->mr_offset)) ||
1746 offset_in_page((seg-1)->mr_offset+(seg-1)->mr_len))
1747 break;
1749 seg1->mr_base = seg1->mr_dma;
1750 seg1->mr_chunk.rl_mr = ib_reg_phys_mr(ia->ri_pd,
1751 ipb, i, mem_priv, &seg1->mr_base);
1752 if (IS_ERR(seg1->mr_chunk.rl_mr)) {
1753 rc = PTR_ERR(seg1->mr_chunk.rl_mr);
1754 dprintk("RPC: %s: failed ib_reg_phys_mr "
1755 "%u@0x%llx (%d)... status %i\n",
1756 __func__, len,
1757 (unsigned long long)seg1->mr_dma, i, rc);
1758 while (i--)
1759 rpcrdma_unmap_one(ia, --seg);
1760 } else {
1761 seg1->mr_rkey = seg1->mr_chunk.rl_mr->rkey;
1762 seg1->mr_nsegs = i;
1763 seg1->mr_len = len;
1765 *nsegs = i;
1766 return rc;
1769 static int
1770 rpcrdma_deregister_default_external(struct rpcrdma_mr_seg *seg,
1771 struct rpcrdma_ia *ia)
1773 struct rpcrdma_mr_seg *seg1 = seg;
1774 int rc;
1776 rc = ib_dereg_mr(seg1->mr_chunk.rl_mr);
1777 seg1->mr_chunk.rl_mr = NULL;
1778 while (seg1->mr_nsegs--)
1779 rpcrdma_unmap_one(ia, seg++);
1780 if (rc)
1781 dprintk("RPC: %s: failed ib_dereg_mr,"
1782 " status %i\n", __func__, rc);
1783 return rc;
1787 rpcrdma_register_external(struct rpcrdma_mr_seg *seg,
1788 int nsegs, int writing, struct rpcrdma_xprt *r_xprt)
1790 struct rpcrdma_ia *ia = &r_xprt->rx_ia;
1791 int rc = 0;
1793 switch (ia->ri_memreg_strategy) {
1795 #if RPCRDMA_PERSISTENT_REGISTRATION
1796 case RPCRDMA_ALLPHYSICAL:
1797 rpcrdma_map_one(ia, seg, writing);
1798 seg->mr_rkey = ia->ri_bind_mem->rkey;
1799 seg->mr_base = seg->mr_dma;
1800 seg->mr_nsegs = 1;
1801 nsegs = 1;
1802 break;
1803 #endif
1805 /* Registration using frmr registration */
1806 case RPCRDMA_FRMR:
1807 rc = rpcrdma_register_frmr_external(seg, &nsegs, writing, ia, r_xprt);
1808 break;
1810 /* Registration using fmr memory registration */
1811 case RPCRDMA_MTHCAFMR:
1812 rc = rpcrdma_register_fmr_external(seg, &nsegs, writing, ia);
1813 break;
1815 /* Registration using memory windows */
1816 case RPCRDMA_MEMWINDOWS_ASYNC:
1817 case RPCRDMA_MEMWINDOWS:
1818 rc = rpcrdma_register_memwin_external(seg, &nsegs, writing, ia, r_xprt);
1819 break;
1821 /* Default registration each time */
1822 default:
1823 rc = rpcrdma_register_default_external(seg, &nsegs, writing, ia);
1824 break;
1826 if (rc)
1827 return -1;
1829 return nsegs;
1833 rpcrdma_deregister_external(struct rpcrdma_mr_seg *seg,
1834 struct rpcrdma_xprt *r_xprt, void *r)
1836 struct rpcrdma_ia *ia = &r_xprt->rx_ia;
1837 int nsegs = seg->mr_nsegs, rc;
1839 switch (ia->ri_memreg_strategy) {
1841 #if RPCRDMA_PERSISTENT_REGISTRATION
1842 case RPCRDMA_ALLPHYSICAL:
1843 BUG_ON(nsegs != 1);
1844 rpcrdma_unmap_one(ia, seg);
1845 rc = 0;
1846 break;
1847 #endif
1849 case RPCRDMA_FRMR:
1850 rc = rpcrdma_deregister_frmr_external(seg, ia, r_xprt);
1851 break;
1853 case RPCRDMA_MTHCAFMR:
1854 rc = rpcrdma_deregister_fmr_external(seg, ia);
1855 break;
1857 case RPCRDMA_MEMWINDOWS_ASYNC:
1858 case RPCRDMA_MEMWINDOWS:
1859 rc = rpcrdma_deregister_memwin_external(seg, ia, r_xprt, &r);
1860 break;
1862 default:
1863 rc = rpcrdma_deregister_default_external(seg, ia);
1864 break;
1866 if (r) {
1867 struct rpcrdma_rep *rep = r;
1868 void (*func)(struct rpcrdma_rep *) = rep->rr_func;
1869 rep->rr_func = NULL;
1870 func(rep); /* dereg done, callback now */
1872 return nsegs;
1876 * Prepost any receive buffer, then post send.
1878 * Receive buffer is donated to hardware, reclaimed upon recv completion.
1881 rpcrdma_ep_post(struct rpcrdma_ia *ia,
1882 struct rpcrdma_ep *ep,
1883 struct rpcrdma_req *req)
1885 struct ib_send_wr send_wr, *send_wr_fail;
1886 struct rpcrdma_rep *rep = req->rl_reply;
1887 int rc;
1889 if (rep) {
1890 rc = rpcrdma_ep_post_recv(ia, ep, rep);
1891 if (rc)
1892 goto out;
1893 req->rl_reply = NULL;
1896 send_wr.next = NULL;
1897 send_wr.wr_id = 0ULL; /* no send cookie */
1898 send_wr.sg_list = req->rl_send_iov;
1899 send_wr.num_sge = req->rl_niovs;
1900 send_wr.opcode = IB_WR_SEND;
1901 if (send_wr.num_sge == 4) /* no need to sync any pad (constant) */
1902 ib_dma_sync_single_for_device(ia->ri_id->device,
1903 req->rl_send_iov[3].addr, req->rl_send_iov[3].length,
1904 DMA_TO_DEVICE);
1905 ib_dma_sync_single_for_device(ia->ri_id->device,
1906 req->rl_send_iov[1].addr, req->rl_send_iov[1].length,
1907 DMA_TO_DEVICE);
1908 ib_dma_sync_single_for_device(ia->ri_id->device,
1909 req->rl_send_iov[0].addr, req->rl_send_iov[0].length,
1910 DMA_TO_DEVICE);
1912 if (DECR_CQCOUNT(ep) > 0)
1913 send_wr.send_flags = 0;
1914 else { /* Provider must take a send completion every now and then */
1915 INIT_CQCOUNT(ep);
1916 send_wr.send_flags = IB_SEND_SIGNALED;
1919 rc = ib_post_send(ia->ri_id->qp, &send_wr, &send_wr_fail);
1920 if (rc)
1921 dprintk("RPC: %s: ib_post_send returned %i\n", __func__,
1922 rc);
1923 out:
1924 return rc;
1928 * (Re)post a receive buffer.
1931 rpcrdma_ep_post_recv(struct rpcrdma_ia *ia,
1932 struct rpcrdma_ep *ep,
1933 struct rpcrdma_rep *rep)
1935 struct ib_recv_wr recv_wr, *recv_wr_fail;
1936 int rc;
1938 recv_wr.next = NULL;
1939 recv_wr.wr_id = (u64) (unsigned long) rep;
1940 recv_wr.sg_list = &rep->rr_iov;
1941 recv_wr.num_sge = 1;
1943 ib_dma_sync_single_for_cpu(ia->ri_id->device,
1944 rep->rr_iov.addr, rep->rr_iov.length, DMA_BIDIRECTIONAL);
1946 DECR_CQCOUNT(ep);
1947 rc = ib_post_recv(ia->ri_id->qp, &recv_wr, &recv_wr_fail);
1949 if (rc)
1950 dprintk("RPC: %s: ib_post_recv returned %i\n", __func__,
1951 rc);
1952 return rc;