4 * The contents of this file are subject to the terms of the
5 * Common Development and Distribution License (the "License").
6 * You may not use this file except in compliance with the License.
8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9 * or http://www.opensolaris.org/os/licensing.
10 * See the License for the specific language governing permissions
11 * and limitations under the License.
13 * When distributing Covered Code, include this CDDL HEADER in each
14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15 * If applicable, add the following below this CDDL HEADER, with the
16 * fields enclosed by brackets "[]" replaced with your own identifying
17 * information: Portions Copyright [yyyy] [name of copyright owner]
23 * Copyright 2010 Sun Microsystems, Inc. All rights reserved.
24 * Use is subject to license terms.
28 * FMA Event Transport Module Transport Layer API implementation.
30 * Library for establishing connections and transporting FMA events between
31 * ETMs (event-transport modules) in separate fault domains.
33 * The transport for this library is internet socket based and uses the DSCP
34 * client services library (libdscp).
40 * On the SP, there is one DSCP interface for every domain.
41 * Each domain has one and only one DSCP interface to the SP.
43 * The DSCP interface is created when the domain powers-on. On the SP,
44 * a sysevent will be generated when the DSCP interface is up. On the domain,
45 * the DSCP interface should be up when ETM loads.
48 exs_conn_t Acc
; /* Connection for accepting/listening */
49 pthread_t Acc_tid
; /* Thread ID for accepting conns */
50 int Acc_quit
; /* Signal to quit the acceptor thread */
51 int Acc_destroy
; /* Destroy accept/listen thread? */
52 exs_hdl_t
*Exh_head
= NULL
; /* Head of ex_hdl_t list */
53 pthread_mutex_t List_lock
= PTHREAD_MUTEX_INITIALIZER
;
54 /* Protects linked list of ex_hdl_t */
55 static void *Dlp
= NULL
; /* Handle for dlopen/dlclose/dlsym */
56 static int (*Send_filter
)(fmd_hdl_t
*hdl
, nvlist_t
*event
, const char *dest
);
57 static int (*Post_filter
)(fmd_hdl_t
*hdl
, nvlist_t
*event
, const char *src
);
60 * * * * * * * * * * * * * *
61 * Module specific routines
62 * * * * * * * * * * * * * *
66 * Allocate and initialize a transport instance handle.
67 * Return hdl pointer for success, NULL for failure.
70 exs_hdl_alloc(fmd_hdl_t
*hdl
, char *endpoint_id
,
71 int (*cb_func
)(fmd_hdl_t
*hdl
, etm_xport_conn_t conn
, etm_cb_flag_t flag
,
72 void *arg
), void *cb_func_arg
, int dom
)
76 hp
= fmd_hdl_zalloc(hdl
, sizeof (exs_hdl_t
), FMD_SLEEP
);
78 hp
->h_endpt_id
= fmd_hdl_strdup(hdl
, endpoint_id
, FMD_SLEEP
);
80 hp
->h_client
.c_sd
= EXS_SD_FREE
;
81 hp
->h_server
.c_sd
= EXS_SD_FREE
;
82 hp
->h_tid
= EXS_TID_FREE
;
85 hp
->h_cb_func
= cb_func
;
86 hp
->h_cb_func_arg
= cb_func_arg
;
93 * dlopen() the platform filter library and dlsym() the filter funcs.
96 exs_filter_init(fmd_hdl_t
*hdl
)
98 char *propstr
= fmd_prop_get_string(hdl
, "filter_path");
100 if (propstr
== NULL
) {
101 fmd_hdl_debug(hdl
, "No filter plugin specified");
106 if ((Dlp
= dlopen(propstr
, RTLD_LOCAL
| RTLD_NOW
)) == NULL
) {
107 fmd_hdl_debug(hdl
, "Failed to dlopen filter plugin");
110 fmd_prop_free_string(hdl
, propstr
);
114 if ((Send_filter
= (int (*)())dlsym(Dlp
, "send_filter"))
116 fmd_hdl_debug(hdl
, "failed to dlsym send_filter()");
120 if ((Post_filter
= (int (*)())dlsym(Dlp
, "post_filter"))
122 fmd_hdl_debug(hdl
, "failed to dlsym post_filter()");
127 fmd_prop_free_string(hdl
, propstr
);
131 * If open, dlclose() the platform filter library.
135 exs_filter_fini(fmd_hdl_t
*hdl
)
142 * Translate endpoint_id string to int.
143 * Return the domain ID via "dom_id".
144 * Return 0 for success, nonzero for failure
147 exs_get_id(fmd_hdl_t
*hdl
, char *endpoint_id
, int *dom_id
)
151 if (strstr(endpoint_id
, EXS_SP_PREFIX
) != NULL
) {
152 /* Remote endpoint is the SP */
153 *dom_id
= DSCP_IDENT_SP
;
156 if ((ptr
= strstr(endpoint_id
, EXS_DOMAIN_PREFIX
)) == NULL
) {
157 fmd_hdl_error(hdl
, "Property parsing error : %s not "
158 "found in %s. Check event-transport.conf\n",
159 EXS_DOMAIN_PREFIX
, endpoint_id
);
163 ptr
+= EXS_DOMAIN_PREFIX_LEN
;
165 if ((sscanf(ptr
, "%d", dom_id
)) != 1) {
166 fmd_hdl_error(hdl
, "Property parsing error : no "
167 "integer found in %s. Check event-transport.conf\n",
177 * Prepare the client connection.
178 * Return 0 for success, nonzero for failure.
181 exs_prep_client(exs_hdl_t
*hp
)
186 /* Find the DSCP address for the remote endpoint */
187 if ((rv
= dscpAddr(hp
->h_dom
, DSCP_ADDR_REMOTE
,
188 (struct sockaddr
*)&hp
->h_client
.c_saddr
,
189 &hp
->h_client
.c_len
)) != DSCP_OK
) {
190 fmd_hdl_debug(hp
->h_hdl
, "dscpAddr on the client socket "
191 "failed for %s : rv = %d\n", hp
->h_endpt_id
, rv
);
195 if ((hp
->h_client
.c_sd
= socket(AF_INET
, SOCK_STREAM
, 0)) == -1) {
196 fmd_hdl_error(hp
->h_hdl
, "Failed to create the client socket "
197 "for %s", hp
->h_endpt_id
);
201 if (setsockopt(hp
->h_client
.c_sd
, SOL_SOCKET
, SO_REUSEADDR
,
202 &optval
, sizeof (optval
))) {
203 fmd_hdl_error(hp
->h_hdl
, "Failed to set REUSEADDR on the "
204 "client socket for %s", hp
->h_endpt_id
);
205 EXS_CLOSE_CLR(hp
->h_client
);
210 * Set SO_LINGER so TCP aborts the connection when closed.
211 * If the domain's client socket goes into the TIME_WAIT state,
212 * ETM will be unable to connect to the SP until this clears.
213 * This connection is over DSCP, which is a simple point-to-point
214 * connection and therefore has no routers or multiple forwarding.
215 * The risk of receiving old packets from a previously terminated
216 * connection is very small.
220 if (setsockopt(hp
->h_client
.c_sd
, SOL_SOCKET
, SO_LINGER
, &ling
,
222 fmd_hdl_error(hp
->h_hdl
, "Failed to set SO_LINGER on the "
223 "client socket for %s", hp
->h_endpt_id
);
224 EXS_CLOSE_CLR(hp
->h_client
);
228 /* Bind the socket to the local IP address of the DSCP link */
229 if ((rv
= dscpBind(hp
->h_dom
, hp
->h_client
.c_sd
,
230 EXS_CLIENT_PORT
)) != DSCP_OK
) {
231 if (rv
== DSCP_ERROR_DOWN
) {
232 fmd_hdl_debug(hp
->h_hdl
, "xport - dscp link for %s "
233 "is down", hp
->h_endpt_id
);
235 fmd_hdl_debug(hp
->h_hdl
, "dscpBind on the client "
236 "socket failed : rv = %d\n", rv
);
238 EXS_CLOSE_CLR(hp
->h_client
);
242 hp
->h_client
.c_saddr
.sin_port
= htons(EXS_SERVER_PORT
);
244 /* Set IPsec security policy for this socket */
245 if ((rv
= dscpSecure(hp
->h_dom
, hp
->h_client
.c_sd
)) != DSCP_OK
) {
246 fmd_hdl_error(hp
->h_hdl
, "dscpSecure on the client socket "
247 "failed for %s : rv = %d\n", hp
->h_endpt_id
, rv
);
248 EXS_CLOSE_CLR(hp
->h_client
);
256 * Server function/thread. There is one thread per endpoint.
257 * Accepts incoming connections and notifies ETM of incoming data.
260 exs_server(void *arg
)
262 exs_hdl_t
*hp
= (exs_hdl_t
*)arg
;
265 while (!hp
->h_quit
) {
268 pfd
.fd
= hp
->h_server
.c_sd
;
270 if (poll(&pfd
, 1, -1) <= 0)
271 continue; /* loop around and check h_quit */
273 if (pfd
.revents
& (POLLHUP
| POLLERR
)) {
274 fmd_hdl_debug(hp
->h_hdl
, "xport - poll hangup/err for "
275 "%s server socket", hp
->h_endpt_id
);
276 EXS_CLOSE_CLR(hp
->h_server
);
278 break; /* thread exits */
281 if (pfd
.revents
& POLLIN
) {
282 /* Notify ETM that incoming data is available */
283 if (hp
->h_cb_func(hp
->h_hdl
, &hp
->h_server
,
284 ETM_CBFLAG_RECV
, hp
->h_cb_func_arg
)) {
286 * For any non-zero return, close the
287 * connection and exit the thread.
289 EXS_CLOSE_CLR(hp
->h_server
);
291 break; /* thread exits */
296 fmd_hdl_debug(hp
->h_hdl
, "xport - exiting server thread for %s",
301 * Accept a new incoming connection.
304 exs_accept(fmd_hdl_t
*hdl
)
306 int new_sd
, dom
, flags
, rv
;
307 struct sockaddr_in new_saddr
;
308 socklen_t new_len
= sizeof (struct sockaddr
);
311 if ((new_sd
= accept(Acc
.c_sd
, (struct sockaddr
*)&new_saddr
,
313 /* Translate saddr to domain id */
314 if ((rv
= dscpIdent((struct sockaddr
*)&new_saddr
, (int)new_len
,
316 fmd_hdl_error(hdl
, "dscpIdent failed : rv = %d\n", rv
);
317 (void) close(new_sd
);
321 /* Find the exs_hdl_t for the domain trying to connect */
322 (void) pthread_mutex_lock(&List_lock
);
323 for (hp
= Exh_head
; hp
; hp
= hp
->h_next
) {
324 if (hp
->h_dom
== dom
)
327 (void) pthread_mutex_unlock(&List_lock
);
330 fmd_hdl_error(hdl
, "Not configured to accept a "
331 "connection from domain %d. Check "
332 "event-transport.conf\n", dom
);
333 (void) close(new_sd
);
337 /* Authenticate this connection request */
338 if ((rv
= dscpAuth(dom
, (struct sockaddr
*)&new_saddr
,
339 (int)new_len
)) != DSCP_OK
) {
340 fmd_hdl_error(hdl
, "dscpAuth failed for %s : rv = %d ",
341 " Possible spoofing attack\n", hp
->h_endpt_id
, rv
);
342 (void) close(new_sd
);
346 if (hp
->h_tid
!= EXS_TID_FREE
) {
348 fmd_thr_signal(hp
->h_hdl
, hp
->h_tid
);
349 fmd_thr_destroy(hp
->h_hdl
, hp
->h_tid
);
354 if (hp
->h_server
.c_sd
!= EXS_SD_FREE
)
355 EXS_CLOSE_CLR(hp
->h_server
);
357 /* Set the socket to be non-blocking */
358 flags
= fcntl(new_sd
, F_GETFL
, 0);
359 (void) fcntl(new_sd
, F_SETFL
, flags
| O_NONBLOCK
);
361 hp
->h_server
.c_sd
= new_sd
;
363 hp
->h_tid
= fmd_thr_create(hdl
, exs_server
, hp
);
366 fmd_hdl_error(hdl
, "Failed to accept() a new connection");
371 * Listen for and accept incoming connections.
372 * There is only one such thread.
375 exs_listen(void *arg
)
377 fmd_hdl_t
*hdl
= (fmd_hdl_t
*)arg
;
385 if (poll(&pfd
, 1, -1) <= 0)
386 continue; /* loop around and check Acc_quit */
388 if (pfd
.revents
& (POLLHUP
| POLLERR
)) {
389 fmd_hdl_debug(hdl
, "xport - poll hangup/err on "
393 break; /* thread exits */
396 if (pfd
.revents
& POLLIN
)
400 fmd_hdl_debug(hdl
, "xport - exiting accept-listen thread");
404 * Prepare to accept a connection.
405 * Return 0 for success, nonzero for failure.
408 exs_prep_accept(fmd_hdl_t
*hdl
, int dom
)
410 int flags
, optval
= 1;
413 if (Acc
.c_sd
!= EXS_SD_FREE
)
414 return; /* nothing to do */
417 fmd_thr_destroy(hdl
, Acc_tid
);
418 Acc_tid
= EXS_TID_FREE
;
421 /* Check to see if the DSCP interface is configured */
422 if ((rv
= dscpAddr(dom
, DSCP_ADDR_LOCAL
,
423 (struct sockaddr
*)&Acc
.c_saddr
, &Acc
.c_len
)) != DSCP_OK
) {
424 fmd_hdl_debug(hdl
, "xport - dscpAddr on the accept socket "
425 "failed for domain %d : rv = %d", dom
, rv
);
429 if ((Acc
.c_sd
= socket(AF_INET
, SOCK_STREAM
, 0)) == -1) {
430 fmd_hdl_error(hdl
, "Failed to create the accept socket");
434 if (setsockopt(Acc
.c_sd
, SOL_SOCKET
, SO_REUSEADDR
, &optval
,
436 fmd_hdl_error(hdl
, "Failed to set REUSEADDR for the accept "
442 /* Bind the socket to the local IP address of the DSCP link */
443 if ((rv
= dscpBind(dom
, Acc
.c_sd
, EXS_SERVER_PORT
)) != DSCP_OK
) {
444 if (rv
== DSCP_ERROR_DOWN
) {
445 fmd_hdl_debug(hdl
, "xport - dscp link for domain %d "
448 fmd_hdl_debug(hdl
, "dscpBind on the accept socket "
449 "failed : rv = %d\n", rv
);
455 /* Activate IPsec security policy for this socket */
456 if ((rv
= dscpSecure(dom
, Acc
.c_sd
)) != DSCP_OK
) {
457 fmd_hdl_error(hdl
, "dscpSecure on the accept socket failed : "
458 "rv = %d\n", dom
, rv
);
463 if ((listen(Acc
.c_sd
, EXS_NUM_SOCKS
)) == -1) {
464 fmd_hdl_debug(hdl
, "Failed to listen() for connections");
469 flags
= fcntl(Acc
.c_sd
, F_GETFL
, 0);
470 (void) fcntl(Acc
.c_sd
, F_SETFL
, flags
| O_NONBLOCK
);
472 Acc_tid
= fmd_thr_create(hdl
, exs_listen
, hdl
);
476 * * * * * * * * * * * * * * * * * * * * * * * * * * *
477 * ETM-to-Transport API Connection Management routines
478 * * * * * * * * * * * * * * * * * * * * * * * * * * *
482 * Initialize and setup any transport infrastructure before any connections
484 * Return etm_xport_hdl_t for success, NULL for failure.
487 etm_xport_init(fmd_hdl_t
*hdl
, char *endpoint_id
,
488 int (*cb_func
)(fmd_hdl_t
*hdl
, etm_xport_conn_t conn
, etm_cb_flag_t flag
,
489 void *arg
), void *cb_func_arg
)
491 exs_hdl_t
*hp
, *curr
;
494 if (exs_get_id(hdl
, endpoint_id
, &dom
))
497 (void) pthread_mutex_lock(&List_lock
);
499 /* Check for a duplicate endpoint_id on the list */
500 for (curr
= Exh_head
; curr
; curr
= curr
->h_next
) {
501 if (dom
== curr
->h_dom
) {
502 fmd_hdl_debug(hdl
, "xport - init failed, "
503 "duplicate domain id : %d\n", dom
);
504 (void) pthread_mutex_unlock(&List_lock
);
509 if (Exh_head
== NULL
) {
510 /* Do one-time initializations */
511 exs_filter_init(hdl
);
513 /* Initialize the accept/listen vars */
514 Acc
.c_sd
= EXS_SD_FREE
;
515 Acc_tid
= EXS_TID_FREE
;
520 hp
= exs_hdl_alloc(hdl
, endpoint_id
, cb_func
, cb_func_arg
, dom
);
522 /* Add this transport instance handle to the list */
523 hp
->h_next
= Exh_head
;
526 (void) pthread_mutex_unlock(&List_lock
);
528 exs_prep_accept(hdl
, dom
);
530 return ((etm_xport_hdl_t
)hp
);
534 * Teardown any transport infrastructure after all connections are closed.
535 * Return 0 for success, or nonzero for failure.
538 etm_xport_fini(fmd_hdl_t
*hdl
, etm_xport_hdl_t tlhdl
)
540 exs_hdl_t
*hp
= (exs_hdl_t
*)tlhdl
;
541 exs_hdl_t
*xp
, **ppx
;
543 (void) pthread_mutex_lock(&List_lock
);
547 for (xp
= *ppx
; xp
; xp
= xp
->h_next
) {
555 (void) pthread_mutex_unlock(&List_lock
);
556 fmd_hdl_abort(hdl
, "xport - fini failed, tlhdl %p not on list",
563 if (hp
->h_tid
!= EXS_TID_FREE
) {
565 fmd_thr_signal(hdl
, hp
->h_tid
);
566 fmd_thr_destroy(hdl
, hp
->h_tid
);
569 if (hp
->h_server
.c_sd
!= EXS_SD_FREE
)
570 (void) close(hp
->h_server
.c_sd
);
572 if (hp
->h_client
.c_sd
!= EXS_SD_FREE
)
573 (void) close(hp
->h_client
.c_sd
);
575 fmd_hdl_strfree(hdl
, hp
->h_endpt_id
);
576 fmd_hdl_free(hdl
, hp
, sizeof (exs_hdl_t
));
578 if (Exh_head
== NULL
) {
579 /* Undo one-time initializations */
580 exs_filter_fini(hdl
);
582 /* Destroy the accept/listen thread */
583 if (Acc_tid
!= EXS_TID_FREE
) {
585 fmd_thr_signal(hdl
, Acc_tid
);
586 fmd_thr_destroy(hdl
, Acc_tid
);
589 if (Acc
.c_sd
!= EXS_SD_FREE
)
593 (void) pthread_mutex_unlock(&List_lock
);
599 * Open a connection with the given endpoint,
600 * Return etm_xport_conn_t for success, NULL and set errno for failure.
603 etm_xport_open(fmd_hdl_t
*hdl
, etm_xport_hdl_t tlhdl
)
606 exs_hdl_t
*hp
= (exs_hdl_t
*)tlhdl
;
609 fmd_thr_destroy(hp
->h_hdl
, hp
->h_tid
);
610 hp
->h_tid
= EXS_TID_FREE
;
614 if (hp
->h_client
.c_sd
== EXS_SD_FREE
) {
615 if (exs_prep_client(hp
) != 0)
619 /* Set the socket to be non-blocking */
620 flags
= fcntl(hp
->h_client
.c_sd
, F_GETFL
, 0);
621 (void) fcntl(hp
->h_client
.c_sd
, F_SETFL
, flags
| O_NONBLOCK
);
623 if ((connect(hp
->h_client
.c_sd
,
624 (struct sockaddr
*)&hp
->h_client
.c_saddr
,
625 hp
->h_client
.c_len
)) == -1) {
626 if (errno
!= EINPROGRESS
) {
627 fmd_hdl_debug(hdl
, "xport - failed to connect to %s",
629 EXS_CLOSE_CLR(hp
->h_client
);
634 fmd_hdl_debug(hdl
, "xport - connected client socket for %s",
637 return (&hp
->h_client
);
641 * Close a connection from either endpoint.
642 * Return zero for success, nonzero for failure.
646 etm_xport_close(fmd_hdl_t
*hdl
, etm_xport_conn_t conn
)
648 exs_conn_t
*cp
= (exs_conn_t
*)conn
;
650 if (cp
->c_sd
== EXS_SD_FREE
)
651 return (0); /* Connection already closed */
653 (void) close(cp
->c_sd
);
654 cp
->c_sd
= EXS_SD_FREE
;
660 * * * * * * * * * * * * * * * * * *
661 * ETM-to-Transport API I/O routines
662 * * * * * * * * * * * * * * * * * *
666 * Try to read byte_cnt bytes from the connection into the given buffer.
667 * Return how many bytes actually read for success, negative value for failure.
670 etm_xport_read(fmd_hdl_t
*hdl
, etm_xport_conn_t conn
, hrtime_t timeout
,
671 void *buf
, size_t byte_cnt
)
673 ssize_t len
, nbytes
= 0;
674 hrtime_t endtime
, sleeptime
;
676 char *ptr
= (char *)buf
;
677 exs_conn_t
*cp
= (exs_conn_t
*)conn
;
679 if (cp
->c_sd
== EXS_SD_FREE
) {
680 fmd_hdl_debug(hdl
, "xport - read socket %d is closed\n",
685 endtime
= gethrtime() + timeout
;
686 sleeptime
= timeout
/ EXS_IO_SLEEP_DIV
;
689 tms
.tv_nsec
= sleeptime
;
691 while (nbytes
< byte_cnt
) {
692 if (gethrtime() < endtime
) {
693 if ((len
= recv(cp
->c_sd
, ptr
, byte_cnt
- nbytes
,
695 if (errno
!= EINTR
&& errno
!= EWOULDBLOCK
) {
696 fmd_hdl_debug(hdl
, "xport - recv "
697 "failed for socket %d", cp
->c_sd
);
700 (void) nanosleep(&tms
, 0);
702 } else if (len
== 0) {
703 fmd_hdl_debug(hdl
, "xport - remote endpt "
704 "closed for socket %d", cp
->c_sd
);
711 fmd_hdl_debug(hdl
, "xport - read timed out for socket "
724 * Try to write byte_cnt bytes to the connection from the given buffer.
725 * Return how many bytes actually written for success, negative value
729 etm_xport_write(fmd_hdl_t
*hdl
, etm_xport_conn_t conn
, hrtime_t timeout
,
730 void *buf
, size_t byte_cnt
)
732 ssize_t len
, nbytes
= 0;
733 hrtime_t endtime
, sleeptime
;
735 char *ptr
= (char *)buf
;
736 exs_conn_t
*cp
= (exs_conn_t
*)conn
;
738 if (cp
->c_sd
== EXS_SD_FREE
) {
739 fmd_hdl_debug(hdl
, "xport - write socket %d is closed\n",
744 endtime
= gethrtime() + timeout
;
745 sleeptime
= timeout
/ EXS_IO_SLEEP_DIV
;
748 tms
.tv_nsec
= sleeptime
;
750 while (nbytes
< byte_cnt
) {
751 if (gethrtime() < endtime
) {
752 if ((len
= send(cp
->c_sd
, ptr
, byte_cnt
- nbytes
,
754 if (errno
!= EINTR
&& errno
!= EWOULDBLOCK
) {
755 fmd_hdl_debug(hdl
, "xport - send "
756 "failed for socket %d", cp
->c_sd
);
759 (void) nanosleep(&tms
, 0);
766 fmd_hdl_debug(hdl
, "xport - write timed out for socket "
779 * * * * * * * * * * * * * * * * * * * *
780 * ETM-to-Transport API Filter routines
781 * * * * * * * * * * * * * * * * * * * *
785 * Call the platform's send_filter function.
786 * Otherwise return ETM_XPORT_FILTER_OK.
789 etm_xport_send_filter(fmd_hdl_t
*hdl
, nvlist_t
*event
, const char *dest
)
791 if (Send_filter
!= NULL
)
792 return (Send_filter(hdl
, event
, dest
));
794 return (ETM_XPORT_FILTER_OK
);
798 * Call the platform's post_filter function.
799 * Otherwise return ETM_XPORT_FILTER_OK.
802 etm_xport_post_filter(fmd_hdl_t
*hdl
, nvlist_t
*event
, const char *src
)
804 if (Post_filter
!= NULL
)
805 return (Post_filter(hdl
, event
, src
));
807 return (ETM_XPORT_FILTER_OK
);