import less(1)
[unleashed/tickless.git] / usr / src / lib / libc / port / nsl / clnt_vc.c
blob8fd8d5eb91f551a30d7673bb315ca06b3b7c096b
1 /*
2 * CDDL HEADER START
4 * The contents of this file are subject to the terms of the
5 * Common Development and Distribution License (the "License").
6 * You may not use this file except in compliance with the License.
8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9 * or http://www.opensolaris.org/os/licensing.
10 * See the License for the specific language governing permissions
11 * and limitations under the License.
13 * When distributing Covered Code, include this CDDL HEADER in each
14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15 * If applicable, add the following below this CDDL HEADER, with the
16 * fields enclosed by brackets "[]" replaced with your own identifying
17 * information: Portions Copyright [yyyy] [name of copyright owner]
19 * CDDL HEADER END
23 * Copyright 2015 Nexenta Systems, Inc. All rights reserved.
24 * Copyright (c) 2016 by Delphix. All rights reserved.
28 * Copyright 2007 Sun Microsystems, Inc. All rights reserved.
29 * Use is subject to license terms.
32 /* Copyright (c) 1983, 1984, 1985, 1986, 1987, 1988, 1989 AT&T */
33 /* All Rights Reserved */
35 * Portions of this source code were derived from Berkeley
36 * 4.3 BSD under license from the Regents of the University of
37 * California.
41 * clnt_vc.c
43 * Implements a connectionful client side RPC.
45 * Connectionful RPC supports 'batched calls'.
46 * A sequence of calls may be batched-up in a send buffer. The rpc call
47 * return immediately to the client even though the call was not necessarily
48 * sent. The batching occurs if the results' xdr routine is NULL (0) AND
49 * the rpc timeout value is zero (see clnt.h, rpc).
51 * Clients should NOT casually batch calls that in fact return results; that
52 * is the server side should be aware that a call is batched and not produce
53 * any return message. Batched calls that produce many result messages can
54 * deadlock (netlock) the client and the server....
58 #include "mt.h"
59 #include "rpc_mt.h"
60 #include <assert.h>
61 #include <rpc/rpc.h>
62 #include <errno.h>
63 #include <sys/byteorder.h>
64 #include <sys/mkdev.h>
65 #include <sys/poll.h>
66 #include <syslog.h>
67 #include <stdlib.h>
68 #include <unistd.h>
69 #include <netinet/tcp.h>
70 #include <limits.h>
72 #define MCALL_MSG_SIZE 24
73 #define SECS_TO_NS(x) ((hrtime_t)(x) * 1000 * 1000 * 1000)
74 #define MSECS_TO_NS(x) ((hrtime_t)(x) * 1000 * 1000)
75 #define USECS_TO_NS(x) ((hrtime_t)(x) * 1000)
76 #define NSECS_TO_MS(x) ((x) / 1000 / 1000)
77 #ifndef MIN
78 #define MIN(a, b) (((a) < (b)) ? (a) : (b))
79 #endif
81 extern int __rpc_timeval_to_msec(struct timeval *);
82 extern int __rpc_compress_pollfd(int, pollfd_t *, pollfd_t *);
83 extern bool_t xdr_opaque_auth(XDR *, struct opaque_auth *);
84 extern bool_t __rpc_gss_wrap(AUTH *, char *, uint_t, XDR *, bool_t (*)(),
85 caddr_t);
86 extern bool_t __rpc_gss_unwrap(AUTH *, XDR *, bool_t (*)(), caddr_t);
87 extern CLIENT *_clnt_vc_create_timed(int, struct netbuf *, rpcprog_t,
88 rpcvers_t, uint_t, uint_t, const struct timeval *);
90 static struct clnt_ops *clnt_vc_ops(void);
91 static int read_vc(void *, caddr_t, int);
92 static int write_vc(void *, caddr_t, int);
93 static int t_rcvall(int, char *, int);
94 static bool_t time_not_ok(struct timeval *);
96 struct ct_data;
97 static bool_t set_up_connection(int, struct netbuf *,
98 struct ct_data *, const struct timeval *);
99 static bool_t set_io_mode(struct ct_data *, int);
102 * Lock table handle used by various MT sync. routines
104 static mutex_t vctbl_lock = DEFAULTMUTEX;
105 static void *vctbl = NULL;
107 static const char clnt_vc_errstr[] = "%s : %s";
108 static const char clnt_vc_str[] = "clnt_vc_create";
109 static const char clnt_read_vc_str[] = "read_vc";
110 static const char __no_mem_str[] = "out of memory";
111 static const char no_fcntl_getfl_str[] = "could not get status flags and modes";
112 static const char no_nonblock_str[] = "could not set transport blocking mode";
115 * Private data structure
117 struct ct_data {
118 int ct_fd; /* connection's fd */
119 bool_t ct_closeit; /* close it on destroy */
120 int ct_tsdu; /* size of tsdu */
121 int ct_wait; /* wait interval in milliseconds */
122 bool_t ct_waitset; /* wait set by clnt_control? */
123 struct netbuf ct_addr; /* remote addr */
124 struct rpc_err ct_error;
125 char ct_mcall[MCALL_MSG_SIZE]; /* marshalled callmsg */
126 uint_t ct_mpos; /* pos after marshal */
127 XDR ct_xdrs; /* XDR stream */
129 /* NON STANDARD INFO - 00-08-31 */
130 bool_t ct_is_oneway; /* True if the current call is oneway. */
131 bool_t ct_is_blocking;
132 ushort_t ct_io_mode;
133 ushort_t ct_blocking_mode;
134 uint_t ct_bufferSize; /* Total size of the buffer. */
135 uint_t ct_bufferPendingSize; /* Size of unsent data. */
136 char *ct_buffer; /* Pointer to the buffer. */
137 char *ct_bufferWritePtr; /* Ptr to the first free byte. */
138 char *ct_bufferReadPtr; /* Ptr to the first byte of data. */
141 struct nb_reg_node {
142 struct nb_reg_node *next;
143 struct ct_data *ct;
146 static struct nb_reg_node *nb_first = (struct nb_reg_node *)&nb_first;
147 static struct nb_reg_node *nb_free = (struct nb_reg_node *)&nb_free;
149 static bool_t exit_handler_set = FALSE;
151 static mutex_t nb_list_mutex = DEFAULTMUTEX;
154 /* Define some macros to manage the linked list. */
155 #define LIST_ISEMPTY(l) (l == (struct nb_reg_node *)&l)
156 #define LIST_CLR(l) (l = (struct nb_reg_node *)&l)
157 #define LIST_ADD(l, node) (node->next = l->next, l = node)
158 #define LIST_EXTRACT(l, node) (node = l, l = l->next)
159 #define LIST_FOR_EACH(l, node) \
160 for (node = l; node != (struct nb_reg_node *)&l; node = node->next)
163 /* Default size of the IO buffer used in non blocking mode */
164 #define DEFAULT_PENDING_ZONE_MAX_SIZE (16*1024)
166 static int nb_send(struct ct_data *, void *, unsigned int);
167 static int do_flush(struct ct_data *, uint_t);
168 static bool_t set_flush_mode(struct ct_data *, int);
169 static bool_t set_blocking_connection(struct ct_data *, bool_t);
171 static int register_nb(struct ct_data *);
172 static int unregister_nb(struct ct_data *);
176 * Change the mode of the underlying fd.
178 static bool_t
179 set_blocking_connection(struct ct_data *ct, bool_t blocking)
181 int flag;
184 * If the underlying fd is already in the required mode,
185 * avoid the syscall.
187 if (ct->ct_is_blocking == blocking)
188 return (TRUE);
190 if ((flag = fcntl(ct->ct_fd, F_GETFL, 0)) < 0) {
191 (void) syslog(LOG_ERR, "set_blocking_connection : %s",
192 no_fcntl_getfl_str);
193 return (FALSE);
196 flag = blocking? flag&~O_NONBLOCK : flag|O_NONBLOCK;
197 if (fcntl(ct->ct_fd, F_SETFL, flag) != 0) {
198 (void) syslog(LOG_ERR, "set_blocking_connection : %s",
199 no_nonblock_str);
200 return (FALSE);
202 ct->ct_is_blocking = blocking;
203 return (TRUE);
207 * Create a client handle for a connection.
208 * Default options are set, which the user can change using clnt_control()'s.
209 * The rpc/vc package does buffering similar to stdio, so the client
210 * must pick send and receive buffer sizes, 0 => use the default.
211 * NB: fd is copied into a private area.
212 * NB: The rpch->cl_auth is set null authentication. Caller may wish to
213 * set this something more useful.
215 * fd should be open and bound.
217 CLIENT *
218 clnt_vc_create(const int fd, struct netbuf *svcaddr, const rpcprog_t prog,
219 const rpcvers_t vers, const uint_t sendsz, const uint_t recvsz)
221 return (_clnt_vc_create_timed(fd, svcaddr, prog, vers, sendsz,
222 recvsz, NULL));
226 * This has the same definition as clnt_vc_create(), except it
227 * takes an additional parameter - a pointer to a timeval structure.
229 * Not a public interface. This is for clnt_create_timed,
230 * clnt_create_vers_timed, clnt_tp_create_timed to pass down the timeout
231 * value to control a tcp connection attempt.
232 * (for bug 4049792: clnt_create_timed does not time out)
234 * If tp is NULL, use default timeout to set up the connection.
236 CLIENT *
237 _clnt_vc_create_timed(int fd, struct netbuf *svcaddr, rpcprog_t prog,
238 rpcvers_t vers, uint_t sendsz, uint_t recvsz, const struct timeval *tp)
240 CLIENT *cl; /* client handle */
241 struct ct_data *ct; /* private data */
242 struct timeval now;
243 struct rpc_msg call_msg;
244 struct t_info tinfo;
245 int flag;
247 cl = malloc(sizeof (*cl));
248 if ((ct = malloc(sizeof (*ct))) != NULL)
249 ct->ct_addr.buf = NULL;
251 if ((cl == NULL) || (ct == NULL)) {
252 (void) syslog(LOG_ERR, clnt_vc_errstr,
253 clnt_vc_str, __no_mem_str);
254 rpc_createerr.cf_stat = RPC_SYSTEMERROR;
255 rpc_createerr.cf_error.re_errno = errno;
256 rpc_createerr.cf_error.re_terrno = 0;
257 goto err;
261 * The only use of vctbl_lock is for serializing the creation of
262 * vctbl. Once created the lock needs to be released so we don't
263 * hold it across the set_up_connection() call and end up with a
264 * bunch of threads stuck waiting for the mutex.
266 sig_mutex_lock(&vctbl_lock);
268 if ((vctbl == NULL) && ((vctbl = rpc_fd_init()) == NULL)) {
269 rpc_createerr.cf_stat = RPC_SYSTEMERROR;
270 rpc_createerr.cf_error.re_errno = errno;
271 rpc_createerr.cf_error.re_terrno = 0;
272 sig_mutex_unlock(&vctbl_lock);
273 goto err;
276 sig_mutex_unlock(&vctbl_lock);
278 ct->ct_io_mode = RPC_CL_BLOCKING;
279 ct->ct_blocking_mode = RPC_CL_BLOCKING_FLUSH;
281 ct->ct_buffer = NULL; /* We allocate the buffer when needed. */
282 ct->ct_bufferSize = DEFAULT_PENDING_ZONE_MAX_SIZE;
283 ct->ct_bufferPendingSize = 0;
284 ct->ct_bufferWritePtr = NULL;
285 ct->ct_bufferReadPtr = NULL;
287 /* Check the current state of the fd. */
288 if ((flag = fcntl(fd, F_GETFL, 0)) < 0) {
289 (void) syslog(LOG_ERR, "_clnt_vc_create_timed : %s",
290 no_fcntl_getfl_str);
291 rpc_createerr.cf_stat = RPC_SYSTEMERROR;
292 rpc_createerr.cf_error.re_terrno = errno;
293 rpc_createerr.cf_error.re_errno = 0;
294 goto err;
296 ct->ct_is_blocking = flag & O_NONBLOCK ? FALSE : TRUE;
298 if (set_up_connection(fd, svcaddr, ct, tp) == FALSE) {
299 goto err;
303 * Set up other members of private data struct
305 ct->ct_fd = fd;
307 * The actual value will be set by clnt_call or clnt_control
309 ct->ct_wait = 30000;
310 ct->ct_waitset = FALSE;
312 * By default, closeit is always FALSE. It is users responsibility
313 * to do a t_close on it, else the user may use clnt_control
314 * to let clnt_destroy do it for them.
316 ct->ct_closeit = FALSE;
319 * Initialize call message
321 (void) gettimeofday(&now, NULL);
322 call_msg.rm_xid = getpid() ^ now.tv_sec ^ now.tv_usec;
323 call_msg.rm_call.cb_prog = prog;
324 call_msg.rm_call.cb_vers = vers;
327 * pre-serialize the static part of the call msg and stash it away
329 xdrmem_create(&(ct->ct_xdrs), ct->ct_mcall, MCALL_MSG_SIZE, XDR_ENCODE);
330 if (!xdr_callhdr(&(ct->ct_xdrs), &call_msg)) {
331 goto err;
333 ct->ct_mpos = XDR_GETPOS(&(ct->ct_xdrs));
334 XDR_DESTROY(&(ct->ct_xdrs));
336 if (t_getinfo(fd, &tinfo) == -1) {
337 rpc_createerr.cf_stat = RPC_TLIERROR;
338 rpc_createerr.cf_error.re_terrno = t_errno;
339 rpc_createerr.cf_error.re_errno = 0;
340 goto err;
343 * Find the receive and the send size
345 sendsz = __rpc_get_t_size((int)sendsz, tinfo.tsdu);
346 recvsz = __rpc_get_t_size((int)recvsz, tinfo.tsdu);
347 if ((sendsz == 0) || (recvsz == 0)) {
348 rpc_createerr.cf_stat = RPC_TLIERROR;
349 rpc_createerr.cf_error.re_terrno = 0;
350 rpc_createerr.cf_error.re_errno = 0;
351 goto err;
353 ct->ct_tsdu = tinfo.tsdu;
355 * Create a client handle which uses xdrrec for serialization
356 * and authnone for authentication.
358 ct->ct_xdrs.x_ops = NULL;
359 xdrrec_create(&(ct->ct_xdrs), sendsz, recvsz, (caddr_t)ct,
360 read_vc, write_vc);
361 if (ct->ct_xdrs.x_ops == NULL) {
362 rpc_createerr.cf_stat = RPC_SYSTEMERROR;
363 rpc_createerr.cf_error.re_terrno = 0;
364 rpc_createerr.cf_error.re_errno = ENOMEM;
365 goto err;
367 cl->cl_ops = clnt_vc_ops();
368 cl->cl_private = (caddr_t)ct;
369 cl->cl_auth = authnone_create();
370 cl->cl_tp = NULL;
371 cl->cl_netid = NULL;
372 return (cl);
374 err:
375 if (ct) {
376 free(ct->ct_addr.buf);
377 free(ct);
379 free(cl);
381 return (NULL);
384 #define TCPOPT_BUFSIZE 128
387 * Set tcp connection timeout value.
388 * Retun 0 for success, -1 for failure.
390 static int
391 _set_tcp_conntime(int fd, int optval)
393 struct t_optmgmt req, res;
394 struct opthdr *opt;
395 int *ip;
396 char buf[TCPOPT_BUFSIZE];
398 /* LINTED pointer cast */
399 opt = (struct opthdr *)buf;
400 opt->level = IPPROTO_TCP;
401 opt->name = TCP_CONN_ABORT_THRESHOLD;
402 opt->len = sizeof (int);
404 req.flags = T_NEGOTIATE;
405 req.opt.len = sizeof (struct opthdr) + opt->len;
406 req.opt.buf = (char *)opt;
407 /* LINTED pointer cast */
408 ip = (int *)((char *)buf + sizeof (struct opthdr));
409 *ip = optval;
411 res.flags = 0;
412 res.opt.buf = (char *)buf;
413 res.opt.maxlen = sizeof (buf);
414 if (t_optmgmt(fd, &req, &res) < 0 || res.flags != T_SUCCESS) {
415 return (-1);
417 return (0);
421 * Get current tcp connection timeout value.
422 * Retun the timeout in milliseconds, or -1 for failure.
424 static int
425 _get_tcp_conntime(int fd)
427 struct t_optmgmt req, res;
428 struct opthdr *opt;
429 int *ip, retval;
430 char buf[TCPOPT_BUFSIZE];
432 /* LINTED pointer cast */
433 opt = (struct opthdr *)buf;
434 opt->level = IPPROTO_TCP;
435 opt->name = TCP_CONN_ABORT_THRESHOLD;
436 opt->len = sizeof (int);
438 req.flags = T_CURRENT;
439 req.opt.len = sizeof (struct opthdr) + opt->len;
440 req.opt.buf = (char *)opt;
441 /* LINTED pointer cast */
442 ip = (int *)((char *)buf + sizeof (struct opthdr));
443 *ip = 0;
445 res.flags = 0;
446 res.opt.buf = (char *)buf;
447 res.opt.maxlen = sizeof (buf);
448 if (t_optmgmt(fd, &req, &res) < 0 || res.flags != T_SUCCESS) {
449 return (-1);
452 /* LINTED pointer cast */
453 ip = (int *)((char *)buf + sizeof (struct opthdr));
454 retval = *ip;
455 return (retval);
458 static bool_t
459 set_up_connection(int fd, struct netbuf *svcaddr, struct ct_data *ct,
460 const struct timeval *tp)
462 int state;
463 struct t_call sndcallstr, *rcvcall;
464 int nconnect;
465 bool_t connected, do_rcv_connect;
466 int curr_time = -1;
467 hrtime_t start;
468 hrtime_t tout; /* timeout in nanoseconds (from tp) */
470 ct->ct_addr.len = 0;
471 state = t_getstate(fd);
472 if (state == -1) {
473 rpc_createerr.cf_stat = RPC_TLIERROR;
474 rpc_createerr.cf_error.re_errno = 0;
475 rpc_createerr.cf_error.re_terrno = t_errno;
476 return (FALSE);
479 switch (state) {
480 case T_IDLE:
481 if (svcaddr == NULL) {
482 rpc_createerr.cf_stat = RPC_UNKNOWNADDR;
483 return (FALSE);
486 * Connect only if state is IDLE and svcaddr known
488 /* LINTED pointer alignment */
489 rcvcall = (struct t_call *)t_alloc(fd, T_CALL, T_OPT|T_ADDR);
490 if (rcvcall == NULL) {
491 rpc_createerr.cf_stat = RPC_TLIERROR;
492 rpc_createerr.cf_error.re_terrno = t_errno;
493 rpc_createerr.cf_error.re_errno = errno;
494 return (FALSE);
496 rcvcall->udata.maxlen = 0;
497 sndcallstr.addr = *svcaddr;
498 sndcallstr.opt.len = 0;
499 sndcallstr.udata.len = 0;
501 * Even NULL could have sufficed for rcvcall, because
502 * the address returned is same for all cases except
503 * for the gateway case, and hence required.
505 connected = FALSE;
506 do_rcv_connect = FALSE;
509 * If there is a timeout value specified, we will try to
510 * reset the tcp connection timeout. If the transport does
511 * not support the TCP_CONN_ABORT_THRESHOLD option or fails
512 * for other reason, default timeout will be used.
514 if (tp != NULL) {
515 start = gethrtime();
518 * Calculate the timeout in nanoseconds
520 tout = SECS_TO_NS(tp->tv_sec) +
521 USECS_TO_NS(tp->tv_usec);
522 curr_time = _get_tcp_conntime(fd);
525 for (nconnect = 0; nconnect < 3; nconnect++) {
526 if (tp != NULL) {
528 * Calculate the elapsed time
530 hrtime_t elapsed = gethrtime() - start;
531 if (elapsed >= tout)
532 break;
534 if (curr_time != -1) {
535 int ms;
538 * TCP_CONN_ABORT_THRESHOLD takes int
539 * value in milliseconds. Make sure we
540 * do not overflow.
542 if (NSECS_TO_MS(tout - elapsed) >=
543 INT_MAX) {
544 ms = INT_MAX;
545 } else {
546 ms = (int)
547 NSECS_TO_MS(tout - elapsed);
548 if (MSECS_TO_NS(ms) !=
549 tout - elapsed)
550 ms++;
553 (void) _set_tcp_conntime(fd, ms);
557 if (t_connect(fd, &sndcallstr, rcvcall) != -1) {
558 connected = TRUE;
559 break;
561 if (t_errno == TLOOK) {
562 switch (t_look(fd)) {
563 case T_DISCONNECT:
564 (void) t_rcvdis(fd, (struct
565 t_discon *) NULL);
566 break;
567 default:
568 break;
570 } else if (!(t_errno == TSYSERR && errno == EINTR)) {
571 break;
573 if ((state = t_getstate(fd)) == T_OUTCON) {
574 do_rcv_connect = TRUE;
575 break;
577 if (state != T_IDLE) {
578 break;
581 if (do_rcv_connect) {
582 do {
583 if (t_rcvconnect(fd, rcvcall) != -1) {
584 connected = TRUE;
585 break;
587 } while (t_errno == TSYSERR && errno == EINTR);
591 * Set the connection timeout back to its old value.
593 if (curr_time != -1) {
594 (void) _set_tcp_conntime(fd, curr_time);
597 if (!connected) {
598 rpc_createerr.cf_stat = RPC_TLIERROR;
599 rpc_createerr.cf_error.re_terrno = t_errno;
600 rpc_createerr.cf_error.re_errno = errno;
601 (void) t_free((char *)rcvcall, T_CALL);
602 return (FALSE);
605 /* Free old area if allocated */
606 free(ct->ct_addr.buf);
607 ct->ct_addr = rcvcall->addr; /* To get the new address */
608 /* So that address buf does not get freed */
609 rcvcall->addr.buf = NULL;
610 (void) t_free((char *)rcvcall, T_CALL);
611 break;
612 case T_DATAXFER:
613 case T_OUTCON:
614 if (svcaddr == NULL) {
616 * svcaddr could also be NULL in cases where the
617 * client is already bound and connected.
619 ct->ct_addr.len = 0;
620 } else {
621 ct->ct_addr.buf = malloc(svcaddr->len);
622 if (ct->ct_addr.buf == NULL) {
623 (void) syslog(LOG_ERR, clnt_vc_errstr,
624 clnt_vc_str, __no_mem_str);
625 rpc_createerr.cf_stat = RPC_SYSTEMERROR;
626 rpc_createerr.cf_error.re_errno = errno;
627 rpc_createerr.cf_error.re_terrno = 0;
628 return (FALSE);
630 (void) memcpy(ct->ct_addr.buf, svcaddr->buf,
631 (size_t)svcaddr->len);
632 ct->ct_addr.len = ct->ct_addr.maxlen = svcaddr->len;
634 break;
635 default:
636 rpc_createerr.cf_stat = RPC_UNKNOWNADDR;
637 return (FALSE);
639 return (TRUE);
642 static enum clnt_stat
643 clnt_vc_call(CLIENT *cl, rpcproc_t proc, xdrproc_t xdr_args, caddr_t args_ptr,
644 xdrproc_t xdr_results, caddr_t results_ptr, struct timeval timeout)
646 /* LINTED pointer alignment */
647 struct ct_data *ct = (struct ct_data *)cl->cl_private;
648 XDR *xdrs = &(ct->ct_xdrs);
649 struct rpc_msg reply_msg;
650 uint32_t x_id;
651 /* LINTED pointer alignment */
652 uint32_t *msg_x_id = (uint32_t *)(ct->ct_mcall); /* yuk */
653 bool_t shipnow;
654 int refreshes = 2;
656 if (rpc_fd_lock(vctbl, ct->ct_fd)) {
657 rpc_callerr.re_status = RPC_FAILED;
658 rpc_callerr.re_errno = errno;
659 rpc_fd_unlock(vctbl, ct->ct_fd);
660 return (RPC_FAILED);
663 ct->ct_is_oneway = FALSE;
664 if (ct->ct_io_mode == RPC_CL_NONBLOCKING) {
665 if (do_flush(ct, RPC_CL_BLOCKING_FLUSH) != 0) {
666 rpc_fd_unlock(vctbl, ct->ct_fd);
667 return (RPC_FAILED); /* XXX */
671 if (!ct->ct_waitset) {
672 /* If time is not within limits, we ignore it. */
673 if (time_not_ok(&timeout) == FALSE)
674 ct->ct_wait = __rpc_timeval_to_msec(&timeout);
675 } else {
676 timeout.tv_sec = (ct->ct_wait / 1000);
677 timeout.tv_usec = (ct->ct_wait % 1000) * 1000;
680 shipnow = ((xdr_results == (xdrproc_t)0) && (timeout.tv_sec == 0) &&
681 (timeout.tv_usec == 0)) ? FALSE : TRUE;
682 call_again:
683 xdrs->x_op = XDR_ENCODE;
684 rpc_callerr.re_status = RPC_SUCCESS;
686 * Due to little endian byte order, it is necessary to convert to host
687 * format before decrementing xid.
689 x_id = ntohl(*msg_x_id) - 1;
690 *msg_x_id = htonl(x_id);
692 if (cl->cl_auth->ah_cred.oa_flavor != RPCSEC_GSS) {
693 if ((!XDR_PUTBYTES(xdrs, ct->ct_mcall, ct->ct_mpos)) ||
694 (!XDR_PUTINT32(xdrs, (int32_t *)&proc)) ||
695 (!AUTH_MARSHALL(cl->cl_auth, xdrs)) ||
696 (!xdr_args(xdrs, args_ptr))) {
697 if (rpc_callerr.re_status == RPC_SUCCESS)
698 rpc_callerr.re_status = RPC_CANTENCODEARGS;
699 (void) xdrrec_endofrecord(xdrs, TRUE);
700 rpc_fd_unlock(vctbl, ct->ct_fd);
701 return (rpc_callerr.re_status);
703 } else {
704 /* LINTED pointer alignment */
705 uint32_t *u = (uint32_t *)&ct->ct_mcall[ct->ct_mpos];
706 IXDR_PUT_U_INT32(u, proc);
707 if (!__rpc_gss_wrap(cl->cl_auth, ct->ct_mcall,
708 ((char *)u) - ct->ct_mcall, xdrs, xdr_args, args_ptr)) {
709 if (rpc_callerr.re_status == RPC_SUCCESS)
710 rpc_callerr.re_status = RPC_CANTENCODEARGS;
711 (void) xdrrec_endofrecord(xdrs, TRUE);
712 rpc_fd_unlock(vctbl, ct->ct_fd);
713 return (rpc_callerr.re_status);
716 if (!xdrrec_endofrecord(xdrs, shipnow)) {
717 rpc_fd_unlock(vctbl, ct->ct_fd);
718 return (rpc_callerr.re_status = RPC_CANTSEND);
720 if (!shipnow) {
721 rpc_fd_unlock(vctbl, ct->ct_fd);
722 return (RPC_SUCCESS);
725 * Hack to provide rpc-based message passing
727 if (timeout.tv_sec == 0 && timeout.tv_usec == 0) {
728 rpc_fd_unlock(vctbl, ct->ct_fd);
729 return (rpc_callerr.re_status = RPC_TIMEDOUT);
734 * Keep receiving until we get a valid transaction id
736 xdrs->x_op = XDR_DECODE;
737 for (;;) {
738 reply_msg.acpted_rply.ar_verf = _null_auth;
739 reply_msg.acpted_rply.ar_results.where = NULL;
740 reply_msg.acpted_rply.ar_results.proc = (xdrproc_t)xdr_void;
741 if (!xdrrec_skiprecord(xdrs)) {
742 rpc_fd_unlock(vctbl, ct->ct_fd);
743 return (rpc_callerr.re_status);
745 /* now decode and validate the response header */
746 if (!xdr_replymsg(xdrs, &reply_msg)) {
747 if (rpc_callerr.re_status == RPC_SUCCESS)
748 continue;
749 rpc_fd_unlock(vctbl, ct->ct_fd);
750 return (rpc_callerr.re_status);
752 if (reply_msg.rm_xid == x_id)
753 break;
757 * process header
759 if ((reply_msg.rm_reply.rp_stat == MSG_ACCEPTED) &&
760 (reply_msg.acpted_rply.ar_stat == SUCCESS))
761 rpc_callerr.re_status = RPC_SUCCESS;
762 else
763 __seterr_reply(&reply_msg, &(rpc_callerr));
765 if (rpc_callerr.re_status == RPC_SUCCESS) {
766 if (!AUTH_VALIDATE(cl->cl_auth,
767 &reply_msg.acpted_rply.ar_verf)) {
768 rpc_callerr.re_status = RPC_AUTHERROR;
769 rpc_callerr.re_why = AUTH_INVALIDRESP;
770 } else if (cl->cl_auth->ah_cred.oa_flavor != RPCSEC_GSS) {
771 if (!(*xdr_results)(xdrs, results_ptr)) {
772 if (rpc_callerr.re_status == RPC_SUCCESS)
773 rpc_callerr.re_status =
774 RPC_CANTDECODERES;
776 } else if (!__rpc_gss_unwrap(cl->cl_auth, xdrs, xdr_results,
777 results_ptr)) {
778 if (rpc_callerr.re_status == RPC_SUCCESS)
779 rpc_callerr.re_status = RPC_CANTDECODERES;
781 } /* end successful completion */
783 * If unsuccesful AND error is an authentication error
784 * then refresh credentials and try again, else break
786 else if (rpc_callerr.re_status == RPC_AUTHERROR) {
787 /* maybe our credentials need to be refreshed ... */
788 if (refreshes-- && AUTH_REFRESH(cl->cl_auth, &reply_msg))
789 goto call_again;
790 else
792 * We are setting rpc_callerr here given that libnsl
793 * is not reentrant thereby reinitializing the TSD.
794 * If not set here then success could be returned even
795 * though refresh failed.
797 rpc_callerr.re_status = RPC_AUTHERROR;
798 } /* end of unsuccessful completion */
799 /* free verifier ... */
800 if (reply_msg.rm_reply.rp_stat == MSG_ACCEPTED &&
801 reply_msg.acpted_rply.ar_verf.oa_base != NULL) {
802 xdrs->x_op = XDR_FREE;
803 (void) xdr_opaque_auth(xdrs, &(reply_msg.acpted_rply.ar_verf));
805 rpc_fd_unlock(vctbl, ct->ct_fd);
806 return (rpc_callerr.re_status);
809 static enum clnt_stat
810 clnt_vc_send(CLIENT *cl, rpcproc_t proc, xdrproc_t xdr_args, caddr_t args_ptr)
812 /* LINTED pointer alignment */
813 struct ct_data *ct = (struct ct_data *)cl->cl_private;
814 XDR *xdrs = &(ct->ct_xdrs);
815 uint32_t x_id;
816 /* LINTED pointer alignment */
817 uint32_t *msg_x_id = (uint32_t *)(ct->ct_mcall); /* yuk */
819 if (rpc_fd_lock(vctbl, ct->ct_fd)) {
820 rpc_callerr.re_status = RPC_FAILED;
821 rpc_callerr.re_errno = errno;
822 rpc_fd_unlock(vctbl, ct->ct_fd);
823 return (RPC_FAILED);
826 ct->ct_is_oneway = TRUE;
828 xdrs->x_op = XDR_ENCODE;
829 rpc_callerr.re_status = RPC_SUCCESS;
831 * Due to little endian byte order, it is necessary to convert to host
832 * format before decrementing xid.
834 x_id = ntohl(*msg_x_id) - 1;
835 *msg_x_id = htonl(x_id);
837 if (cl->cl_auth->ah_cred.oa_flavor != RPCSEC_GSS) {
838 if ((!XDR_PUTBYTES(xdrs, ct->ct_mcall, ct->ct_mpos)) ||
839 (!XDR_PUTINT32(xdrs, (int32_t *)&proc)) ||
840 (!AUTH_MARSHALL(cl->cl_auth, xdrs)) ||
841 (!xdr_args(xdrs, args_ptr))) {
842 if (rpc_callerr.re_status == RPC_SUCCESS)
843 rpc_callerr.re_status = RPC_CANTENCODEARGS;
844 (void) xdrrec_endofrecord(xdrs, TRUE);
845 rpc_fd_unlock(vctbl, ct->ct_fd);
846 return (rpc_callerr.re_status);
848 } else {
849 /* LINTED pointer alignment */
850 uint32_t *u = (uint32_t *)&ct->ct_mcall[ct->ct_mpos];
851 IXDR_PUT_U_INT32(u, proc);
852 if (!__rpc_gss_wrap(cl->cl_auth, ct->ct_mcall,
853 ((char *)u) - ct->ct_mcall, xdrs, xdr_args, args_ptr)) {
854 if (rpc_callerr.re_status == RPC_SUCCESS)
855 rpc_callerr.re_status = RPC_CANTENCODEARGS;
856 (void) xdrrec_endofrecord(xdrs, TRUE);
857 rpc_fd_unlock(vctbl, ct->ct_fd);
858 return (rpc_callerr.re_status);
863 * Do not need to check errors, as the following code does
864 * not depend on the successful completion of the call.
865 * An error, if any occurs, is reported through
866 * rpc_callerr.re_status.
868 (void) xdrrec_endofrecord(xdrs, TRUE);
870 rpc_fd_unlock(vctbl, ct->ct_fd);
871 return (rpc_callerr.re_status);
874 /* ARGSUSED */
875 static void
876 clnt_vc_geterr(CLIENT *cl, struct rpc_err *errp)
878 *errp = rpc_callerr;
881 static bool_t
882 clnt_vc_freeres(CLIENT *cl, xdrproc_t xdr_res, caddr_t res_ptr)
884 /* LINTED pointer alignment */
885 struct ct_data *ct = (struct ct_data *)cl->cl_private;
886 XDR *xdrs = &(ct->ct_xdrs);
887 bool_t stat;
889 (void) rpc_fd_lock(vctbl, ct->ct_fd);
890 xdrs->x_op = XDR_FREE;
891 stat = (*xdr_res)(xdrs, res_ptr);
892 rpc_fd_unlock(vctbl, ct->ct_fd);
893 return (stat);
896 static void
897 clnt_vc_abort(void)
901 /*ARGSUSED*/
902 static bool_t
903 clnt_vc_control(CLIENT *cl, int request, char *info)
905 bool_t ret;
906 /* LINTED pointer alignment */
907 struct ct_data *ct = (struct ct_data *)cl->cl_private;
909 if (rpc_fd_lock(vctbl, ct->ct_fd)) {
910 rpc_fd_unlock(vctbl, ct->ct_fd);
911 return (FALSE);
914 switch (request) {
915 case CLSET_FD_CLOSE:
916 ct->ct_closeit = TRUE;
917 rpc_fd_unlock(vctbl, ct->ct_fd);
918 return (TRUE);
919 case CLSET_FD_NCLOSE:
920 ct->ct_closeit = FALSE;
921 rpc_fd_unlock(vctbl, ct->ct_fd);
922 return (TRUE);
923 case CLFLUSH:
924 if (ct->ct_io_mode == RPC_CL_NONBLOCKING) {
925 int res;
926 res = do_flush(ct, (info == NULL ||
927 /* LINTED pointer cast */
928 *(int *)info == RPC_CL_DEFAULT_FLUSH)?
929 /* LINTED pointer cast */
930 ct->ct_blocking_mode: *(int *)info);
931 ret = (0 == res);
932 } else {
933 ret = FALSE;
935 rpc_fd_unlock(vctbl, ct->ct_fd);
936 return (ret);
939 /* for other requests which use info */
940 if (info == NULL) {
941 rpc_fd_unlock(vctbl, ct->ct_fd);
942 return (FALSE);
944 switch (request) {
945 case CLSET_TIMEOUT:
946 /* LINTED pointer alignment */
947 if (time_not_ok((struct timeval *)info)) {
948 rpc_fd_unlock(vctbl, ct->ct_fd);
949 return (FALSE);
951 /* LINTED pointer alignment */
952 ct->ct_wait = __rpc_timeval_to_msec((struct timeval *)info);
953 ct->ct_waitset = TRUE;
954 break;
955 case CLGET_TIMEOUT:
956 /* LINTED pointer alignment */
957 ((struct timeval *)info)->tv_sec = ct->ct_wait / 1000;
958 /* LINTED pointer alignment */
959 ((struct timeval *)info)->tv_usec = (ct->ct_wait % 1000) * 1000;
960 break;
961 case CLGET_SERVER_ADDR: /* For compatibility only */
962 (void) memcpy(info, ct->ct_addr.buf, (size_t)ct->ct_addr.len);
963 break;
964 case CLGET_FD:
965 /* LINTED pointer alignment */
966 *(int *)info = ct->ct_fd;
967 break;
968 case CLGET_SVC_ADDR:
969 /* The caller should not free this memory area */
970 /* LINTED pointer alignment */
971 *(struct netbuf *)info = ct->ct_addr;
972 break;
973 case CLSET_SVC_ADDR: /* set to new address */
974 #ifdef undef
976 * XXX: once the t_snddis(), followed by t_connect() starts to
977 * work, this ifdef should be removed. CLIENT handle reuse
978 * would then be possible for COTS as well.
980 if (t_snddis(ct->ct_fd, NULL) == -1) {
981 rpc_createerr.cf_stat = RPC_TLIERROR;
982 rpc_createerr.cf_error.re_terrno = t_errno;
983 rpc_createerr.cf_error.re_errno = errno;
984 rpc_fd_unlock(vctbl, ct->ct_fd);
985 return (FALSE);
987 ret = set_up_connection(ct->ct_fd, (struct netbuf *)info,
988 ct, NULL);
989 rpc_fd_unlock(vctbl, ct->ct_fd);
990 return (ret);
991 #else
992 rpc_fd_unlock(vctbl, ct->ct_fd);
993 return (FALSE);
994 #endif
995 case CLGET_XID:
997 * use the knowledge that xid is the
998 * first element in the call structure
999 * This will get the xid of the PREVIOUS call
1001 /* LINTED pointer alignment */
1002 *(uint32_t *)info = ntohl(*(uint32_t *)ct->ct_mcall);
1003 break;
1004 case CLSET_XID:
1005 /* This will set the xid of the NEXT call */
1006 /* LINTED pointer alignment */
1007 *(uint32_t *)ct->ct_mcall = htonl(*(uint32_t *)info + 1);
1008 /* increment by 1 as clnt_vc_call() decrements once */
1009 break;
1010 case CLGET_VERS:
1012 * This RELIES on the information that, in the call body,
1013 * the version number field is the fifth field from the
1014 * begining of the RPC header. MUST be changed if the
1015 * call_struct is changed
1017 /* LINTED pointer alignment */
1018 *(uint32_t *)info = ntohl(*(uint32_t *)(ct->ct_mcall +
1019 4 * BYTES_PER_XDR_UNIT));
1020 break;
1022 case CLSET_VERS:
1023 /* LINTED pointer alignment */
1024 *(uint32_t *)(ct->ct_mcall + 4 * BYTES_PER_XDR_UNIT) =
1025 /* LINTED pointer alignment */
1026 htonl(*(uint32_t *)info);
1027 break;
1029 case CLGET_PROG:
1031 * This RELIES on the information that, in the call body,
1032 * the program number field is the fourth field from the
1033 * begining of the RPC header. MUST be changed if the
1034 * call_struct is changed
1036 /* LINTED pointer alignment */
1037 *(uint32_t *)info = ntohl(*(uint32_t *)(ct->ct_mcall +
1038 3 * BYTES_PER_XDR_UNIT));
1039 break;
1041 case CLSET_PROG:
1042 /* LINTED pointer alignment */
1043 *(uint32_t *)(ct->ct_mcall + 3 * BYTES_PER_XDR_UNIT) =
1044 /* LINTED pointer alignment */
1045 htonl(*(uint32_t *)info);
1046 break;
1048 case CLSET_IO_MODE:
1049 /* LINTED pointer cast */
1050 if (!set_io_mode(ct, *(int *)info)) {
1051 rpc_fd_unlock(vctbl, ct->ct_fd);
1052 return (FALSE);
1054 break;
1055 case CLSET_FLUSH_MODE:
1056 /* Set a specific FLUSH_MODE */
1057 /* LINTED pointer cast */
1058 if (!set_flush_mode(ct, *(int *)info)) {
1059 rpc_fd_unlock(vctbl, ct->ct_fd);
1060 return (FALSE);
1062 break;
1063 case CLGET_FLUSH_MODE:
1064 /* LINTED pointer cast */
1065 *(rpcflushmode_t *)info = ct->ct_blocking_mode;
1066 break;
1068 case CLGET_IO_MODE:
1069 /* LINTED pointer cast */
1070 *(rpciomode_t *)info = ct->ct_io_mode;
1071 break;
1073 case CLGET_CURRENT_REC_SIZE:
1075 * Returns the current amount of memory allocated
1076 * to pending requests
1078 /* LINTED pointer cast */
1079 *(int *)info = ct->ct_bufferPendingSize;
1080 break;
1082 case CLSET_CONNMAXREC_SIZE:
1083 /* Cannot resize the buffer if it is used. */
1084 if (ct->ct_bufferPendingSize != 0) {
1085 rpc_fd_unlock(vctbl, ct->ct_fd);
1086 return (FALSE);
1089 * If the new size is equal to the current size,
1090 * there is nothing to do.
1092 /* LINTED pointer cast */
1093 if (ct->ct_bufferSize == *(uint_t *)info)
1094 break;
1096 /* LINTED pointer cast */
1097 ct->ct_bufferSize = *(uint_t *)info;
1098 if (ct->ct_buffer) {
1099 free(ct->ct_buffer);
1100 ct->ct_buffer = NULL;
1101 ct->ct_bufferReadPtr = ct->ct_bufferWritePtr = NULL;
1103 break;
1105 case CLGET_CONNMAXREC_SIZE:
1107 * Returns the size of buffer allocated
1108 * to pending requests
1110 /* LINTED pointer cast */
1111 *(uint_t *)info = ct->ct_bufferSize;
1112 break;
1114 default:
1115 rpc_fd_unlock(vctbl, ct->ct_fd);
1116 return (FALSE);
1118 rpc_fd_unlock(vctbl, ct->ct_fd);
1119 return (TRUE);
1122 static void
1123 clnt_vc_destroy(CLIENT *cl)
1125 /* LINTED pointer alignment */
1126 struct ct_data *ct = (struct ct_data *)cl->cl_private;
1127 int ct_fd = ct->ct_fd;
1129 (void) rpc_fd_lock(vctbl, ct_fd);
1131 if (ct->ct_io_mode == RPC_CL_NONBLOCKING) {
1132 (void) do_flush(ct, RPC_CL_BLOCKING_FLUSH);
1133 (void) unregister_nb(ct);
1136 if (ct->ct_closeit)
1137 (void) t_close(ct_fd);
1138 XDR_DESTROY(&(ct->ct_xdrs));
1139 free(ct->ct_addr.buf);
1140 free(ct);
1141 if (cl->cl_netid && cl->cl_netid[0])
1142 free(cl->cl_netid);
1143 if (cl->cl_tp && cl->cl_tp[0])
1144 free(cl->cl_tp);
1145 free(cl);
1146 rpc_fd_unlock(vctbl, ct_fd);
1150 * Interface between xdr serializer and vc connection.
1151 * Behaves like the system calls, read & write, but keeps some error state
1152 * around for the rpc level.
1154 static int
1155 read_vc(void *ct_tmp, caddr_t buf, int len)
1157 static pthread_key_t pfdp_key = PTHREAD_ONCE_KEY_NP;
1158 struct pollfd *pfdp;
1159 int npfd; /* total number of pfdp allocated */
1160 struct ct_data *ct = ct_tmp;
1161 struct timeval starttime;
1162 struct timeval curtime;
1163 int poll_time;
1164 int delta;
1166 if (len == 0)
1167 return (0);
1170 * Allocate just one the first time. thr_get_storage() may
1171 * return a larger buffer, left over from the last time we were
1172 * here, but that's OK. realloc() will deal with it properly.
1174 npfd = 1;
1175 pfdp = thr_get_storage(&pfdp_key, sizeof (struct pollfd), free);
1176 if (pfdp == NULL) {
1177 (void) syslog(LOG_ERR, clnt_vc_errstr,
1178 clnt_read_vc_str, __no_mem_str);
1179 rpc_callerr.re_status = RPC_SYSTEMERROR;
1180 rpc_callerr.re_errno = errno;
1181 rpc_callerr.re_terrno = 0;
1182 return (-1);
1186 * N.B.: slot 0 in the pollfd array is reserved for the file
1187 * descriptor we're really interested in (as opposed to the
1188 * callback descriptors).
1190 pfdp[0].fd = ct->ct_fd;
1191 pfdp[0].events = MASKVAL;
1192 pfdp[0].revents = 0;
1193 poll_time = ct->ct_wait;
1194 if (gettimeofday(&starttime, NULL) == -1) {
1195 syslog(LOG_ERR, "Unable to get time of day: %m");
1196 return (-1);
1199 for (;;) {
1200 extern void (*_svc_getreqset_proc)();
1201 extern pollfd_t *svc_pollfd;
1202 extern int svc_max_pollfd;
1203 int fds;
1205 /* VARIABLES PROTECTED BY svc_fd_lock: svc_pollfd */
1207 if (_svc_getreqset_proc) {
1208 sig_rw_rdlock(&svc_fd_lock);
1210 /* reallocate pfdp to svc_max_pollfd +1 */
1211 if (npfd != (svc_max_pollfd + 1)) {
1212 struct pollfd *tmp_pfdp = reallocarray(pfdp,
1213 svc_max_pollfd + 1,
1214 sizeof (struct pollfd));
1215 if (tmp_pfdp == NULL) {
1216 sig_rw_unlock(&svc_fd_lock);
1217 (void) syslog(LOG_ERR, clnt_vc_errstr,
1218 clnt_read_vc_str, __no_mem_str);
1219 rpc_callerr.re_status = RPC_SYSTEMERROR;
1220 rpc_callerr.re_errno = errno;
1221 rpc_callerr.re_terrno = 0;
1222 return (-1);
1225 pfdp = tmp_pfdp;
1226 npfd = svc_max_pollfd + 1;
1227 (void) pthread_setspecific(pfdp_key, pfdp);
1229 if (npfd > 1)
1230 (void) memcpy(&pfdp[1], svc_pollfd,
1231 sizeof (struct pollfd) * (npfd - 1));
1233 sig_rw_unlock(&svc_fd_lock);
1234 } else {
1235 npfd = 1; /* don't forget about pfdp[0] */
1238 switch (fds = poll(pfdp, npfd, poll_time)) {
1239 case 0:
1240 rpc_callerr.re_status = RPC_TIMEDOUT;
1241 return (-1);
1243 case -1:
1244 if (errno != EINTR)
1245 continue;
1246 else {
1248 * interrupted by another signal,
1249 * update time_waited
1252 if (gettimeofday(&curtime, NULL) == -1) {
1253 syslog(LOG_ERR,
1254 "Unable to get time of day: %m");
1255 errno = 0;
1256 continue;
1258 delta = (curtime.tv_sec -
1259 starttime.tv_sec) * 1000 +
1260 (curtime.tv_usec -
1261 starttime.tv_usec) / 1000;
1262 poll_time -= delta;
1263 if (poll_time < 0) {
1264 rpc_callerr.re_status = RPC_TIMEDOUT;
1265 errno = 0;
1266 return (-1);
1267 } else {
1268 errno = 0; /* reset it */
1269 continue;
1274 if (pfdp[0].revents == 0) {
1275 /* must be for server side of the house */
1276 (*_svc_getreqset_proc)(&pfdp[1], fds);
1277 continue; /* do poll again */
1280 if (pfdp[0].revents & POLLNVAL) {
1281 rpc_callerr.re_status = RPC_CANTRECV;
1283 * Note: we're faking errno here because we
1284 * previously would have expected select() to
1285 * return -1 with errno EBADF. Poll(BA_OS)
1286 * returns 0 and sets the POLLNVAL revents flag
1287 * instead.
1289 rpc_callerr.re_errno = errno = EBADF;
1290 return (-1);
1293 if (pfdp[0].revents & (POLLERR | POLLHUP)) {
1294 rpc_callerr.re_status = RPC_CANTRECV;
1295 rpc_callerr.re_errno = errno = EPIPE;
1296 return (-1);
1298 break;
1301 switch (len = t_rcvall(ct->ct_fd, buf, len)) {
1302 case 0:
1303 /* premature eof */
1304 rpc_callerr.re_errno = ENOLINK;
1305 rpc_callerr.re_terrno = 0;
1306 rpc_callerr.re_status = RPC_CANTRECV;
1307 len = -1; /* it's really an error */
1308 break;
1310 case -1:
1311 rpc_callerr.re_terrno = t_errno;
1312 rpc_callerr.re_errno = 0;
1313 rpc_callerr.re_status = RPC_CANTRECV;
1314 break;
1316 return (len);
1319 static int
1320 write_vc(void *ct_tmp, caddr_t buf, int len)
1322 int i, cnt;
1323 struct ct_data *ct = ct_tmp;
1324 int flag;
1325 int maxsz;
1327 maxsz = ct->ct_tsdu;
1329 /* Handle the non-blocking mode */
1330 if (ct->ct_is_oneway && ct->ct_io_mode == RPC_CL_NONBLOCKING) {
1332 * Test a special case here. If the length of the current
1333 * write is greater than the transport data unit, and the
1334 * mode is non blocking, we return RPC_CANTSEND.
1335 * XXX this is not very clean.
1337 if (maxsz > 0 && len > maxsz) {
1338 rpc_callerr.re_terrno = errno;
1339 rpc_callerr.re_errno = 0;
1340 rpc_callerr.re_status = RPC_CANTSEND;
1341 return (-1);
1344 len = nb_send(ct, buf, (unsigned)len);
1345 if (len == -1) {
1346 rpc_callerr.re_terrno = errno;
1347 rpc_callerr.re_errno = 0;
1348 rpc_callerr.re_status = RPC_CANTSEND;
1349 } else if (len == -2) {
1350 rpc_callerr.re_terrno = 0;
1351 rpc_callerr.re_errno = 0;
1352 rpc_callerr.re_status = RPC_CANTSTORE;
1354 return (len);
1357 if ((maxsz == 0) || (maxsz == -1)) {
1359 * T_snd may return -1 for error on connection (connection
1360 * needs to be repaired/closed, and -2 for flow-control
1361 * handling error (no operation to do, just wait and call
1362 * T_Flush()).
1364 if ((len = t_snd(ct->ct_fd, buf, (unsigned)len, 0)) == -1) {
1365 rpc_callerr.re_terrno = t_errno;
1366 rpc_callerr.re_errno = 0;
1367 rpc_callerr.re_status = RPC_CANTSEND;
1369 return (len);
1373 * This for those transports which have a max size for data.
1375 for (cnt = len, i = 0; cnt > 0; cnt -= i, buf += i) {
1376 flag = cnt > maxsz ? T_MORE : 0;
1377 if ((i = t_snd(ct->ct_fd, buf, (unsigned)MIN(cnt, maxsz),
1378 flag)) == -1) {
1379 rpc_callerr.re_terrno = t_errno;
1380 rpc_callerr.re_errno = 0;
1381 rpc_callerr.re_status = RPC_CANTSEND;
1382 return (-1);
1385 return (len);
1389 * Receive the required bytes of data, even if it is fragmented.
1391 static int
1392 t_rcvall(int fd, char *buf, int len)
1394 int moreflag;
1395 int final = 0;
1396 int res;
1398 do {
1399 moreflag = 0;
1400 res = t_rcv(fd, buf, (unsigned)len, &moreflag);
1401 if (res == -1) {
1402 if (t_errno == TLOOK)
1403 switch (t_look(fd)) {
1404 case T_DISCONNECT:
1405 (void) t_rcvdis(fd, NULL);
1406 (void) t_snddis(fd, NULL);
1407 return (-1);
1408 case T_ORDREL:
1409 /* Received orderly release indication */
1410 (void) t_rcvrel(fd);
1411 /* Send orderly release indicator */
1412 (void) t_sndrel(fd);
1413 return (-1);
1414 default:
1415 return (-1);
1417 } else if (res == 0) {
1418 return (0);
1420 final += res;
1421 buf += res;
1422 len -= res;
1423 } while ((len > 0) && (moreflag & T_MORE));
1424 return (final);
1427 static struct clnt_ops *
1428 clnt_vc_ops(void)
1430 static struct clnt_ops ops;
1431 extern mutex_t ops_lock;
1433 /* VARIABLES PROTECTED BY ops_lock: ops */
1435 sig_mutex_lock(&ops_lock);
1436 if (ops.cl_call == NULL) {
1437 ops.cl_call = clnt_vc_call;
1438 ops.cl_send = clnt_vc_send;
1439 ops.cl_abort = clnt_vc_abort;
1440 ops.cl_geterr = clnt_vc_geterr;
1441 ops.cl_freeres = clnt_vc_freeres;
1442 ops.cl_destroy = clnt_vc_destroy;
1443 ops.cl_control = clnt_vc_control;
1445 sig_mutex_unlock(&ops_lock);
1446 return (&ops);
1450 * Make sure that the time is not garbage. -1 value is disallowed.
1451 * Note this is different from time_not_ok in clnt_dg.c
1453 static bool_t
1454 time_not_ok(struct timeval *t)
1456 return (t->tv_sec <= -1 || t->tv_sec > 100000000 ||
1457 t->tv_usec <= -1 || t->tv_usec > 1000000);
1461 /* Compute the # of bytes that remains until the end of the buffer */
1462 #define REMAIN_BYTES(p) (ct->ct_bufferSize-(ct->ct_##p - ct->ct_buffer))
1464 static int
1465 addInBuffer(struct ct_data *ct, char *dataToAdd, unsigned int nBytes)
1467 if (NULL == ct->ct_buffer) {
1468 /* Buffer not allocated yet. */
1469 char *buffer;
1471 buffer = malloc(ct->ct_bufferSize);
1472 if (NULL == buffer) {
1473 errno = ENOMEM;
1474 return (-1);
1476 (void) memcpy(buffer, dataToAdd, nBytes);
1478 ct->ct_buffer = buffer;
1479 ct->ct_bufferReadPtr = buffer;
1480 ct->ct_bufferWritePtr = buffer + nBytes;
1481 ct->ct_bufferPendingSize = nBytes;
1482 } else {
1484 * For an already allocated buffer, two mem copies
1485 * might be needed, depending on the current
1486 * writing position.
1489 /* Compute the length of the first copy. */
1490 int len = MIN(nBytes, REMAIN_BYTES(bufferWritePtr));
1492 ct->ct_bufferPendingSize += nBytes;
1494 (void) memcpy(ct->ct_bufferWritePtr, dataToAdd, len);
1495 ct->ct_bufferWritePtr += len;
1496 nBytes -= len;
1497 if (0 == nBytes) {
1498 /* One memcopy needed. */
1501 * If the write pointer is at the end of the buffer,
1502 * wrap it now.
1504 if (ct->ct_bufferWritePtr ==
1505 (ct->ct_buffer + ct->ct_bufferSize)) {
1506 ct->ct_bufferWritePtr = ct->ct_buffer;
1508 } else {
1509 /* Two memcopy needed. */
1510 dataToAdd += len;
1513 * Copy the remaining data to the beginning of the
1514 * buffer
1516 (void) memcpy(ct->ct_buffer, dataToAdd, nBytes);
1517 ct->ct_bufferWritePtr = ct->ct_buffer + nBytes;
1520 return (0);
1523 static void
1524 consumeFromBuffer(struct ct_data *ct, unsigned int nBytes)
1526 ct->ct_bufferPendingSize -= nBytes;
1527 if (ct->ct_bufferPendingSize == 0) {
1529 * If the buffer contains no data, we set the two pointers at
1530 * the beginning of the buffer (to miminize buffer wraps).
1532 ct->ct_bufferReadPtr = ct->ct_bufferWritePtr = ct->ct_buffer;
1533 } else {
1534 ct->ct_bufferReadPtr += nBytes;
1535 if (ct->ct_bufferReadPtr >
1536 ct->ct_buffer + ct->ct_bufferSize) {
1537 ct->ct_bufferReadPtr -= ct->ct_bufferSize;
1542 static int
1543 iovFromBuffer(struct ct_data *ct, struct iovec *iov)
1545 int l;
1547 if (ct->ct_bufferPendingSize == 0)
1548 return (0);
1550 l = REMAIN_BYTES(bufferReadPtr);
1551 if (l < ct->ct_bufferPendingSize) {
1552 /* Buffer in two fragments. */
1553 iov[0].iov_base = ct->ct_bufferReadPtr;
1554 iov[0].iov_len = l;
1556 iov[1].iov_base = ct->ct_buffer;
1557 iov[1].iov_len = ct->ct_bufferPendingSize - l;
1558 return (2);
1559 } else {
1560 /* Buffer in one fragment. */
1561 iov[0].iov_base = ct->ct_bufferReadPtr;
1562 iov[0].iov_len = ct->ct_bufferPendingSize;
1563 return (1);
1567 static bool_t
1568 set_flush_mode(struct ct_data *ct, int mode)
1570 switch (mode) {
1571 case RPC_CL_BLOCKING_FLUSH:
1572 /* flush as most as possible without blocking */
1573 case RPC_CL_BESTEFFORT_FLUSH:
1574 /* flush the buffer completely (possibly blocking) */
1575 case RPC_CL_DEFAULT_FLUSH:
1576 /* flush according to the currently defined policy */
1577 ct->ct_blocking_mode = mode;
1578 return (TRUE);
1579 default:
1580 return (FALSE);
1584 static bool_t
1585 set_io_mode(struct ct_data *ct, int ioMode)
1587 switch (ioMode) {
1588 case RPC_CL_BLOCKING:
1589 if (ct->ct_io_mode == RPC_CL_NONBLOCKING) {
1590 if (NULL != ct->ct_buffer) {
1592 * If a buffer was allocated for this
1593 * connection, flush it now, and free it.
1595 (void) do_flush(ct, RPC_CL_BLOCKING_FLUSH);
1596 free(ct->ct_buffer);
1597 ct->ct_buffer = NULL;
1599 (void) unregister_nb(ct);
1600 ct->ct_io_mode = ioMode;
1602 break;
1603 case RPC_CL_NONBLOCKING:
1604 if (ct->ct_io_mode == RPC_CL_BLOCKING) {
1605 if (-1 == register_nb(ct)) {
1606 return (FALSE);
1608 ct->ct_io_mode = ioMode;
1610 break;
1611 default:
1612 return (FALSE);
1614 return (TRUE);
1617 static int
1618 do_flush(struct ct_data *ct, uint_t flush_mode)
1620 int result;
1621 if (ct->ct_bufferPendingSize == 0) {
1622 return (0);
1625 switch (flush_mode) {
1626 case RPC_CL_BLOCKING_FLUSH:
1627 if (!set_blocking_connection(ct, TRUE)) {
1628 return (-1);
1630 while (ct->ct_bufferPendingSize > 0) {
1631 if (REMAIN_BYTES(bufferReadPtr) <
1632 ct->ct_bufferPendingSize) {
1633 struct iovec iov[2];
1634 (void) iovFromBuffer(ct, iov);
1635 result = writev(ct->ct_fd, iov, 2);
1636 } else {
1637 result = t_snd(ct->ct_fd, ct->ct_bufferReadPtr,
1638 ct->ct_bufferPendingSize, 0);
1640 if (result < 0) {
1641 return (-1);
1643 consumeFromBuffer(ct, result);
1646 break;
1648 case RPC_CL_BESTEFFORT_FLUSH:
1649 (void) set_blocking_connection(ct, FALSE);
1650 if (REMAIN_BYTES(bufferReadPtr) < ct->ct_bufferPendingSize) {
1651 struct iovec iov[2];
1652 (void) iovFromBuffer(ct, iov);
1653 result = writev(ct->ct_fd, iov, 2);
1654 } else {
1655 result = t_snd(ct->ct_fd, ct->ct_bufferReadPtr,
1656 ct->ct_bufferPendingSize, 0);
1658 if (result < 0) {
1659 if (errno != EWOULDBLOCK) {
1660 perror("flush");
1661 return (-1);
1663 return (0);
1665 if (result > 0)
1666 consumeFromBuffer(ct, result);
1667 break;
1669 return (0);
1673 * Non blocking send.
1676 static int
1677 nb_send(struct ct_data *ct, void *buff, unsigned int nBytes)
1679 int result;
1681 if (!(ntohl(*(uint32_t *)buff) & 2^31)) {
1682 return (-1);
1686 * Check to see if the current message can be stored fully in the
1687 * buffer. We have to check this now because it may be impossible
1688 * to send any data, so the message must be stored in the buffer.
1690 if (nBytes > (ct->ct_bufferSize - ct->ct_bufferPendingSize)) {
1691 /* Try to flush (to free some space). */
1692 (void) do_flush(ct, RPC_CL_BESTEFFORT_FLUSH);
1694 /* Can we store the message now ? */
1695 if (nBytes > (ct->ct_bufferSize - ct->ct_bufferPendingSize))
1696 return (-2);
1699 (void) set_blocking_connection(ct, FALSE);
1702 * If there is no data pending, we can simply try
1703 * to send our data.
1705 if (ct->ct_bufferPendingSize == 0) {
1706 result = t_snd(ct->ct_fd, buff, nBytes, 0);
1707 if (result == -1) {
1708 if (errno == EWOULDBLOCK) {
1709 result = 0;
1710 } else {
1711 perror("send");
1712 return (-1);
1716 * If we have not sent all data, we must store them
1717 * in the buffer.
1719 if (result != nBytes) {
1720 if (addInBuffer(ct, (char *)buff + result,
1721 nBytes - result) == -1) {
1722 return (-1);
1725 } else {
1727 * Some data pending in the buffer. We try to send
1728 * both buffer data and current message in one shot.
1730 struct iovec iov[3];
1731 int i = iovFromBuffer(ct, &iov[0]);
1733 iov[i].iov_base = buff;
1734 iov[i].iov_len = nBytes;
1736 result = writev(ct->ct_fd, iov, i+1);
1737 if (result == -1) {
1738 if (errno == EWOULDBLOCK) {
1739 /* No bytes sent */
1740 result = 0;
1741 } else {
1742 return (-1);
1747 * Add the bytes from the message
1748 * that we have not sent.
1750 if (result <= ct->ct_bufferPendingSize) {
1751 /* No bytes from the message sent */
1752 consumeFromBuffer(ct, result);
1753 if (addInBuffer(ct, buff, nBytes) == -1) {
1754 return (-1);
1756 } else {
1758 * Some bytes of the message are sent.
1759 * Compute the length of the message that has
1760 * been sent.
1762 int len = result - ct->ct_bufferPendingSize;
1764 /* So, empty the buffer. */
1765 ct->ct_bufferReadPtr = ct->ct_buffer;
1766 ct->ct_bufferWritePtr = ct->ct_buffer;
1767 ct->ct_bufferPendingSize = 0;
1769 /* And add the remaining part of the message. */
1770 if (len != nBytes) {
1771 if (addInBuffer(ct, (char *)buff + len,
1772 nBytes-len) == -1) {
1773 return (-1);
1778 return (nBytes);
1781 static void
1782 flush_registered_clients(void)
1784 struct nb_reg_node *node;
1786 if (LIST_ISEMPTY(nb_first)) {
1787 return;
1790 LIST_FOR_EACH(nb_first, node) {
1791 (void) do_flush(node->ct, RPC_CL_BLOCKING_FLUSH);
1795 static int
1796 allocate_chunk(void)
1798 #define CHUNK_SIZE 16
1799 struct nb_reg_node *chk =
1800 malloc(sizeof (struct nb_reg_node) * CHUNK_SIZE);
1801 struct nb_reg_node *n;
1802 int i;
1804 if (NULL == chk) {
1805 return (-1);
1808 n = chk;
1809 for (i = 0; i < CHUNK_SIZE-1; ++i) {
1810 n[i].next = &(n[i+1]);
1812 n[CHUNK_SIZE-1].next = (struct nb_reg_node *)&nb_free;
1813 nb_free = chk;
1814 return (0);
1817 static int
1818 register_nb(struct ct_data *ct)
1820 struct nb_reg_node *node;
1822 (void) mutex_lock(&nb_list_mutex);
1824 if (LIST_ISEMPTY(nb_free) && (allocate_chunk() == -1)) {
1825 (void) mutex_unlock(&nb_list_mutex);
1826 errno = ENOMEM;
1827 return (-1);
1830 if (!exit_handler_set) {
1831 (void) atexit(flush_registered_clients);
1832 exit_handler_set = TRUE;
1834 /* Get the first free node */
1835 LIST_EXTRACT(nb_free, node);
1837 node->ct = ct;
1839 LIST_ADD(nb_first, node);
1840 (void) mutex_unlock(&nb_list_mutex);
1842 return (0);
1845 static int
1846 unregister_nb(struct ct_data *ct)
1848 struct nb_reg_node *node;
1850 (void) mutex_lock(&nb_list_mutex);
1851 assert(!LIST_ISEMPTY(nb_first));
1853 node = nb_first;
1854 LIST_FOR_EACH(nb_first, node) {
1855 if (node->next->ct == ct) {
1856 /* Get the node to unregister. */
1857 struct nb_reg_node *n = node->next;
1858 node->next = n->next;
1860 n->ct = NULL;
1861 LIST_ADD(nb_free, n);
1862 break;
1865 (void) mutex_unlock(&nb_list_mutex);
1866 return (0);