4 * The contents of this file are subject to the terms of the
5 * Common Development and Distribution License (the "License").
6 * You may not use this file except in compliance with the License.
8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9 * or http://www.opensolaris.org/os/licensing.
10 * See the License for the specific language governing permissions
11 * and limitations under the License.
13 * When distributing Covered Code, include this CDDL HEADER in each
14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15 * If applicable, add the following below this CDDL HEADER, with the
16 * fields enclosed by brackets "[]" replaced with your own identifying
17 * information: Portions Copyright [yyyy] [name of copyright owner]
23 * Copyright 2015 Nexenta Systems, Inc. All rights reserved.
24 * Copyright (c) 2016 by Delphix. All rights reserved.
28 * Copyright 2007 Sun Microsystems, Inc. All rights reserved.
29 * Use is subject to license terms.
32 /* Copyright (c) 1983, 1984, 1985, 1986, 1987, 1988, 1989 AT&T */
33 /* All Rights Reserved */
35 * Portions of this source code were derived from Berkeley
36 * 4.3 BSD under license from the Regents of the University of
43 * Implements a connectionful client side RPC.
45 * Connectionful RPC supports 'batched calls'.
46 * A sequence of calls may be batched-up in a send buffer. The rpc call
47 * return immediately to the client even though the call was not necessarily
48 * sent. The batching occurs if the results' xdr routine is NULL (0) AND
49 * the rpc timeout value is zero (see clnt.h, rpc).
51 * Clients should NOT casually batch calls that in fact return results; that
52 * is the server side should be aware that a call is batched and not produce
53 * any return message. Batched calls that produce many result messages can
54 * deadlock (netlock) the client and the server....
63 #include <sys/byteorder.h>
64 #include <sys/mkdev.h>
69 #include <netinet/tcp.h>
72 #define MCALL_MSG_SIZE 24
73 #define SECS_TO_NS(x) ((hrtime_t)(x) * 1000 * 1000 * 1000)
74 #define MSECS_TO_NS(x) ((hrtime_t)(x) * 1000 * 1000)
75 #define USECS_TO_NS(x) ((hrtime_t)(x) * 1000)
76 #define NSECS_TO_MS(x) ((x) / 1000 / 1000)
78 #define MIN(a, b) (((a) < (b)) ? (a) : (b))
81 extern int __rpc_timeval_to_msec(struct timeval
*);
82 extern int __rpc_compress_pollfd(int, pollfd_t
*, pollfd_t
*);
83 extern bool_t
xdr_opaque_auth(XDR
*, struct opaque_auth
*);
84 extern bool_t
__rpc_gss_wrap(AUTH
*, char *, uint_t
, XDR
*, bool_t (*)(),
86 extern bool_t
__rpc_gss_unwrap(AUTH
*, XDR
*, bool_t (*)(), caddr_t
);
87 extern CLIENT
*_clnt_vc_create_timed(int, struct netbuf
*, rpcprog_t
,
88 rpcvers_t
, uint_t
, uint_t
, const struct timeval
*);
90 static struct clnt_ops
*clnt_vc_ops(void);
91 static int read_vc(void *, caddr_t
, int);
92 static int write_vc(void *, caddr_t
, int);
93 static int t_rcvall(int, char *, int);
94 static bool_t
time_not_ok(struct timeval
*);
97 static bool_t
set_up_connection(int, struct netbuf
*,
98 struct ct_data
*, const struct timeval
*);
99 static bool_t
set_io_mode(struct ct_data
*, int);
102 * Lock table handle used by various MT sync. routines
104 static mutex_t vctbl_lock
= DEFAULTMUTEX
;
105 static void *vctbl
= NULL
;
107 static const char clnt_vc_errstr
[] = "%s : %s";
108 static const char clnt_vc_str
[] = "clnt_vc_create";
109 static const char clnt_read_vc_str
[] = "read_vc";
110 static const char __no_mem_str
[] = "out of memory";
111 static const char no_fcntl_getfl_str
[] = "could not get status flags and modes";
112 static const char no_nonblock_str
[] = "could not set transport blocking mode";
115 * Private data structure
118 int ct_fd
; /* connection's fd */
119 bool_t ct_closeit
; /* close it on destroy */
120 int ct_tsdu
; /* size of tsdu */
121 int ct_wait
; /* wait interval in milliseconds */
122 bool_t ct_waitset
; /* wait set by clnt_control? */
123 struct netbuf ct_addr
; /* remote addr */
124 struct rpc_err ct_error
;
125 char ct_mcall
[MCALL_MSG_SIZE
]; /* marshalled callmsg */
126 uint_t ct_mpos
; /* pos after marshal */
127 XDR ct_xdrs
; /* XDR stream */
129 /* NON STANDARD INFO - 00-08-31 */
130 bool_t ct_is_oneway
; /* True if the current call is oneway. */
131 bool_t ct_is_blocking
;
133 ushort_t ct_blocking_mode
;
134 uint_t ct_bufferSize
; /* Total size of the buffer. */
135 uint_t ct_bufferPendingSize
; /* Size of unsent data. */
136 char *ct_buffer
; /* Pointer to the buffer. */
137 char *ct_bufferWritePtr
; /* Ptr to the first free byte. */
138 char *ct_bufferReadPtr
; /* Ptr to the first byte of data. */
142 struct nb_reg_node
*next
;
146 static struct nb_reg_node
*nb_first
= (struct nb_reg_node
*)&nb_first
;
147 static struct nb_reg_node
*nb_free
= (struct nb_reg_node
*)&nb_free
;
149 static bool_t exit_handler_set
= FALSE
;
151 static mutex_t nb_list_mutex
= DEFAULTMUTEX
;
154 /* Define some macros to manage the linked list. */
155 #define LIST_ISEMPTY(l) (l == (struct nb_reg_node *)&l)
156 #define LIST_CLR(l) (l = (struct nb_reg_node *)&l)
157 #define LIST_ADD(l, node) (node->next = l->next, l = node)
158 #define LIST_EXTRACT(l, node) (node = l, l = l->next)
159 #define LIST_FOR_EACH(l, node) \
160 for (node = l; node != (struct nb_reg_node *)&l; node = node->next)
163 /* Default size of the IO buffer used in non blocking mode */
164 #define DEFAULT_PENDING_ZONE_MAX_SIZE (16*1024)
166 static int nb_send(struct ct_data
*, void *, unsigned int);
167 static int do_flush(struct ct_data
*, uint_t
);
168 static bool_t
set_flush_mode(struct ct_data
*, int);
169 static bool_t
set_blocking_connection(struct ct_data
*, bool_t
);
171 static int register_nb(struct ct_data
*);
172 static int unregister_nb(struct ct_data
*);
176 * Change the mode of the underlying fd.
179 set_blocking_connection(struct ct_data
*ct
, bool_t blocking
)
184 * If the underlying fd is already in the required mode,
187 if (ct
->ct_is_blocking
== blocking
)
190 if ((flag
= fcntl(ct
->ct_fd
, F_GETFL
, 0)) < 0) {
191 (void) syslog(LOG_ERR
, "set_blocking_connection : %s",
196 flag
= blocking
? flag
&~O_NONBLOCK
: flag
|O_NONBLOCK
;
197 if (fcntl(ct
->ct_fd
, F_SETFL
, flag
) != 0) {
198 (void) syslog(LOG_ERR
, "set_blocking_connection : %s",
202 ct
->ct_is_blocking
= blocking
;
207 * Create a client handle for a connection.
208 * Default options are set, which the user can change using clnt_control()'s.
209 * The rpc/vc package does buffering similar to stdio, so the client
210 * must pick send and receive buffer sizes, 0 => use the default.
211 * NB: fd is copied into a private area.
212 * NB: The rpch->cl_auth is set null authentication. Caller may wish to
213 * set this something more useful.
215 * fd should be open and bound.
218 clnt_vc_create(const int fd
, struct netbuf
*svcaddr
, const rpcprog_t prog
,
219 const rpcvers_t vers
, const uint_t sendsz
, const uint_t recvsz
)
221 return (_clnt_vc_create_timed(fd
, svcaddr
, prog
, vers
, sendsz
,
226 * This has the same definition as clnt_vc_create(), except it
227 * takes an additional parameter - a pointer to a timeval structure.
229 * Not a public interface. This is for clnt_create_timed,
230 * clnt_create_vers_timed, clnt_tp_create_timed to pass down the timeout
231 * value to control a tcp connection attempt.
232 * (for bug 4049792: clnt_create_timed does not time out)
234 * If tp is NULL, use default timeout to set up the connection.
237 _clnt_vc_create_timed(int fd
, struct netbuf
*svcaddr
, rpcprog_t prog
,
238 rpcvers_t vers
, uint_t sendsz
, uint_t recvsz
, const struct timeval
*tp
)
240 CLIENT
*cl
; /* client handle */
241 struct ct_data
*ct
; /* private data */
243 struct rpc_msg call_msg
;
247 cl
= malloc(sizeof (*cl
));
248 if ((ct
= malloc(sizeof (*ct
))) != NULL
)
249 ct
->ct_addr
.buf
= NULL
;
251 if ((cl
== NULL
) || (ct
== NULL
)) {
252 (void) syslog(LOG_ERR
, clnt_vc_errstr
,
253 clnt_vc_str
, __no_mem_str
);
254 rpc_createerr
.cf_stat
= RPC_SYSTEMERROR
;
255 rpc_createerr
.cf_error
.re_errno
= errno
;
256 rpc_createerr
.cf_error
.re_terrno
= 0;
261 * The only use of vctbl_lock is for serializing the creation of
262 * vctbl. Once created the lock needs to be released so we don't
263 * hold it across the set_up_connection() call and end up with a
264 * bunch of threads stuck waiting for the mutex.
266 sig_mutex_lock(&vctbl_lock
);
268 if ((vctbl
== NULL
) && ((vctbl
= rpc_fd_init()) == NULL
)) {
269 rpc_createerr
.cf_stat
= RPC_SYSTEMERROR
;
270 rpc_createerr
.cf_error
.re_errno
= errno
;
271 rpc_createerr
.cf_error
.re_terrno
= 0;
272 sig_mutex_unlock(&vctbl_lock
);
276 sig_mutex_unlock(&vctbl_lock
);
278 ct
->ct_io_mode
= RPC_CL_BLOCKING
;
279 ct
->ct_blocking_mode
= RPC_CL_BLOCKING_FLUSH
;
281 ct
->ct_buffer
= NULL
; /* We allocate the buffer when needed. */
282 ct
->ct_bufferSize
= DEFAULT_PENDING_ZONE_MAX_SIZE
;
283 ct
->ct_bufferPendingSize
= 0;
284 ct
->ct_bufferWritePtr
= NULL
;
285 ct
->ct_bufferReadPtr
= NULL
;
287 /* Check the current state of the fd. */
288 if ((flag
= fcntl(fd
, F_GETFL
, 0)) < 0) {
289 (void) syslog(LOG_ERR
, "_clnt_vc_create_timed : %s",
291 rpc_createerr
.cf_stat
= RPC_SYSTEMERROR
;
292 rpc_createerr
.cf_error
.re_terrno
= errno
;
293 rpc_createerr
.cf_error
.re_errno
= 0;
296 ct
->ct_is_blocking
= flag
& O_NONBLOCK
? FALSE
: TRUE
;
298 if (set_up_connection(fd
, svcaddr
, ct
, tp
) == FALSE
) {
303 * Set up other members of private data struct
307 * The actual value will be set by clnt_call or clnt_control
310 ct
->ct_waitset
= FALSE
;
312 * By default, closeit is always FALSE. It is users responsibility
313 * to do a t_close on it, else the user may use clnt_control
314 * to let clnt_destroy do it for them.
316 ct
->ct_closeit
= FALSE
;
319 * Initialize call message
321 (void) gettimeofday(&now
, NULL
);
322 call_msg
.rm_xid
= getpid() ^ now
.tv_sec
^ now
.tv_usec
;
323 call_msg
.rm_call
.cb_prog
= prog
;
324 call_msg
.rm_call
.cb_vers
= vers
;
327 * pre-serialize the static part of the call msg and stash it away
329 xdrmem_create(&(ct
->ct_xdrs
), ct
->ct_mcall
, MCALL_MSG_SIZE
, XDR_ENCODE
);
330 if (!xdr_callhdr(&(ct
->ct_xdrs
), &call_msg
)) {
333 ct
->ct_mpos
= XDR_GETPOS(&(ct
->ct_xdrs
));
334 XDR_DESTROY(&(ct
->ct_xdrs
));
336 if (t_getinfo(fd
, &tinfo
) == -1) {
337 rpc_createerr
.cf_stat
= RPC_TLIERROR
;
338 rpc_createerr
.cf_error
.re_terrno
= t_errno
;
339 rpc_createerr
.cf_error
.re_errno
= 0;
343 * Find the receive and the send size
345 sendsz
= __rpc_get_t_size((int)sendsz
, tinfo
.tsdu
);
346 recvsz
= __rpc_get_t_size((int)recvsz
, tinfo
.tsdu
);
347 if ((sendsz
== 0) || (recvsz
== 0)) {
348 rpc_createerr
.cf_stat
= RPC_TLIERROR
;
349 rpc_createerr
.cf_error
.re_terrno
= 0;
350 rpc_createerr
.cf_error
.re_errno
= 0;
353 ct
->ct_tsdu
= tinfo
.tsdu
;
355 * Create a client handle which uses xdrrec for serialization
356 * and authnone for authentication.
358 ct
->ct_xdrs
.x_ops
= NULL
;
359 xdrrec_create(&(ct
->ct_xdrs
), sendsz
, recvsz
, (caddr_t
)ct
,
361 if (ct
->ct_xdrs
.x_ops
== NULL
) {
362 rpc_createerr
.cf_stat
= RPC_SYSTEMERROR
;
363 rpc_createerr
.cf_error
.re_terrno
= 0;
364 rpc_createerr
.cf_error
.re_errno
= ENOMEM
;
367 cl
->cl_ops
= clnt_vc_ops();
368 cl
->cl_private
= (caddr_t
)ct
;
369 cl
->cl_auth
= authnone_create();
376 free(ct
->ct_addr
.buf
);
384 #define TCPOPT_BUFSIZE 128
387 * Set tcp connection timeout value.
388 * Retun 0 for success, -1 for failure.
391 _set_tcp_conntime(int fd
, int optval
)
393 struct t_optmgmt req
, res
;
396 char buf
[TCPOPT_BUFSIZE
];
398 /* LINTED pointer cast */
399 opt
= (struct opthdr
*)buf
;
400 opt
->level
= IPPROTO_TCP
;
401 opt
->name
= TCP_CONN_ABORT_THRESHOLD
;
402 opt
->len
= sizeof (int);
404 req
.flags
= T_NEGOTIATE
;
405 req
.opt
.len
= sizeof (struct opthdr
) + opt
->len
;
406 req
.opt
.buf
= (char *)opt
;
407 /* LINTED pointer cast */
408 ip
= (int *)((char *)buf
+ sizeof (struct opthdr
));
412 res
.opt
.buf
= (char *)buf
;
413 res
.opt
.maxlen
= sizeof (buf
);
414 if (t_optmgmt(fd
, &req
, &res
) < 0 || res
.flags
!= T_SUCCESS
) {
421 * Get current tcp connection timeout value.
422 * Retun the timeout in milliseconds, or -1 for failure.
425 _get_tcp_conntime(int fd
)
427 struct t_optmgmt req
, res
;
430 char buf
[TCPOPT_BUFSIZE
];
432 /* LINTED pointer cast */
433 opt
= (struct opthdr
*)buf
;
434 opt
->level
= IPPROTO_TCP
;
435 opt
->name
= TCP_CONN_ABORT_THRESHOLD
;
436 opt
->len
= sizeof (int);
438 req
.flags
= T_CURRENT
;
439 req
.opt
.len
= sizeof (struct opthdr
) + opt
->len
;
440 req
.opt
.buf
= (char *)opt
;
441 /* LINTED pointer cast */
442 ip
= (int *)((char *)buf
+ sizeof (struct opthdr
));
446 res
.opt
.buf
= (char *)buf
;
447 res
.opt
.maxlen
= sizeof (buf
);
448 if (t_optmgmt(fd
, &req
, &res
) < 0 || res
.flags
!= T_SUCCESS
) {
452 /* LINTED pointer cast */
453 ip
= (int *)((char *)buf
+ sizeof (struct opthdr
));
459 set_up_connection(int fd
, struct netbuf
*svcaddr
, struct ct_data
*ct
,
460 const struct timeval
*tp
)
463 struct t_call sndcallstr
, *rcvcall
;
465 bool_t connected
, do_rcv_connect
;
468 hrtime_t tout
; /* timeout in nanoseconds (from tp) */
471 state
= t_getstate(fd
);
473 rpc_createerr
.cf_stat
= RPC_TLIERROR
;
474 rpc_createerr
.cf_error
.re_errno
= 0;
475 rpc_createerr
.cf_error
.re_terrno
= t_errno
;
481 if (svcaddr
== NULL
) {
482 rpc_createerr
.cf_stat
= RPC_UNKNOWNADDR
;
486 * Connect only if state is IDLE and svcaddr known
488 /* LINTED pointer alignment */
489 rcvcall
= (struct t_call
*)t_alloc(fd
, T_CALL
, T_OPT
|T_ADDR
);
490 if (rcvcall
== NULL
) {
491 rpc_createerr
.cf_stat
= RPC_TLIERROR
;
492 rpc_createerr
.cf_error
.re_terrno
= t_errno
;
493 rpc_createerr
.cf_error
.re_errno
= errno
;
496 rcvcall
->udata
.maxlen
= 0;
497 sndcallstr
.addr
= *svcaddr
;
498 sndcallstr
.opt
.len
= 0;
499 sndcallstr
.udata
.len
= 0;
501 * Even NULL could have sufficed for rcvcall, because
502 * the address returned is same for all cases except
503 * for the gateway case, and hence required.
506 do_rcv_connect
= FALSE
;
509 * If there is a timeout value specified, we will try to
510 * reset the tcp connection timeout. If the transport does
511 * not support the TCP_CONN_ABORT_THRESHOLD option or fails
512 * for other reason, default timeout will be used.
518 * Calculate the timeout in nanoseconds
520 tout
= SECS_TO_NS(tp
->tv_sec
) +
521 USECS_TO_NS(tp
->tv_usec
);
522 curr_time
= _get_tcp_conntime(fd
);
525 for (nconnect
= 0; nconnect
< 3; nconnect
++) {
528 * Calculate the elapsed time
530 hrtime_t elapsed
= gethrtime() - start
;
534 if (curr_time
!= -1) {
538 * TCP_CONN_ABORT_THRESHOLD takes int
539 * value in milliseconds. Make sure we
542 if (NSECS_TO_MS(tout
- elapsed
) >=
547 NSECS_TO_MS(tout
- elapsed
);
548 if (MSECS_TO_NS(ms
) !=
553 (void) _set_tcp_conntime(fd
, ms
);
557 if (t_connect(fd
, &sndcallstr
, rcvcall
) != -1) {
561 if (t_errno
== TLOOK
) {
562 switch (t_look(fd
)) {
564 (void) t_rcvdis(fd
, (struct
570 } else if (!(t_errno
== TSYSERR
&& errno
== EINTR
)) {
573 if ((state
= t_getstate(fd
)) == T_OUTCON
) {
574 do_rcv_connect
= TRUE
;
577 if (state
!= T_IDLE
) {
581 if (do_rcv_connect
) {
583 if (t_rcvconnect(fd
, rcvcall
) != -1) {
587 } while (t_errno
== TSYSERR
&& errno
== EINTR
);
591 * Set the connection timeout back to its old value.
593 if (curr_time
!= -1) {
594 (void) _set_tcp_conntime(fd
, curr_time
);
598 rpc_createerr
.cf_stat
= RPC_TLIERROR
;
599 rpc_createerr
.cf_error
.re_terrno
= t_errno
;
600 rpc_createerr
.cf_error
.re_errno
= errno
;
601 (void) t_free((char *)rcvcall
, T_CALL
);
605 /* Free old area if allocated */
606 free(ct
->ct_addr
.buf
);
607 ct
->ct_addr
= rcvcall
->addr
; /* To get the new address */
608 /* So that address buf does not get freed */
609 rcvcall
->addr
.buf
= NULL
;
610 (void) t_free((char *)rcvcall
, T_CALL
);
614 if (svcaddr
== NULL
) {
616 * svcaddr could also be NULL in cases where the
617 * client is already bound and connected.
621 ct
->ct_addr
.buf
= malloc(svcaddr
->len
);
622 if (ct
->ct_addr
.buf
== NULL
) {
623 (void) syslog(LOG_ERR
, clnt_vc_errstr
,
624 clnt_vc_str
, __no_mem_str
);
625 rpc_createerr
.cf_stat
= RPC_SYSTEMERROR
;
626 rpc_createerr
.cf_error
.re_errno
= errno
;
627 rpc_createerr
.cf_error
.re_terrno
= 0;
630 (void) memcpy(ct
->ct_addr
.buf
, svcaddr
->buf
,
631 (size_t)svcaddr
->len
);
632 ct
->ct_addr
.len
= ct
->ct_addr
.maxlen
= svcaddr
->len
;
636 rpc_createerr
.cf_stat
= RPC_UNKNOWNADDR
;
642 static enum clnt_stat
643 clnt_vc_call(CLIENT
*cl
, rpcproc_t proc
, xdrproc_t xdr_args
, caddr_t args_ptr
,
644 xdrproc_t xdr_results
, caddr_t results_ptr
, struct timeval timeout
)
646 /* LINTED pointer alignment */
647 struct ct_data
*ct
= (struct ct_data
*)cl
->cl_private
;
648 XDR
*xdrs
= &(ct
->ct_xdrs
);
649 struct rpc_msg reply_msg
;
651 /* LINTED pointer alignment */
652 uint32_t *msg_x_id
= (uint32_t *)(ct
->ct_mcall
); /* yuk */
656 if (rpc_fd_lock(vctbl
, ct
->ct_fd
)) {
657 rpc_callerr
.re_status
= RPC_FAILED
;
658 rpc_callerr
.re_errno
= errno
;
659 rpc_fd_unlock(vctbl
, ct
->ct_fd
);
663 ct
->ct_is_oneway
= FALSE
;
664 if (ct
->ct_io_mode
== RPC_CL_NONBLOCKING
) {
665 if (do_flush(ct
, RPC_CL_BLOCKING_FLUSH
) != 0) {
666 rpc_fd_unlock(vctbl
, ct
->ct_fd
);
667 return (RPC_FAILED
); /* XXX */
671 if (!ct
->ct_waitset
) {
672 /* If time is not within limits, we ignore it. */
673 if (time_not_ok(&timeout
) == FALSE
)
674 ct
->ct_wait
= __rpc_timeval_to_msec(&timeout
);
676 timeout
.tv_sec
= (ct
->ct_wait
/ 1000);
677 timeout
.tv_usec
= (ct
->ct_wait
% 1000) * 1000;
680 shipnow
= ((xdr_results
== (xdrproc_t
)0) && (timeout
.tv_sec
== 0) &&
681 (timeout
.tv_usec
== 0)) ? FALSE
: TRUE
;
683 xdrs
->x_op
= XDR_ENCODE
;
684 rpc_callerr
.re_status
= RPC_SUCCESS
;
686 * Due to little endian byte order, it is necessary to convert to host
687 * format before decrementing xid.
689 x_id
= ntohl(*msg_x_id
) - 1;
690 *msg_x_id
= htonl(x_id
);
692 if (cl
->cl_auth
->ah_cred
.oa_flavor
!= RPCSEC_GSS
) {
693 if ((!XDR_PUTBYTES(xdrs
, ct
->ct_mcall
, ct
->ct_mpos
)) ||
694 (!XDR_PUTINT32(xdrs
, (int32_t *)&proc
)) ||
695 (!AUTH_MARSHALL(cl
->cl_auth
, xdrs
)) ||
696 (!xdr_args(xdrs
, args_ptr
))) {
697 if (rpc_callerr
.re_status
== RPC_SUCCESS
)
698 rpc_callerr
.re_status
= RPC_CANTENCODEARGS
;
699 (void) xdrrec_endofrecord(xdrs
, TRUE
);
700 rpc_fd_unlock(vctbl
, ct
->ct_fd
);
701 return (rpc_callerr
.re_status
);
704 /* LINTED pointer alignment */
705 uint32_t *u
= (uint32_t *)&ct
->ct_mcall
[ct
->ct_mpos
];
706 IXDR_PUT_U_INT32(u
, proc
);
707 if (!__rpc_gss_wrap(cl
->cl_auth
, ct
->ct_mcall
,
708 ((char *)u
) - ct
->ct_mcall
, xdrs
, xdr_args
, args_ptr
)) {
709 if (rpc_callerr
.re_status
== RPC_SUCCESS
)
710 rpc_callerr
.re_status
= RPC_CANTENCODEARGS
;
711 (void) xdrrec_endofrecord(xdrs
, TRUE
);
712 rpc_fd_unlock(vctbl
, ct
->ct_fd
);
713 return (rpc_callerr
.re_status
);
716 if (!xdrrec_endofrecord(xdrs
, shipnow
)) {
717 rpc_fd_unlock(vctbl
, ct
->ct_fd
);
718 return (rpc_callerr
.re_status
= RPC_CANTSEND
);
721 rpc_fd_unlock(vctbl
, ct
->ct_fd
);
722 return (RPC_SUCCESS
);
725 * Hack to provide rpc-based message passing
727 if (timeout
.tv_sec
== 0 && timeout
.tv_usec
== 0) {
728 rpc_fd_unlock(vctbl
, ct
->ct_fd
);
729 return (rpc_callerr
.re_status
= RPC_TIMEDOUT
);
734 * Keep receiving until we get a valid transaction id
736 xdrs
->x_op
= XDR_DECODE
;
738 reply_msg
.acpted_rply
.ar_verf
= _null_auth
;
739 reply_msg
.acpted_rply
.ar_results
.where
= NULL
;
740 reply_msg
.acpted_rply
.ar_results
.proc
= (xdrproc_t
)xdr_void
;
741 if (!xdrrec_skiprecord(xdrs
)) {
742 rpc_fd_unlock(vctbl
, ct
->ct_fd
);
743 return (rpc_callerr
.re_status
);
745 /* now decode and validate the response header */
746 if (!xdr_replymsg(xdrs
, &reply_msg
)) {
747 if (rpc_callerr
.re_status
== RPC_SUCCESS
)
749 rpc_fd_unlock(vctbl
, ct
->ct_fd
);
750 return (rpc_callerr
.re_status
);
752 if (reply_msg
.rm_xid
== x_id
)
759 if ((reply_msg
.rm_reply
.rp_stat
== MSG_ACCEPTED
) &&
760 (reply_msg
.acpted_rply
.ar_stat
== SUCCESS
))
761 rpc_callerr
.re_status
= RPC_SUCCESS
;
763 __seterr_reply(&reply_msg
, &(rpc_callerr
));
765 if (rpc_callerr
.re_status
== RPC_SUCCESS
) {
766 if (!AUTH_VALIDATE(cl
->cl_auth
,
767 &reply_msg
.acpted_rply
.ar_verf
)) {
768 rpc_callerr
.re_status
= RPC_AUTHERROR
;
769 rpc_callerr
.re_why
= AUTH_INVALIDRESP
;
770 } else if (cl
->cl_auth
->ah_cred
.oa_flavor
!= RPCSEC_GSS
) {
771 if (!(*xdr_results
)(xdrs
, results_ptr
)) {
772 if (rpc_callerr
.re_status
== RPC_SUCCESS
)
773 rpc_callerr
.re_status
=
776 } else if (!__rpc_gss_unwrap(cl
->cl_auth
, xdrs
, xdr_results
,
778 if (rpc_callerr
.re_status
== RPC_SUCCESS
)
779 rpc_callerr
.re_status
= RPC_CANTDECODERES
;
781 } /* end successful completion */
783 * If unsuccesful AND error is an authentication error
784 * then refresh credentials and try again, else break
786 else if (rpc_callerr
.re_status
== RPC_AUTHERROR
) {
787 /* maybe our credentials need to be refreshed ... */
788 if (refreshes
-- && AUTH_REFRESH(cl
->cl_auth
, &reply_msg
))
792 * We are setting rpc_callerr here given that libnsl
793 * is not reentrant thereby reinitializing the TSD.
794 * If not set here then success could be returned even
795 * though refresh failed.
797 rpc_callerr
.re_status
= RPC_AUTHERROR
;
798 } /* end of unsuccessful completion */
799 /* free verifier ... */
800 if (reply_msg
.rm_reply
.rp_stat
== MSG_ACCEPTED
&&
801 reply_msg
.acpted_rply
.ar_verf
.oa_base
!= NULL
) {
802 xdrs
->x_op
= XDR_FREE
;
803 (void) xdr_opaque_auth(xdrs
, &(reply_msg
.acpted_rply
.ar_verf
));
805 rpc_fd_unlock(vctbl
, ct
->ct_fd
);
806 return (rpc_callerr
.re_status
);
809 static enum clnt_stat
810 clnt_vc_send(CLIENT
*cl
, rpcproc_t proc
, xdrproc_t xdr_args
, caddr_t args_ptr
)
812 /* LINTED pointer alignment */
813 struct ct_data
*ct
= (struct ct_data
*)cl
->cl_private
;
814 XDR
*xdrs
= &(ct
->ct_xdrs
);
816 /* LINTED pointer alignment */
817 uint32_t *msg_x_id
= (uint32_t *)(ct
->ct_mcall
); /* yuk */
819 if (rpc_fd_lock(vctbl
, ct
->ct_fd
)) {
820 rpc_callerr
.re_status
= RPC_FAILED
;
821 rpc_callerr
.re_errno
= errno
;
822 rpc_fd_unlock(vctbl
, ct
->ct_fd
);
826 ct
->ct_is_oneway
= TRUE
;
828 xdrs
->x_op
= XDR_ENCODE
;
829 rpc_callerr
.re_status
= RPC_SUCCESS
;
831 * Due to little endian byte order, it is necessary to convert to host
832 * format before decrementing xid.
834 x_id
= ntohl(*msg_x_id
) - 1;
835 *msg_x_id
= htonl(x_id
);
837 if (cl
->cl_auth
->ah_cred
.oa_flavor
!= RPCSEC_GSS
) {
838 if ((!XDR_PUTBYTES(xdrs
, ct
->ct_mcall
, ct
->ct_mpos
)) ||
839 (!XDR_PUTINT32(xdrs
, (int32_t *)&proc
)) ||
840 (!AUTH_MARSHALL(cl
->cl_auth
, xdrs
)) ||
841 (!xdr_args(xdrs
, args_ptr
))) {
842 if (rpc_callerr
.re_status
== RPC_SUCCESS
)
843 rpc_callerr
.re_status
= RPC_CANTENCODEARGS
;
844 (void) xdrrec_endofrecord(xdrs
, TRUE
);
845 rpc_fd_unlock(vctbl
, ct
->ct_fd
);
846 return (rpc_callerr
.re_status
);
849 /* LINTED pointer alignment */
850 uint32_t *u
= (uint32_t *)&ct
->ct_mcall
[ct
->ct_mpos
];
851 IXDR_PUT_U_INT32(u
, proc
);
852 if (!__rpc_gss_wrap(cl
->cl_auth
, ct
->ct_mcall
,
853 ((char *)u
) - ct
->ct_mcall
, xdrs
, xdr_args
, args_ptr
)) {
854 if (rpc_callerr
.re_status
== RPC_SUCCESS
)
855 rpc_callerr
.re_status
= RPC_CANTENCODEARGS
;
856 (void) xdrrec_endofrecord(xdrs
, TRUE
);
857 rpc_fd_unlock(vctbl
, ct
->ct_fd
);
858 return (rpc_callerr
.re_status
);
863 * Do not need to check errors, as the following code does
864 * not depend on the successful completion of the call.
865 * An error, if any occurs, is reported through
866 * rpc_callerr.re_status.
868 (void) xdrrec_endofrecord(xdrs
, TRUE
);
870 rpc_fd_unlock(vctbl
, ct
->ct_fd
);
871 return (rpc_callerr
.re_status
);
876 clnt_vc_geterr(CLIENT
*cl
, struct rpc_err
*errp
)
882 clnt_vc_freeres(CLIENT
*cl
, xdrproc_t xdr_res
, caddr_t res_ptr
)
884 /* LINTED pointer alignment */
885 struct ct_data
*ct
= (struct ct_data
*)cl
->cl_private
;
886 XDR
*xdrs
= &(ct
->ct_xdrs
);
889 (void) rpc_fd_lock(vctbl
, ct
->ct_fd
);
890 xdrs
->x_op
= XDR_FREE
;
891 stat
= (*xdr_res
)(xdrs
, res_ptr
);
892 rpc_fd_unlock(vctbl
, ct
->ct_fd
);
903 clnt_vc_control(CLIENT
*cl
, int request
, char *info
)
906 /* LINTED pointer alignment */
907 struct ct_data
*ct
= (struct ct_data
*)cl
->cl_private
;
909 if (rpc_fd_lock(vctbl
, ct
->ct_fd
)) {
910 rpc_fd_unlock(vctbl
, ct
->ct_fd
);
916 ct
->ct_closeit
= TRUE
;
917 rpc_fd_unlock(vctbl
, ct
->ct_fd
);
919 case CLSET_FD_NCLOSE
:
920 ct
->ct_closeit
= FALSE
;
921 rpc_fd_unlock(vctbl
, ct
->ct_fd
);
924 if (ct
->ct_io_mode
== RPC_CL_NONBLOCKING
) {
926 res
= do_flush(ct
, (info
== NULL
||
927 /* LINTED pointer cast */
928 *(int *)info
== RPC_CL_DEFAULT_FLUSH
)?
929 /* LINTED pointer cast */
930 ct
->ct_blocking_mode
: *(int *)info
);
935 rpc_fd_unlock(vctbl
, ct
->ct_fd
);
939 /* for other requests which use info */
941 rpc_fd_unlock(vctbl
, ct
->ct_fd
);
946 /* LINTED pointer alignment */
947 if (time_not_ok((struct timeval
*)info
)) {
948 rpc_fd_unlock(vctbl
, ct
->ct_fd
);
951 /* LINTED pointer alignment */
952 ct
->ct_wait
= __rpc_timeval_to_msec((struct timeval
*)info
);
953 ct
->ct_waitset
= TRUE
;
956 /* LINTED pointer alignment */
957 ((struct timeval
*)info
)->tv_sec
= ct
->ct_wait
/ 1000;
958 /* LINTED pointer alignment */
959 ((struct timeval
*)info
)->tv_usec
= (ct
->ct_wait
% 1000) * 1000;
961 case CLGET_SERVER_ADDR
: /* For compatibility only */
962 (void) memcpy(info
, ct
->ct_addr
.buf
, (size_t)ct
->ct_addr
.len
);
965 /* LINTED pointer alignment */
966 *(int *)info
= ct
->ct_fd
;
969 /* The caller should not free this memory area */
970 /* LINTED pointer alignment */
971 *(struct netbuf
*)info
= ct
->ct_addr
;
973 case CLSET_SVC_ADDR
: /* set to new address */
976 * XXX: once the t_snddis(), followed by t_connect() starts to
977 * work, this ifdef should be removed. CLIENT handle reuse
978 * would then be possible for COTS as well.
980 if (t_snddis(ct
->ct_fd
, NULL
) == -1) {
981 rpc_createerr
.cf_stat
= RPC_TLIERROR
;
982 rpc_createerr
.cf_error
.re_terrno
= t_errno
;
983 rpc_createerr
.cf_error
.re_errno
= errno
;
984 rpc_fd_unlock(vctbl
, ct
->ct_fd
);
987 ret
= set_up_connection(ct
->ct_fd
, (struct netbuf
*)info
,
989 rpc_fd_unlock(vctbl
, ct
->ct_fd
);
992 rpc_fd_unlock(vctbl
, ct
->ct_fd
);
997 * use the knowledge that xid is the
998 * first element in the call structure
999 * This will get the xid of the PREVIOUS call
1001 /* LINTED pointer alignment */
1002 *(uint32_t *)info
= ntohl(*(uint32_t *)ct
->ct_mcall
);
1005 /* This will set the xid of the NEXT call */
1006 /* LINTED pointer alignment */
1007 *(uint32_t *)ct
->ct_mcall
= htonl(*(uint32_t *)info
+ 1);
1008 /* increment by 1 as clnt_vc_call() decrements once */
1012 * This RELIES on the information that, in the call body,
1013 * the version number field is the fifth field from the
1014 * begining of the RPC header. MUST be changed if the
1015 * call_struct is changed
1017 /* LINTED pointer alignment */
1018 *(uint32_t *)info
= ntohl(*(uint32_t *)(ct
->ct_mcall
+
1019 4 * BYTES_PER_XDR_UNIT
));
1023 /* LINTED pointer alignment */
1024 *(uint32_t *)(ct
->ct_mcall
+ 4 * BYTES_PER_XDR_UNIT
) =
1025 /* LINTED pointer alignment */
1026 htonl(*(uint32_t *)info
);
1031 * This RELIES on the information that, in the call body,
1032 * the program number field is the fourth field from the
1033 * begining of the RPC header. MUST be changed if the
1034 * call_struct is changed
1036 /* LINTED pointer alignment */
1037 *(uint32_t *)info
= ntohl(*(uint32_t *)(ct
->ct_mcall
+
1038 3 * BYTES_PER_XDR_UNIT
));
1042 /* LINTED pointer alignment */
1043 *(uint32_t *)(ct
->ct_mcall
+ 3 * BYTES_PER_XDR_UNIT
) =
1044 /* LINTED pointer alignment */
1045 htonl(*(uint32_t *)info
);
1049 /* LINTED pointer cast */
1050 if (!set_io_mode(ct
, *(int *)info
)) {
1051 rpc_fd_unlock(vctbl
, ct
->ct_fd
);
1055 case CLSET_FLUSH_MODE
:
1056 /* Set a specific FLUSH_MODE */
1057 /* LINTED pointer cast */
1058 if (!set_flush_mode(ct
, *(int *)info
)) {
1059 rpc_fd_unlock(vctbl
, ct
->ct_fd
);
1063 case CLGET_FLUSH_MODE
:
1064 /* LINTED pointer cast */
1065 *(rpcflushmode_t
*)info
= ct
->ct_blocking_mode
;
1069 /* LINTED pointer cast */
1070 *(rpciomode_t
*)info
= ct
->ct_io_mode
;
1073 case CLGET_CURRENT_REC_SIZE
:
1075 * Returns the current amount of memory allocated
1076 * to pending requests
1078 /* LINTED pointer cast */
1079 *(int *)info
= ct
->ct_bufferPendingSize
;
1082 case CLSET_CONNMAXREC_SIZE
:
1083 /* Cannot resize the buffer if it is used. */
1084 if (ct
->ct_bufferPendingSize
!= 0) {
1085 rpc_fd_unlock(vctbl
, ct
->ct_fd
);
1089 * If the new size is equal to the current size,
1090 * there is nothing to do.
1092 /* LINTED pointer cast */
1093 if (ct
->ct_bufferSize
== *(uint_t
*)info
)
1096 /* LINTED pointer cast */
1097 ct
->ct_bufferSize
= *(uint_t
*)info
;
1098 if (ct
->ct_buffer
) {
1099 free(ct
->ct_buffer
);
1100 ct
->ct_buffer
= NULL
;
1101 ct
->ct_bufferReadPtr
= ct
->ct_bufferWritePtr
= NULL
;
1105 case CLGET_CONNMAXREC_SIZE
:
1107 * Returns the size of buffer allocated
1108 * to pending requests
1110 /* LINTED pointer cast */
1111 *(uint_t
*)info
= ct
->ct_bufferSize
;
1115 rpc_fd_unlock(vctbl
, ct
->ct_fd
);
1118 rpc_fd_unlock(vctbl
, ct
->ct_fd
);
1123 clnt_vc_destroy(CLIENT
*cl
)
1125 /* LINTED pointer alignment */
1126 struct ct_data
*ct
= (struct ct_data
*)cl
->cl_private
;
1127 int ct_fd
= ct
->ct_fd
;
1129 (void) rpc_fd_lock(vctbl
, ct_fd
);
1131 if (ct
->ct_io_mode
== RPC_CL_NONBLOCKING
) {
1132 (void) do_flush(ct
, RPC_CL_BLOCKING_FLUSH
);
1133 (void) unregister_nb(ct
);
1137 (void) t_close(ct_fd
);
1138 XDR_DESTROY(&(ct
->ct_xdrs
));
1139 free(ct
->ct_addr
.buf
);
1141 if (cl
->cl_netid
&& cl
->cl_netid
[0])
1143 if (cl
->cl_tp
&& cl
->cl_tp
[0])
1146 rpc_fd_unlock(vctbl
, ct_fd
);
1150 * Interface between xdr serializer and vc connection.
1151 * Behaves like the system calls, read & write, but keeps some error state
1152 * around for the rpc level.
1155 read_vc(void *ct_tmp
, caddr_t buf
, int len
)
1157 static pthread_key_t pfdp_key
= PTHREAD_ONCE_KEY_NP
;
1158 struct pollfd
*pfdp
;
1159 int npfd
; /* total number of pfdp allocated */
1160 struct ct_data
*ct
= ct_tmp
;
1161 struct timeval starttime
;
1162 struct timeval curtime
;
1170 * Allocate just one the first time. thr_get_storage() may
1171 * return a larger buffer, left over from the last time we were
1172 * here, but that's OK. realloc() will deal with it properly.
1175 pfdp
= thr_get_storage(&pfdp_key
, sizeof (struct pollfd
), free
);
1177 (void) syslog(LOG_ERR
, clnt_vc_errstr
,
1178 clnt_read_vc_str
, __no_mem_str
);
1179 rpc_callerr
.re_status
= RPC_SYSTEMERROR
;
1180 rpc_callerr
.re_errno
= errno
;
1181 rpc_callerr
.re_terrno
= 0;
1186 * N.B.: slot 0 in the pollfd array is reserved for the file
1187 * descriptor we're really interested in (as opposed to the
1188 * callback descriptors).
1190 pfdp
[0].fd
= ct
->ct_fd
;
1191 pfdp
[0].events
= MASKVAL
;
1192 pfdp
[0].revents
= 0;
1193 poll_time
= ct
->ct_wait
;
1194 if (gettimeofday(&starttime
, NULL
) == -1) {
1195 syslog(LOG_ERR
, "Unable to get time of day: %m");
1200 extern void (*_svc_getreqset_proc
)();
1201 extern pollfd_t
*svc_pollfd
;
1202 extern int svc_max_pollfd
;
1205 /* VARIABLES PROTECTED BY svc_fd_lock: svc_pollfd */
1207 if (_svc_getreqset_proc
) {
1208 sig_rw_rdlock(&svc_fd_lock
);
1210 /* reallocate pfdp to svc_max_pollfd +1 */
1211 if (npfd
!= (svc_max_pollfd
+ 1)) {
1212 struct pollfd
*tmp_pfdp
= reallocarray(pfdp
,
1214 sizeof (struct pollfd
));
1215 if (tmp_pfdp
== NULL
) {
1216 sig_rw_unlock(&svc_fd_lock
);
1217 (void) syslog(LOG_ERR
, clnt_vc_errstr
,
1218 clnt_read_vc_str
, __no_mem_str
);
1219 rpc_callerr
.re_status
= RPC_SYSTEMERROR
;
1220 rpc_callerr
.re_errno
= errno
;
1221 rpc_callerr
.re_terrno
= 0;
1226 npfd
= svc_max_pollfd
+ 1;
1227 (void) pthread_setspecific(pfdp_key
, pfdp
);
1230 (void) memcpy(&pfdp
[1], svc_pollfd
,
1231 sizeof (struct pollfd
) * (npfd
- 1));
1233 sig_rw_unlock(&svc_fd_lock
);
1235 npfd
= 1; /* don't forget about pfdp[0] */
1238 switch (fds
= poll(pfdp
, npfd
, poll_time
)) {
1240 rpc_callerr
.re_status
= RPC_TIMEDOUT
;
1248 * interrupted by another signal,
1249 * update time_waited
1252 if (gettimeofday(&curtime
, NULL
) == -1) {
1254 "Unable to get time of day: %m");
1258 delta
= (curtime
.tv_sec
-
1259 starttime
.tv_sec
) * 1000 +
1261 starttime
.tv_usec
) / 1000;
1263 if (poll_time
< 0) {
1264 rpc_callerr
.re_status
= RPC_TIMEDOUT
;
1268 errno
= 0; /* reset it */
1274 if (pfdp
[0].revents
== 0) {
1275 /* must be for server side of the house */
1276 (*_svc_getreqset_proc
)(&pfdp
[1], fds
);
1277 continue; /* do poll again */
1280 if (pfdp
[0].revents
& POLLNVAL
) {
1281 rpc_callerr
.re_status
= RPC_CANTRECV
;
1283 * Note: we're faking errno here because we
1284 * previously would have expected select() to
1285 * return -1 with errno EBADF. Poll(BA_OS)
1286 * returns 0 and sets the POLLNVAL revents flag
1289 rpc_callerr
.re_errno
= errno
= EBADF
;
1293 if (pfdp
[0].revents
& (POLLERR
| POLLHUP
)) {
1294 rpc_callerr
.re_status
= RPC_CANTRECV
;
1295 rpc_callerr
.re_errno
= errno
= EPIPE
;
1301 switch (len
= t_rcvall(ct
->ct_fd
, buf
, len
)) {
1304 rpc_callerr
.re_errno
= ENOLINK
;
1305 rpc_callerr
.re_terrno
= 0;
1306 rpc_callerr
.re_status
= RPC_CANTRECV
;
1307 len
= -1; /* it's really an error */
1311 rpc_callerr
.re_terrno
= t_errno
;
1312 rpc_callerr
.re_errno
= 0;
1313 rpc_callerr
.re_status
= RPC_CANTRECV
;
1320 write_vc(void *ct_tmp
, caddr_t buf
, int len
)
1323 struct ct_data
*ct
= ct_tmp
;
1327 maxsz
= ct
->ct_tsdu
;
1329 /* Handle the non-blocking mode */
1330 if (ct
->ct_is_oneway
&& ct
->ct_io_mode
== RPC_CL_NONBLOCKING
) {
1332 * Test a special case here. If the length of the current
1333 * write is greater than the transport data unit, and the
1334 * mode is non blocking, we return RPC_CANTSEND.
1335 * XXX this is not very clean.
1337 if (maxsz
> 0 && len
> maxsz
) {
1338 rpc_callerr
.re_terrno
= errno
;
1339 rpc_callerr
.re_errno
= 0;
1340 rpc_callerr
.re_status
= RPC_CANTSEND
;
1344 len
= nb_send(ct
, buf
, (unsigned)len
);
1346 rpc_callerr
.re_terrno
= errno
;
1347 rpc_callerr
.re_errno
= 0;
1348 rpc_callerr
.re_status
= RPC_CANTSEND
;
1349 } else if (len
== -2) {
1350 rpc_callerr
.re_terrno
= 0;
1351 rpc_callerr
.re_errno
= 0;
1352 rpc_callerr
.re_status
= RPC_CANTSTORE
;
1357 if ((maxsz
== 0) || (maxsz
== -1)) {
1359 * T_snd may return -1 for error on connection (connection
1360 * needs to be repaired/closed, and -2 for flow-control
1361 * handling error (no operation to do, just wait and call
1364 if ((len
= t_snd(ct
->ct_fd
, buf
, (unsigned)len
, 0)) == -1) {
1365 rpc_callerr
.re_terrno
= t_errno
;
1366 rpc_callerr
.re_errno
= 0;
1367 rpc_callerr
.re_status
= RPC_CANTSEND
;
1373 * This for those transports which have a max size for data.
1375 for (cnt
= len
, i
= 0; cnt
> 0; cnt
-= i
, buf
+= i
) {
1376 flag
= cnt
> maxsz
? T_MORE
: 0;
1377 if ((i
= t_snd(ct
->ct_fd
, buf
, (unsigned)MIN(cnt
, maxsz
),
1379 rpc_callerr
.re_terrno
= t_errno
;
1380 rpc_callerr
.re_errno
= 0;
1381 rpc_callerr
.re_status
= RPC_CANTSEND
;
1389 * Receive the required bytes of data, even if it is fragmented.
1392 t_rcvall(int fd
, char *buf
, int len
)
1400 res
= t_rcv(fd
, buf
, (unsigned)len
, &moreflag
);
1402 if (t_errno
== TLOOK
)
1403 switch (t_look(fd
)) {
1405 (void) t_rcvdis(fd
, NULL
);
1406 (void) t_snddis(fd
, NULL
);
1409 /* Received orderly release indication */
1410 (void) t_rcvrel(fd
);
1411 /* Send orderly release indicator */
1412 (void) t_sndrel(fd
);
1417 } else if (res
== 0) {
1423 } while ((len
> 0) && (moreflag
& T_MORE
));
1427 static struct clnt_ops
*
1430 static struct clnt_ops ops
;
1431 extern mutex_t ops_lock
;
1433 /* VARIABLES PROTECTED BY ops_lock: ops */
1435 sig_mutex_lock(&ops_lock
);
1436 if (ops
.cl_call
== NULL
) {
1437 ops
.cl_call
= clnt_vc_call
;
1438 ops
.cl_send
= clnt_vc_send
;
1439 ops
.cl_abort
= clnt_vc_abort
;
1440 ops
.cl_geterr
= clnt_vc_geterr
;
1441 ops
.cl_freeres
= clnt_vc_freeres
;
1442 ops
.cl_destroy
= clnt_vc_destroy
;
1443 ops
.cl_control
= clnt_vc_control
;
1445 sig_mutex_unlock(&ops_lock
);
1450 * Make sure that the time is not garbage. -1 value is disallowed.
1451 * Note this is different from time_not_ok in clnt_dg.c
1454 time_not_ok(struct timeval
*t
)
1456 return (t
->tv_sec
<= -1 || t
->tv_sec
> 100000000 ||
1457 t
->tv_usec
<= -1 || t
->tv_usec
> 1000000);
1461 /* Compute the # of bytes that remains until the end of the buffer */
1462 #define REMAIN_BYTES(p) (ct->ct_bufferSize-(ct->ct_##p - ct->ct_buffer))
1465 addInBuffer(struct ct_data
*ct
, char *dataToAdd
, unsigned int nBytes
)
1467 if (NULL
== ct
->ct_buffer
) {
1468 /* Buffer not allocated yet. */
1471 buffer
= malloc(ct
->ct_bufferSize
);
1472 if (NULL
== buffer
) {
1476 (void) memcpy(buffer
, dataToAdd
, nBytes
);
1478 ct
->ct_buffer
= buffer
;
1479 ct
->ct_bufferReadPtr
= buffer
;
1480 ct
->ct_bufferWritePtr
= buffer
+ nBytes
;
1481 ct
->ct_bufferPendingSize
= nBytes
;
1484 * For an already allocated buffer, two mem copies
1485 * might be needed, depending on the current
1489 /* Compute the length of the first copy. */
1490 int len
= MIN(nBytes
, REMAIN_BYTES(bufferWritePtr
));
1492 ct
->ct_bufferPendingSize
+= nBytes
;
1494 (void) memcpy(ct
->ct_bufferWritePtr
, dataToAdd
, len
);
1495 ct
->ct_bufferWritePtr
+= len
;
1498 /* One memcopy needed. */
1501 * If the write pointer is at the end of the buffer,
1504 if (ct
->ct_bufferWritePtr
==
1505 (ct
->ct_buffer
+ ct
->ct_bufferSize
)) {
1506 ct
->ct_bufferWritePtr
= ct
->ct_buffer
;
1509 /* Two memcopy needed. */
1513 * Copy the remaining data to the beginning of the
1516 (void) memcpy(ct
->ct_buffer
, dataToAdd
, nBytes
);
1517 ct
->ct_bufferWritePtr
= ct
->ct_buffer
+ nBytes
;
1524 consumeFromBuffer(struct ct_data
*ct
, unsigned int nBytes
)
1526 ct
->ct_bufferPendingSize
-= nBytes
;
1527 if (ct
->ct_bufferPendingSize
== 0) {
1529 * If the buffer contains no data, we set the two pointers at
1530 * the beginning of the buffer (to miminize buffer wraps).
1532 ct
->ct_bufferReadPtr
= ct
->ct_bufferWritePtr
= ct
->ct_buffer
;
1534 ct
->ct_bufferReadPtr
+= nBytes
;
1535 if (ct
->ct_bufferReadPtr
>
1536 ct
->ct_buffer
+ ct
->ct_bufferSize
) {
1537 ct
->ct_bufferReadPtr
-= ct
->ct_bufferSize
;
1543 iovFromBuffer(struct ct_data
*ct
, struct iovec
*iov
)
1547 if (ct
->ct_bufferPendingSize
== 0)
1550 l
= REMAIN_BYTES(bufferReadPtr
);
1551 if (l
< ct
->ct_bufferPendingSize
) {
1552 /* Buffer in two fragments. */
1553 iov
[0].iov_base
= ct
->ct_bufferReadPtr
;
1556 iov
[1].iov_base
= ct
->ct_buffer
;
1557 iov
[1].iov_len
= ct
->ct_bufferPendingSize
- l
;
1560 /* Buffer in one fragment. */
1561 iov
[0].iov_base
= ct
->ct_bufferReadPtr
;
1562 iov
[0].iov_len
= ct
->ct_bufferPendingSize
;
1568 set_flush_mode(struct ct_data
*ct
, int mode
)
1571 case RPC_CL_BLOCKING_FLUSH
:
1572 /* flush as most as possible without blocking */
1573 case RPC_CL_BESTEFFORT_FLUSH
:
1574 /* flush the buffer completely (possibly blocking) */
1575 case RPC_CL_DEFAULT_FLUSH
:
1576 /* flush according to the currently defined policy */
1577 ct
->ct_blocking_mode
= mode
;
1585 set_io_mode(struct ct_data
*ct
, int ioMode
)
1588 case RPC_CL_BLOCKING
:
1589 if (ct
->ct_io_mode
== RPC_CL_NONBLOCKING
) {
1590 if (NULL
!= ct
->ct_buffer
) {
1592 * If a buffer was allocated for this
1593 * connection, flush it now, and free it.
1595 (void) do_flush(ct
, RPC_CL_BLOCKING_FLUSH
);
1596 free(ct
->ct_buffer
);
1597 ct
->ct_buffer
= NULL
;
1599 (void) unregister_nb(ct
);
1600 ct
->ct_io_mode
= ioMode
;
1603 case RPC_CL_NONBLOCKING
:
1604 if (ct
->ct_io_mode
== RPC_CL_BLOCKING
) {
1605 if (-1 == register_nb(ct
)) {
1608 ct
->ct_io_mode
= ioMode
;
1618 do_flush(struct ct_data
*ct
, uint_t flush_mode
)
1621 if (ct
->ct_bufferPendingSize
== 0) {
1625 switch (flush_mode
) {
1626 case RPC_CL_BLOCKING_FLUSH
:
1627 if (!set_blocking_connection(ct
, TRUE
)) {
1630 while (ct
->ct_bufferPendingSize
> 0) {
1631 if (REMAIN_BYTES(bufferReadPtr
) <
1632 ct
->ct_bufferPendingSize
) {
1633 struct iovec iov
[2];
1634 (void) iovFromBuffer(ct
, iov
);
1635 result
= writev(ct
->ct_fd
, iov
, 2);
1637 result
= t_snd(ct
->ct_fd
, ct
->ct_bufferReadPtr
,
1638 ct
->ct_bufferPendingSize
, 0);
1643 consumeFromBuffer(ct
, result
);
1648 case RPC_CL_BESTEFFORT_FLUSH
:
1649 (void) set_blocking_connection(ct
, FALSE
);
1650 if (REMAIN_BYTES(bufferReadPtr
) < ct
->ct_bufferPendingSize
) {
1651 struct iovec iov
[2];
1652 (void) iovFromBuffer(ct
, iov
);
1653 result
= writev(ct
->ct_fd
, iov
, 2);
1655 result
= t_snd(ct
->ct_fd
, ct
->ct_bufferReadPtr
,
1656 ct
->ct_bufferPendingSize
, 0);
1659 if (errno
!= EWOULDBLOCK
) {
1666 consumeFromBuffer(ct
, result
);
1673 * Non blocking send.
1677 nb_send(struct ct_data
*ct
, void *buff
, unsigned int nBytes
)
1681 if (!(ntohl(*(uint32_t *)buff
) & 2^31)) {
1686 * Check to see if the current message can be stored fully in the
1687 * buffer. We have to check this now because it may be impossible
1688 * to send any data, so the message must be stored in the buffer.
1690 if (nBytes
> (ct
->ct_bufferSize
- ct
->ct_bufferPendingSize
)) {
1691 /* Try to flush (to free some space). */
1692 (void) do_flush(ct
, RPC_CL_BESTEFFORT_FLUSH
);
1694 /* Can we store the message now ? */
1695 if (nBytes
> (ct
->ct_bufferSize
- ct
->ct_bufferPendingSize
))
1699 (void) set_blocking_connection(ct
, FALSE
);
1702 * If there is no data pending, we can simply try
1705 if (ct
->ct_bufferPendingSize
== 0) {
1706 result
= t_snd(ct
->ct_fd
, buff
, nBytes
, 0);
1708 if (errno
== EWOULDBLOCK
) {
1716 * If we have not sent all data, we must store them
1719 if (result
!= nBytes
) {
1720 if (addInBuffer(ct
, (char *)buff
+ result
,
1721 nBytes
- result
) == -1) {
1727 * Some data pending in the buffer. We try to send
1728 * both buffer data and current message in one shot.
1730 struct iovec iov
[3];
1731 int i
= iovFromBuffer(ct
, &iov
[0]);
1733 iov
[i
].iov_base
= buff
;
1734 iov
[i
].iov_len
= nBytes
;
1736 result
= writev(ct
->ct_fd
, iov
, i
+1);
1738 if (errno
== EWOULDBLOCK
) {
1747 * Add the bytes from the message
1748 * that we have not sent.
1750 if (result
<= ct
->ct_bufferPendingSize
) {
1751 /* No bytes from the message sent */
1752 consumeFromBuffer(ct
, result
);
1753 if (addInBuffer(ct
, buff
, nBytes
) == -1) {
1758 * Some bytes of the message are sent.
1759 * Compute the length of the message that has
1762 int len
= result
- ct
->ct_bufferPendingSize
;
1764 /* So, empty the buffer. */
1765 ct
->ct_bufferReadPtr
= ct
->ct_buffer
;
1766 ct
->ct_bufferWritePtr
= ct
->ct_buffer
;
1767 ct
->ct_bufferPendingSize
= 0;
1769 /* And add the remaining part of the message. */
1770 if (len
!= nBytes
) {
1771 if (addInBuffer(ct
, (char *)buff
+ len
,
1772 nBytes
-len
) == -1) {
1782 flush_registered_clients(void)
1784 struct nb_reg_node
*node
;
1786 if (LIST_ISEMPTY(nb_first
)) {
1790 LIST_FOR_EACH(nb_first
, node
) {
1791 (void) do_flush(node
->ct
, RPC_CL_BLOCKING_FLUSH
);
1796 allocate_chunk(void)
1798 #define CHUNK_SIZE 16
1799 struct nb_reg_node
*chk
=
1800 malloc(sizeof (struct nb_reg_node
) * CHUNK_SIZE
);
1801 struct nb_reg_node
*n
;
1809 for (i
= 0; i
< CHUNK_SIZE
-1; ++i
) {
1810 n
[i
].next
= &(n
[i
+1]);
1812 n
[CHUNK_SIZE
-1].next
= (struct nb_reg_node
*)&nb_free
;
1818 register_nb(struct ct_data
*ct
)
1820 struct nb_reg_node
*node
;
1822 (void) mutex_lock(&nb_list_mutex
);
1824 if (LIST_ISEMPTY(nb_free
) && (allocate_chunk() == -1)) {
1825 (void) mutex_unlock(&nb_list_mutex
);
1830 if (!exit_handler_set
) {
1831 (void) atexit(flush_registered_clients
);
1832 exit_handler_set
= TRUE
;
1834 /* Get the first free node */
1835 LIST_EXTRACT(nb_free
, node
);
1839 LIST_ADD(nb_first
, node
);
1840 (void) mutex_unlock(&nb_list_mutex
);
1846 unregister_nb(struct ct_data
*ct
)
1848 struct nb_reg_node
*node
;
1850 (void) mutex_lock(&nb_list_mutex
);
1851 assert(!LIST_ISEMPTY(nb_first
));
1854 LIST_FOR_EACH(nb_first
, node
) {
1855 if (node
->next
->ct
== ct
) {
1856 /* Get the node to unregister. */
1857 struct nb_reg_node
*n
= node
->next
;
1858 node
->next
= n
->next
;
1861 LIST_ADD(nb_free
, n
);
1865 (void) mutex_unlock(&nb_list_mutex
);