writeback: split writeback_inodes_wb
[linux-2.6/next.git] / net / sunrpc / xprtrdma / verbs.c
blob27015c6d8eb58311ed88264e86313fc3879ad0c2
1 /*
2 * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
4 * This software is available to you under a choice of one of two
5 * licenses. You may choose to be licensed under the terms of the GNU
6 * General Public License (GPL) Version 2, available from the file
7 * COPYING in the main directory of this source tree, or the BSD-type
8 * license below:
10 * Redistribution and use in source and binary forms, with or without
11 * modification, are permitted provided that the following conditions
12 * are met:
14 * Redistributions of source code must retain the above copyright
15 * notice, this list of conditions and the following disclaimer.
17 * Redistributions in binary form must reproduce the above
18 * copyright notice, this list of conditions and the following
19 * disclaimer in the documentation and/or other materials provided
20 * with the distribution.
22 * Neither the name of the Network Appliance, Inc. nor the names of
23 * its contributors may be used to endorse or promote products
24 * derived from this software without specific prior written
25 * permission.
27 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
28 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
29 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
30 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
31 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
32 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
33 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
34 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
35 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
36 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
37 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
41 * verbs.c
43 * Encapsulates the major functions managing:
44 * o adapters
45 * o endpoints
46 * o connections
47 * o buffer memory
50 #include <linux/pci.h> /* for Tavor hack below */
51 #include <linux/slab.h>
53 #include "xprt_rdma.h"
56 * Globals/Macros
59 #ifdef RPC_DEBUG
60 # define RPCDBG_FACILITY RPCDBG_TRANS
61 #endif
64 * internal functions
68 * handle replies in tasklet context, using a single, global list
69 * rdma tasklet function -- just turn around and call the func
70 * for all replies on the list
73 static DEFINE_SPINLOCK(rpcrdma_tk_lock_g);
74 static LIST_HEAD(rpcrdma_tasklets_g);
76 static void
77 rpcrdma_run_tasklet(unsigned long data)
79 struct rpcrdma_rep *rep;
80 void (*func)(struct rpcrdma_rep *);
81 unsigned long flags;
83 data = data;
84 spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
85 while (!list_empty(&rpcrdma_tasklets_g)) {
86 rep = list_entry(rpcrdma_tasklets_g.next,
87 struct rpcrdma_rep, rr_list);
88 list_del(&rep->rr_list);
89 func = rep->rr_func;
90 rep->rr_func = NULL;
91 spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
93 if (func)
94 func(rep);
95 else
96 rpcrdma_recv_buffer_put(rep);
98 spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
100 spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
103 static DECLARE_TASKLET(rpcrdma_tasklet_g, rpcrdma_run_tasklet, 0UL);
105 static inline void
106 rpcrdma_schedule_tasklet(struct rpcrdma_rep *rep)
108 unsigned long flags;
110 spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
111 list_add_tail(&rep->rr_list, &rpcrdma_tasklets_g);
112 spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
113 tasklet_schedule(&rpcrdma_tasklet_g);
116 static void
117 rpcrdma_qp_async_error_upcall(struct ib_event *event, void *context)
119 struct rpcrdma_ep *ep = context;
121 dprintk("RPC: %s: QP error %X on device %s ep %p\n",
122 __func__, event->event, event->device->name, context);
123 if (ep->rep_connected == 1) {
124 ep->rep_connected = -EIO;
125 ep->rep_func(ep);
126 wake_up_all(&ep->rep_connect_wait);
130 static void
131 rpcrdma_cq_async_error_upcall(struct ib_event *event, void *context)
133 struct rpcrdma_ep *ep = context;
135 dprintk("RPC: %s: CQ error %X on device %s ep %p\n",
136 __func__, event->event, event->device->name, context);
137 if (ep->rep_connected == 1) {
138 ep->rep_connected = -EIO;
139 ep->rep_func(ep);
140 wake_up_all(&ep->rep_connect_wait);
144 static inline
145 void rpcrdma_event_process(struct ib_wc *wc)
147 struct rpcrdma_rep *rep =
148 (struct rpcrdma_rep *)(unsigned long) wc->wr_id;
150 dprintk("RPC: %s: event rep %p status %X opcode %X length %u\n",
151 __func__, rep, wc->status, wc->opcode, wc->byte_len);
153 if (!rep) /* send or bind completion that we don't care about */
154 return;
156 if (IB_WC_SUCCESS != wc->status) {
157 dprintk("RPC: %s: %s WC status %X, connection lost\n",
158 __func__, (wc->opcode & IB_WC_RECV) ? "recv" : "send",
159 wc->status);
160 rep->rr_len = ~0U;
161 rpcrdma_schedule_tasklet(rep);
162 return;
165 switch (wc->opcode) {
166 case IB_WC_RECV:
167 rep->rr_len = wc->byte_len;
168 ib_dma_sync_single_for_cpu(
169 rdmab_to_ia(rep->rr_buffer)->ri_id->device,
170 rep->rr_iov.addr, rep->rr_len, DMA_FROM_DEVICE);
171 /* Keep (only) the most recent credits, after check validity */
172 if (rep->rr_len >= 16) {
173 struct rpcrdma_msg *p =
174 (struct rpcrdma_msg *) rep->rr_base;
175 unsigned int credits = ntohl(p->rm_credit);
176 if (credits == 0) {
177 dprintk("RPC: %s: server"
178 " dropped credits to 0!\n", __func__);
179 /* don't deadlock */
180 credits = 1;
181 } else if (credits > rep->rr_buffer->rb_max_requests) {
182 dprintk("RPC: %s: server"
183 " over-crediting: %d (%d)\n",
184 __func__, credits,
185 rep->rr_buffer->rb_max_requests);
186 credits = rep->rr_buffer->rb_max_requests;
188 atomic_set(&rep->rr_buffer->rb_credits, credits);
190 /* fall through */
191 case IB_WC_BIND_MW:
192 rpcrdma_schedule_tasklet(rep);
193 break;
194 default:
195 dprintk("RPC: %s: unexpected WC event %X\n",
196 __func__, wc->opcode);
197 break;
201 static inline int
202 rpcrdma_cq_poll(struct ib_cq *cq)
204 struct ib_wc wc;
205 int rc;
207 for (;;) {
208 rc = ib_poll_cq(cq, 1, &wc);
209 if (rc < 0) {
210 dprintk("RPC: %s: ib_poll_cq failed %i\n",
211 __func__, rc);
212 return rc;
214 if (rc == 0)
215 break;
217 rpcrdma_event_process(&wc);
220 return 0;
224 * rpcrdma_cq_event_upcall
226 * This upcall handles recv, send, bind and unbind events.
227 * It is reentrant but processes single events in order to maintain
228 * ordering of receives to keep server credits.
230 * It is the responsibility of the scheduled tasklet to return
231 * recv buffers to the pool. NOTE: this affects synchronization of
232 * connection shutdown. That is, the structures required for
233 * the completion of the reply handler must remain intact until
234 * all memory has been reclaimed.
236 * Note that send events are suppressed and do not result in an upcall.
238 static void
239 rpcrdma_cq_event_upcall(struct ib_cq *cq, void *context)
241 int rc;
243 rc = rpcrdma_cq_poll(cq);
244 if (rc)
245 return;
247 rc = ib_req_notify_cq(cq, IB_CQ_NEXT_COMP);
248 if (rc) {
249 dprintk("RPC: %s: ib_req_notify_cq failed %i\n",
250 __func__, rc);
251 return;
254 rpcrdma_cq_poll(cq);
257 #ifdef RPC_DEBUG
258 static const char * const conn[] = {
259 "address resolved",
260 "address error",
261 "route resolved",
262 "route error",
263 "connect request",
264 "connect response",
265 "connect error",
266 "unreachable",
267 "rejected",
268 "established",
269 "disconnected",
270 "device removal"
272 #endif
274 static int
275 rpcrdma_conn_upcall(struct rdma_cm_id *id, struct rdma_cm_event *event)
277 struct rpcrdma_xprt *xprt = id->context;
278 struct rpcrdma_ia *ia = &xprt->rx_ia;
279 struct rpcrdma_ep *ep = &xprt->rx_ep;
280 #ifdef RPC_DEBUG
281 struct sockaddr_in *addr = (struct sockaddr_in *) &ep->rep_remote_addr;
282 #endif
283 struct ib_qp_attr attr;
284 struct ib_qp_init_attr iattr;
285 int connstate = 0;
287 switch (event->event) {
288 case RDMA_CM_EVENT_ADDR_RESOLVED:
289 case RDMA_CM_EVENT_ROUTE_RESOLVED:
290 ia->ri_async_rc = 0;
291 complete(&ia->ri_done);
292 break;
293 case RDMA_CM_EVENT_ADDR_ERROR:
294 ia->ri_async_rc = -EHOSTUNREACH;
295 dprintk("RPC: %s: CM address resolution error, ep 0x%p\n",
296 __func__, ep);
297 complete(&ia->ri_done);
298 break;
299 case RDMA_CM_EVENT_ROUTE_ERROR:
300 ia->ri_async_rc = -ENETUNREACH;
301 dprintk("RPC: %s: CM route resolution error, ep 0x%p\n",
302 __func__, ep);
303 complete(&ia->ri_done);
304 break;
305 case RDMA_CM_EVENT_ESTABLISHED:
306 connstate = 1;
307 ib_query_qp(ia->ri_id->qp, &attr,
308 IB_QP_MAX_QP_RD_ATOMIC | IB_QP_MAX_DEST_RD_ATOMIC,
309 &iattr);
310 dprintk("RPC: %s: %d responder resources"
311 " (%d initiator)\n",
312 __func__, attr.max_dest_rd_atomic, attr.max_rd_atomic);
313 goto connected;
314 case RDMA_CM_EVENT_CONNECT_ERROR:
315 connstate = -ENOTCONN;
316 goto connected;
317 case RDMA_CM_EVENT_UNREACHABLE:
318 connstate = -ENETDOWN;
319 goto connected;
320 case RDMA_CM_EVENT_REJECTED:
321 connstate = -ECONNREFUSED;
322 goto connected;
323 case RDMA_CM_EVENT_DISCONNECTED:
324 connstate = -ECONNABORTED;
325 goto connected;
326 case RDMA_CM_EVENT_DEVICE_REMOVAL:
327 connstate = -ENODEV;
328 connected:
329 dprintk("RPC: %s: %s: %pI4:%u (ep 0x%p event 0x%x)\n",
330 __func__,
331 (event->event <= 11) ? conn[event->event] :
332 "unknown connection error",
333 &addr->sin_addr.s_addr,
334 ntohs(addr->sin_port),
335 ep, event->event);
336 atomic_set(&rpcx_to_rdmax(ep->rep_xprt)->rx_buf.rb_credits, 1);
337 dprintk("RPC: %s: %sconnected\n",
338 __func__, connstate > 0 ? "" : "dis");
339 ep->rep_connected = connstate;
340 ep->rep_func(ep);
341 wake_up_all(&ep->rep_connect_wait);
342 break;
343 default:
344 dprintk("RPC: %s: unexpected CM event %d\n",
345 __func__, event->event);
346 break;
349 #ifdef RPC_DEBUG
350 if (connstate == 1) {
351 int ird = attr.max_dest_rd_atomic;
352 int tird = ep->rep_remote_cma.responder_resources;
353 printk(KERN_INFO "rpcrdma: connection to %pI4:%u "
354 "on %s, memreg %d slots %d ird %d%s\n",
355 &addr->sin_addr.s_addr,
356 ntohs(addr->sin_port),
357 ia->ri_id->device->name,
358 ia->ri_memreg_strategy,
359 xprt->rx_buf.rb_max_requests,
360 ird, ird < 4 && ird < tird / 2 ? " (low!)" : "");
361 } else if (connstate < 0) {
362 printk(KERN_INFO "rpcrdma: connection to %pI4:%u closed (%d)\n",
363 &addr->sin_addr.s_addr,
364 ntohs(addr->sin_port),
365 connstate);
367 #endif
369 return 0;
372 static struct rdma_cm_id *
373 rpcrdma_create_id(struct rpcrdma_xprt *xprt,
374 struct rpcrdma_ia *ia, struct sockaddr *addr)
376 struct rdma_cm_id *id;
377 int rc;
379 init_completion(&ia->ri_done);
381 id = rdma_create_id(rpcrdma_conn_upcall, xprt, RDMA_PS_TCP);
382 if (IS_ERR(id)) {
383 rc = PTR_ERR(id);
384 dprintk("RPC: %s: rdma_create_id() failed %i\n",
385 __func__, rc);
386 return id;
389 ia->ri_async_rc = -ETIMEDOUT;
390 rc = rdma_resolve_addr(id, NULL, addr, RDMA_RESOLVE_TIMEOUT);
391 if (rc) {
392 dprintk("RPC: %s: rdma_resolve_addr() failed %i\n",
393 __func__, rc);
394 goto out;
396 wait_for_completion_interruptible_timeout(&ia->ri_done,
397 msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT) + 1);
398 rc = ia->ri_async_rc;
399 if (rc)
400 goto out;
402 ia->ri_async_rc = -ETIMEDOUT;
403 rc = rdma_resolve_route(id, RDMA_RESOLVE_TIMEOUT);
404 if (rc) {
405 dprintk("RPC: %s: rdma_resolve_route() failed %i\n",
406 __func__, rc);
407 goto out;
409 wait_for_completion_interruptible_timeout(&ia->ri_done,
410 msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT) + 1);
411 rc = ia->ri_async_rc;
412 if (rc)
413 goto out;
415 return id;
417 out:
418 rdma_destroy_id(id);
419 return ERR_PTR(rc);
423 * Drain any cq, prior to teardown.
425 static void
426 rpcrdma_clean_cq(struct ib_cq *cq)
428 struct ib_wc wc;
429 int count = 0;
431 while (1 == ib_poll_cq(cq, 1, &wc))
432 ++count;
434 if (count)
435 dprintk("RPC: %s: flushed %d events (last 0x%x)\n",
436 __func__, count, wc.opcode);
440 * Exported functions.
444 * Open and initialize an Interface Adapter.
445 * o initializes fields of struct rpcrdma_ia, including
446 * interface and provider attributes and protection zone.
449 rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg)
451 int rc, mem_priv;
452 struct ib_device_attr devattr;
453 struct rpcrdma_ia *ia = &xprt->rx_ia;
455 ia->ri_id = rpcrdma_create_id(xprt, ia, addr);
456 if (IS_ERR(ia->ri_id)) {
457 rc = PTR_ERR(ia->ri_id);
458 goto out1;
461 ia->ri_pd = ib_alloc_pd(ia->ri_id->device);
462 if (IS_ERR(ia->ri_pd)) {
463 rc = PTR_ERR(ia->ri_pd);
464 dprintk("RPC: %s: ib_alloc_pd() failed %i\n",
465 __func__, rc);
466 goto out2;
470 * Query the device to determine if the requested memory
471 * registration strategy is supported. If it isn't, set the
472 * strategy to a globally supported model.
474 rc = ib_query_device(ia->ri_id->device, &devattr);
475 if (rc) {
476 dprintk("RPC: %s: ib_query_device failed %d\n",
477 __func__, rc);
478 goto out2;
481 if (devattr.device_cap_flags & IB_DEVICE_LOCAL_DMA_LKEY) {
482 ia->ri_have_dma_lkey = 1;
483 ia->ri_dma_lkey = ia->ri_id->device->local_dma_lkey;
486 switch (memreg) {
487 case RPCRDMA_MEMWINDOWS:
488 case RPCRDMA_MEMWINDOWS_ASYNC:
489 if (!(devattr.device_cap_flags & IB_DEVICE_MEM_WINDOW)) {
490 dprintk("RPC: %s: MEMWINDOWS registration "
491 "specified but not supported by adapter, "
492 "using slower RPCRDMA_REGISTER\n",
493 __func__);
494 memreg = RPCRDMA_REGISTER;
496 break;
497 case RPCRDMA_MTHCAFMR:
498 if (!ia->ri_id->device->alloc_fmr) {
499 #if RPCRDMA_PERSISTENT_REGISTRATION
500 dprintk("RPC: %s: MTHCAFMR registration "
501 "specified but not supported by adapter, "
502 "using riskier RPCRDMA_ALLPHYSICAL\n",
503 __func__);
504 memreg = RPCRDMA_ALLPHYSICAL;
505 #else
506 dprintk("RPC: %s: MTHCAFMR registration "
507 "specified but not supported by adapter, "
508 "using slower RPCRDMA_REGISTER\n",
509 __func__);
510 memreg = RPCRDMA_REGISTER;
511 #endif
513 break;
514 case RPCRDMA_FRMR:
515 /* Requires both frmr reg and local dma lkey */
516 if ((devattr.device_cap_flags &
517 (IB_DEVICE_MEM_MGT_EXTENSIONS|IB_DEVICE_LOCAL_DMA_LKEY)) !=
518 (IB_DEVICE_MEM_MGT_EXTENSIONS|IB_DEVICE_LOCAL_DMA_LKEY)) {
519 #if RPCRDMA_PERSISTENT_REGISTRATION
520 dprintk("RPC: %s: FRMR registration "
521 "specified but not supported by adapter, "
522 "using riskier RPCRDMA_ALLPHYSICAL\n",
523 __func__);
524 memreg = RPCRDMA_ALLPHYSICAL;
525 #else
526 dprintk("RPC: %s: FRMR registration "
527 "specified but not supported by adapter, "
528 "using slower RPCRDMA_REGISTER\n",
529 __func__);
530 memreg = RPCRDMA_REGISTER;
531 #endif
533 break;
537 * Optionally obtain an underlying physical identity mapping in
538 * order to do a memory window-based bind. This base registration
539 * is protected from remote access - that is enabled only by binding
540 * for the specific bytes targeted during each RPC operation, and
541 * revoked after the corresponding completion similar to a storage
542 * adapter.
544 switch (memreg) {
545 case RPCRDMA_BOUNCEBUFFERS:
546 case RPCRDMA_REGISTER:
547 case RPCRDMA_FRMR:
548 break;
549 #if RPCRDMA_PERSISTENT_REGISTRATION
550 case RPCRDMA_ALLPHYSICAL:
551 mem_priv = IB_ACCESS_LOCAL_WRITE |
552 IB_ACCESS_REMOTE_WRITE |
553 IB_ACCESS_REMOTE_READ;
554 goto register_setup;
555 #endif
556 case RPCRDMA_MEMWINDOWS_ASYNC:
557 case RPCRDMA_MEMWINDOWS:
558 mem_priv = IB_ACCESS_LOCAL_WRITE |
559 IB_ACCESS_MW_BIND;
560 goto register_setup;
561 case RPCRDMA_MTHCAFMR:
562 if (ia->ri_have_dma_lkey)
563 break;
564 mem_priv = IB_ACCESS_LOCAL_WRITE;
565 register_setup:
566 ia->ri_bind_mem = ib_get_dma_mr(ia->ri_pd, mem_priv);
567 if (IS_ERR(ia->ri_bind_mem)) {
568 printk(KERN_ALERT "%s: ib_get_dma_mr for "
569 "phys register failed with %lX\n\t"
570 "Will continue with degraded performance\n",
571 __func__, PTR_ERR(ia->ri_bind_mem));
572 memreg = RPCRDMA_REGISTER;
573 ia->ri_bind_mem = NULL;
575 break;
576 default:
577 printk(KERN_ERR "%s: invalid memory registration mode %d\n",
578 __func__, memreg);
579 rc = -EINVAL;
580 goto out2;
582 dprintk("RPC: %s: memory registration strategy is %d\n",
583 __func__, memreg);
585 /* Else will do memory reg/dereg for each chunk */
586 ia->ri_memreg_strategy = memreg;
588 return 0;
589 out2:
590 rdma_destroy_id(ia->ri_id);
591 ia->ri_id = NULL;
592 out1:
593 return rc;
597 * Clean up/close an IA.
598 * o if event handles and PD have been initialized, free them.
599 * o close the IA
601 void
602 rpcrdma_ia_close(struct rpcrdma_ia *ia)
604 int rc;
606 dprintk("RPC: %s: entering\n", __func__);
607 if (ia->ri_bind_mem != NULL) {
608 rc = ib_dereg_mr(ia->ri_bind_mem);
609 dprintk("RPC: %s: ib_dereg_mr returned %i\n",
610 __func__, rc);
612 if (ia->ri_id != NULL && !IS_ERR(ia->ri_id)) {
613 if (ia->ri_id->qp)
614 rdma_destroy_qp(ia->ri_id);
615 rdma_destroy_id(ia->ri_id);
616 ia->ri_id = NULL;
618 if (ia->ri_pd != NULL && !IS_ERR(ia->ri_pd)) {
619 rc = ib_dealloc_pd(ia->ri_pd);
620 dprintk("RPC: %s: ib_dealloc_pd returned %i\n",
621 __func__, rc);
626 * Create unconnected endpoint.
629 rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
630 struct rpcrdma_create_data_internal *cdata)
632 struct ib_device_attr devattr;
633 int rc, err;
635 rc = ib_query_device(ia->ri_id->device, &devattr);
636 if (rc) {
637 dprintk("RPC: %s: ib_query_device failed %d\n",
638 __func__, rc);
639 return rc;
642 /* check provider's send/recv wr limits */
643 if (cdata->max_requests > devattr.max_qp_wr)
644 cdata->max_requests = devattr.max_qp_wr;
646 ep->rep_attr.event_handler = rpcrdma_qp_async_error_upcall;
647 ep->rep_attr.qp_context = ep;
648 /* send_cq and recv_cq initialized below */
649 ep->rep_attr.srq = NULL;
650 ep->rep_attr.cap.max_send_wr = cdata->max_requests;
651 switch (ia->ri_memreg_strategy) {
652 case RPCRDMA_FRMR:
653 /* Add room for frmr register and invalidate WRs */
654 ep->rep_attr.cap.max_send_wr *= 3;
655 if (ep->rep_attr.cap.max_send_wr > devattr.max_qp_wr)
656 return -EINVAL;
657 break;
658 case RPCRDMA_MEMWINDOWS_ASYNC:
659 case RPCRDMA_MEMWINDOWS:
660 /* Add room for mw_binds+unbinds - overkill! */
661 ep->rep_attr.cap.max_send_wr++;
662 ep->rep_attr.cap.max_send_wr *= (2 * RPCRDMA_MAX_SEGS);
663 if (ep->rep_attr.cap.max_send_wr > devattr.max_qp_wr)
664 return -EINVAL;
665 break;
666 default:
667 break;
669 ep->rep_attr.cap.max_recv_wr = cdata->max_requests;
670 ep->rep_attr.cap.max_send_sge = (cdata->padding ? 4 : 2);
671 ep->rep_attr.cap.max_recv_sge = 1;
672 ep->rep_attr.cap.max_inline_data = 0;
673 ep->rep_attr.sq_sig_type = IB_SIGNAL_REQ_WR;
674 ep->rep_attr.qp_type = IB_QPT_RC;
675 ep->rep_attr.port_num = ~0;
677 dprintk("RPC: %s: requested max: dtos: send %d recv %d; "
678 "iovs: send %d recv %d\n",
679 __func__,
680 ep->rep_attr.cap.max_send_wr,
681 ep->rep_attr.cap.max_recv_wr,
682 ep->rep_attr.cap.max_send_sge,
683 ep->rep_attr.cap.max_recv_sge);
685 /* set trigger for requesting send completion */
686 ep->rep_cqinit = ep->rep_attr.cap.max_send_wr/2 /* - 1*/;
687 switch (ia->ri_memreg_strategy) {
688 case RPCRDMA_MEMWINDOWS_ASYNC:
689 case RPCRDMA_MEMWINDOWS:
690 ep->rep_cqinit -= RPCRDMA_MAX_SEGS;
691 break;
692 default:
693 break;
695 if (ep->rep_cqinit <= 2)
696 ep->rep_cqinit = 0;
697 INIT_CQCOUNT(ep);
698 ep->rep_ia = ia;
699 init_waitqueue_head(&ep->rep_connect_wait);
702 * Create a single cq for receive dto and mw_bind (only ever
703 * care about unbind, really). Send completions are suppressed.
704 * Use single threaded tasklet upcalls to maintain ordering.
706 ep->rep_cq = ib_create_cq(ia->ri_id->device, rpcrdma_cq_event_upcall,
707 rpcrdma_cq_async_error_upcall, NULL,
708 ep->rep_attr.cap.max_recv_wr +
709 ep->rep_attr.cap.max_send_wr + 1, 0);
710 if (IS_ERR(ep->rep_cq)) {
711 rc = PTR_ERR(ep->rep_cq);
712 dprintk("RPC: %s: ib_create_cq failed: %i\n",
713 __func__, rc);
714 goto out1;
717 rc = ib_req_notify_cq(ep->rep_cq, IB_CQ_NEXT_COMP);
718 if (rc) {
719 dprintk("RPC: %s: ib_req_notify_cq failed: %i\n",
720 __func__, rc);
721 goto out2;
724 ep->rep_attr.send_cq = ep->rep_cq;
725 ep->rep_attr.recv_cq = ep->rep_cq;
727 /* Initialize cma parameters */
729 /* RPC/RDMA does not use private data */
730 ep->rep_remote_cma.private_data = NULL;
731 ep->rep_remote_cma.private_data_len = 0;
733 /* Client offers RDMA Read but does not initiate */
734 ep->rep_remote_cma.initiator_depth = 0;
735 if (ia->ri_memreg_strategy == RPCRDMA_BOUNCEBUFFERS)
736 ep->rep_remote_cma.responder_resources = 0;
737 else if (devattr.max_qp_rd_atom > 32) /* arbitrary but <= 255 */
738 ep->rep_remote_cma.responder_resources = 32;
739 else
740 ep->rep_remote_cma.responder_resources = devattr.max_qp_rd_atom;
742 ep->rep_remote_cma.retry_count = 7;
743 ep->rep_remote_cma.flow_control = 0;
744 ep->rep_remote_cma.rnr_retry_count = 0;
746 return 0;
748 out2:
749 err = ib_destroy_cq(ep->rep_cq);
750 if (err)
751 dprintk("RPC: %s: ib_destroy_cq returned %i\n",
752 __func__, err);
753 out1:
754 return rc;
758 * rpcrdma_ep_destroy
760 * Disconnect and destroy endpoint. After this, the only
761 * valid operations on the ep are to free it (if dynamically
762 * allocated) or re-create it.
764 * The caller's error handling must be sure to not leak the endpoint
765 * if this function fails.
768 rpcrdma_ep_destroy(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
770 int rc;
772 dprintk("RPC: %s: entering, connected is %d\n",
773 __func__, ep->rep_connected);
775 if (ia->ri_id->qp) {
776 rc = rpcrdma_ep_disconnect(ep, ia);
777 if (rc)
778 dprintk("RPC: %s: rpcrdma_ep_disconnect"
779 " returned %i\n", __func__, rc);
780 rdma_destroy_qp(ia->ri_id);
781 ia->ri_id->qp = NULL;
784 /* padding - could be done in rpcrdma_buffer_destroy... */
785 if (ep->rep_pad_mr) {
786 rpcrdma_deregister_internal(ia, ep->rep_pad_mr, &ep->rep_pad);
787 ep->rep_pad_mr = NULL;
790 rpcrdma_clean_cq(ep->rep_cq);
791 rc = ib_destroy_cq(ep->rep_cq);
792 if (rc)
793 dprintk("RPC: %s: ib_destroy_cq returned %i\n",
794 __func__, rc);
796 return rc;
800 * Connect unconnected endpoint.
803 rpcrdma_ep_connect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
805 struct rdma_cm_id *id;
806 int rc = 0;
807 int retry_count = 0;
809 if (ep->rep_connected != 0) {
810 struct rpcrdma_xprt *xprt;
811 retry:
812 rc = rpcrdma_ep_disconnect(ep, ia);
813 if (rc && rc != -ENOTCONN)
814 dprintk("RPC: %s: rpcrdma_ep_disconnect"
815 " status %i\n", __func__, rc);
816 rpcrdma_clean_cq(ep->rep_cq);
818 xprt = container_of(ia, struct rpcrdma_xprt, rx_ia);
819 id = rpcrdma_create_id(xprt, ia,
820 (struct sockaddr *)&xprt->rx_data.addr);
821 if (IS_ERR(id)) {
822 rc = PTR_ERR(id);
823 goto out;
825 /* TEMP TEMP TEMP - fail if new device:
826 * Deregister/remarshal *all* requests!
827 * Close and recreate adapter, pd, etc!
828 * Re-determine all attributes still sane!
829 * More stuff I haven't thought of!
830 * Rrrgh!
832 if (ia->ri_id->device != id->device) {
833 printk("RPC: %s: can't reconnect on "
834 "different device!\n", __func__);
835 rdma_destroy_id(id);
836 rc = -ENETDOWN;
837 goto out;
839 /* END TEMP */
840 rdma_destroy_qp(ia->ri_id);
841 rdma_destroy_id(ia->ri_id);
842 ia->ri_id = id;
845 rc = rdma_create_qp(ia->ri_id, ia->ri_pd, &ep->rep_attr);
846 if (rc) {
847 dprintk("RPC: %s: rdma_create_qp failed %i\n",
848 __func__, rc);
849 goto out;
852 /* XXX Tavor device performs badly with 2K MTU! */
853 if (strnicmp(ia->ri_id->device->dma_device->bus->name, "pci", 3) == 0) {
854 struct pci_dev *pcid = to_pci_dev(ia->ri_id->device->dma_device);
855 if (pcid->device == PCI_DEVICE_ID_MELLANOX_TAVOR &&
856 (pcid->vendor == PCI_VENDOR_ID_MELLANOX ||
857 pcid->vendor == PCI_VENDOR_ID_TOPSPIN)) {
858 struct ib_qp_attr attr = {
859 .path_mtu = IB_MTU_1024
861 rc = ib_modify_qp(ia->ri_id->qp, &attr, IB_QP_PATH_MTU);
865 ep->rep_connected = 0;
867 rc = rdma_connect(ia->ri_id, &ep->rep_remote_cma);
868 if (rc) {
869 dprintk("RPC: %s: rdma_connect() failed with %i\n",
870 __func__, rc);
871 goto out;
874 wait_event_interruptible(ep->rep_connect_wait, ep->rep_connected != 0);
877 * Check state. A non-peer reject indicates no listener
878 * (ECONNREFUSED), which may be a transient state. All
879 * others indicate a transport condition which has already
880 * undergone a best-effort.
882 if (ep->rep_connected == -ECONNREFUSED &&
883 ++retry_count <= RDMA_CONNECT_RETRY_MAX) {
884 dprintk("RPC: %s: non-peer_reject, retry\n", __func__);
885 goto retry;
887 if (ep->rep_connected <= 0) {
888 /* Sometimes, the only way to reliably connect to remote
889 * CMs is to use same nonzero values for ORD and IRD. */
890 if (retry_count++ <= RDMA_CONNECT_RETRY_MAX + 1 &&
891 (ep->rep_remote_cma.responder_resources == 0 ||
892 ep->rep_remote_cma.initiator_depth !=
893 ep->rep_remote_cma.responder_resources)) {
894 if (ep->rep_remote_cma.responder_resources == 0)
895 ep->rep_remote_cma.responder_resources = 1;
896 ep->rep_remote_cma.initiator_depth =
897 ep->rep_remote_cma.responder_resources;
898 goto retry;
900 rc = ep->rep_connected;
901 } else {
902 dprintk("RPC: %s: connected\n", __func__);
905 out:
906 if (rc)
907 ep->rep_connected = rc;
908 return rc;
912 * rpcrdma_ep_disconnect
914 * This is separate from destroy to facilitate the ability
915 * to reconnect without recreating the endpoint.
917 * This call is not reentrant, and must not be made in parallel
918 * on the same endpoint.
921 rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
923 int rc;
925 rpcrdma_clean_cq(ep->rep_cq);
926 rc = rdma_disconnect(ia->ri_id);
927 if (!rc) {
928 /* returns without wait if not connected */
929 wait_event_interruptible(ep->rep_connect_wait,
930 ep->rep_connected != 1);
931 dprintk("RPC: %s: after wait, %sconnected\n", __func__,
932 (ep->rep_connected == 1) ? "still " : "dis");
933 } else {
934 dprintk("RPC: %s: rdma_disconnect %i\n", __func__, rc);
935 ep->rep_connected = rc;
937 return rc;
941 * Initialize buffer memory
944 rpcrdma_buffer_create(struct rpcrdma_buffer *buf, struct rpcrdma_ep *ep,
945 struct rpcrdma_ia *ia, struct rpcrdma_create_data_internal *cdata)
947 char *p;
948 size_t len;
949 int i, rc;
950 struct rpcrdma_mw *r;
952 buf->rb_max_requests = cdata->max_requests;
953 spin_lock_init(&buf->rb_lock);
954 atomic_set(&buf->rb_credits, 1);
956 /* Need to allocate:
957 * 1. arrays for send and recv pointers
958 * 2. arrays of struct rpcrdma_req to fill in pointers
959 * 3. array of struct rpcrdma_rep for replies
960 * 4. padding, if any
961 * 5. mw's, fmr's or frmr's, if any
962 * Send/recv buffers in req/rep need to be registered
965 len = buf->rb_max_requests *
966 (sizeof(struct rpcrdma_req *) + sizeof(struct rpcrdma_rep *));
967 len += cdata->padding;
968 switch (ia->ri_memreg_strategy) {
969 case RPCRDMA_FRMR:
970 len += buf->rb_max_requests * RPCRDMA_MAX_SEGS *
971 sizeof(struct rpcrdma_mw);
972 break;
973 case RPCRDMA_MTHCAFMR:
974 /* TBD we are perhaps overallocating here */
975 len += (buf->rb_max_requests + 1) * RPCRDMA_MAX_SEGS *
976 sizeof(struct rpcrdma_mw);
977 break;
978 case RPCRDMA_MEMWINDOWS_ASYNC:
979 case RPCRDMA_MEMWINDOWS:
980 len += (buf->rb_max_requests + 1) * RPCRDMA_MAX_SEGS *
981 sizeof(struct rpcrdma_mw);
982 break;
983 default:
984 break;
987 /* allocate 1, 4 and 5 in one shot */
988 p = kzalloc(len, GFP_KERNEL);
989 if (p == NULL) {
990 dprintk("RPC: %s: req_t/rep_t/pad kzalloc(%zd) failed\n",
991 __func__, len);
992 rc = -ENOMEM;
993 goto out;
995 buf->rb_pool = p; /* for freeing it later */
997 buf->rb_send_bufs = (struct rpcrdma_req **) p;
998 p = (char *) &buf->rb_send_bufs[buf->rb_max_requests];
999 buf->rb_recv_bufs = (struct rpcrdma_rep **) p;
1000 p = (char *) &buf->rb_recv_bufs[buf->rb_max_requests];
1003 * Register the zeroed pad buffer, if any.
1005 if (cdata->padding) {
1006 rc = rpcrdma_register_internal(ia, p, cdata->padding,
1007 &ep->rep_pad_mr, &ep->rep_pad);
1008 if (rc)
1009 goto out;
1011 p += cdata->padding;
1014 * Allocate the fmr's, or mw's for mw_bind chunk registration.
1015 * We "cycle" the mw's in order to minimize rkey reuse,
1016 * and also reduce unbind-to-bind collision.
1018 INIT_LIST_HEAD(&buf->rb_mws);
1019 r = (struct rpcrdma_mw *)p;
1020 switch (ia->ri_memreg_strategy) {
1021 case RPCRDMA_FRMR:
1022 for (i = buf->rb_max_requests * RPCRDMA_MAX_SEGS; i; i--) {
1023 r->r.frmr.fr_mr = ib_alloc_fast_reg_mr(ia->ri_pd,
1024 RPCRDMA_MAX_SEGS);
1025 if (IS_ERR(r->r.frmr.fr_mr)) {
1026 rc = PTR_ERR(r->r.frmr.fr_mr);
1027 dprintk("RPC: %s: ib_alloc_fast_reg_mr"
1028 " failed %i\n", __func__, rc);
1029 goto out;
1031 r->r.frmr.fr_pgl =
1032 ib_alloc_fast_reg_page_list(ia->ri_id->device,
1033 RPCRDMA_MAX_SEGS);
1034 if (IS_ERR(r->r.frmr.fr_pgl)) {
1035 rc = PTR_ERR(r->r.frmr.fr_pgl);
1036 dprintk("RPC: %s: "
1037 "ib_alloc_fast_reg_page_list "
1038 "failed %i\n", __func__, rc);
1039 goto out;
1041 list_add(&r->mw_list, &buf->rb_mws);
1042 ++r;
1044 break;
1045 case RPCRDMA_MTHCAFMR:
1046 /* TBD we are perhaps overallocating here */
1047 for (i = (buf->rb_max_requests+1) * RPCRDMA_MAX_SEGS; i; i--) {
1048 static struct ib_fmr_attr fa =
1049 { RPCRDMA_MAX_DATA_SEGS, 1, PAGE_SHIFT };
1050 r->r.fmr = ib_alloc_fmr(ia->ri_pd,
1051 IB_ACCESS_REMOTE_WRITE | IB_ACCESS_REMOTE_READ,
1052 &fa);
1053 if (IS_ERR(r->r.fmr)) {
1054 rc = PTR_ERR(r->r.fmr);
1055 dprintk("RPC: %s: ib_alloc_fmr"
1056 " failed %i\n", __func__, rc);
1057 goto out;
1059 list_add(&r->mw_list, &buf->rb_mws);
1060 ++r;
1062 break;
1063 case RPCRDMA_MEMWINDOWS_ASYNC:
1064 case RPCRDMA_MEMWINDOWS:
1065 /* Allocate one extra request's worth, for full cycling */
1066 for (i = (buf->rb_max_requests+1) * RPCRDMA_MAX_SEGS; i; i--) {
1067 r->r.mw = ib_alloc_mw(ia->ri_pd);
1068 if (IS_ERR(r->r.mw)) {
1069 rc = PTR_ERR(r->r.mw);
1070 dprintk("RPC: %s: ib_alloc_mw"
1071 " failed %i\n", __func__, rc);
1072 goto out;
1074 list_add(&r->mw_list, &buf->rb_mws);
1075 ++r;
1077 break;
1078 default:
1079 break;
1083 * Allocate/init the request/reply buffers. Doing this
1084 * using kmalloc for now -- one for each buf.
1086 for (i = 0; i < buf->rb_max_requests; i++) {
1087 struct rpcrdma_req *req;
1088 struct rpcrdma_rep *rep;
1090 len = cdata->inline_wsize + sizeof(struct rpcrdma_req);
1091 /* RPC layer requests *double* size + 1K RPC_SLACK_SPACE! */
1092 /* Typical ~2400b, so rounding up saves work later */
1093 if (len < 4096)
1094 len = 4096;
1095 req = kmalloc(len, GFP_KERNEL);
1096 if (req == NULL) {
1097 dprintk("RPC: %s: request buffer %d alloc"
1098 " failed\n", __func__, i);
1099 rc = -ENOMEM;
1100 goto out;
1102 memset(req, 0, sizeof(struct rpcrdma_req));
1103 buf->rb_send_bufs[i] = req;
1104 buf->rb_send_bufs[i]->rl_buffer = buf;
1106 rc = rpcrdma_register_internal(ia, req->rl_base,
1107 len - offsetof(struct rpcrdma_req, rl_base),
1108 &buf->rb_send_bufs[i]->rl_handle,
1109 &buf->rb_send_bufs[i]->rl_iov);
1110 if (rc)
1111 goto out;
1113 buf->rb_send_bufs[i]->rl_size = len-sizeof(struct rpcrdma_req);
1115 len = cdata->inline_rsize + sizeof(struct rpcrdma_rep);
1116 rep = kmalloc(len, GFP_KERNEL);
1117 if (rep == NULL) {
1118 dprintk("RPC: %s: reply buffer %d alloc failed\n",
1119 __func__, i);
1120 rc = -ENOMEM;
1121 goto out;
1123 memset(rep, 0, sizeof(struct rpcrdma_rep));
1124 buf->rb_recv_bufs[i] = rep;
1125 buf->rb_recv_bufs[i]->rr_buffer = buf;
1126 init_waitqueue_head(&rep->rr_unbind);
1128 rc = rpcrdma_register_internal(ia, rep->rr_base,
1129 len - offsetof(struct rpcrdma_rep, rr_base),
1130 &buf->rb_recv_bufs[i]->rr_handle,
1131 &buf->rb_recv_bufs[i]->rr_iov);
1132 if (rc)
1133 goto out;
1136 dprintk("RPC: %s: max_requests %d\n",
1137 __func__, buf->rb_max_requests);
1138 /* done */
1139 return 0;
1140 out:
1141 rpcrdma_buffer_destroy(buf);
1142 return rc;
1146 * Unregister and destroy buffer memory. Need to deal with
1147 * partial initialization, so it's callable from failed create.
1148 * Must be called before destroying endpoint, as registrations
1149 * reference it.
1151 void
1152 rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
1154 int rc, i;
1155 struct rpcrdma_ia *ia = rdmab_to_ia(buf);
1156 struct rpcrdma_mw *r;
1158 /* clean up in reverse order from create
1159 * 1. recv mr memory (mr free, then kfree)
1160 * 1a. bind mw memory
1161 * 2. send mr memory (mr free, then kfree)
1162 * 3. padding (if any) [moved to rpcrdma_ep_destroy]
1163 * 4. arrays
1165 dprintk("RPC: %s: entering\n", __func__);
1167 for (i = 0; i < buf->rb_max_requests; i++) {
1168 if (buf->rb_recv_bufs && buf->rb_recv_bufs[i]) {
1169 rpcrdma_deregister_internal(ia,
1170 buf->rb_recv_bufs[i]->rr_handle,
1171 &buf->rb_recv_bufs[i]->rr_iov);
1172 kfree(buf->rb_recv_bufs[i]);
1174 if (buf->rb_send_bufs && buf->rb_send_bufs[i]) {
1175 while (!list_empty(&buf->rb_mws)) {
1176 r = list_entry(buf->rb_mws.next,
1177 struct rpcrdma_mw, mw_list);
1178 list_del(&r->mw_list);
1179 switch (ia->ri_memreg_strategy) {
1180 case RPCRDMA_FRMR:
1181 rc = ib_dereg_mr(r->r.frmr.fr_mr);
1182 if (rc)
1183 dprintk("RPC: %s:"
1184 " ib_dereg_mr"
1185 " failed %i\n",
1186 __func__, rc);
1187 ib_free_fast_reg_page_list(r->r.frmr.fr_pgl);
1188 break;
1189 case RPCRDMA_MTHCAFMR:
1190 rc = ib_dealloc_fmr(r->r.fmr);
1191 if (rc)
1192 dprintk("RPC: %s:"
1193 " ib_dealloc_fmr"
1194 " failed %i\n",
1195 __func__, rc);
1196 break;
1197 case RPCRDMA_MEMWINDOWS_ASYNC:
1198 case RPCRDMA_MEMWINDOWS:
1199 rc = ib_dealloc_mw(r->r.mw);
1200 if (rc)
1201 dprintk("RPC: %s:"
1202 " ib_dealloc_mw"
1203 " failed %i\n",
1204 __func__, rc);
1205 break;
1206 default:
1207 break;
1210 rpcrdma_deregister_internal(ia,
1211 buf->rb_send_bufs[i]->rl_handle,
1212 &buf->rb_send_bufs[i]->rl_iov);
1213 kfree(buf->rb_send_bufs[i]);
1217 kfree(buf->rb_pool);
1221 * Get a set of request/reply buffers.
1223 * Reply buffer (if needed) is attached to send buffer upon return.
1224 * Rule:
1225 * rb_send_index and rb_recv_index MUST always be pointing to the
1226 * *next* available buffer (non-NULL). They are incremented after
1227 * removing buffers, and decremented *before* returning them.
1229 struct rpcrdma_req *
1230 rpcrdma_buffer_get(struct rpcrdma_buffer *buffers)
1232 struct rpcrdma_req *req;
1233 unsigned long flags;
1234 int i;
1235 struct rpcrdma_mw *r;
1237 spin_lock_irqsave(&buffers->rb_lock, flags);
1238 if (buffers->rb_send_index == buffers->rb_max_requests) {
1239 spin_unlock_irqrestore(&buffers->rb_lock, flags);
1240 dprintk("RPC: %s: out of request buffers\n", __func__);
1241 return ((struct rpcrdma_req *)NULL);
1244 req = buffers->rb_send_bufs[buffers->rb_send_index];
1245 if (buffers->rb_send_index < buffers->rb_recv_index) {
1246 dprintk("RPC: %s: %d extra receives outstanding (ok)\n",
1247 __func__,
1248 buffers->rb_recv_index - buffers->rb_send_index);
1249 req->rl_reply = NULL;
1250 } else {
1251 req->rl_reply = buffers->rb_recv_bufs[buffers->rb_recv_index];
1252 buffers->rb_recv_bufs[buffers->rb_recv_index++] = NULL;
1254 buffers->rb_send_bufs[buffers->rb_send_index++] = NULL;
1255 if (!list_empty(&buffers->rb_mws)) {
1256 i = RPCRDMA_MAX_SEGS - 1;
1257 do {
1258 r = list_entry(buffers->rb_mws.next,
1259 struct rpcrdma_mw, mw_list);
1260 list_del(&r->mw_list);
1261 req->rl_segments[i].mr_chunk.rl_mw = r;
1262 } while (--i >= 0);
1264 spin_unlock_irqrestore(&buffers->rb_lock, flags);
1265 return req;
1269 * Put request/reply buffers back into pool.
1270 * Pre-decrement counter/array index.
1272 void
1273 rpcrdma_buffer_put(struct rpcrdma_req *req)
1275 struct rpcrdma_buffer *buffers = req->rl_buffer;
1276 struct rpcrdma_ia *ia = rdmab_to_ia(buffers);
1277 int i;
1278 unsigned long flags;
1280 BUG_ON(req->rl_nchunks != 0);
1281 spin_lock_irqsave(&buffers->rb_lock, flags);
1282 buffers->rb_send_bufs[--buffers->rb_send_index] = req;
1283 req->rl_niovs = 0;
1284 if (req->rl_reply) {
1285 buffers->rb_recv_bufs[--buffers->rb_recv_index] = req->rl_reply;
1286 init_waitqueue_head(&req->rl_reply->rr_unbind);
1287 req->rl_reply->rr_func = NULL;
1288 req->rl_reply = NULL;
1290 switch (ia->ri_memreg_strategy) {
1291 case RPCRDMA_FRMR:
1292 case RPCRDMA_MTHCAFMR:
1293 case RPCRDMA_MEMWINDOWS_ASYNC:
1294 case RPCRDMA_MEMWINDOWS:
1296 * Cycle mw's back in reverse order, and "spin" them.
1297 * This delays and scrambles reuse as much as possible.
1299 i = 1;
1300 do {
1301 struct rpcrdma_mw **mw;
1302 mw = &req->rl_segments[i].mr_chunk.rl_mw;
1303 list_add_tail(&(*mw)->mw_list, &buffers->rb_mws);
1304 *mw = NULL;
1305 } while (++i < RPCRDMA_MAX_SEGS);
1306 list_add_tail(&req->rl_segments[0].mr_chunk.rl_mw->mw_list,
1307 &buffers->rb_mws);
1308 req->rl_segments[0].mr_chunk.rl_mw = NULL;
1309 break;
1310 default:
1311 break;
1313 spin_unlock_irqrestore(&buffers->rb_lock, flags);
1317 * Recover reply buffers from pool.
1318 * This happens when recovering from error conditions.
1319 * Post-increment counter/array index.
1321 void
1322 rpcrdma_recv_buffer_get(struct rpcrdma_req *req)
1324 struct rpcrdma_buffer *buffers = req->rl_buffer;
1325 unsigned long flags;
1327 if (req->rl_iov.length == 0) /* special case xprt_rdma_allocate() */
1328 buffers = ((struct rpcrdma_req *) buffers)->rl_buffer;
1329 spin_lock_irqsave(&buffers->rb_lock, flags);
1330 if (buffers->rb_recv_index < buffers->rb_max_requests) {
1331 req->rl_reply = buffers->rb_recv_bufs[buffers->rb_recv_index];
1332 buffers->rb_recv_bufs[buffers->rb_recv_index++] = NULL;
1334 spin_unlock_irqrestore(&buffers->rb_lock, flags);
1338 * Put reply buffers back into pool when not attached to
1339 * request. This happens in error conditions, and when
1340 * aborting unbinds. Pre-decrement counter/array index.
1342 void
1343 rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep)
1345 struct rpcrdma_buffer *buffers = rep->rr_buffer;
1346 unsigned long flags;
1348 rep->rr_func = NULL;
1349 spin_lock_irqsave(&buffers->rb_lock, flags);
1350 buffers->rb_recv_bufs[--buffers->rb_recv_index] = rep;
1351 spin_unlock_irqrestore(&buffers->rb_lock, flags);
1355 * Wrappers for internal-use kmalloc memory registration, used by buffer code.
1359 rpcrdma_register_internal(struct rpcrdma_ia *ia, void *va, int len,
1360 struct ib_mr **mrp, struct ib_sge *iov)
1362 struct ib_phys_buf ipb;
1363 struct ib_mr *mr;
1364 int rc;
1367 * All memory passed here was kmalloc'ed, therefore phys-contiguous.
1369 iov->addr = ib_dma_map_single(ia->ri_id->device,
1370 va, len, DMA_BIDIRECTIONAL);
1371 iov->length = len;
1373 if (ia->ri_have_dma_lkey) {
1374 *mrp = NULL;
1375 iov->lkey = ia->ri_dma_lkey;
1376 return 0;
1377 } else if (ia->ri_bind_mem != NULL) {
1378 *mrp = NULL;
1379 iov->lkey = ia->ri_bind_mem->lkey;
1380 return 0;
1383 ipb.addr = iov->addr;
1384 ipb.size = iov->length;
1385 mr = ib_reg_phys_mr(ia->ri_pd, &ipb, 1,
1386 IB_ACCESS_LOCAL_WRITE, &iov->addr);
1388 dprintk("RPC: %s: phys convert: 0x%llx "
1389 "registered 0x%llx length %d\n",
1390 __func__, (unsigned long long)ipb.addr,
1391 (unsigned long long)iov->addr, len);
1393 if (IS_ERR(mr)) {
1394 *mrp = NULL;
1395 rc = PTR_ERR(mr);
1396 dprintk("RPC: %s: failed with %i\n", __func__, rc);
1397 } else {
1398 *mrp = mr;
1399 iov->lkey = mr->lkey;
1400 rc = 0;
1403 return rc;
1407 rpcrdma_deregister_internal(struct rpcrdma_ia *ia,
1408 struct ib_mr *mr, struct ib_sge *iov)
1410 int rc;
1412 ib_dma_unmap_single(ia->ri_id->device,
1413 iov->addr, iov->length, DMA_BIDIRECTIONAL);
1415 if (NULL == mr)
1416 return 0;
1418 rc = ib_dereg_mr(mr);
1419 if (rc)
1420 dprintk("RPC: %s: ib_dereg_mr failed %i\n", __func__, rc);
1421 return rc;
1425 * Wrappers for chunk registration, shared by read/write chunk code.
1428 static void
1429 rpcrdma_map_one(struct rpcrdma_ia *ia, struct rpcrdma_mr_seg *seg, int writing)
1431 seg->mr_dir = writing ? DMA_FROM_DEVICE : DMA_TO_DEVICE;
1432 seg->mr_dmalen = seg->mr_len;
1433 if (seg->mr_page)
1434 seg->mr_dma = ib_dma_map_page(ia->ri_id->device,
1435 seg->mr_page, offset_in_page(seg->mr_offset),
1436 seg->mr_dmalen, seg->mr_dir);
1437 else
1438 seg->mr_dma = ib_dma_map_single(ia->ri_id->device,
1439 seg->mr_offset,
1440 seg->mr_dmalen, seg->mr_dir);
1443 static void
1444 rpcrdma_unmap_one(struct rpcrdma_ia *ia, struct rpcrdma_mr_seg *seg)
1446 if (seg->mr_page)
1447 ib_dma_unmap_page(ia->ri_id->device,
1448 seg->mr_dma, seg->mr_dmalen, seg->mr_dir);
1449 else
1450 ib_dma_unmap_single(ia->ri_id->device,
1451 seg->mr_dma, seg->mr_dmalen, seg->mr_dir);
1454 static int
1455 rpcrdma_register_frmr_external(struct rpcrdma_mr_seg *seg,
1456 int *nsegs, int writing, struct rpcrdma_ia *ia,
1457 struct rpcrdma_xprt *r_xprt)
1459 struct rpcrdma_mr_seg *seg1 = seg;
1460 struct ib_send_wr frmr_wr, *bad_wr;
1461 u8 key;
1462 int len, pageoff;
1463 int i, rc;
1465 pageoff = offset_in_page(seg1->mr_offset);
1466 seg1->mr_offset -= pageoff; /* start of page */
1467 seg1->mr_len += pageoff;
1468 len = -pageoff;
1469 if (*nsegs > RPCRDMA_MAX_DATA_SEGS)
1470 *nsegs = RPCRDMA_MAX_DATA_SEGS;
1471 for (i = 0; i < *nsegs;) {
1472 rpcrdma_map_one(ia, seg, writing);
1473 seg1->mr_chunk.rl_mw->r.frmr.fr_pgl->page_list[i] = seg->mr_dma;
1474 len += seg->mr_len;
1475 ++seg;
1476 ++i;
1477 /* Check for holes */
1478 if ((i < *nsegs && offset_in_page(seg->mr_offset)) ||
1479 offset_in_page((seg-1)->mr_offset + (seg-1)->mr_len))
1480 break;
1482 dprintk("RPC: %s: Using frmr %p to map %d segments\n",
1483 __func__, seg1->mr_chunk.rl_mw, i);
1485 /* Bump the key */
1486 key = (u8)(seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey & 0x000000FF);
1487 ib_update_fast_reg_key(seg1->mr_chunk.rl_mw->r.frmr.fr_mr, ++key);
1489 /* Prepare FRMR WR */
1490 memset(&frmr_wr, 0, sizeof frmr_wr);
1491 frmr_wr.opcode = IB_WR_FAST_REG_MR;
1492 frmr_wr.send_flags = 0; /* unsignaled */
1493 frmr_wr.wr.fast_reg.iova_start = (unsigned long)seg1->mr_dma;
1494 frmr_wr.wr.fast_reg.page_list = seg1->mr_chunk.rl_mw->r.frmr.fr_pgl;
1495 frmr_wr.wr.fast_reg.page_list_len = i;
1496 frmr_wr.wr.fast_reg.page_shift = PAGE_SHIFT;
1497 frmr_wr.wr.fast_reg.length = i << PAGE_SHIFT;
1498 frmr_wr.wr.fast_reg.access_flags = (writing ?
1499 IB_ACCESS_REMOTE_WRITE | IB_ACCESS_LOCAL_WRITE :
1500 IB_ACCESS_REMOTE_READ);
1501 frmr_wr.wr.fast_reg.rkey = seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey;
1502 DECR_CQCOUNT(&r_xprt->rx_ep);
1504 rc = ib_post_send(ia->ri_id->qp, &frmr_wr, &bad_wr);
1506 if (rc) {
1507 dprintk("RPC: %s: failed ib_post_send for register,"
1508 " status %i\n", __func__, rc);
1509 while (i--)
1510 rpcrdma_unmap_one(ia, --seg);
1511 } else {
1512 seg1->mr_rkey = seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey;
1513 seg1->mr_base = seg1->mr_dma + pageoff;
1514 seg1->mr_nsegs = i;
1515 seg1->mr_len = len;
1517 *nsegs = i;
1518 return rc;
1521 static int
1522 rpcrdma_deregister_frmr_external(struct rpcrdma_mr_seg *seg,
1523 struct rpcrdma_ia *ia, struct rpcrdma_xprt *r_xprt)
1525 struct rpcrdma_mr_seg *seg1 = seg;
1526 struct ib_send_wr invalidate_wr, *bad_wr;
1527 int rc;
1529 while (seg1->mr_nsegs--)
1530 rpcrdma_unmap_one(ia, seg++);
1532 memset(&invalidate_wr, 0, sizeof invalidate_wr);
1533 invalidate_wr.opcode = IB_WR_LOCAL_INV;
1534 invalidate_wr.send_flags = 0; /* unsignaled */
1535 invalidate_wr.ex.invalidate_rkey = seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey;
1536 DECR_CQCOUNT(&r_xprt->rx_ep);
1538 rc = ib_post_send(ia->ri_id->qp, &invalidate_wr, &bad_wr);
1539 if (rc)
1540 dprintk("RPC: %s: failed ib_post_send for invalidate,"
1541 " status %i\n", __func__, rc);
1542 return rc;
1545 static int
1546 rpcrdma_register_fmr_external(struct rpcrdma_mr_seg *seg,
1547 int *nsegs, int writing, struct rpcrdma_ia *ia)
1549 struct rpcrdma_mr_seg *seg1 = seg;
1550 u64 physaddrs[RPCRDMA_MAX_DATA_SEGS];
1551 int len, pageoff, i, rc;
1553 pageoff = offset_in_page(seg1->mr_offset);
1554 seg1->mr_offset -= pageoff; /* start of page */
1555 seg1->mr_len += pageoff;
1556 len = -pageoff;
1557 if (*nsegs > RPCRDMA_MAX_DATA_SEGS)
1558 *nsegs = RPCRDMA_MAX_DATA_SEGS;
1559 for (i = 0; i < *nsegs;) {
1560 rpcrdma_map_one(ia, seg, writing);
1561 physaddrs[i] = seg->mr_dma;
1562 len += seg->mr_len;
1563 ++seg;
1564 ++i;
1565 /* Check for holes */
1566 if ((i < *nsegs && offset_in_page(seg->mr_offset)) ||
1567 offset_in_page((seg-1)->mr_offset + (seg-1)->mr_len))
1568 break;
1570 rc = ib_map_phys_fmr(seg1->mr_chunk.rl_mw->r.fmr,
1571 physaddrs, i, seg1->mr_dma);
1572 if (rc) {
1573 dprintk("RPC: %s: failed ib_map_phys_fmr "
1574 "%u@0x%llx+%i (%d)... status %i\n", __func__,
1575 len, (unsigned long long)seg1->mr_dma,
1576 pageoff, i, rc);
1577 while (i--)
1578 rpcrdma_unmap_one(ia, --seg);
1579 } else {
1580 seg1->mr_rkey = seg1->mr_chunk.rl_mw->r.fmr->rkey;
1581 seg1->mr_base = seg1->mr_dma + pageoff;
1582 seg1->mr_nsegs = i;
1583 seg1->mr_len = len;
1585 *nsegs = i;
1586 return rc;
1589 static int
1590 rpcrdma_deregister_fmr_external(struct rpcrdma_mr_seg *seg,
1591 struct rpcrdma_ia *ia)
1593 struct rpcrdma_mr_seg *seg1 = seg;
1594 LIST_HEAD(l);
1595 int rc;
1597 list_add(&seg1->mr_chunk.rl_mw->r.fmr->list, &l);
1598 rc = ib_unmap_fmr(&l);
1599 while (seg1->mr_nsegs--)
1600 rpcrdma_unmap_one(ia, seg++);
1601 if (rc)
1602 dprintk("RPC: %s: failed ib_unmap_fmr,"
1603 " status %i\n", __func__, rc);
1604 return rc;
1607 static int
1608 rpcrdma_register_memwin_external(struct rpcrdma_mr_seg *seg,
1609 int *nsegs, int writing, struct rpcrdma_ia *ia,
1610 struct rpcrdma_xprt *r_xprt)
1612 int mem_priv = (writing ? IB_ACCESS_REMOTE_WRITE :
1613 IB_ACCESS_REMOTE_READ);
1614 struct ib_mw_bind param;
1615 int rc;
1617 *nsegs = 1;
1618 rpcrdma_map_one(ia, seg, writing);
1619 param.mr = ia->ri_bind_mem;
1620 param.wr_id = 0ULL; /* no send cookie */
1621 param.addr = seg->mr_dma;
1622 param.length = seg->mr_len;
1623 param.send_flags = 0;
1624 param.mw_access_flags = mem_priv;
1626 DECR_CQCOUNT(&r_xprt->rx_ep);
1627 rc = ib_bind_mw(ia->ri_id->qp, seg->mr_chunk.rl_mw->r.mw, &param);
1628 if (rc) {
1629 dprintk("RPC: %s: failed ib_bind_mw "
1630 "%u@0x%llx status %i\n",
1631 __func__, seg->mr_len,
1632 (unsigned long long)seg->mr_dma, rc);
1633 rpcrdma_unmap_one(ia, seg);
1634 } else {
1635 seg->mr_rkey = seg->mr_chunk.rl_mw->r.mw->rkey;
1636 seg->mr_base = param.addr;
1637 seg->mr_nsegs = 1;
1639 return rc;
1642 static int
1643 rpcrdma_deregister_memwin_external(struct rpcrdma_mr_seg *seg,
1644 struct rpcrdma_ia *ia,
1645 struct rpcrdma_xprt *r_xprt, void **r)
1647 struct ib_mw_bind param;
1648 LIST_HEAD(l);
1649 int rc;
1651 BUG_ON(seg->mr_nsegs != 1);
1652 param.mr = ia->ri_bind_mem;
1653 param.addr = 0ULL; /* unbind */
1654 param.length = 0;
1655 param.mw_access_flags = 0;
1656 if (*r) {
1657 param.wr_id = (u64) (unsigned long) *r;
1658 param.send_flags = IB_SEND_SIGNALED;
1659 INIT_CQCOUNT(&r_xprt->rx_ep);
1660 } else {
1661 param.wr_id = 0ULL;
1662 param.send_flags = 0;
1663 DECR_CQCOUNT(&r_xprt->rx_ep);
1665 rc = ib_bind_mw(ia->ri_id->qp, seg->mr_chunk.rl_mw->r.mw, &param);
1666 rpcrdma_unmap_one(ia, seg);
1667 if (rc)
1668 dprintk("RPC: %s: failed ib_(un)bind_mw,"
1669 " status %i\n", __func__, rc);
1670 else
1671 *r = NULL; /* will upcall on completion */
1672 return rc;
1675 static int
1676 rpcrdma_register_default_external(struct rpcrdma_mr_seg *seg,
1677 int *nsegs, int writing, struct rpcrdma_ia *ia)
1679 int mem_priv = (writing ? IB_ACCESS_REMOTE_WRITE :
1680 IB_ACCESS_REMOTE_READ);
1681 struct rpcrdma_mr_seg *seg1 = seg;
1682 struct ib_phys_buf ipb[RPCRDMA_MAX_DATA_SEGS];
1683 int len, i, rc = 0;
1685 if (*nsegs > RPCRDMA_MAX_DATA_SEGS)
1686 *nsegs = RPCRDMA_MAX_DATA_SEGS;
1687 for (len = 0, i = 0; i < *nsegs;) {
1688 rpcrdma_map_one(ia, seg, writing);
1689 ipb[i].addr = seg->mr_dma;
1690 ipb[i].size = seg->mr_len;
1691 len += seg->mr_len;
1692 ++seg;
1693 ++i;
1694 /* Check for holes */
1695 if ((i < *nsegs && offset_in_page(seg->mr_offset)) ||
1696 offset_in_page((seg-1)->mr_offset+(seg-1)->mr_len))
1697 break;
1699 seg1->mr_base = seg1->mr_dma;
1700 seg1->mr_chunk.rl_mr = ib_reg_phys_mr(ia->ri_pd,
1701 ipb, i, mem_priv, &seg1->mr_base);
1702 if (IS_ERR(seg1->mr_chunk.rl_mr)) {
1703 rc = PTR_ERR(seg1->mr_chunk.rl_mr);
1704 dprintk("RPC: %s: failed ib_reg_phys_mr "
1705 "%u@0x%llx (%d)... status %i\n",
1706 __func__, len,
1707 (unsigned long long)seg1->mr_dma, i, rc);
1708 while (i--)
1709 rpcrdma_unmap_one(ia, --seg);
1710 } else {
1711 seg1->mr_rkey = seg1->mr_chunk.rl_mr->rkey;
1712 seg1->mr_nsegs = i;
1713 seg1->mr_len = len;
1715 *nsegs = i;
1716 return rc;
1719 static int
1720 rpcrdma_deregister_default_external(struct rpcrdma_mr_seg *seg,
1721 struct rpcrdma_ia *ia)
1723 struct rpcrdma_mr_seg *seg1 = seg;
1724 int rc;
1726 rc = ib_dereg_mr(seg1->mr_chunk.rl_mr);
1727 seg1->mr_chunk.rl_mr = NULL;
1728 while (seg1->mr_nsegs--)
1729 rpcrdma_unmap_one(ia, seg++);
1730 if (rc)
1731 dprintk("RPC: %s: failed ib_dereg_mr,"
1732 " status %i\n", __func__, rc);
1733 return rc;
1737 rpcrdma_register_external(struct rpcrdma_mr_seg *seg,
1738 int nsegs, int writing, struct rpcrdma_xprt *r_xprt)
1740 struct rpcrdma_ia *ia = &r_xprt->rx_ia;
1741 int rc = 0;
1743 switch (ia->ri_memreg_strategy) {
1745 #if RPCRDMA_PERSISTENT_REGISTRATION
1746 case RPCRDMA_ALLPHYSICAL:
1747 rpcrdma_map_one(ia, seg, writing);
1748 seg->mr_rkey = ia->ri_bind_mem->rkey;
1749 seg->mr_base = seg->mr_dma;
1750 seg->mr_nsegs = 1;
1751 nsegs = 1;
1752 break;
1753 #endif
1755 /* Registration using frmr registration */
1756 case RPCRDMA_FRMR:
1757 rc = rpcrdma_register_frmr_external(seg, &nsegs, writing, ia, r_xprt);
1758 break;
1760 /* Registration using fmr memory registration */
1761 case RPCRDMA_MTHCAFMR:
1762 rc = rpcrdma_register_fmr_external(seg, &nsegs, writing, ia);
1763 break;
1765 /* Registration using memory windows */
1766 case RPCRDMA_MEMWINDOWS_ASYNC:
1767 case RPCRDMA_MEMWINDOWS:
1768 rc = rpcrdma_register_memwin_external(seg, &nsegs, writing, ia, r_xprt);
1769 break;
1771 /* Default registration each time */
1772 default:
1773 rc = rpcrdma_register_default_external(seg, &nsegs, writing, ia);
1774 break;
1776 if (rc)
1777 return -1;
1779 return nsegs;
1783 rpcrdma_deregister_external(struct rpcrdma_mr_seg *seg,
1784 struct rpcrdma_xprt *r_xprt, void *r)
1786 struct rpcrdma_ia *ia = &r_xprt->rx_ia;
1787 int nsegs = seg->mr_nsegs, rc;
1789 switch (ia->ri_memreg_strategy) {
1791 #if RPCRDMA_PERSISTENT_REGISTRATION
1792 case RPCRDMA_ALLPHYSICAL:
1793 BUG_ON(nsegs != 1);
1794 rpcrdma_unmap_one(ia, seg);
1795 rc = 0;
1796 break;
1797 #endif
1799 case RPCRDMA_FRMR:
1800 rc = rpcrdma_deregister_frmr_external(seg, ia, r_xprt);
1801 break;
1803 case RPCRDMA_MTHCAFMR:
1804 rc = rpcrdma_deregister_fmr_external(seg, ia);
1805 break;
1807 case RPCRDMA_MEMWINDOWS_ASYNC:
1808 case RPCRDMA_MEMWINDOWS:
1809 rc = rpcrdma_deregister_memwin_external(seg, ia, r_xprt, &r);
1810 break;
1812 default:
1813 rc = rpcrdma_deregister_default_external(seg, ia);
1814 break;
1816 if (r) {
1817 struct rpcrdma_rep *rep = r;
1818 void (*func)(struct rpcrdma_rep *) = rep->rr_func;
1819 rep->rr_func = NULL;
1820 func(rep); /* dereg done, callback now */
1822 return nsegs;
1826 * Prepost any receive buffer, then post send.
1828 * Receive buffer is donated to hardware, reclaimed upon recv completion.
1831 rpcrdma_ep_post(struct rpcrdma_ia *ia,
1832 struct rpcrdma_ep *ep,
1833 struct rpcrdma_req *req)
1835 struct ib_send_wr send_wr, *send_wr_fail;
1836 struct rpcrdma_rep *rep = req->rl_reply;
1837 int rc;
1839 if (rep) {
1840 rc = rpcrdma_ep_post_recv(ia, ep, rep);
1841 if (rc)
1842 goto out;
1843 req->rl_reply = NULL;
1846 send_wr.next = NULL;
1847 send_wr.wr_id = 0ULL; /* no send cookie */
1848 send_wr.sg_list = req->rl_send_iov;
1849 send_wr.num_sge = req->rl_niovs;
1850 send_wr.opcode = IB_WR_SEND;
1851 if (send_wr.num_sge == 4) /* no need to sync any pad (constant) */
1852 ib_dma_sync_single_for_device(ia->ri_id->device,
1853 req->rl_send_iov[3].addr, req->rl_send_iov[3].length,
1854 DMA_TO_DEVICE);
1855 ib_dma_sync_single_for_device(ia->ri_id->device,
1856 req->rl_send_iov[1].addr, req->rl_send_iov[1].length,
1857 DMA_TO_DEVICE);
1858 ib_dma_sync_single_for_device(ia->ri_id->device,
1859 req->rl_send_iov[0].addr, req->rl_send_iov[0].length,
1860 DMA_TO_DEVICE);
1862 if (DECR_CQCOUNT(ep) > 0)
1863 send_wr.send_flags = 0;
1864 else { /* Provider must take a send completion every now and then */
1865 INIT_CQCOUNT(ep);
1866 send_wr.send_flags = IB_SEND_SIGNALED;
1869 rc = ib_post_send(ia->ri_id->qp, &send_wr, &send_wr_fail);
1870 if (rc)
1871 dprintk("RPC: %s: ib_post_send returned %i\n", __func__,
1872 rc);
1873 out:
1874 return rc;
1878 * (Re)post a receive buffer.
1881 rpcrdma_ep_post_recv(struct rpcrdma_ia *ia,
1882 struct rpcrdma_ep *ep,
1883 struct rpcrdma_rep *rep)
1885 struct ib_recv_wr recv_wr, *recv_wr_fail;
1886 int rc;
1888 recv_wr.next = NULL;
1889 recv_wr.wr_id = (u64) (unsigned long) rep;
1890 recv_wr.sg_list = &rep->rr_iov;
1891 recv_wr.num_sge = 1;
1893 ib_dma_sync_single_for_cpu(ia->ri_id->device,
1894 rep->rr_iov.addr, rep->rr_iov.length, DMA_BIDIRECTIONAL);
1896 DECR_CQCOUNT(ep);
1897 rc = ib_post_recv(ia->ri_id->qp, &recv_wr, &recv_wr_fail);
1899 if (rc)
1900 dprintk("RPC: %s: ib_post_recv returned %i\n", __func__,
1901 rc);
1902 return rc;