4 * The contents of this file are subject to the terms of the
5 * Common Development and Distribution License (the "License").
6 * You may not use this file except in compliance with the License.
8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9 * or http://www.opensolaris.org/os/licensing.
10 * See the License for the specific language governing permissions
11 * and limitations under the License.
13 * When distributing Covered Code, include this CDDL HEADER in each
14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15 * If applicable, add the following below this CDDL HEADER, with the
16 * fields enclosed by brackets "[]" replaced with your own identifying
17 * information: Portions Copyright [yyyy] [name of copyright owner]
22 * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
26 * Copyright (c) 2007, The Ohio State University. All rights reserved.
28 * Portions of this source code is developed by the team members of
29 * The Ohio State University's Network-Based Computing Laboratory (NBCL),
30 * headed by Professor Dhabaleswar K. (DK) Panda.
32 * Acknowledgements to contributions from developors:
33 * Ranjit Noronha: noronha@cse.ohio-state.edu
34 * Lei Chai : chail@cse.ohio-state.edu
35 * Weikuan Yu : yuw@cse.ohio-state.edu
40 * xdr_rdma.c, XDR implementation using RDMA to move large chunks
43 #include <sys/param.h>
44 #include <sys/types.h>
45 #include <sys/systm.h>
48 #include <sys/debug.h>
50 #include <rpc/types.h>
52 #include <sys/cmn_err.h>
53 #include <rpc/rpc_sztypes.h>
54 #include <rpc/rpc_rdma.h>
55 #include <sys/sysmacros.h>
58 * RCP header and xdr encoding overhead. The number was determined by
59 * tracing the msglen in svc_rdma_ksend for sec=sys,krb5,krb5i and krb5p.
60 * If the XDR_RDMA_BUF_OVERHEAD is not large enough the result is the trigger
61 * of the dtrace probe on the server "krpc-e-svcrdma-ksend-noreplycl" from
64 #define XDR_RDMA_BUF_OVERHEAD 300
66 static bool_t
xdrrdma_getint32(XDR
*, int32_t *);
67 static bool_t
xdrrdma_putint32(XDR
*, int32_t *);
68 static bool_t
xdrrdma_getbytes(XDR
*, caddr_t
, int);
69 static bool_t
xdrrdma_putbytes(XDR
*, caddr_t
, int);
70 uint_t
xdrrdma_getpos(XDR
*);
71 bool_t
xdrrdma_setpos(XDR
*, uint_t
);
72 static rpc_inline_t
*xdrrdma_inline(XDR
*, int);
73 void xdrrdma_destroy(XDR
*);
74 static bool_t
xdrrdma_control(XDR
*, int, void *);
75 static bool_t
xdrrdma_read_a_chunk(XDR
*, CONN
**);
76 static void xdrrdma_free_xdr_chunks(CONN
*, struct clist
*);
78 struct xdr_ops xdrrdmablk_ops
= {
90 struct xdr_ops xdrrdma_ops
= {
103 * A chunk list entry identifies a chunk of opaque data to be moved
104 * separately from the rest of the RPC message. xp_min_chunk = 0, is a
105 * special case for ENCODING, which means do not chunk the incoming stream of
108 * A read chunk can contain part of the RPC message in addition to the
109 * inline message. In such a case, (xp_offp - x_base) will not provide
110 * the correct xdr offset of the entire message. xp_off is used in such
111 * a case to denote the offset or current position in the overall message
112 * covering both the inline and the chunk. This is used only in the case
113 * of decoding and useful to compare read chunk 'c_xdroff' offsets.
115 * An example for a read chunk containing an XDR message:
116 * An NFSv4 compound as following:
122 * Solaris Encoding is:
123 * -------------------
125 * <Inline message>: [PUTFH WRITE4args GETATTR]
128 * [RDMA_READ chunks]: [write data]
134 * <Inline message>: [PUTFH WRITE4args]
137 * [RDMA_READ chunks]: [Write data] [Write data2] [Getattr chunk]
138 * chunk1 chunk2 chunk3
140 * where the READ chunks are as:
144 * - chunk2 - 13 bytes(4109 - 4k)
145 * getattr op - chunk3 - 19 bytes
146 * (getattr op starts at byte 4 after 3 bytes of roundup)
153 uint_t xp_flags
; /* Controls setting for rdma xdr */
154 int xp_buf_size
; /* size of xdr buffer */
155 int xp_off
; /* overall offset */
156 struct clist
*xp_rcl
; /* head of chunk list */
157 struct clist
**xp_rcl_next
; /* location to place/find next chunk */
158 struct clist
*xp_rcl_xdr
; /* copy of rcl containing RPC message */
159 struct clist
*xp_wcl
; /* head of write chunk list */
160 CONN
*xp_conn
; /* connection for chunk data xfer */
161 uint_t xp_reply_chunk_len
;
162 /* used to track length for security modes: integrity/privacy */
163 uint_t xp_reply_chunk_len_alt
;
166 extern kmem_cache_t
*clist_cache
;
169 xdrrdma_getrdmablk(XDR
*xdrs
, struct clist
**rlist
, uint_t
*sizep
,
170 CONN
**conn
, const uint_t maxsize
)
172 xrdma_private_t
*xdrp
= (xrdma_private_t
*)(xdrs
->x_private
);
173 struct clist
*cle
= *(xdrp
->xp_rcl_next
);
174 struct clist
*rdclist
= NULL
, *prev
= NULL
;
175 bool_t retval
= TRUE
;
176 uint32_t cur_offset
= 0;
177 uint32_t total_segments
= 0;
178 uint32_t actual_segments
= 0;
182 ASSERT(xdrs
->x_op
!= XDR_FREE
);
185 * first deal with the length since xdr bytes are counted
187 if (!xdr_u_int(xdrs
, sizep
)) {
188 DTRACE_PROBE(xdr__e__getrdmablk_sizep_fail
);
192 if (total_len
> maxsize
) {
193 DTRACE_PROBE2(xdr__e__getrdmablk_bad_size
,
194 int, total_len
, int, maxsize
);
197 (*conn
) = xdrp
->xp_conn
;
200 * if no data we are done
210 cle
= *(xdrp
->xp_rcl_next
);
213 * If there was a chunk at the current offset, then setup a read
214 * chunk list which records the destination address and length
215 * and will RDMA READ the data in later.
220 if (cle
->c_xdroff
!= (xdrp
->xp_offp
- xdrs
->x_base
))
224 * Setup the chunk list with appropriate
225 * address (offset) and length
227 for (actual_segments
= 0;
228 actual_segments
< total_segments
; actual_segments
++) {
230 DTRACE_PROBE3(krpc__i__xdrrdma_getrdmablk
, uint32_t, cle
->c_len
,
231 uint32_t, total_len
, uint32_t, cle
->c_xdroff
);
237 * not the first time in the loop
239 if (actual_segments
> 0)
242 cle
->u
.c_daddr
= (uint64
) cur_offset
;
244 if (cle
->c_len
> total_len
) {
246 cle
->c_len
= total_len
;
249 xdrp
->xp_rcl_next
= &cle
->c_next
;
251 cur_offset
+= cle
->c_len
;
252 total_len
-= cle
->c_len
;
254 if ((total_segments
- actual_segments
- 1) == 0 &&
256 DTRACE_PROBE(krpc__e__xdrrdma_getblk_chunktooshort
);
260 if ((total_segments
- actual_segments
- 1) > 0 &&
262 DTRACE_PROBE2(krpc__e__xdrrdma_getblk_toobig
,
263 int, total_segments
, int, actual_segments
);
266 rdclist
= clist_alloc();
268 if ((*rlist
) == NULL
)
273 prev
->c_next
= rdclist
;
284 * Adjust the chunk length, if we read only a part of
290 (uint64
)(uintptr_t)cle
->w
.c_saddr
+ cle
->c_len
;
291 cle
->c_len
= alen
- cle
->c_len
;
298 * The procedure xdrrdma_create initializes a stream descriptor for a memory
302 xdrrdma_create(XDR
*xdrs
, caddr_t addr
, uint_t size
,
303 int min_chunk
, struct clist
*cl
, enum xdr_op op
, CONN
*conn
)
305 xrdma_private_t
*xdrp
;
309 xdrs
->x_ops
= &xdrrdma_ops
;
311 xdrs
->x_handy
= size
;
312 xdrs
->x_public
= NULL
;
314 xdrp
= (xrdma_private_t
*)kmem_zalloc(sizeof (xrdma_private_t
),
316 xdrs
->x_private
= (caddr_t
)xdrp
;
317 xdrp
->xp_offp
= addr
;
318 xdrp
->xp_min_chunk
= min_chunk
;
320 xdrp
->xp_buf_size
= size
;
322 xdrp
->xp_reply_chunk_len
= 0;
323 xdrp
->xp_reply_chunk_len_alt
= 0;
325 if (op
== XDR_ENCODE
&& cl
!= NULL
) {
326 /* Find last element in chunk list and set xp_rcl_next */
327 for (cle
= cl
; cle
->c_next
!= NULL
; cle
= cle
->c_next
)
330 xdrp
->xp_rcl_next
= &(cle
->c_next
);
332 xdrp
->xp_rcl_next
= &(xdrp
->xp_rcl
);
337 xdrp
->xp_conn
= conn
;
338 if (xdrp
->xp_min_chunk
!= 0)
339 xdrp
->xp_flags
|= XDR_RDMA_CHUNK
;
344 xdrrdma_destroy(XDR
* xdrs
)
346 xrdma_private_t
*xdrp
= (xrdma_private_t
*)(xdrs
->x_private
);
352 if (xdrp
->xp_flags
& XDR_RDMA_WLIST_REG
) {
353 (void) clist_deregister(xdrp
->xp_conn
, xdrp
->xp_wcl
);
354 rdma_buf_free(xdrp
->xp_conn
,
355 &xdrp
->xp_wcl
->rb_longbuf
);
357 clist_free(xdrp
->xp_wcl
);
361 if (xdrp
->xp_flags
& XDR_RDMA_RLIST_REG
) {
362 (void) clist_deregister(xdrp
->xp_conn
, xdrp
->xp_rcl
);
363 rdma_buf_free(xdrp
->xp_conn
,
364 &xdrp
->xp_rcl
->rb_longbuf
);
366 clist_free(xdrp
->xp_rcl
);
369 if (xdrp
->xp_rcl_xdr
)
370 xdrrdma_free_xdr_chunks(xdrp
->xp_conn
, xdrp
->xp_rcl_xdr
);
372 (void) kmem_free(xdrs
->x_private
, sizeof (xrdma_private_t
));
373 xdrs
->x_private
= NULL
;
377 xdrrdma_getint32(XDR
*xdrs
, int32_t *int32p
)
379 xrdma_private_t
*xdrp
= (xrdma_private_t
*)(xdrs
->x_private
);
382 if ((xdrs
->x_handy
-= (int)sizeof (int32_t)) < 0) {
384 * check if rest of the rpc message is in a chunk
386 if (!xdrrdma_read_a_chunk(xdrs
, &xdrp
->xp_conn
)) {
392 /* LINTED pointer alignment */
393 *int32p
= (int32_t)ntohl((uint32_t)(*((int32_t *)(xdrp
->xp_offp
))));
395 DTRACE_PROBE1(krpc__i__xdrrdma_getint32
, int32_t, *int32p
);
397 xdrp
->xp_offp
+= sizeof (int32_t);
400 xdrs
->x_handy
-= (int)sizeof (int32_t);
402 if (xdrp
->xp_off
!= 0) {
403 xdrp
->xp_off
+= sizeof (int32_t);
410 xdrrdma_putint32(XDR
*xdrs
, int32_t *int32p
)
412 xrdma_private_t
*xdrp
= (xrdma_private_t
*)(xdrs
->x_private
);
414 if ((xdrs
->x_handy
-= (int)sizeof (int32_t)) < 0)
417 /* LINTED pointer alignment */
418 *(int32_t *)xdrp
->xp_offp
= (int32_t)htonl((uint32_t)(*int32p
));
419 xdrp
->xp_offp
+= sizeof (int32_t);
425 * DECODE bytes from XDR stream for rdma.
426 * If the XDR stream contains a read chunk list,
427 * it will go through xdrrdma_getrdmablk instead.
430 xdrrdma_getbytes(XDR
*xdrs
, caddr_t addr
, int len
)
432 xrdma_private_t
*xdrp
= (xrdma_private_t
*)(xdrs
->x_private
);
433 struct clist
*cle
= *(xdrp
->xp_rcl_next
);
434 struct clist
*cls
= *(xdrp
->xp_rcl_next
);
436 bool_t retval
= TRUE
;
437 uint32_t total_len
= len
;
438 uint32_t cur_offset
= 0;
439 uint32_t total_segments
= 0;
440 uint32_t actual_segments
= 0;
441 uint32_t status
= RDMA_SUCCESS
;
450 cle
= *(xdrp
->xp_rcl_next
);
453 xpoff
= xdrp
->xp_off
;
455 xpoff
= (xdrp
->xp_offp
- xdrs
->x_base
);
459 * If there was a chunk at the current offset, then setup a read
460 * chunk list which records the destination address and length
461 * and will RDMA READ the data in later.
464 if (cle
!= NULL
&& cle
->c_xdroff
== xpoff
) {
465 for (actual_segments
= 0;
466 actual_segments
< total_segments
; actual_segments
++) {
471 if (status
!= RDMA_SUCCESS
)
474 cle
->u
.c_daddr
= (uint64
)(uintptr_t)addr
+ cur_offset
;
476 if (cle
->c_len
> total_len
) {
478 cle
->c_len
= total_len
;
481 xdrp
->xp_rcl_next
= &cle
->c_next
;
483 cur_offset
+= cle
->c_len
;
484 total_len
-= cle
->c_len
;
486 if ((total_segments
- actual_segments
- 1) == 0 &&
489 krpc__e__xdrrdma_getbytes_chunktooshort
);
493 if ((total_segments
- actual_segments
- 1) > 0 &&
495 DTRACE_PROBE2(krpc__e__xdrrdma_getbytes_toobig
,
496 int, total_segments
, int, actual_segments
);
500 * RDMA READ the chunk data from the remote end.
501 * First prep the destination buffer by registering
502 * it, then RDMA READ the chunk data. Since we are
503 * doing streaming memory, sync the destination
504 * buffer to CPU and deregister the buffer.
506 if (xdrp
->xp_conn
== NULL
) {
511 status
= clist_register(xdrp
->xp_conn
, &cl
,
513 if (status
!= RDMA_SUCCESS
) {
516 * Deregister the previous chunks
522 cle
->c_dmemhandle
= cl
.c_dmemhandle
;
523 cle
->c_dsynchandle
= cl
.c_dsynchandle
;
526 * Now read the chunk in
528 if ((total_segments
- actual_segments
- 1) == 0 ||
530 status
= RDMA_READ(xdrp
->xp_conn
, &cl
, WAIT
);
532 status
= RDMA_READ(xdrp
->xp_conn
, &cl
, NOWAIT
);
534 if (status
!= RDMA_SUCCESS
) {
536 krpc__i__xdrrdma_getblk_readfailed
,
546 * sync the memory for cpu
550 cl
.c_len
= cur_offset
;
552 xdrp
->xp_conn
, &cl
, CLIST_REG_DST
) != RDMA_SUCCESS
) {
558 * Deregister the chunks
561 while (actual_segments
!= 0) {
565 cl
.c_regtype
= CLIST_REG_DST
;
566 (void) clist_deregister(xdrp
->xp_conn
, &cl
);
573 cle
= *(xdrp
->xp_rcl_next
);
575 (uint64
)(uintptr_t)cle
->w
.c_saddr
+ cle
->c_len
;
576 cle
->c_len
= alen
- cle
->c_len
;
582 if ((xdrs
->x_handy
-= len
) < 0)
585 bcopy(xdrp
->xp_offp
, addr
, len
);
587 xdrp
->xp_offp
+= len
;
589 if (xdrp
->xp_off
!= 0)
596 * ENCODE some bytes into an XDR stream xp_min_chunk = 0, means the stream of
597 * bytes contain no chunks to seperate out, and if the bytes do not fit in
598 * the supplied buffer, grow the buffer and free the old buffer.
601 xdrrdma_putbytes(XDR
*xdrs
, caddr_t addr
, int len
)
603 xrdma_private_t
*xdrp
= (xrdma_private_t
*)(xdrs
->x_private
);
605 * Is this stream accepting chunks?
606 * If so, does the either of the two following conditions exist?
607 * - length of bytes to encode is greater than the min chunk size?
608 * - remaining space in this stream is shorter than length of
611 * If the above exists, then create a chunk for this encoding
612 * and save the addresses, etc.
614 if (xdrp
->xp_flags
& XDR_RDMA_CHUNK
&&
615 ((xdrp
->xp_min_chunk
!= 0 &&
616 len
>= xdrp
->xp_min_chunk
) ||
617 (xdrs
->x_handy
- len
< 0))) {
619 int offset
= xdrp
->xp_offp
- xdrs
->x_base
;
622 cle
->c_xdroff
= offset
;
624 cle
->w
.c_saddr
= (uint64
)(uintptr_t)addr
;
627 *(xdrp
->xp_rcl_next
) = cle
;
628 xdrp
->xp_rcl_next
= &(cle
->c_next
);
632 /* Is there enough space to encode what is left? */
633 if ((xdrs
->x_handy
-= len
) < 0) {
636 bcopy(addr
, xdrp
->xp_offp
, len
);
637 xdrp
->xp_offp
+= len
;
643 xdrrdma_getpos(XDR
*xdrs
)
645 xrdma_private_t
*xdrp
= (xrdma_private_t
*)(xdrs
->x_private
);
647 return ((uint_t
)((uintptr_t)xdrp
->xp_offp
- (uintptr_t)xdrs
->x_base
));
651 xdrrdma_setpos(XDR
*xdrs
, uint_t pos
)
653 xrdma_private_t
*xdrp
= (xrdma_private_t
*)(xdrs
->x_private
);
655 caddr_t newaddr
= xdrs
->x_base
+ pos
;
656 caddr_t lastaddr
= xdrp
->xp_offp
+ xdrs
->x_handy
;
659 if (newaddr
> lastaddr
)
662 xdrp
->xp_offp
= newaddr
;
663 diff
= lastaddr
- newaddr
;
664 xdrs
->x_handy
= (int)diff
;
670 static rpc_inline_t
*
671 xdrrdma_inline(XDR
*xdrs
, int len
)
673 rpc_inline_t
*buf
= NULL
;
674 xrdma_private_t
*xdrp
= (xrdma_private_t
*)(xdrs
->x_private
);
675 struct clist
*cle
= *(xdrp
->xp_rcl_next
);
677 if (xdrs
->x_op
== XDR_DECODE
) {
679 * Since chunks aren't in-line, check to see whether there is
680 * a chunk in the inline range.
683 cle
->c_xdroff
<= (xdrp
->xp_offp
- xdrs
->x_base
+ len
))
687 /* LINTED pointer alignment */
688 buf
= (rpc_inline_t
*)xdrp
->xp_offp
;
689 if (!IS_P2ALIGNED(buf
, sizeof (int32_t)))
692 if ((xdrs
->x_handy
< len
) || (xdrp
->xp_min_chunk
!= 0 &&
693 len
>= xdrp
->xp_min_chunk
)) {
696 xdrs
->x_handy
-= len
;
697 xdrp
->xp_offp
+= len
;
703 xdrrdma_control(XDR
*xdrs
, int request
, void *info
)
708 xrdma_private_t
*xdrp
= (xrdma_private_t
*)(xdrs
->x_private
);
709 rdma_chunkinfo_t
*rcip
= NULL
;
710 rdma_wlist_conn_info_t
*rwcip
= NULL
;
711 rdma_chunkinfo_lengths_t
*rcilp
= NULL
;
713 struct clist
*rwl
= NULL
, *first
= NULL
;
714 struct clist
*prev
= NULL
;
719 * Return the next 4 byte unit in the XDR stream.
721 if (xdrs
->x_handy
< sizeof (int32_t))
724 int32p
= (int32_t *)info
;
725 *int32p
= (int32_t)ntohl((uint32_t)
726 (*((int32_t *)(xdrp
->xp_offp
))));
732 * Skip the next N bytes in the XDR stream.
734 int32p
= (int32_t *)info
;
735 len
= RNDUP((int)(*int32p
));
736 if ((xdrs
->x_handy
-= len
) < 0)
738 xdrp
->xp_offp
+= len
;
742 case XDR_RDMA_SET_FLAGS
:
744 * Set the flags provided in the *info in xp_flags for rdma
745 * xdr stream control.
747 int32p
= (int32_t *)info
;
748 in_flags
= (uint_t
)(*int32p
);
750 xdrp
->xp_flags
|= in_flags
;
753 case XDR_RDMA_GET_FLAGS
:
755 * Get the flags provided in xp_flags return through *info
757 int32p
= (int32_t *)info
;
759 *int32p
= (int32_t)xdrp
->xp_flags
;
762 case XDR_RDMA_GET_CHUNK_LEN
:
763 rcilp
= (rdma_chunkinfo_lengths_t
*)info
;
764 rcilp
->rcil_len
= xdrp
->xp_reply_chunk_len
;
765 rcilp
->rcil_len_alt
= xdrp
->xp_reply_chunk_len_alt
;
769 case XDR_RDMA_ADD_CHUNK
:
771 * Store wlist information
774 rcip
= (rdma_chunkinfo_t
*)info
;
776 DTRACE_PROBE2(krpc__i__xdrrdma__control__add__chunk
,
777 rci_type_t
, rcip
->rci_type
, uint32
, rcip
->rci_len
);
778 switch (rcip
->rci_type
) {
779 case RCI_WRITE_UIO_CHUNK
:
780 xdrp
->xp_reply_chunk_len_alt
+= rcip
->rci_len
;
782 if ((rcip
->rci_len
+ XDR_RDMA_BUF_OVERHEAD
) <
783 xdrp
->xp_min_chunk
) {
785 *(rcip
->rci_clpp
) = NULL
;
788 uiop
= rcip
->rci_a
.rci_uiop
;
790 for (i
= 0; i
< uiop
->uio_iovcnt
; i
++) {
794 rwl
->c_len
= uiop
->uio_iov
[i
].iov_len
;
797 (uiop
->uio_iov
[i
].iov_base
);
799 * if userspace address, put adspace ptr in
800 * clist. If not, then do nothing since it's
801 * already set to NULL (from kmem_zalloc)
803 if (uiop
->uio_segflg
== UIO_USERSPACE
) {
804 rwl
->c_adspc
= ttoproc(curthread
)->p_as
;
816 xdrp
->xp_wcl
= first
;
817 *(rcip
->rci_clpp
) = first
;
821 case RCI_WRITE_ADDR_CHUNK
:
824 rwl
->c_len
= rcip
->rci_len
;
825 rwl
->u
.c_daddr3
= rcip
->rci_a
.rci_addr
;
827 xdrp
->xp_reply_chunk_len_alt
+= rcip
->rci_len
;
830 *(rcip
->rci_clpp
) = rwl
;
834 case RCI_REPLY_CHUNK
:
835 xdrp
->xp_reply_chunk_len
+= rcip
->rci_len
;
840 case XDR_RDMA_GET_WLIST
:
841 *((struct clist
**)info
) = xdrp
->xp_wcl
;
844 case XDR_RDMA_SET_WLIST
:
845 xdrp
->xp_wcl
= (struct clist
*)info
;
848 case XDR_RDMA_GET_RLIST
:
849 *((struct clist
**)info
) = xdrp
->xp_rcl
;
852 case XDR_RDMA_GET_WCINFO
:
853 rwcip
= (rdma_wlist_conn_info_t
*)info
;
855 rwcip
->rwci_wlist
= xdrp
->xp_wcl
;
856 rwcip
->rwci_conn
= xdrp
->xp_conn
;
865 bool_t
xdr_do_clist(XDR
*, clist
**);
868 * Not all fields in struct clist are interesting to the RPC over RDMA
869 * protocol. Only XDR the interesting fields.
872 xdr_clist(XDR
*xdrs
, clist
*objp
)
874 if (!xdr_uint32(xdrs
, &objp
->c_xdroff
))
876 if (!xdr_uint32(xdrs
, &objp
->c_smemhandle
.mrc_rmr
))
878 if (!xdr_uint32(xdrs
, &objp
->c_len
))
880 if (!xdr_uint64(xdrs
, &objp
->w
.c_saddr
))
882 if (!xdr_do_clist(xdrs
, &objp
->c_next
))
888 * The following two functions are forms of xdr_pointer()
889 * and xdr_reference(). Since the generic versions just
890 * kmem_alloc() a new clist, we actually want to use the
891 * rdma_clist kmem_cache.
895 * Generate or free a clist structure from the
896 * kmem_cache "rdma_clist"
899 xdr_ref_clist(XDR
*xdrs
, caddr_t
*pp
)
905 switch (xdrs
->x_op
) {
910 *pp
= loc
= (caddr_t
)clist_alloc();
919 stat
= xdr_clist(xdrs
, (struct clist
*)loc
);
921 if (xdrs
->x_op
== XDR_FREE
) {
922 kmem_cache_free(clist_cache
, loc
);
929 * XDR a pointer to a possibly recursive clist. This differs
930 * with xdr_reference in that it can serialize/deserialiaze
933 * What is sent is actually a union:
935 * union object_pointer switch (boolean b) {
936 * case TRUE: object_data data;
937 * case FALSE: void nothing;
940 * > objpp: Pointer to the pointer to the object.
945 xdr_do_clist(XDR
*xdrs
, clist
**objpp
)
949 more_data
= (*objpp
!= NULL
);
950 if (!xdr_bool(xdrs
, &more_data
))
956 return (xdr_ref_clist(xdrs
, (caddr_t
*)objpp
));
960 xdr_getbufsize(XDR
*xdrs
)
962 xrdma_private_t
*xdrp
= (xrdma_private_t
*)(xdrs
->x_private
);
964 return ((uint_t
)xdrp
->xp_buf_size
);
969 xdr_encode_rlist_svc(XDR
*xdrs
, clist
*rlist
)
971 bool_t vfalse
= FALSE
;
973 ASSERT(rlist
== NULL
);
974 return (xdr_bool(xdrs
, &vfalse
));
978 xdr_encode_wlist(XDR
*xdrs
, clist
*w
)
980 bool_t vfalse
= FALSE
, vtrue
= TRUE
;
982 uint_t num_segment
= 0;
985 /* does a wlist exist? */
987 return (xdr_bool(xdrs
, &vfalse
));
989 /* Encode N consecutive segments, 1, N, HLOO, ..., HLOO, 0 */
990 if (!xdr_bool(xdrs
, &vtrue
))
993 for (cl
= w
; cl
!= NULL
; cl
= cl
->c_next
) {
997 if (!xdr_uint32(xdrs
, &num_segment
))
999 for (i
= 0; i
< num_segment
; i
++) {
1001 DTRACE_PROBE1(krpc__i__xdr_encode_wlist_len
, uint_t
, w
->c_len
);
1003 if (!xdr_uint32(xdrs
, &w
->c_dmemhandle
.mrc_rmr
))
1006 if (!xdr_uint32(xdrs
, &w
->c_len
))
1009 if (!xdr_uint64(xdrs
, &w
->u
.c_daddr
))
1015 if (!xdr_bool(xdrs
, &vfalse
))
1023 * Conditionally decode a RDMA WRITE chunk list from XDR stream.
1025 * If the next boolean in the XDR stream is false there is no
1026 * RDMA WRITE chunk list present. Otherwise iterate over the
1027 * array and for each entry: allocate a struct clist and decode.
1028 * Pass back an indication via wlist_exists if we have seen a
1029 * RDMA WRITE chunk list.
1032 xdr_decode_wlist(XDR
*xdrs
, struct clist
**w
, bool_t
*wlist_exists
)
1035 bool_t more
= FALSE
;
1036 uint32_t seg_array_len
;
1039 if (!xdr_bool(xdrs
, &more
))
1042 /* is there a wlist? */
1043 if (more
== FALSE
) {
1044 *wlist_exists
= FALSE
;
1047 *wlist_exists
= TRUE
;
1049 if (!xdr_uint32(xdrs
, &seg_array_len
))
1052 tmp
= *w
= clist_alloc();
1053 for (i
= 0; i
< seg_array_len
; i
++) {
1055 if (!xdr_uint32(xdrs
, &tmp
->c_dmemhandle
.mrc_rmr
))
1057 if (!xdr_uint32(xdrs
, &tmp
->c_len
))
1060 DTRACE_PROBE1(krpc__i__xdr_decode_wlist_len
,
1061 uint_t
, tmp
->c_len
);
1063 if (!xdr_uint64(xdrs
, &tmp
->u
.c_daddr
))
1065 if (i
< seg_array_len
- 1) {
1066 tmp
->c_next
= clist_alloc();
1074 if (!xdr_bool(xdrs
, &more
))
1081 * Server side RDMA WRITE list decode.
1082 * XDR context is memory ops
1085 xdr_decode_wlist_svc(XDR
*xdrs
, struct clist
**wclp
, bool_t
*wwl
,
1086 uint32_t *total_length
, CONN
*conn
)
1088 struct clist
*first
, *ncl
;
1090 uint32_t num_wclist
;
1091 uint32_t wcl_length
= 0;
1093 bool_t more
= FALSE
;
1099 if (!xdr_bool(xdrs
, &more
)) {
1103 if (more
== FALSE
) {
1109 if (!xdr_uint32(xdrs
, &num_wclist
)) {
1110 DTRACE_PROBE(krpc__e__xdrrdma__wlistsvc__listlength
);
1114 first
= ncl
= clist_alloc();
1116 for (i
= 0; i
< num_wclist
; i
++) {
1118 if (!xdr_uint32(xdrs
, &ncl
->c_dmemhandle
.mrc_rmr
))
1120 if (!xdr_uint32(xdrs
, &ncl
->c_len
))
1122 if (!xdr_uint64(xdrs
, &ncl
->u
.c_daddr
))
1125 if (ncl
->c_len
> MAX_SVC_XFER_SIZE
) {
1127 krpc__e__xdrrdma__wlistsvc__chunklist_toobig
);
1128 ncl
->c_len
= MAX_SVC_XFER_SIZE
;
1131 DTRACE_PROBE1(krpc__i__xdr_decode_wlist_svc_len
,
1132 uint_t
, ncl
->c_len
);
1134 wcl_length
+= ncl
->c_len
;
1136 if (i
< num_wclist
- 1) {
1137 ncl
->c_next
= clist_alloc();
1142 if (!xdr_bool(xdrs
, &more
))
1145 first
->rb_longbuf
.type
= RDMA_LONG_BUFFER
;
1146 first
->rb_longbuf
.len
=
1147 wcl_length
> WCL_BUF_LEN
? wcl_length
: WCL_BUF_LEN
;
1149 if (rdma_buf_alloc(conn
, &first
->rb_longbuf
)) {
1154 memp
= first
->rb_longbuf
.addr
;
1157 for (i
= 0; i
< num_wclist
; i
++) {
1158 ncl
->w
.c_saddr3
= (caddr_t
)memp
;
1164 *total_length
= wcl_length
;
1173 * XDR decode the long reply write chunk.
1176 xdr_decode_reply_wchunk(XDR
*xdrs
, struct clist
**clist
)
1178 bool_t have_rchunk
= FALSE
;
1179 struct clist
*first
= NULL
, *ncl
= NULL
;
1180 uint32_t num_wclist
;
1183 if (!xdr_bool(xdrs
, &have_rchunk
))
1186 if (have_rchunk
== FALSE
)
1189 if (!xdr_uint32(xdrs
, &num_wclist
)) {
1190 DTRACE_PROBE(krpc__e__xdrrdma__replywchunk__listlength
);
1194 if (num_wclist
== 0) {
1198 first
= ncl
= clist_alloc();
1200 for (i
= 0; i
< num_wclist
; i
++) {
1203 ncl
->c_next
= clist_alloc();
1207 if (!xdr_uint32(xdrs
, &ncl
->c_dmemhandle
.mrc_rmr
))
1209 if (!xdr_uint32(xdrs
, &ncl
->c_len
))
1211 if (!xdr_uint64(xdrs
, &ncl
->u
.c_daddr
))
1214 if (ncl
->c_len
> MAX_SVC_XFER_SIZE
) {
1216 krpc__e__xdrrdma__replywchunk__chunklist_toobig
);
1217 ncl
->c_len
= MAX_SVC_XFER_SIZE
;
1219 if (!(ncl
->c_dmemhandle
.mrc_rmr
&&
1220 (ncl
->c_len
> 0) && ncl
->u
.c_daddr
))
1222 krpc__e__xdrrdma__replywchunk__invalid_segaddr
);
1224 DTRACE_PROBE1(krpc__i__xdr_decode_reply_wchunk_c_len
,
1225 uint32_t, ncl
->c_len
);
1238 xdr_encode_reply_wchunk(XDR
*xdrs
,
1239 struct clist
*cl_longreply
, uint32_t seg_array_len
)
1242 bool_t long_reply_exists
= TRUE
;
1246 if (seg_array_len
> 0) {
1247 if (!xdr_bool(xdrs
, &long_reply_exists
))
1249 if (!xdr_uint32(xdrs
, &seg_array_len
))
1252 for (i
= 0; i
< seg_array_len
; i
++) {
1255 length
= cl_longreply
->c_len
;
1256 offset
= (uint64
) cl_longreply
->u
.c_daddr
;
1259 krpc__i__xdr_encode_reply_wchunk_c_len
,
1262 if (!xdr_uint32(xdrs
,
1263 &cl_longreply
->c_dmemhandle
.mrc_rmr
))
1265 if (!xdr_uint32(xdrs
, &length
))
1267 if (!xdr_uint64(xdrs
, &offset
))
1269 cl_longreply
= cl_longreply
->c_next
;
1272 long_reply_exists
= FALSE
;
1273 if (!xdr_bool(xdrs
, &long_reply_exists
))
1279 xdrrdma_read_from_client(struct clist
*rlist
, CONN
**conn
, uint_t count
)
1281 struct clist
*rdclist
;
1283 uint_t total_len
= 0;
1285 bool_t retval
= TRUE
;
1287 rlist
->rb_longbuf
.type
= RDMA_LONG_BUFFER
;
1288 rlist
->rb_longbuf
.len
=
1289 count
> RCL_BUF_LEN
? count
: RCL_BUF_LEN
;
1291 if (rdma_buf_alloc(*conn
, &rlist
->rb_longbuf
)) {
1296 * The entire buffer is registered with the first chunk.
1297 * Later chunks will use the same registered memory handle.
1302 if (clist_register(*conn
, &cl
, CLIST_REG_DST
) != RDMA_SUCCESS
) {
1303 rdma_buf_free(*conn
, &rlist
->rb_longbuf
);
1305 krpc__e__xdrrdma__readfromclient__clist__reg
);
1309 rlist
->c_regtype
= CLIST_REG_DST
;
1310 rlist
->c_dmemhandle
= cl
.c_dmemhandle
;
1311 rlist
->c_dsynchandle
= cl
.c_dsynchandle
;
1313 for (rdclist
= rlist
;
1314 rdclist
!= NULL
; rdclist
= rdclist
->c_next
) {
1315 total_len
+= rdclist
->c_len
;
1316 #if (defined(OBJ32)||defined(DEBUG32))
1317 rdclist
->u
.c_daddr3
=
1318 (caddr_t
)((char *)rlist
->rb_longbuf
.addr
+
1319 (uint32
) rdclist
->u
.c_daddr3
);
1321 rdclist
->u
.c_daddr3
=
1322 (caddr_t
)((char *)rlist
->rb_longbuf
.addr
+
1323 (uint64
) rdclist
->u
.c_daddr
);
1330 * Use the same memory handle for all the chunks
1332 cl
.c_dmemhandle
= rlist
->c_dmemhandle
;
1333 cl
.c_dsynchandle
= rlist
->c_dsynchandle
;
1336 DTRACE_PROBE1(krpc__i__xdrrdma__readfromclient__buflen
,
1337 int, rdclist
->c_len
);
1340 * Now read the chunk in
1342 if (rdclist
->c_next
== NULL
) {
1343 status
= RDMA_READ(*conn
, &cl
, WAIT
);
1345 status
= RDMA_READ(*conn
, &cl
, NOWAIT
);
1347 if (status
!= RDMA_SUCCESS
) {
1349 krpc__e__xdrrdma__readfromclient__readfailed
);
1350 rdma_buf_free(*conn
, &rlist
->rb_longbuf
);
1357 cl
.c_len
= total_len
;
1358 if (clist_syncmem(*conn
, &cl
, CLIST_REG_DST
) != RDMA_SUCCESS
) {
1365 xdrrdma_free_clist(CONN
*conn
, struct clist
*clp
)
1367 rdma_buf_free(conn
, &clp
->rb_longbuf
);
1373 xdrrdma_send_read_data(XDR
*xdrs
, uint_t data_len
, struct clist
*wcl
)
1376 xrdma_private_t
*xdrp
= (xrdma_private_t
*)(xdrs
->x_private
);
1377 struct xdr_ops
*xops
= xdrrdma_xops();
1378 struct clist
*tcl
, *wrcl
, *cl
;
1380 int rndup_present
, rnduplen
;
1385 /* caller is doing a sizeof */
1386 if (xdrs
->x_ops
!= &xdrrdma_ops
|| xdrs
->x_ops
== xops
)
1389 /* copy of the first chunk */
1394 * The entire buffer is registered with the first chunk.
1395 * Later chunks will use the same registered memory handle.
1398 status
= clist_register(xdrp
->xp_conn
, &fcl
, CLIST_REG_SOURCE
);
1399 if (status
!= RDMA_SUCCESS
) {
1403 wcl
->c_regtype
= CLIST_REG_SOURCE
;
1404 wcl
->c_smemhandle
= fcl
.c_smemhandle
;
1405 wcl
->c_ssynchandle
= fcl
.c_ssynchandle
;
1408 * Only transfer the read data ignoring any trailing
1409 * roundup chunks. A bit of work, but it saves an
1410 * unnecessary extra RDMA_WRITE containing only
1414 rnduplen
= clist_len(wcl
) - data_len
;
1421 * Check if there is a trailing roundup chunk
1424 if ((tcl
->c_next
== NULL
) && (tcl
->c_len
== rnduplen
)) {
1432 * Make a copy chunk list skipping the last chunk
1434 if (rndup_present
) {
1439 tcl
= clist_alloc();
1442 tcl
->c_next
= clist_alloc();
1449 if (cl
->c_next
== NULL
)
1457 /* No roundup chunks */
1462 * Set the registered memory handles for the
1463 * rest of the chunks same as the first chunk.
1467 tcl
->c_smemhandle
= fcl
.c_smemhandle
;
1468 tcl
->c_ssynchandle
= fcl
.c_ssynchandle
;
1473 * Sync the total len beginning from the first chunk.
1475 fcl
.c_len
= clist_len(wrcl
);
1476 status
= clist_syncmem(xdrp
->xp_conn
, &fcl
, CLIST_REG_SOURCE
);
1477 if (status
!= RDMA_SUCCESS
) {
1481 status
= RDMA_WRITE(xdrp
->xp_conn
, wrcl
, WAIT
);
1486 if (status
!= RDMA_SUCCESS
) {
1495 * Reads one chunk at a time
1499 xdrrdma_read_a_chunk(XDR
*xdrs
, CONN
**conn
)
1503 xrdma_private_t
*xdrp
= (xrdma_private_t
*)(xdrs
->x_private
);
1504 struct clist
*cle
= *(xdrp
->xp_rcl_next
);
1505 struct clist
*rclp
= xdrp
->xp_rcl
;
1509 * len is used later to decide xdr offset in
1510 * the chunk factoring any 4-byte XDR alignment
1511 * (See read chunk example top of this file)
1513 while (rclp
!= cle
) {
1515 rclp
= rclp
->c_next
;
1518 len
= RNDUP(len
) - len
;
1520 ASSERT(xdrs
->x_handy
<= 0);
1523 * If this is the first chunk to contain the RPC
1524 * message set xp_off to the xdr offset of the
1527 if (xdrp
->xp_off
== 0)
1528 xdrp
->xp_off
= (xdrp
->xp_offp
- xdrs
->x_base
);
1530 if (cle
== NULL
|| (cle
->c_xdroff
!= xdrp
->xp_off
))
1534 * Make a copy of the chunk to read from client.
1535 * Chunks are read on demand, so read only one
1539 rclp
= clist_alloc();
1541 rclp
->c_next
= NULL
;
1543 xdrp
->xp_rcl_next
= &cle
->c_next
;
1546 * If there is a roundup present, then skip those
1547 * bytes when reading.
1551 (uint64
)(uintptr_t)rclp
->w
.c_saddr
+ len
;
1552 rclp
->c_len
= rclp
->c_len
- len
;
1555 status
= xdrrdma_read_from_client(rclp
, conn
, rclp
->c_len
);
1557 if (status
== FALSE
) {
1562 xdrp
->xp_offp
= rclp
->rb_longbuf
.addr
;
1563 xdrs
->x_base
= xdrp
->xp_offp
;
1564 xdrs
->x_handy
= rclp
->c_len
;
1567 * This copy of read chunks containing the XDR
1568 * message is freed later in xdrrdma_destroy()
1571 if (xdrp
->xp_rcl_xdr
) {
1572 /* Add the chunk to end of the list */
1573 clp
= xdrp
->xp_rcl_xdr
;
1574 while (clp
->c_next
!= NULL
)
1578 xdrp
->xp_rcl_xdr
= rclp
;
1584 xdrrdma_free_xdr_chunks(CONN
*conn
, struct clist
*xdr_rcl
)
1588 (void) clist_deregister(conn
, xdr_rcl
);
1591 * Read chunks containing parts XDR message are
1592 * special: in case of multiple chunks each has
1598 rdma_buf_free(conn
, &cl
->rb_longbuf
);
1602 clist_free(xdr_rcl
);