2 * Copyright (c) 2014 Open Grid Computing, Inc. All rights reserved.
3 * Copyright (c) 2005-2006 Network Appliance, Inc. All rights reserved.
5 * This software is available to you under a choice of one of two
6 * licenses. You may choose to be licensed under the terms of the GNU
7 * General Public License (GPL) Version 2, available from the file
8 * COPYING in the main directory of this source tree, or the BSD-type
11 * Redistribution and use in source and binary forms, with or without
12 * modification, are permitted provided that the following conditions
15 * Redistributions of source code must retain the above copyright
16 * notice, this list of conditions and the following disclaimer.
18 * Redistributions in binary form must reproduce the above
19 * copyright notice, this list of conditions and the following
20 * disclaimer in the documentation and/or other materials provided
21 * with the distribution.
23 * Neither the name of the Network Appliance, Inc. nor the names of
24 * its contributors may be used to endorse or promote products
25 * derived from this software without specific prior written
28 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
29 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
30 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
31 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
32 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
33 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
34 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
35 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
36 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
37 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
38 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
40 * Author: Tom Tucker <tom@opengridcomputing.com>
43 #include <linux/sunrpc/debug.h>
44 #include <linux/sunrpc/rpc_rdma.h>
45 #include <linux/spinlock.h>
46 #include <asm/unaligned.h>
47 #include <rdma/ib_verbs.h>
48 #include <rdma/rdma_cm.h>
49 #include <linux/sunrpc/svc_rdma.h>
51 #define RPCDBG_FACILITY RPCDBG_SVCXPRT
54 * Replace the pages in the rq_argpages array with the pages from the SGE in
55 * the RDMA_RECV completion. The SGL should contain full pages up until the
58 static void rdma_build_arg_xdr(struct svc_rqst
*rqstp
,
59 struct svc_rdma_op_ctxt
*ctxt
,
62 struct rpcrdma_msg
*rmsgp
;
67 /* Swap the page in the SGE with the page in argpages */
68 page
= ctxt
->pages
[0];
69 put_page(rqstp
->rq_pages
[0]);
70 rqstp
->rq_pages
[0] = page
;
72 /* Set up the XDR head */
73 rqstp
->rq_arg
.head
[0].iov_base
= page_address(page
);
74 rqstp
->rq_arg
.head
[0].iov_len
=
75 min_t(size_t, byte_count
, ctxt
->sge
[0].length
);
76 rqstp
->rq_arg
.len
= byte_count
;
77 rqstp
->rq_arg
.buflen
= byte_count
;
79 /* Compute bytes past head in the SGL */
80 bc
= byte_count
- rqstp
->rq_arg
.head
[0].iov_len
;
82 /* If data remains, store it in the pagelist */
83 rqstp
->rq_arg
.page_len
= bc
;
84 rqstp
->rq_arg
.page_base
= 0;
86 /* RDMA_NOMSG: RDMA READ data should land just after RDMA RECV data */
87 rmsgp
= (struct rpcrdma_msg
*)rqstp
->rq_arg
.head
[0].iov_base
;
88 if (rmsgp
->rm_type
== rdma_nomsg
)
89 rqstp
->rq_arg
.pages
= &rqstp
->rq_pages
[0];
91 rqstp
->rq_arg
.pages
= &rqstp
->rq_pages
[1];
94 while (bc
&& sge_no
< ctxt
->count
) {
95 page
= ctxt
->pages
[sge_no
];
96 put_page(rqstp
->rq_pages
[sge_no
]);
97 rqstp
->rq_pages
[sge_no
] = page
;
98 bc
-= min_t(u32
, bc
, ctxt
->sge
[sge_no
].length
);
99 rqstp
->rq_arg
.buflen
+= ctxt
->sge
[sge_no
].length
;
102 rqstp
->rq_respages
= &rqstp
->rq_pages
[sge_no
];
103 rqstp
->rq_next_page
= rqstp
->rq_respages
+ 1;
105 /* If not all pages were used from the SGL, free the remaining ones */
107 while (sge_no
< ctxt
->count
) {
108 page
= ctxt
->pages
[sge_no
++];
114 rqstp
->rq_arg
.tail
[0].iov_base
= NULL
;
115 rqstp
->rq_arg
.tail
[0].iov_len
= 0;
118 /* Issue an RDMA_READ using the local lkey to map the data sink */
119 int rdma_read_chunk_lcl(struct svcxprt_rdma
*xprt
,
120 struct svc_rqst
*rqstp
,
121 struct svc_rdma_op_ctxt
*head
,
129 struct ib_rdma_wr read_wr
;
130 int pages_needed
= PAGE_ALIGN(*page_offset
+ rs_length
) >> PAGE_SHIFT
;
131 struct svc_rdma_op_ctxt
*ctxt
= svc_rdma_get_context(xprt
);
133 u32 pg_off
= *page_offset
;
134 u32 pg_no
= *page_no
;
136 ctxt
->direction
= DMA_FROM_DEVICE
;
137 ctxt
->read_hdr
= head
;
138 pages_needed
= min_t(int, pages_needed
, xprt
->sc_max_sge_rd
);
139 read
= min_t(int, (pages_needed
<< PAGE_SHIFT
) - *page_offset
,
142 for (pno
= 0; pno
< pages_needed
; pno
++) {
143 int len
= min_t(int, rs_length
, PAGE_SIZE
- pg_off
);
145 head
->arg
.pages
[pg_no
] = rqstp
->rq_arg
.pages
[pg_no
];
146 head
->arg
.page_len
+= len
;
148 head
->arg
.len
+= len
;
151 rqstp
->rq_respages
= &rqstp
->rq_arg
.pages
[pg_no
+1];
152 rqstp
->rq_next_page
= rqstp
->rq_respages
+ 1;
153 ctxt
->sge
[pno
].addr
=
154 ib_dma_map_page(xprt
->sc_cm_id
->device
,
155 head
->arg
.pages
[pg_no
], pg_off
,
158 ret
= ib_dma_mapping_error(xprt
->sc_cm_id
->device
,
159 ctxt
->sge
[pno
].addr
);
162 svc_rdma_count_mappings(xprt
, ctxt
);
164 ctxt
->sge
[pno
].lkey
= xprt
->sc_pd
->local_dma_lkey
;
165 ctxt
->sge
[pno
].length
= len
;
168 /* adjust offset and wrap to next page if needed */
170 if (pg_off
== PAGE_SIZE
) {
177 if (last
&& rs_length
== 0)
178 set_bit(RDMACTXT_F_LAST_CTXT
, &ctxt
->flags
);
180 clear_bit(RDMACTXT_F_LAST_CTXT
, &ctxt
->flags
);
182 memset(&read_wr
, 0, sizeof(read_wr
));
183 ctxt
->cqe
.done
= svc_rdma_wc_read
;
184 read_wr
.wr
.wr_cqe
= &ctxt
->cqe
;
185 read_wr
.wr
.opcode
= IB_WR_RDMA_READ
;
186 read_wr
.wr
.send_flags
= IB_SEND_SIGNALED
;
187 read_wr
.rkey
= rs_handle
;
188 read_wr
.remote_addr
= rs_offset
;
189 read_wr
.wr
.sg_list
= ctxt
->sge
;
190 read_wr
.wr
.num_sge
= pages_needed
;
192 ret
= svc_rdma_send(xprt
, &read_wr
.wr
);
194 pr_err("svcrdma: Error %d posting RDMA_READ\n", ret
);
195 set_bit(XPT_CLOSE
, &xprt
->sc_xprt
.xpt_flags
);
199 /* return current location in page array */
201 *page_offset
= pg_off
;
203 atomic_inc(&rdma_stat_read
);
206 svc_rdma_unmap_dma(ctxt
);
207 svc_rdma_put_context(ctxt
, 0);
211 /* Issue an RDMA_READ using an FRMR to map the data sink */
212 int rdma_read_chunk_frmr(struct svcxprt_rdma
*xprt
,
213 struct svc_rqst
*rqstp
,
214 struct svc_rdma_op_ctxt
*head
,
222 struct ib_rdma_wr read_wr
;
223 struct ib_send_wr inv_wr
;
224 struct ib_reg_wr reg_wr
;
226 int nents
= PAGE_ALIGN(*page_offset
+ rs_length
) >> PAGE_SHIFT
;
227 struct svc_rdma_op_ctxt
*ctxt
= svc_rdma_get_context(xprt
);
228 struct svc_rdma_fastreg_mr
*frmr
= svc_rdma_get_frmr(xprt
);
229 int ret
, read
, pno
, dma_nents
, n
;
230 u32 pg_off
= *page_offset
;
231 u32 pg_no
= *page_no
;
236 ctxt
->direction
= DMA_FROM_DEVICE
;
238 nents
= min_t(unsigned int, nents
, xprt
->sc_frmr_pg_list_len
);
239 read
= min_t(int, (nents
<< PAGE_SHIFT
) - *page_offset
, rs_length
);
241 frmr
->direction
= DMA_FROM_DEVICE
;
242 frmr
->access_flags
= (IB_ACCESS_LOCAL_WRITE
|IB_ACCESS_REMOTE_WRITE
);
243 frmr
->sg_nents
= nents
;
245 for (pno
= 0; pno
< nents
; pno
++) {
246 int len
= min_t(int, rs_length
, PAGE_SIZE
- pg_off
);
248 head
->arg
.pages
[pg_no
] = rqstp
->rq_arg
.pages
[pg_no
];
249 head
->arg
.page_len
+= len
;
250 head
->arg
.len
+= len
;
254 sg_set_page(&frmr
->sg
[pno
], rqstp
->rq_arg
.pages
[pg_no
],
257 rqstp
->rq_respages
= &rqstp
->rq_arg
.pages
[pg_no
+1];
258 rqstp
->rq_next_page
= rqstp
->rq_respages
+ 1;
260 /* adjust offset and wrap to next page if needed */
262 if (pg_off
== PAGE_SIZE
) {
269 if (last
&& rs_length
== 0)
270 set_bit(RDMACTXT_F_LAST_CTXT
, &ctxt
->flags
);
272 clear_bit(RDMACTXT_F_LAST_CTXT
, &ctxt
->flags
);
274 dma_nents
= ib_dma_map_sg(xprt
->sc_cm_id
->device
,
275 frmr
->sg
, frmr
->sg_nents
,
278 pr_err("svcrdma: failed to dma map sg %p\n",
283 n
= ib_map_mr_sg(frmr
->mr
, frmr
->sg
, frmr
->sg_nents
, NULL
, PAGE_SIZE
);
284 if (unlikely(n
!= frmr
->sg_nents
)) {
285 pr_err("svcrdma: failed to map mr %p (%d/%d elements)\n",
286 frmr
->mr
, n
, frmr
->sg_nents
);
287 return n
< 0 ? n
: -EINVAL
;
291 key
= (u8
)(frmr
->mr
->lkey
& 0x000000FF);
292 ib_update_fast_reg_key(frmr
->mr
, ++key
);
294 ctxt
->sge
[0].addr
= frmr
->mr
->iova
;
295 ctxt
->sge
[0].lkey
= frmr
->mr
->lkey
;
296 ctxt
->sge
[0].length
= frmr
->mr
->length
;
298 ctxt
->read_hdr
= head
;
301 ctxt
->reg_cqe
.done
= svc_rdma_wc_reg
;
302 reg_wr
.wr
.wr_cqe
= &ctxt
->reg_cqe
;
303 reg_wr
.wr
.opcode
= IB_WR_REG_MR
;
304 reg_wr
.wr
.send_flags
= IB_SEND_SIGNALED
;
305 reg_wr
.wr
.num_sge
= 0;
306 reg_wr
.mr
= frmr
->mr
;
307 reg_wr
.key
= frmr
->mr
->lkey
;
308 reg_wr
.access
= frmr
->access_flags
;
309 reg_wr
.wr
.next
= &read_wr
.wr
;
311 /* Prepare RDMA_READ */
312 memset(&read_wr
, 0, sizeof(read_wr
));
313 ctxt
->cqe
.done
= svc_rdma_wc_read
;
314 read_wr
.wr
.wr_cqe
= &ctxt
->cqe
;
315 read_wr
.wr
.send_flags
= IB_SEND_SIGNALED
;
316 read_wr
.rkey
= rs_handle
;
317 read_wr
.remote_addr
= rs_offset
;
318 read_wr
.wr
.sg_list
= ctxt
->sge
;
319 read_wr
.wr
.num_sge
= 1;
320 if (xprt
->sc_dev_caps
& SVCRDMA_DEVCAP_READ_W_INV
) {
321 read_wr
.wr
.opcode
= IB_WR_RDMA_READ_WITH_INV
;
322 read_wr
.wr
.ex
.invalidate_rkey
= ctxt
->frmr
->mr
->lkey
;
324 read_wr
.wr
.opcode
= IB_WR_RDMA_READ
;
325 read_wr
.wr
.next
= &inv_wr
;
326 /* Prepare invalidate */
327 memset(&inv_wr
, 0, sizeof(inv_wr
));
328 ctxt
->inv_cqe
.done
= svc_rdma_wc_inv
;
329 inv_wr
.wr_cqe
= &ctxt
->inv_cqe
;
330 inv_wr
.opcode
= IB_WR_LOCAL_INV
;
331 inv_wr
.send_flags
= IB_SEND_SIGNALED
| IB_SEND_FENCE
;
332 inv_wr
.ex
.invalidate_rkey
= frmr
->mr
->lkey
;
336 ret
= svc_rdma_send(xprt
, ®_wr
.wr
);
338 pr_err("svcrdma: Error %d posting RDMA_READ\n", ret
);
339 set_bit(XPT_CLOSE
, &xprt
->sc_xprt
.xpt_flags
);
343 /* return current location in page array */
345 *page_offset
= pg_off
;
347 atomic_inc(&rdma_stat_read
);
350 svc_rdma_put_context(ctxt
, 0);
351 svc_rdma_put_frmr(xprt
, frmr
);
356 rdma_rcl_chunk_count(struct rpcrdma_read_chunk
*ch
)
360 for (count
= 0; ch
->rc_discrim
!= xdr_zero
; ch
++)
365 /* If there was additional inline content, append it to the end of arg.pages.
366 * Tail copy has to be done after the reader function has determined how many
367 * pages are needed for RDMA READ.
370 rdma_copy_tail(struct svc_rqst
*rqstp
, struct svc_rdma_op_ctxt
*head
,
371 u32 position
, u32 byte_count
, u32 page_offset
, int page_no
)
375 srcp
= head
->arg
.head
[0].iov_base
+ position
;
376 byte_count
= head
->arg
.head
[0].iov_len
- position
;
377 if (byte_count
> PAGE_SIZE
) {
378 dprintk("svcrdma: large tail unsupported\n");
382 /* Fit as much of the tail on the current page as possible */
383 if (page_offset
!= PAGE_SIZE
) {
384 destp
= page_address(rqstp
->rq_arg
.pages
[page_no
]);
385 destp
+= page_offset
;
386 while (byte_count
--) {
389 if (page_offset
== PAGE_SIZE
&& byte_count
)
396 /* Fit the rest on the next page */
398 destp
= page_address(rqstp
->rq_arg
.pages
[page_no
]);
402 rqstp
->rq_respages
= &rqstp
->rq_arg
.pages
[page_no
+1];
403 rqstp
->rq_next_page
= rqstp
->rq_respages
+ 1;
406 byte_count
= head
->arg
.head
[0].iov_len
- position
;
407 head
->arg
.page_len
+= byte_count
;
408 head
->arg
.len
+= byte_count
;
409 head
->arg
.buflen
+= byte_count
;
413 /* Returns the address of the first read chunk or <nul> if no read chunk
416 static struct rpcrdma_read_chunk
*
417 svc_rdma_get_read_chunk(struct rpcrdma_msg
*rmsgp
)
419 struct rpcrdma_read_chunk
*ch
=
420 (struct rpcrdma_read_chunk
*)&rmsgp
->rm_body
.rm_chunks
[0];
422 if (ch
->rc_discrim
== xdr_zero
)
427 static int rdma_read_chunks(struct svcxprt_rdma
*xprt
,
428 struct rpcrdma_msg
*rmsgp
,
429 struct svc_rqst
*rqstp
,
430 struct svc_rdma_op_ctxt
*head
)
433 struct rpcrdma_read_chunk
*ch
;
434 u32 handle
, page_offset
, byte_count
;
439 /* If no read list is present, return 0 */
440 ch
= svc_rdma_get_read_chunk(rmsgp
);
444 if (rdma_rcl_chunk_count(ch
) > RPCSVC_MAXPAGES
)
447 /* The request is completed when the RDMA_READs complete. The
448 * head context keeps all the pages that comprise the
451 head
->arg
.head
[0] = rqstp
->rq_arg
.head
[0];
452 head
->arg
.tail
[0] = rqstp
->rq_arg
.tail
[0];
453 head
->hdr_count
= head
->count
;
454 head
->arg
.page_base
= 0;
455 head
->arg
.page_len
= 0;
456 head
->arg
.len
= rqstp
->rq_arg
.len
;
457 head
->arg
.buflen
= rqstp
->rq_arg
.buflen
;
459 /* RDMA_NOMSG: RDMA READ data should land just after RDMA RECV data */
460 position
= be32_to_cpu(ch
->rc_position
);
462 head
->arg
.pages
= &head
->pages
[0];
463 page_offset
= head
->byte_len
;
465 head
->arg
.pages
= &head
->pages
[head
->count
];
471 for (; ch
->rc_discrim
!= xdr_zero
; ch
++) {
472 if (be32_to_cpu(ch
->rc_position
) != position
)
475 handle
= be32_to_cpu(ch
->rc_target
.rs_handle
),
476 byte_count
= be32_to_cpu(ch
->rc_target
.rs_length
);
477 xdr_decode_hyper((__be32
*)&ch
->rc_target
.rs_offset
,
480 while (byte_count
> 0) {
481 last
= (ch
+ 1)->rc_discrim
== xdr_zero
;
482 ret
= xprt
->sc_reader(xprt
, rqstp
, head
,
483 &page_no
, &page_offset
,
490 head
->arg
.buflen
+= ret
;
494 /* Read list may need XDR round-up (see RFC 5666, s. 3.7) */
495 if (page_offset
& 3) {
496 u32 pad
= 4 - (page_offset
& 3);
498 head
->arg
.tail
[0].iov_len
+= pad
;
499 head
->arg
.len
+= pad
;
500 head
->arg
.buflen
+= pad
;
505 if (position
&& position
< head
->arg
.head
[0].iov_len
)
506 ret
= rdma_copy_tail(rqstp
, head
, position
,
507 byte_count
, page_offset
, page_no
);
508 head
->arg
.head
[0].iov_len
= position
;
509 head
->position
= position
;
512 /* Detach arg pages. svc_recv will replenish them */
514 &rqstp
->rq_pages
[page_no
] < rqstp
->rq_respages
; page_no
++)
515 rqstp
->rq_pages
[page_no
] = NULL
;
520 static void rdma_read_complete(struct svc_rqst
*rqstp
,
521 struct svc_rdma_op_ctxt
*head
)
526 for (page_no
= 0; page_no
< head
->count
; page_no
++) {
527 put_page(rqstp
->rq_pages
[page_no
]);
528 rqstp
->rq_pages
[page_no
] = head
->pages
[page_no
];
531 /* Adjustments made for RDMA_NOMSG type requests */
532 if (head
->position
== 0) {
533 if (head
->arg
.len
<= head
->sge
[0].length
) {
534 head
->arg
.head
[0].iov_len
= head
->arg
.len
-
536 head
->arg
.page_len
= 0;
538 head
->arg
.head
[0].iov_len
= head
->sge
[0].length
-
540 head
->arg
.page_len
= head
->arg
.len
-
545 /* Point rq_arg.pages past header */
546 rqstp
->rq_arg
.pages
= &rqstp
->rq_pages
[head
->hdr_count
];
547 rqstp
->rq_arg
.page_len
= head
->arg
.page_len
;
548 rqstp
->rq_arg
.page_base
= head
->arg
.page_base
;
550 /* rq_respages starts after the last arg page */
551 rqstp
->rq_respages
= &rqstp
->rq_pages
[page_no
];
552 rqstp
->rq_next_page
= rqstp
->rq_respages
+ 1;
554 /* Rebuild rq_arg head and tail. */
555 rqstp
->rq_arg
.head
[0] = head
->arg
.head
[0];
556 rqstp
->rq_arg
.tail
[0] = head
->arg
.tail
[0];
557 rqstp
->rq_arg
.len
= head
->arg
.len
;
558 rqstp
->rq_arg
.buflen
= head
->arg
.buflen
;
561 static void svc_rdma_send_error(struct svcxprt_rdma
*xprt
,
562 __be32
*rdma_argp
, int status
)
564 struct svc_rdma_op_ctxt
*ctxt
;
565 __be32
*p
, *err_msgp
;
570 ret
= svc_rdma_repost_recv(xprt
, GFP_KERNEL
);
574 page
= alloc_page(GFP_KERNEL
);
577 err_msgp
= page_address(page
);
581 *p
++ = *(rdma_argp
+ 1);
582 *p
++ = xprt
->sc_fc_credits
;
584 if (status
== -EPROTONOSUPPORT
) {
586 *p
++ = rpcrdma_version
;
587 *p
++ = rpcrdma_version
;
591 length
= (unsigned long)p
- (unsigned long)err_msgp
;
593 /* Map transport header; no RPC message payload */
594 ctxt
= svc_rdma_get_context(xprt
);
595 ret
= svc_rdma_map_reply_hdr(xprt
, ctxt
, err_msgp
, length
);
597 dprintk("svcrdma: Error %d mapping send for protocol error\n",
602 ret
= svc_rdma_post_send_wr(xprt
, ctxt
, 1, 0);
604 dprintk("svcrdma: Error %d posting send for protocol error\n",
606 svc_rdma_unmap_dma(ctxt
);
607 svc_rdma_put_context(ctxt
, 1);
611 /* By convention, backchannel calls arrive via rdma_msg type
612 * messages, and never populate the chunk lists. This makes
613 * the RPC/RDMA header small and fixed in size, so it is
614 * straightforward to check the RPC header's direction field.
616 static bool svc_rdma_is_backchannel_reply(struct svc_xprt
*xprt
,
621 if (!xprt
->xpt_bc_xprt
)
625 if (*p
++ != rdma_msg
)
628 if (*p
++ != xdr_zero
)
630 if (*p
++ != xdr_zero
)
632 if (*p
++ != xdr_zero
)
636 if (*p
++ != *rdma_resp
)
639 if (*p
== cpu_to_be32(RPC_CALL
))
646 * Set up the rqstp thread context to point to the RQ buffer. If
647 * necessary, pull additional data from the client with an RDMA_READ
650 int svc_rdma_recvfrom(struct svc_rqst
*rqstp
)
652 struct svc_xprt
*xprt
= rqstp
->rq_xprt
;
653 struct svcxprt_rdma
*rdma_xprt
=
654 container_of(xprt
, struct svcxprt_rdma
, sc_xprt
);
655 struct svc_rdma_op_ctxt
*ctxt
= NULL
;
656 struct rpcrdma_msg
*rmsgp
;
659 dprintk("svcrdma: rqstp=%p\n", rqstp
);
661 spin_lock(&rdma_xprt
->sc_rq_dto_lock
);
662 if (!list_empty(&rdma_xprt
->sc_read_complete_q
)) {
663 ctxt
= list_first_entry(&rdma_xprt
->sc_read_complete_q
,
664 struct svc_rdma_op_ctxt
, list
);
665 list_del(&ctxt
->list
);
666 spin_unlock(&rdma_xprt
->sc_rq_dto_lock
);
667 rdma_read_complete(rqstp
, ctxt
);
669 } else if (!list_empty(&rdma_xprt
->sc_rq_dto_q
)) {
670 ctxt
= list_first_entry(&rdma_xprt
->sc_rq_dto_q
,
671 struct svc_rdma_op_ctxt
, list
);
672 list_del(&ctxt
->list
);
674 atomic_inc(&rdma_stat_rq_starve
);
675 clear_bit(XPT_DATA
, &xprt
->xpt_flags
);
678 spin_unlock(&rdma_xprt
->sc_rq_dto_lock
);
680 /* This is the EAGAIN path. The svc_recv routine will
681 * return -EAGAIN, the nfsd thread will go to call into
682 * svc_recv again and we shouldn't be on the active
685 if (test_bit(XPT_CLOSE
, &xprt
->xpt_flags
))
689 dprintk("svcrdma: processing ctxt=%p on xprt=%p, rqstp=%p\n",
690 ctxt
, rdma_xprt
, rqstp
);
691 atomic_inc(&rdma_stat_recv
);
693 /* Build up the XDR from the receive buffers. */
694 rdma_build_arg_xdr(rqstp
, ctxt
, ctxt
->byte_len
);
696 /* Decode the RDMA header. */
697 rmsgp
= (struct rpcrdma_msg
*)rqstp
->rq_arg
.head
[0].iov_base
;
698 ret
= svc_rdma_xdr_decode_req(&rqstp
->rq_arg
);
703 rqstp
->rq_xprt_hlen
= ret
;
705 if (svc_rdma_is_backchannel_reply(xprt
, &rmsgp
->rm_xid
)) {
706 ret
= svc_rdma_handle_bc_reply(xprt
->xpt_bc_xprt
,
709 svc_rdma_put_context(ctxt
, 0);
715 /* Read read-list data. */
716 ret
= rdma_read_chunks(rdma_xprt
, rmsgp
, rqstp
, ctxt
);
718 /* read-list posted, defer until data received from client. */
720 } else if (ret
< 0) {
721 /* Post of read-list failed, free context. */
722 svc_rdma_put_context(ctxt
, 1);
727 ret
= rqstp
->rq_arg
.head
[0].iov_len
728 + rqstp
->rq_arg
.page_len
729 + rqstp
->rq_arg
.tail
[0].iov_len
;
730 svc_rdma_put_context(ctxt
, 0);
732 dprintk("svcrdma: ret=%d, rq_arg.len=%u, "
733 "rq_arg.head[0].iov_base=%p, rq_arg.head[0].iov_len=%zd\n",
734 ret
, rqstp
->rq_arg
.len
,
735 rqstp
->rq_arg
.head
[0].iov_base
,
736 rqstp
->rq_arg
.head
[0].iov_len
);
737 rqstp
->rq_prot
= IPPROTO_MAX
;
738 svc_xprt_copy_addrs(rqstp
, xprt
);
742 svc_rdma_send_error(rdma_xprt
, &rmsgp
->rm_xid
, ret
);
743 svc_rdma_put_context(ctxt
, 0);
750 svc_rdma_put_context(ctxt
, 1);
752 return svc_rdma_repost_recv(rdma_xprt
, GFP_KERNEL
);