4 * The contents of this file are subject to the terms of the
5 * Common Development and Distribution License (the "License").
6 * You may not use this file except in compliance with the License.
8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9 * or http://www.opensolaris.org/os/licensing.
10 * See the License for the specific language governing permissions
11 * and limitations under the License.
13 * When distributing Covered Code, include this CDDL HEADER in each
14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15 * If applicable, add the following below this CDDL HEADER, with the
16 * fields enclosed by brackets "[]" replaced with your own identifying
17 * information: Portions Copyright [yyyy] [name of copyright owner]
22 * Copyright (c) 1983, 2010, Oracle and/or its affiliates. All rights reserved.
23 * Copyright (c) 2012 by Delphix. All rights reserved.
24 * Copyright 2013 Nexenta Systems, Inc. All rights reserved.
25 * Copyright 2012 Marcel Telka <marcel@telka.sk>
26 * Copyright 2018 OmniOS Community Edition (OmniOSce) Association.
28 /* Copyright (c) 1983, 1984, 1985, 1986, 1987, 1988, 1989 AT&T */
29 /* All Rights Reserved */
31 * Portions of this source code were derived from Berkeley
32 * 4.3 BSD under license from the Regents of the University of
37 * Server side of RPC over RDMA in the kernel.
40 #include <sys/param.h>
41 #include <sys/types.h>
43 #include <sys/sysmacros.h>
46 #include <sys/errno.h>
48 #include <sys/debug.h>
49 #include <sys/systm.h>
50 #include <sys/cmn_err.h>
51 #include <sys/kstat.h>
52 #include <sys/vtrace.h>
53 #include <sys/debug.h>
55 #include <rpc/types.h>
59 #include <rpc/rpc_msg.h>
61 #include <rpc/rpc_rdma.h>
63 #include <sys/sunddi.h>
65 #include <inet/common.h>
72 #define SVC_RDMA_SUCCESS 0
73 #define SVC_RDMA_FAIL -1
75 #define SVC_CREDIT_FACTOR (0.5)
77 #define MSG_IS_RPCSEC_GSS(msg) \
78 ((msg)->rm_reply.rp_acpt.ar_verf.oa_flavor == RPCSEC_GSS)
81 uint32_t rdma_bufs_granted
= RDMA_BUFS_GRANT
;
84 * RDMA transport specific data associated with SVCMASTERXPRT
87 SVCMASTERXPRT
*rd_xprt
; /* back ptr to SVCMASTERXPRT */
88 struct rdma_svc_data rd_data
; /* rdma data */
89 rdma_mod_t
*r_mod
; /* RDMA module containing ops ptr */
93 * Plugin connection specific data stashed away in clone SVCXPRT
95 struct clone_rdma_data
{
96 bool_t cloned
; /* xprt cloned for thread processing */
97 CONN
*conn
; /* RDMA connection */
98 rdma_buf_t rpcbuf
; /* RPC req/resp buffer */
99 struct clist
*cl_reply
; /* reply chunk buffer info */
100 struct clist
*cl_wlist
; /* write list clist */
104 #define MAXADDRLEN 128 /* max length for address mask */
107 * Routines exported through ops vector.
109 static bool_t
svc_rdma_krecv(SVCXPRT
*, mblk_t
*, struct rpc_msg
*);
110 static bool_t
svc_rdma_ksend(SVCXPRT
*, struct rpc_msg
*);
111 static bool_t
svc_rdma_kgetargs(SVCXPRT
*, xdrproc_t
, caddr_t
);
112 static bool_t
svc_rdma_kfreeargs(SVCXPRT
*, xdrproc_t
, caddr_t
);
113 void svc_rdma_kdestroy(SVCMASTERXPRT
*);
114 static int svc_rdma_kdup(struct svc_req
*, caddr_t
, int,
115 struct dupreq
**, bool_t
*);
116 static void svc_rdma_kdupdone(struct dupreq
*, caddr_t
,
117 void (*)(), int, int);
118 static int32_t *svc_rdma_kgetres(SVCXPRT
*, int);
119 static void svc_rdma_kfreeres(SVCXPRT
*);
120 static void svc_rdma_kclone_destroy(SVCXPRT
*);
121 static void svc_rdma_kstart(SVCMASTERXPRT
*);
122 void svc_rdma_kstop(SVCMASTERXPRT
*);
123 static void svc_rdma_kclone_xprt(SVCXPRT
*, SVCXPRT
*);
124 static void svc_rdma_ktattrs(SVCXPRT
*, int, void **);
126 static int svc_process_long_reply(SVCXPRT
*, xdrproc_t
,
127 caddr_t
, struct rpc_msg
*, bool_t
, int *,
128 int *, int *, unsigned int *);
130 static int svc_compose_rpcmsg(SVCXPRT
*, CONN
*, xdrproc_t
,
131 caddr_t
, rdma_buf_t
*, XDR
**, struct rpc_msg
*,
133 static bool_t
rpcmsg_length(xdrproc_t
,
135 struct rpc_msg
*, bool_t
, int);
138 * Server transport operations vector.
140 struct svc_ops rdma_svc_ops
= {
141 svc_rdma_krecv
, /* Get requests */
142 svc_rdma_kgetargs
, /* Deserialize arguments */
143 svc_rdma_ksend
, /* Send reply */
144 svc_rdma_kfreeargs
, /* Free argument data space */
145 svc_rdma_kdestroy
, /* Destroy transport handle */
146 svc_rdma_kdup
, /* Check entry in dup req cache */
147 svc_rdma_kdupdone
, /* Mark entry in dup req cache as done */
148 svc_rdma_kgetres
, /* Get pointer to response buffer */
149 svc_rdma_kfreeres
, /* Destroy pre-serialized response header */
150 svc_rdma_kclone_destroy
, /* Destroy a clone xprt */
151 svc_rdma_kstart
, /* Tell `ready-to-receive' to rpcmod */
152 svc_rdma_kclone_xprt
, /* Transport specific clone xprt */
153 svc_rdma_ktattrs
, /* Get Transport Attributes */
154 NULL
, /* Increment transport reference count */
155 NULL
/* Decrement transport reference count */
160 * NOTE: This structure type is duplicated in the NFS fast path.
163 kstat_named_t rscalls
;
164 kstat_named_t rsbadcalls
;
165 kstat_named_t rsnullrecv
;
166 kstat_named_t rsbadlen
;
167 kstat_named_t rsxdrcall
;
168 kstat_named_t rsdupchecks
;
169 kstat_named_t rsdupreqs
;
170 kstat_named_t rslongrpcs
;
171 kstat_named_t rstotalreplies
;
172 kstat_named_t rstotallongreplies
;
173 kstat_named_t rstotalinlinereplies
;
175 { "calls", KSTAT_DATA_UINT64
},
176 { "badcalls", KSTAT_DATA_UINT64
},
177 { "nullrecv", KSTAT_DATA_UINT64
},
178 { "badlen", KSTAT_DATA_UINT64
},
179 { "xdrcall", KSTAT_DATA_UINT64
},
180 { "dupchecks", KSTAT_DATA_UINT64
},
181 { "dupreqs", KSTAT_DATA_UINT64
},
182 { "longrpcs", KSTAT_DATA_UINT64
},
183 { "totalreplies", KSTAT_DATA_UINT64
},
184 { "totallongreplies", KSTAT_DATA_UINT64
},
185 { "totalinlinereplies", KSTAT_DATA_UINT64
},
188 kstat_named_t
*rdmarsstat_ptr
= (kstat_named_t
*)&rdmarsstat
;
189 uint_t rdmarsstat_ndata
= sizeof (rdmarsstat
) / sizeof (kstat_named_t
);
191 #define RSSTAT_INCR(x) atomic_inc_64(&rdmarsstat.x.value.ui64)
193 * Create a transport record.
194 * The transport record, output buffer, and private data structure
195 * are allocated. The output buffer is serialized into using xdrmem.
196 * There is one transport record per user process which implements a
201 svc_rdma_kcreate(char *netid
, SVC_CALLOUT_TABLE
*sct
, int id
,
202 rdma_xprt_group_t
*started_xprts
)
206 struct rdma_data
*rd
;
207 rdma_registry_t
*rmod
;
208 rdma_xprt_record_t
*xprt_rec
;
211 * modload the RDMA plugins is not already done.
213 if (!rdma_modloaded
) {
214 /*CONSTANTCONDITION*/
215 ASSERT(sizeof (struct clone_rdma_data
) <= SVC_P2LEN
);
217 mutex_enter(&rdma_modload_lock
);
218 if (!rdma_modloaded
) {
219 error
= rdma_modload();
221 mutex_exit(&rdma_modload_lock
);
228 * master_xprt_count is the count of master transport handles
229 * that were successfully created and are ready to recieve for
234 rw_enter(&rdma_lock
, RW_READER
);
235 if (rdma_mod_head
== NULL
) {
236 started_xprts
->rtg_count
= 0;
238 if (rdma_dev_available
)
239 return (EPROTONOSUPPORT
);
245 * If we have reached here, then atleast one RDMA plugin has loaded.
246 * Create a master_xprt, make it start listenining on the device,
247 * if an error is generated, record it, we might need to shut
249 * SVC_START() calls svc_rdma_kstart which calls plugin binding
252 for (rmod
= rdma_mod_head
; rmod
!= NULL
; rmod
= rmod
->r_next
) {
255 * One SVCMASTERXPRT per RDMA plugin.
257 xprt
= kmem_zalloc(sizeof (*xprt
), KM_SLEEP
);
258 xprt
->xp_ops
= &rdma_svc_ops
;
260 xprt
->xp_type
= T_RDMA
;
261 mutex_init(&xprt
->xp_req_lock
, NULL
, MUTEX_DEFAULT
, NULL
);
262 mutex_init(&xprt
->xp_thread_lock
, NULL
, MUTEX_DEFAULT
, NULL
);
263 xprt
->xp_req_head
= (mblk_t
*)0;
264 xprt
->xp_req_tail
= (mblk_t
*)0;
265 xprt
->xp_full
= FALSE
;
266 xprt
->xp_enable
= FALSE
;
269 xprt
->xp_threads
= 0;
270 xprt
->xp_detached_threads
= 0;
272 rd
= kmem_zalloc(sizeof (*rd
), KM_SLEEP
);
273 xprt
->xp_p2
= (caddr_t
)rd
;
275 rd
->r_mod
= rmod
->r_mod
;
279 q
->q_ptr
= &rd
->rd_xprt
;
280 xprt
->xp_netid
= NULL
;
283 * Each of the plugins will have their own Service ID
284 * to listener specific mapping, like port number for VI
285 * and service name for IB.
287 rd
->rd_data
.svcid
= id
;
288 error
= svc_xprt_register(xprt
, id
);
290 DTRACE_PROBE(krpc__e__svcrdma__xprt__reg
);
295 if (!rd
->rd_data
.active
) {
296 svc_xprt_unregister(xprt
);
297 error
= rd
->rd_data
.err_code
;
302 * This is set only when there is atleast one or more
303 * transports successfully created. We insert the pointer
304 * to the created RDMA master xprt into a separately maintained
305 * list. This way we can easily reference it later to cleanup,
306 * when NFS kRPC service pool is going away/unregistered.
308 started_xprts
->rtg_count
++;
309 xprt_rec
= kmem_alloc(sizeof (*xprt_rec
), KM_SLEEP
);
310 xprt_rec
->rtr_xprt_ptr
= xprt
;
311 xprt_rec
->rtr_next
= started_xprts
->rtg_listhead
;
312 started_xprts
->rtg_listhead
= xprt_rec
;
316 if (error
== RDMA_FAILED
)
317 error
= EPROTONOSUPPORT
;
323 * Don't return any error even if a single plugin was started
326 if (started_xprts
->rtg_count
== 0)
332 * Cleanup routine for freeing up memory allocated by
336 svc_rdma_kdestroy(SVCMASTERXPRT
*xprt
)
338 struct rdma_data
*rd
= (struct rdma_data
*)xprt
->xp_p2
;
341 mutex_destroy(&xprt
->xp_req_lock
);
342 mutex_destroy(&xprt
->xp_thread_lock
);
343 kmem_free(rd
, sizeof (*rd
));
344 kmem_free(xprt
, sizeof (*xprt
));
349 svc_rdma_kstart(SVCMASTERXPRT
*xprt
)
351 struct rdma_svc_data
*svcdata
;
354 svcdata
= &((struct rdma_data
*)xprt
->xp_p2
)->rd_data
;
355 rmod
= ((struct rdma_data
*)xprt
->xp_p2
)->r_mod
;
358 * Create a listener for module at this port
361 if (rmod
->rdma_count
!= 0)
362 (*rmod
->rdma_ops
->rdma_svc_listen
)(svcdata
);
364 svcdata
->err_code
= RDMA_FAILED
;
368 svc_rdma_kstop(SVCMASTERXPRT
*xprt
)
370 struct rdma_svc_data
*svcdata
;
373 svcdata
= &((struct rdma_data
*)xprt
->xp_p2
)->rd_data
;
374 rmod
= ((struct rdma_data
*)xprt
->xp_p2
)->r_mod
;
377 * Call the stop listener routine for each plugin. If rdma_count is
378 * already zero set active to zero.
380 if (rmod
->rdma_count
!= 0)
381 (*rmod
->rdma_ops
->rdma_svc_stop
)(svcdata
);
385 DTRACE_PROBE(krpc__e__svcrdma__kstop
);
390 svc_rdma_kclone_destroy(SVCXPRT
*clone_xprt
)
393 struct clone_rdma_data
*cdrp
;
394 cdrp
= (struct clone_rdma_data
*)clone_xprt
->xp_p2buf
;
397 * Only free buffers and release connection when cloned is set.
399 if (cdrp
->cloned
!= TRUE
)
402 rdma_buf_free(cdrp
->conn
, &cdrp
->rpcbuf
);
403 if (cdrp
->cl_reply
) {
404 clist_free(cdrp
->cl_reply
);
405 cdrp
->cl_reply
= NULL
;
407 RDMA_REL_CONN(cdrp
->conn
);
413 * Clone the xprt specific information. It will be freed by
417 svc_rdma_kclone_xprt(SVCXPRT
*src_xprt
, SVCXPRT
*dst_xprt
)
419 struct clone_rdma_data
*srcp2
;
420 struct clone_rdma_data
*dstp2
;
422 srcp2
= (struct clone_rdma_data
*)src_xprt
->xp_p2buf
;
423 dstp2
= (struct clone_rdma_data
*)dst_xprt
->xp_p2buf
;
425 if (srcp2
->conn
!= NULL
) {
426 srcp2
->cloned
= TRUE
;
432 svc_rdma_ktattrs(SVCXPRT
*clone_xprt
, int attrflag
, void **tattr
)
438 case SVC_TATTR_ADDRMASK
:
439 conn
= ((struct clone_rdma_data
*)clone_xprt
->xp_p2buf
)->conn
;
440 ASSERT(conn
!= NULL
);
442 *tattr
= (void *)&conn
->c_addrmask
;
447 svc_rdma_krecv(SVCXPRT
*clone_xprt
, mblk_t
*mp
, struct rpc_msg
*msg
)
451 rdma_recv_data_t
*rdp
= (rdma_recv_data_t
*)mp
->b_rptr
;
452 struct clone_rdma_data
*crdp
;
453 struct clist
*cl
= NULL
;
454 struct clist
*wcl
= NULL
;
455 struct clist
*cllong
= NULL
;
458 uint32_t vers
, op
, pos
, xid
;
459 uint32_t rdma_credit
;
460 uint32_t wcl_total_length
= 0;
463 crdp
= (struct clone_rdma_data
*)clone_xprt
->xp_p2buf
;
464 RSSTAT_INCR(rscalls
);
467 status
= rdma_svc_postrecv(conn
);
468 if (status
!= RDMA_SUCCESS
) {
469 DTRACE_PROBE(krpc__e__svcrdma__krecv__postrecv
);
473 xdrs
= &clone_xprt
->xp_xdrin
;
474 xdrmem_create(xdrs
, rdp
->rpcmsg
.addr
, rdp
->rpcmsg
.len
, XDR_DECODE
);
475 xid
= *(uint32_t *)rdp
->rpcmsg
.addr
;
476 XDR_SETPOS(xdrs
, sizeof (uint32_t));
478 if (! xdr_u_int(xdrs
, &vers
) ||
479 ! xdr_u_int(xdrs
, &rdma_credit
) ||
480 ! xdr_u_int(xdrs
, &op
)) {
481 DTRACE_PROBE(krpc__e__svcrdma__krecv__uint
);
485 /* Checking if the status of the recv operation was normal */
486 if (rdp
->status
!= 0) {
487 DTRACE_PROBE1(krpc__e__svcrdma__krecv__invalid__status
,
492 if (! xdr_do_clist(xdrs
, &cl
)) {
493 DTRACE_PROBE(krpc__e__svcrdma__krecv__do__clist
);
497 if (!xdr_decode_wlist_svc(xdrs
, &wcl
, &wwl
, &wcl_total_length
, conn
)) {
498 DTRACE_PROBE(krpc__e__svcrdma__krecv__decode__wlist
);
503 crdp
->cl_wlist
= wcl
;
505 crdp
->cl_reply
= NULL
;
506 (void) xdr_decode_reply_wchunk(xdrs
, &crdp
->cl_reply
);
509 * A chunk at 0 offset indicates that the RPC call message
510 * is in a chunk. Get the RPC call message chunk.
512 if (cl
!= NULL
&& op
== RDMA_NOMSG
) {
514 /* Remove RPC call message chunk from chunklist */
517 cllong
->c_next
= NULL
;
520 /* Allocate and register memory for the RPC call msg chunk */
521 cllong
->rb_longbuf
.type
= RDMA_LONG_BUFFER
;
522 cllong
->rb_longbuf
.len
= cllong
->c_len
> LONG_REPLY_LEN
?
523 cllong
->c_len
: LONG_REPLY_LEN
;
525 if (rdma_buf_alloc(conn
, &cllong
->rb_longbuf
)) {
530 cllong
->u
.c_daddr3
= cllong
->rb_longbuf
.addr
;
532 if (cllong
->u
.c_daddr
== (uintptr_t)NULL
) {
533 DTRACE_PROBE(krpc__e__svcrdma__krecv__nomem
);
534 rdma_buf_free(conn
, &cllong
->rb_longbuf
);
539 status
= clist_register(conn
, cllong
, CLIST_REG_DST
);
541 DTRACE_PROBE(krpc__e__svcrdma__krecv__clist__reg
);
542 rdma_buf_free(conn
, &cllong
->rb_longbuf
);
548 * Now read the RPC call message in
550 status
= RDMA_READ(conn
, cllong
, WAIT
);
552 DTRACE_PROBE(krpc__e__svcrdma__krecv__read
);
553 (void) clist_deregister(conn
, cllong
);
554 rdma_buf_free(conn
, &cllong
->rb_longbuf
);
559 status
= clist_syncmem(conn
, cllong
, CLIST_REG_DST
);
560 (void) clist_deregister(conn
, cllong
);
562 xdrrdma_create(xdrs
, (caddr_t
)(uintptr_t)cllong
->u
.c_daddr3
,
563 cllong
->c_len
, 0, cl
, XDR_DECODE
, conn
);
565 crdp
->rpcbuf
= cllong
->rb_longbuf
;
566 crdp
->rpcbuf
.len
= cllong
->c_len
;
568 RDMA_BUF_FREE(conn
, &rdp
->rpcmsg
);
570 pos
= XDR_GETPOS(xdrs
);
571 xdrrdma_create(xdrs
, rdp
->rpcmsg
.addr
+ pos
,
572 rdp
->rpcmsg
.len
- pos
, 0, cl
, XDR_DECODE
, conn
);
573 crdp
->rpcbuf
= rdp
->rpcmsg
;
575 /* Use xdrrdmablk_ops to indicate there is a read chunk list */
577 int32_t flg
= XDR_RDMA_RLIST_REG
;
579 XDR_CONTROL(xdrs
, XDR_RDMA_SET_FLAGS
, &flg
);
580 xdrs
->x_ops
= &xdrrdmablk_ops
;
584 if (crdp
->cl_wlist
) {
585 int32_t flg
= XDR_RDMA_WLIST_REG
;
587 XDR_CONTROL(xdrs
, XDR_RDMA_SET_WLIST
, crdp
->cl_wlist
);
588 XDR_CONTROL(xdrs
, XDR_RDMA_SET_FLAGS
, &flg
);
591 if (! xdr_callmsg(xdrs
, msg
)) {
592 DTRACE_PROBE(krpc__e__svcrdma__krecv__callmsg
);
593 RSSTAT_INCR(rsxdrcall
);
598 * Point the remote transport address in the service_transport
599 * handle at the address in the request.
601 clone_xprt
->xp_rtaddr
.buf
= conn
->c_raddr
.buf
;
602 clone_xprt
->xp_rtaddr
.len
= conn
->c_raddr
.len
;
603 clone_xprt
->xp_rtaddr
.maxlen
= conn
->c_raddr
.len
;
605 clone_xprt
->xp_lcladdr
.buf
= conn
->c_laddr
.buf
;
606 clone_xprt
->xp_lcladdr
.len
= conn
->c_laddr
.len
;
607 clone_xprt
->xp_lcladdr
.maxlen
= conn
->c_laddr
.len
;
610 * In case of RDMA, connection management is
611 * entirely done in rpcib module and netid in the
612 * SVCMASTERXPRT is NULL. Initialize the clone netid
613 * from the connection.
616 clone_xprt
->xp_netid
= conn
->c_netid
;
618 clone_xprt
->xp_xid
= xid
;
626 rdma_buf_free(conn
, &crdp
->rpcbuf
);
635 RDMA_BUF_FREE(conn
, &rdp
->rpcmsg
);
638 RSSTAT_INCR(rsbadcalls
);
643 svc_process_long_reply(SVCXPRT
* clone_xprt
,
644 xdrproc_t xdr_results
, caddr_t xdr_location
,
645 struct rpc_msg
*msg
, bool_t has_args
, int *msglen
,
646 int *freelen
, int *numchunks
, unsigned int *final_len
)
650 struct clist
*wcl
= NULL
;
654 rdma_buf_t long_rpc
= {0};
655 struct clone_rdma_data
*crdp
;
657 crdp
= (struct clone_rdma_data
*)clone_xprt
->xp_p2buf
;
659 bzero(&xdrslong
, sizeof (xdrslong
));
661 /* Choose a size for the long rpc response */
662 if (MSG_IS_RPCSEC_GSS(msg
)) {
663 alloc_len
= RNDUP(MAX_AUTH_BYTES
+ *msglen
);
665 alloc_len
= RNDUP(*msglen
);
668 if (alloc_len
<= 64 * 1024) {
669 if (alloc_len
> 32 * 1024) {
670 alloc_len
= 64 * 1024;
672 if (alloc_len
> 16 * 1024) {
673 alloc_len
= 32 * 1024;
675 alloc_len
= 16 * 1024;
680 long_rpc
.type
= RDMA_LONG_BUFFER
;
681 long_rpc
.len
= alloc_len
;
682 if (rdma_buf_alloc(crdp
->conn
, &long_rpc
)) {
683 return (SVC_RDMA_FAIL
);
686 memp
= long_rpc
.addr
;
687 xdrmem_create(&xdrslong
, memp
, alloc_len
, XDR_ENCODE
);
689 msg
->rm_xid
= clone_xprt
->xp_xid
;
691 if (!(xdr_replymsg(&xdrslong
, msg
) &&
692 (!has_args
|| SVCAUTH_WRAP(&clone_xprt
->xp_auth
, &xdrslong
,
693 xdr_results
, xdr_location
)))) {
694 rdma_buf_free(crdp
->conn
, &long_rpc
);
695 DTRACE_PROBE(krpc__e__svcrdma__longrep__authwrap
);
696 return (SVC_RDMA_FAIL
);
699 *final_len
= XDR_GETPOS(&xdrslong
);
701 DTRACE_PROBE1(krpc__i__replylen
, uint_t
, *final_len
);
705 wcl
= crdp
->cl_reply
;
706 wcl
->rb_longbuf
= long_rpc
;
709 while ((wcl
!= NULL
) && (count
> 0)) {
711 if (wcl
->c_dmemhandle
.mrc_rmr
== 0)
714 DTRACE_PROBE2(krpc__i__write__chunks
, uint32_t, count
,
715 uint32_t, wcl
->c_len
);
717 if (wcl
->c_len
> count
) {
720 wcl
->w
.c_saddr3
= (caddr_t
)memp
;
729 * Make rest of the chunks 0-len
731 while (wcl
!= NULL
) {
732 if (wcl
->c_dmemhandle
.mrc_rmr
== 0)
738 wcl
= crdp
->cl_reply
;
741 * MUST fail if there are still more data
744 rdma_buf_free(crdp
->conn
, &long_rpc
);
745 DTRACE_PROBE(krpc__e__svcrdma__longrep__dlen__clist
);
746 return (SVC_RDMA_FAIL
);
749 if (clist_register(crdp
->conn
, wcl
, CLIST_REG_SOURCE
) != RDMA_SUCCESS
) {
750 rdma_buf_free(crdp
->conn
, &long_rpc
);
751 DTRACE_PROBE(krpc__e__svcrdma__longrep__clistreg
);
752 return (SVC_RDMA_FAIL
);
755 status
= clist_syncmem(crdp
->conn
, wcl
, CLIST_REG_SOURCE
);
758 (void) clist_deregister(crdp
->conn
, wcl
);
759 rdma_buf_free(crdp
->conn
, &long_rpc
);
760 DTRACE_PROBE(krpc__e__svcrdma__longrep__syncmem
);
761 return (SVC_RDMA_FAIL
);
764 status
= RDMA_WRITE(crdp
->conn
, wcl
, WAIT
);
766 (void) clist_deregister(crdp
->conn
, wcl
);
767 rdma_buf_free(crdp
->conn
, &wcl
->rb_longbuf
);
769 if (status
!= RDMA_SUCCESS
) {
770 DTRACE_PROBE(krpc__e__svcrdma__longrep__write
);
771 return (SVC_RDMA_FAIL
);
774 return (SVC_RDMA_SUCCESS
);
779 svc_compose_rpcmsg(SVCXPRT
* clone_xprt
, CONN
* conn
, xdrproc_t xdr_results
,
780 caddr_t xdr_location
, rdma_buf_t
*rpcreply
, XDR
** xdrs
,
781 struct rpc_msg
*msg
, bool_t has_args
, uint_t
*len
)
784 * Get a pre-allocated buffer for rpc reply
786 rpcreply
->type
= SEND_BUFFER
;
787 if (rdma_buf_alloc(conn
, rpcreply
)) {
788 DTRACE_PROBE(krpc__e__svcrdma__rpcmsg__reply__nofreebufs
);
789 return (SVC_RDMA_FAIL
);
792 xdrrdma_create(*xdrs
, rpcreply
->addr
, rpcreply
->len
,
793 0, NULL
, XDR_ENCODE
, conn
);
795 msg
->rm_xid
= clone_xprt
->xp_xid
;
798 if (!(xdr_replymsg(*xdrs
, msg
) &&
800 SVCAUTH_WRAP(&clone_xprt
->xp_auth
, *xdrs
,
801 xdr_results
, xdr_location
)))) {
802 rdma_buf_free(conn
, rpcreply
);
804 krpc__e__svcrdma__rpcmsg__reply__authwrap1
);
805 return (SVC_RDMA_FAIL
);
808 if (!xdr_replymsg(*xdrs
, msg
)) {
809 rdma_buf_free(conn
, rpcreply
);
811 krpc__e__svcrdma__rpcmsg__reply__authwrap2
);
812 return (SVC_RDMA_FAIL
);
816 *len
= XDR_GETPOS(*xdrs
);
818 return (SVC_RDMA_SUCCESS
);
825 svc_rdma_ksend(SVCXPRT
* clone_xprt
, struct rpc_msg
*msg
)
827 XDR
*xdrs_rpc
= &(clone_xprt
->xp_xdrout
);
830 rdma_buf_t rbuf_resp
= {0}, rbuf_rpc_resp
= {0};
832 struct clone_rdma_data
*crdp
;
833 struct clist
*cl_read
= NULL
;
834 struct clist
*cl_send
= NULL
;
835 struct clist
*cl_write
= NULL
;
836 xdrproc_t xdr_results
; /* results XDR encoding function */
837 caddr_t xdr_location
; /* response results pointer */
840 int status
, msglen
, num_wreply_segments
= 0;
841 uint32_t rdma_credit
= 0;
844 uint_t final_resp_len
, rdma_response_op
, vers
;
846 bzero(&xdrs_rhdr
, sizeof (XDR
));
847 crdp
= (struct clone_rdma_data
*)clone_xprt
->xp_p2buf
;
851 * If there is a result procedure specified in the reply message,
852 * it will be processed in the xdr_replymsg and SVCAUTH_WRAP.
853 * We need to make sure it won't be processed twice, so we null
854 * it for xdr_replymsg here.
857 if (msg
->rm_reply
.rp_stat
== MSG_ACCEPTED
&&
858 msg
->rm_reply
.rp_acpt
.ar_stat
== SUCCESS
) {
859 if ((xdr_results
= msg
->acpted_rply
.ar_results
.proc
) != NULL
) {
861 xdr_location
= msg
->acpted_rply
.ar_results
.where
;
862 msg
->acpted_rply
.ar_results
.proc
= xdr_void
;
863 msg
->acpted_rply
.ar_results
.where
= NULL
;
868 * Given the limit on the inline response size (RPC_MSG_SZ),
869 * there is a need to make a guess as to the overall size of
870 * the response. If the resultant size is beyond the inline
871 * size, then the server needs to use the "reply chunk list"
872 * provided by the client (if the client provided one). An
873 * example of this type of response would be a READDIR
874 * response (e.g. a small directory read would fit in RPC_MSG_SZ
875 * and that is the preference but it may not fit)
877 * Combine the encoded size and the size of the true results
878 * and then make the decision about where to encode and send results.
880 * One important note, this calculation is ignoring the size
881 * of the encoding of the authentication overhead. The reason
882 * for this is rooted in the complexities of access to the
883 * encoded size of RPCSEC_GSS related authentiation,
884 * integrity, and privacy.
886 * If it turns out that the encoded authentication bumps the
887 * response over the RPC_MSG_SZ limit, then it may need to
888 * attempt to encode for the reply chunk list.
892 * Calculating the "sizeof" the RPC response header and the
895 msglen
= xdr_sizeof(xdr_replymsg
, msg
);
898 RSSTAT_INCR(rstotalreplies
);
901 msglen
+= xdrrdma_sizeof(xdr_results
, xdr_location
,
902 rdma_minchunk
, NULL
, NULL
);
904 DTRACE_PROBE1(krpc__i__svcrdma__ksend__msglen
, int, msglen
);
906 status
= SVC_RDMA_SUCCESS
;
908 if (msglen
< RPC_MSG_SZ
) {
910 * Looks like the response will fit in the inline
911 * response; let's try
913 RSSTAT_INCR(rstotalinlinereplies
);
915 rdma_response_op
= RDMA_MSG
;
917 status
= svc_compose_rpcmsg(clone_xprt
, conn
, xdr_results
,
918 xdr_location
, &rbuf_rpc_resp
, &xdrs_rpc
, msg
,
919 has_args
, &final_resp_len
);
921 DTRACE_PROBE1(krpc__i__srdma__ksend__compose_status
,
923 DTRACE_PROBE1(krpc__i__srdma__ksend__compose_len
,
924 int, final_resp_len
);
926 if (status
== SVC_RDMA_SUCCESS
&& crdp
->cl_reply
) {
927 clist_free(crdp
->cl_reply
);
928 crdp
->cl_reply
= NULL
;
933 * If the encode failed (size?) or the message really is
934 * larger than what is allowed, try the response chunk list.
936 if (status
!= SVC_RDMA_SUCCESS
|| msglen
>= RPC_MSG_SZ
) {
938 * attempting to use a reply chunk list when there
939 * isn't one won't get very far...
941 if (crdp
->cl_reply
== NULL
) {
942 DTRACE_PROBE(krpc__e__svcrdma__ksend__noreplycl
);
946 RSSTAT_INCR(rstotallongreplies
);
948 msglen
= xdr_sizeof(xdr_replymsg
, msg
);
949 msglen
+= xdrrdma_sizeof(xdr_results
, xdr_location
, 0,
952 status
= svc_process_long_reply(clone_xprt
, xdr_results
,
953 xdr_location
, msg
, has_args
, &msglen
, &freelen
,
954 &num_wreply_segments
, &final_resp_len
);
956 DTRACE_PROBE1(krpc__i__svcrdma__ksend__longreplen
,
957 int, final_resp_len
);
959 if (status
!= SVC_RDMA_SUCCESS
) {
960 DTRACE_PROBE(krpc__e__svcrdma__ksend__compose__failed
);
964 rdma_response_op
= RDMA_NOMSG
;
967 DTRACE_PROBE1(krpc__i__svcrdma__ksend__rdmamsg__len
,
968 int, final_resp_len
);
970 rbuf_resp
.type
= SEND_BUFFER
;
971 if (rdma_buf_alloc(conn
, &rbuf_resp
)) {
972 rdma_buf_free(conn
, &rbuf_rpc_resp
);
973 DTRACE_PROBE(krpc__e__svcrdma__ksend__nofreebufs
);
977 rdma_credit
= rdma_bufs_granted
;
980 xdrmem_create(&xdrs_rhdr
, rbuf_resp
.addr
, rbuf_resp
.len
, XDR_ENCODE
);
981 (*(uint32_t *)rbuf_resp
.addr
) = msg
->rm_xid
;
982 /* Skip xid and set the xdr position accordingly. */
983 XDR_SETPOS(&xdrs_rhdr
, sizeof (uint32_t));
984 if (!xdr_u_int(&xdrs_rhdr
, &vers
) ||
985 !xdr_u_int(&xdrs_rhdr
, &rdma_credit
) ||
986 !xdr_u_int(&xdrs_rhdr
, &rdma_response_op
)) {
987 rdma_buf_free(conn
, &rbuf_rpc_resp
);
988 rdma_buf_free(conn
, &rbuf_resp
);
989 DTRACE_PROBE(krpc__e__svcrdma__ksend__uint
);
994 * Now XDR the read chunk list, actually always NULL
996 (void) xdr_encode_rlist_svc(&xdrs_rhdr
, cl_read
);
999 * encode write list -- we already drove RDMA_WRITEs
1001 cl_write
= crdp
->cl_wlist
;
1002 if (!xdr_encode_wlist(&xdrs_rhdr
, cl_write
)) {
1003 DTRACE_PROBE(krpc__e__svcrdma__ksend__enc__wlist
);
1004 rdma_buf_free(conn
, &rbuf_rpc_resp
);
1005 rdma_buf_free(conn
, &rbuf_resp
);
1010 * XDR encode the RDMA_REPLY write chunk
1012 if (!xdr_encode_reply_wchunk(&xdrs_rhdr
, crdp
->cl_reply
,
1013 num_wreply_segments
)) {
1014 rdma_buf_free(conn
, &rbuf_rpc_resp
);
1015 rdma_buf_free(conn
, &rbuf_resp
);
1019 clist_add(&cl_send
, 0, XDR_GETPOS(&xdrs_rhdr
), &rbuf_resp
.handle
,
1020 rbuf_resp
.addr
, NULL
, NULL
);
1022 if (rdma_response_op
== RDMA_MSG
) {
1023 clist_add(&cl_send
, 0, final_resp_len
, &rbuf_rpc_resp
.handle
,
1024 rbuf_rpc_resp
.addr
, NULL
, NULL
);
1027 status
= RDMA_SEND(conn
, cl_send
, msg
->rm_xid
);
1029 if (status
== RDMA_SUCCESS
) {
1035 * Free up sendlist chunks
1037 if (cl_send
!= NULL
)
1038 clist_free(cl_send
);
1041 * Destroy private data for xdr rdma
1043 if (clone_xprt
->xp_xdrout
.x_ops
!= NULL
) {
1044 XDR_DESTROY(&(clone_xprt
->xp_xdrout
));
1047 if (crdp
->cl_reply
) {
1048 clist_free(crdp
->cl_reply
);
1049 crdp
->cl_reply
= NULL
;
1053 * This is completely disgusting. If public is set it is
1054 * a pointer to a structure whose first field is the address
1055 * of the function to free that structure and any related
1056 * stuff. (see rrokfree in nfs_xdr.c).
1058 if (xdrs_rpc
->x_public
) {
1059 /* LINTED pointer alignment */
1060 (**((int (**)()) xdrs_rpc
->x_public
)) (xdrs_rpc
->x_public
);
1063 if (xdrs_rhdr
.x_ops
!= NULL
) {
1064 XDR_DESTROY(&xdrs_rhdr
);
1071 * Deserialize arguments.
1074 svc_rdma_kgetargs(SVCXPRT
*clone_xprt
, xdrproc_t xdr_args
, caddr_t args_ptr
)
1076 if ((SVCAUTH_UNWRAP(&clone_xprt
->xp_auth
, &clone_xprt
->xp_xdrin
,
1077 xdr_args
, args_ptr
)) != TRUE
)
1083 svc_rdma_kfreeargs(SVCXPRT
*clone_xprt
, xdrproc_t xdr_args
,
1086 struct clone_rdma_data
*crdp
;
1090 * If the cloned bit is true, then this transport specific
1091 * rmda data has been duplicated into another cloned xprt. Do
1092 * not free, or release the connection, it is still in use. The
1093 * buffers will be freed and the connection released later by
1094 * SVC_CLONE_DESTROY().
1096 crdp
= (struct clone_rdma_data
*)clone_xprt
->xp_p2buf
;
1097 if (crdp
->cloned
== TRUE
) {
1103 * Free the args if needed then XDR_DESTROY
1106 XDR
*xdrs
= &clone_xprt
->xp_xdrin
;
1108 xdrs
->x_op
= XDR_FREE
;
1109 retval
= (*xdr_args
)(xdrs
, args_ptr
);
1112 XDR_DESTROY(&(clone_xprt
->xp_xdrin
));
1113 rdma_buf_free(crdp
->conn
, &crdp
->rpcbuf
);
1114 if (crdp
->cl_reply
) {
1115 clist_free(crdp
->cl_reply
);
1116 crdp
->cl_reply
= NULL
;
1118 RDMA_REL_CONN(crdp
->conn
);
1125 svc_rdma_kgetres(SVCXPRT
*clone_xprt
, int size
)
1132 svc_rdma_kfreeres(SVCXPRT
*clone_xprt
)
1137 * the dup cacheing routines below provide a cache of non-failure
1138 * transaction id's. rpc service routines can use this to detect
1139 * retransmissions and re-send a non-failure response.
1143 * MAXDUPREQS is the number of cached items. It should be adjusted
1144 * to the service load so that there is likely to be a response entry
1145 * when the first retransmission comes in.
1147 #define MAXDUPREQS 8192
1150 * This should be appropriately scaled to MAXDUPREQS. To produce as less as
1151 * possible collisions it is suggested to set this to a prime.
1153 #define DRHASHSZ 2053
1155 #define XIDHASH(xid) ((xid) % DRHASHSZ)
1156 #define DRHASH(dr) XIDHASH((dr)->dr_xid)
1157 #define REQTOXID(req) ((req)->rq_xprt->xp_xid)
1159 static int rdmandupreqs
= 0;
1160 int rdmamaxdupreqs
= MAXDUPREQS
;
1161 static kmutex_t rdmadupreq_lock
;
1162 static struct dupreq
*rdmadrhashtbl
[DRHASHSZ
];
1163 static int rdmadrhashstat
[DRHASHSZ
];
1165 static void unhash(struct dupreq
*);
1168 * rdmadrmru points to the head of a circular linked list in lru order.
1169 * rdmadrmru->dr_next == drlru
1171 struct dupreq
*rdmadrmru
;
1174 * svc_rdma_kdup searches the request cache and returns 0 if the
1175 * request is not found in the cache. If it is found, then it
1176 * returns the state of the request (in progress or done) and
1177 * the status or attributes that were part of the original reply.
1180 svc_rdma_kdup(struct svc_req
*req
, caddr_t res
, int size
, struct dupreq
**drpp
,
1188 xid
= REQTOXID(req
);
1189 mutex_enter(&rdmadupreq_lock
);
1190 RSSTAT_INCR(rsdupchecks
);
1192 * Check to see whether an entry already exists in the cache.
1194 dr
= rdmadrhashtbl
[XIDHASH(xid
)];
1195 while (dr
!= NULL
) {
1196 if (dr
->dr_xid
== xid
&&
1197 dr
->dr_proc
== req
->rq_proc
&&
1198 dr
->dr_prog
== req
->rq_prog
&&
1199 dr
->dr_vers
== req
->rq_vers
&&
1200 dr
->dr_addr
.len
== req
->rq_xprt
->xp_rtaddr
.len
&&
1201 bcmp((caddr_t
)dr
->dr_addr
.buf
,
1202 (caddr_t
)req
->rq_xprt
->xp_rtaddr
.buf
,
1203 dr
->dr_addr
.len
) == 0) {
1204 status
= dr
->dr_status
;
1205 if (status
== DUP_DONE
) {
1206 bcopy(dr
->dr_resp
.buf
, res
, size
);
1207 if (dupcachedp
!= NULL
)
1208 *dupcachedp
= (dr
->dr_resfree
!= NULL
);
1210 dr
->dr_status
= DUP_INPROGRESS
;
1213 RSSTAT_INCR(rsdupreqs
);
1214 mutex_exit(&rdmadupreq_lock
);
1221 * There wasn't an entry, either allocate a new one or recycle
1224 if (rdmandupreqs
< rdmamaxdupreqs
) {
1225 dr
= kmem_alloc(sizeof (*dr
), KM_NOSLEEP
);
1227 mutex_exit(&rdmadupreq_lock
);
1230 dr
->dr_resp
.buf
= NULL
;
1231 dr
->dr_resp
.maxlen
= 0;
1232 dr
->dr_addr
.buf
= NULL
;
1233 dr
->dr_addr
.maxlen
= 0;
1235 dr
->dr_next
= rdmadrmru
->dr_next
;
1236 rdmadrmru
->dr_next
= dr
;
1242 dr
= rdmadrmru
->dr_next
;
1243 while (dr
->dr_status
== DUP_INPROGRESS
) {
1245 if (dr
== rdmadrmru
->dr_next
) {
1246 mutex_exit(&rdmadupreq_lock
);
1251 if (dr
->dr_resfree
) {
1252 (*dr
->dr_resfree
)(dr
->dr_resp
.buf
);
1255 dr
->dr_resfree
= NULL
;
1258 dr
->dr_xid
= REQTOXID(req
);
1259 dr
->dr_prog
= req
->rq_prog
;
1260 dr
->dr_vers
= req
->rq_vers
;
1261 dr
->dr_proc
= req
->rq_proc
;
1262 if (dr
->dr_addr
.maxlen
< req
->rq_xprt
->xp_rtaddr
.len
) {
1263 if (dr
->dr_addr
.buf
!= NULL
)
1264 kmem_free(dr
->dr_addr
.buf
, dr
->dr_addr
.maxlen
);
1265 dr
->dr_addr
.maxlen
= req
->rq_xprt
->xp_rtaddr
.len
;
1266 dr
->dr_addr
.buf
= kmem_alloc(dr
->dr_addr
.maxlen
, KM_NOSLEEP
);
1267 if (dr
->dr_addr
.buf
== NULL
) {
1268 dr
->dr_addr
.maxlen
= 0;
1269 dr
->dr_status
= DUP_DROP
;
1270 mutex_exit(&rdmadupreq_lock
);
1274 dr
->dr_addr
.len
= req
->rq_xprt
->xp_rtaddr
.len
;
1275 bcopy(req
->rq_xprt
->xp_rtaddr
.buf
, dr
->dr_addr
.buf
, dr
->dr_addr
.len
);
1276 if (dr
->dr_resp
.maxlen
< size
) {
1277 if (dr
->dr_resp
.buf
!= NULL
)
1278 kmem_free(dr
->dr_resp
.buf
, dr
->dr_resp
.maxlen
);
1279 dr
->dr_resp
.maxlen
= (unsigned int)size
;
1280 dr
->dr_resp
.buf
= kmem_alloc(size
, KM_NOSLEEP
);
1281 if (dr
->dr_resp
.buf
== NULL
) {
1282 dr
->dr_resp
.maxlen
= 0;
1283 dr
->dr_status
= DUP_DROP
;
1284 mutex_exit(&rdmadupreq_lock
);
1288 dr
->dr_status
= DUP_INPROGRESS
;
1290 drhash
= (uint32_t)DRHASH(dr
);
1291 dr
->dr_chain
= rdmadrhashtbl
[drhash
];
1292 rdmadrhashtbl
[drhash
] = dr
;
1293 rdmadrhashstat
[drhash
]++;
1294 mutex_exit(&rdmadupreq_lock
);
1300 * svc_rdma_kdupdone marks the request done (DUP_DONE or DUP_DROP)
1301 * and stores the response.
1304 svc_rdma_kdupdone(struct dupreq
*dr
, caddr_t res
, void (*dis_resfree
)(),
1305 int size
, int status
)
1307 ASSERT(dr
->dr_resfree
== NULL
);
1308 if (status
== DUP_DONE
) {
1309 bcopy(res
, dr
->dr_resp
.buf
, size
);
1310 dr
->dr_resfree
= dis_resfree
;
1312 dr
->dr_status
= status
;
1316 * This routine expects that the mutex, rdmadupreq_lock, is already held.
1319 unhash(struct dupreq
*dr
)
1322 struct dupreq
*drtprev
= NULL
;
1325 ASSERT(MUTEX_HELD(&rdmadupreq_lock
));
1327 drhash
= (uint32_t)DRHASH(dr
);
1328 drt
= rdmadrhashtbl
[drhash
];
1329 while (drt
!= NULL
) {
1331 rdmadrhashstat
[drhash
]--;
1332 if (drtprev
== NULL
) {
1333 rdmadrhashtbl
[drhash
] = drt
->dr_chain
;
1335 drtprev
->dr_chain
= drt
->dr_chain
;
1340 drt
= drt
->dr_chain
;
1345 rdma_get_wchunk(struct svc_req
*req
, iovec_t
*iov
, struct clist
*wlist
)
1347 struct clist
*clist
;
1350 if (req
->rq_xprt
->xp_type
!= T_RDMA
) {
1357 tlen
+= clist
->c_len
;
1358 clist
= clist
->c_next
;
1362 * set iov to addr+len of first segment of first wchunk of
1363 * wlist sent by client. krecv() already malloc'd a buffer
1364 * large enough, but registration is deferred until we write
1365 * the buffer back to (NFS) client using RDMA_WRITE.
1367 iov
->iov_base
= (caddr_t
)(uintptr_t)wlist
->w
.c_saddr
;
1368 iov
->iov_len
= tlen
;
1374 * routine to setup the read chunk lists
1378 rdma_setup_read_chunks(struct clist
*wcl
, uint32_t count
, int *wcl_len
)
1380 int data_len
, avail_len
;
1383 data_len
= avail_len
= 0;
1385 while (wcl
!= NULL
&& count
> 0) {
1386 if (wcl
->c_dmemhandle
.mrc_rmr
== 0)
1389 if (wcl
->c_len
< count
) {
1390 data_len
+= wcl
->c_len
;
1394 avail_len
= wcl
->c_len
- count
;
1397 count
-= wcl
->c_len
;
1406 * MUST fail if there are still more data
1409 DTRACE_PROBE2(krpc__e__rdma_setup_read_chunks_clist_len
,
1410 int, data_len
, int, count
);
1415 * Round up the last chunk to 4-byte boundary
1417 *wcl_len
= roundup(data_len
, BYTES_PER_XDR_UNIT
);
1418 round_len
= *wcl_len
- data_len
;
1423 * If there is space in the current chunk,
1424 * add the roundup to the chunk.
1426 if (avail_len
>= round_len
) {
1427 wcl
->c_len
+= round_len
;
1433 if ((wcl
== NULL
) || (wcl
->c_len
< round_len
)) {
1435 krpc__e__rdma_setup_read_chunks_rndup
,
1439 wcl
->c_len
= round_len
;
1446 * Make rest of the chunks 0-len
1449 clist_zero_len(wcl
);