4 * The contents of this file are subject to the terms of the
5 * Common Development and Distribution License (the "License").
6 * You may not use this file except in compliance with the License.
8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9 * or http://www.opensolaris.org/os/licensing.
10 * See the License for the specific language governing permissions
11 * and limitations under the License.
13 * When distributing Covered Code, include this CDDL HEADER in each
14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15 * If applicable, add the following below this CDDL HEADER, with the
16 * fields enclosed by brackets "[]" replaced with your own identifying
17 * information: Portions Copyright [yyyy] [name of copyright owner]
23 * Copyright 2006 Sun Microsystems, Inc. All rights reserved.
24 * Use is subject to license terms.
27 #pragma ident "%Z%%M% %I% %E% SMI"
30 * FMA Event Transport Module
32 * Plugin for sending/receiving FMA events to/from a remote endoint.
35 #include <netinet/in.h>
37 #include <sys/fm/protocol.h>
38 #include <sys/sysmacros.h>
43 #include <libnvpair.h>
44 #include "etm_xport_api.h"
45 #include "etm_proto.h"
51 typedef enum etm_connection_status
{
53 C_OPEN
, /* Connection is open */
54 C_CLOSED
, /* Connection is closed */
55 C_LIMBO
, /* Bad value in header from peer */
56 C_TIMED_OUT
/* Reconnection to peer timed out */
59 typedef enum etm_fmd_queue_status
{
60 Q_UNINITIALIZED
= 100,
61 Q_INIT_PENDING
, /* Queue initialization in progress */
62 Q_OPEN
, /* Queue is open */
63 Q_SUSPENDED
/* Queue is suspended */
66 /* Per endpoint data */
67 typedef struct etm_endpoint_map
{
68 uint8_t epm_ver
; /* Protocol version being used */
69 char *epm_ep_str
; /* Endpoint ID string */
70 int epm_xprtflags
; /* FMD transport open flags */
71 etm_xport_hdl_t epm_tlhdl
; /* Transport Layer instance handle */
72 pthread_mutex_t epm_lock
; /* Protects remainder of struct */
73 pthread_cond_t epm_tx_cv
; /* Cond var for send/transmit */
74 int epm_txbusy
; /* Busy doing send/transmit */
75 fmd_xprt_t
*epm_xprthdl
; /* FMD transport handle */
76 etm_qstat_t epm_qstat
; /* Status of fmd xprt queue */
77 nvlist_t
*epm_ep_nvl
; /* Endpoint ID nv_list */
78 etm_xport_conn_t epm_oconn
; /* Connection for outgoing events */
79 etm_connstat_t epm_cstat
; /* Status of connection */
80 id_t epm_timer_id
; /* Timer id */
81 int epm_timer_in_use
; /* Indicates if timer is in use */
82 hrtime_t epm_reconn_end
; /* Reconnection end time */
83 struct etm_endpoint_map
*epm_next
;
86 #define ETM_HDR_INVALID (ETM_HDR_TYPE_TOO_HIGH + 1)
87 #define ETM_HDR_BADVERSION (ETM_HDR_TYPE_TOO_HIGH + 2)
88 #define ETM_HDR_BADTYPE (ETM_HDR_TYPE_TOO_HIGH + 3)
89 #define ETM_EP_INST_MAX 4 /* Max chars in endpt instance */
90 #define ETM_CLIENT_XPRT_FLAGS FMD_XPRT_RDWR
91 #define ETM_SERVER_XPRT_FLAGS (FMD_XPRT_RDWR | FMD_XPRT_ACCEPT)
93 #define ALLOC_BUF(hdl, buf, size) \
94 buf = fmd_hdl_zalloc((hdl), (size), FMD_SLEEP);
96 #define FREE_BUF(hdl, buf, size) fmd_hdl_free((hdl), (buf), (size));
98 #define IS_CLIENT(mp) (((mp)->epm_xprtflags & FMD_XPRT_ACCEPT) ? 0 : 1)
100 #define INCRSTAT(x) { (void) pthread_mutex_lock(&Etm_mod_lock); \
102 (void) pthread_mutex_unlock(&Etm_mod_lock); \
105 #define DECRSTAT(x) { (void) pthread_mutex_lock(&Etm_mod_lock); \
107 (void) pthread_mutex_unlock(&Etm_mod_lock); \
110 #define ADDSTAT(x, y) { (void) pthread_mutex_lock(&Etm_mod_lock); \
112 (void) pthread_mutex_unlock(&Etm_mod_lock); \
118 static pthread_mutex_t Etm_mod_lock
= PTHREAD_MUTEX_INITIALIZER
;
119 /* Protects globals */
120 static hrtime_t Reconn_interval
; /* Time between reconnection attempts */
121 static hrtime_t Reconn_timeout
; /* Time allowed for reconnection */
122 static hrtime_t Rw_timeout
; /* Time allowed for I/O operation */
123 static int Etm_dump
= 0; /* Enables hex dump for debug */
124 static int Etm_exit
= 0; /* Flag for exit */
125 static etm_epmap_t
*Epmap_head
= NULL
; /* Head of list of epmap structs */
127 /* Module statistics */
128 static struct etm_stats
{
131 fmd_stat_t read_bytes
;
133 fmd_stat_t post_filter
;
135 fmd_stat_t write_ack
;
136 fmd_stat_t write_bytes
;
137 fmd_stat_t write_msg
;
138 fmd_stat_t send_filter
;
140 fmd_stat_t error_protocol
;
141 fmd_stat_t error_drop_read
;
142 fmd_stat_t error_read
;
143 fmd_stat_t error_read_badhdr
;
144 fmd_stat_t error_write
;
145 fmd_stat_t error_send_filter
;
146 fmd_stat_t error_post_filter
;
148 fmd_stat_t peer_count
;
152 { "read_ack", FMD_TYPE_UINT64
, "ACKs read" },
153 { "read_bytes", FMD_TYPE_UINT64
, "Bytes read" },
154 { "read_msg", FMD_TYPE_UINT64
, "Messages read" },
155 { "post_filter", FMD_TYPE_UINT64
, "Drops by post_filter" },
157 { "write_ack", FMD_TYPE_UINT64
, "ACKs sent" },
158 { "write_bytes", FMD_TYPE_UINT64
, "Bytes sent" },
159 { "write_msg", FMD_TYPE_UINT64
, "Messages sent" },
160 { "send_filter", FMD_TYPE_UINT64
, "Drops by send_filter" },
161 /* ETM error counters */
162 { "error_protocol", FMD_TYPE_UINT64
, "ETM protocol errors" },
163 { "error_drop_read", FMD_TYPE_UINT64
, "Dropped read messages" },
164 { "error_read", FMD_TYPE_UINT64
, "Read I/O errors" },
165 { "error_read_badhdr", FMD_TYPE_UINT64
, "Bad headers read" },
166 { "error_write", FMD_TYPE_UINT64
, "Write I/O errors" },
167 { "error_send_filter", FMD_TYPE_UINT64
, "Send filter errors" },
168 { "error_post_filter", FMD_TYPE_UINT64
, "Post filter errors" },
170 { "peer_count", FMD_TYPE_UINT64
, "Number of peers initialized" },
174 * ETM Private functions
178 * Hex dump for debug.
181 etm_hex_dump(fmd_hdl_t
*hdl
, void *buf
, size_t buflen
, int direction
)
189 j
= buflen
/ 16; /* Number of complete 8-column rows */
190 k
= buflen
% 16; /* Is there a last (non-8-column) row? */
193 fmd_hdl_debug(hdl
, "--- WRITE Message Dump ---");
195 fmd_hdl_debug(hdl
, "--- READ Message Dump ---");
197 fmd_hdl_debug(hdl
, " Displaying %d bytes", buflen
);
199 /* Dump the complete 8-column rows */
200 for (i
= 0; i
< j
; i
++) {
201 c
= (int16_t *)buf
+ (i
* 8);
202 fmd_hdl_debug(hdl
, "%3d: %4x %4x %4x %4x %4x %4x %4x %4x", i
,
203 *(c
+0), *(c
+1), *(c
+2), *(c
+3),
204 *(c
+4), *(c
+5), *(c
+6), *(c
+7));
207 /* Dump the last (incomplete) row */
208 c
= (int16_t *)buf
+ (i
* 8);
211 fmd_hdl_debug(hdl
, "%3d: %4x %4x", i
, *(c
+0), *(c
+1));
214 fmd_hdl_debug(hdl
, "%3d: %4x %4x %4x %4x", i
, *(c
+0), *(c
+1),
218 fmd_hdl_debug(hdl
, "%3d: %4x %4x %4x %4x %4x %4x", i
, *(c
+0),
219 *(c
+1), *(c
+2), *(c
+3), *(c
+4), *(c
+5));
223 fmd_hdl_debug(hdl
, "--- End Dump ---");
227 * Provide the length of a message based on the data in the given ETM header.
230 etm_get_msglen(void *buf
)
232 etm_proto_hdr_t
*hp
= (etm_proto_hdr_t
*)buf
;
234 return (ntohl(hp
->hdr_msglen
));
238 * Check the contents of the ETM header for errors.
239 * Return the header type (hdr_type).
242 etm_check_hdr(fmd_hdl_t
*hdl
, etm_epmap_t
*mp
, void *buf
)
244 etm_proto_hdr_t
*hp
= (etm_proto_hdr_t
*)buf
;
246 if (bcmp(hp
->hdr_delim
, ETM_DELIM
, ETM_DELIMLEN
) != 0) {
247 fmd_hdl_debug(hdl
, "Bad delimiter in ETM header from %s "
248 ": 0x%x\n", mp
->epm_ep_str
, hp
->hdr_delim
);
249 return (ETM_HDR_INVALID
);
252 if ((hp
->hdr_type
== ETM_HDR_C_HELLO
) ||
253 (hp
->hdr_type
== ETM_HDR_S_HELLO
)) {
254 /* Until version is negotiated, other fields may be wrong */
255 return (hp
->hdr_type
);
258 if (hp
->hdr_ver
!= mp
->epm_ver
) {
259 fmd_hdl_debug(hdl
, "Bad version in ETM header from %s : 0x%x\n",
260 mp
->epm_ep_str
, hp
->hdr_ver
);
261 return (ETM_HDR_BADVERSION
);
264 if ((hp
->hdr_type
== ETM_HDR_TYPE_TOO_LOW
) ||
265 (hp
->hdr_type
>= ETM_HDR_TYPE_TOO_HIGH
)) {
266 fmd_hdl_debug(hdl
, "Bad type in ETM header from %s : 0x%x\n",
267 mp
->epm_ep_str
, hp
->hdr_type
);
268 return (ETM_HDR_BADTYPE
);
271 return (hp
->hdr_type
);
275 * Create an ETM header of a given type in the given buffer.
276 * Return length of header.
279 etm_create_hdr(void *buf
, uint8_t ver
, uint8_t type
, uint32_t msglen
)
281 etm_proto_hdr_t
*hp
= (etm_proto_hdr_t
*)buf
;
283 bcopy(ETM_DELIM
, hp
->hdr_delim
, ETM_DELIMLEN
);
286 hp
->hdr_msglen
= htonl(msglen
);
292 * Convert message bytes to nvlist and post to fmd.
293 * Return zero for success, non-zero for failure.
295 * Note : nvl is free'd by fmd.
298 etm_post_msg(fmd_hdl_t
*hdl
, etm_epmap_t
*mp
, void *buf
, size_t buflen
)
303 if (nvlist_unpack((char *)buf
, buflen
, &nvl
, 0)) {
304 fmd_hdl_error(hdl
, "failed to unpack message");
308 rv
= etm_xport_post_filter(hdl
, nvl
, mp
->epm_ep_str
);
309 if (rv
== ETM_XPORT_FILTER_DROP
) {
310 fmd_hdl_debug(hdl
, "post_filter dropped event");
311 INCRSTAT(Etm_stats
.post_filter
.fmds_value
.ui64
);
314 } else if (rv
== ETM_XPORT_FILTER_ERROR
) {
315 fmd_hdl_debug(hdl
, "post_filter error : %s", strerror(errno
));
316 INCRSTAT(Etm_stats
.error_post_filter
.fmds_value
.ui64
);
317 /* Still post event */
320 (void) pthread_mutex_lock(&mp
->epm_lock
);
321 (void) pthread_mutex_lock(&Etm_mod_lock
);
323 (void) pthread_mutex_unlock(&Etm_mod_lock
);
324 if (mp
->epm_qstat
== Q_OPEN
) {
325 fmd_xprt_post(hdl
, mp
->epm_xprthdl
, nvl
, 0);
327 } else if (mp
->epm_qstat
== Q_SUSPENDED
) {
328 fmd_xprt_resume(hdl
, mp
->epm_xprthdl
);
329 if (mp
->epm_timer_in_use
) {
330 fmd_timer_remove(hdl
, mp
->epm_timer_id
);
331 mp
->epm_timer_in_use
= 0;
333 mp
->epm_qstat
= Q_OPEN
;
334 fmd_hdl_debug(hdl
, "queue resumed for %s",
336 fmd_xprt_post(hdl
, mp
->epm_xprthdl
, nvl
, 0);
339 fmd_hdl_debug(hdl
, "unable to post message, qstat = %d",
342 /* Remote peer will attempt to resend event */
346 (void) pthread_mutex_unlock(&Etm_mod_lock
);
347 fmd_hdl_debug(hdl
, "unable to post message, module exiting");
349 /* Remote peer will attempt to resend event */
353 (void) pthread_mutex_unlock(&mp
->epm_lock
);
359 * Handle the startup handshake to the server. The client always initiates
360 * the startup handshake. In the following sequence, we are the client and
361 * the remote endpoint is the server.
363 * Client sends C_HELLO and transitions to Q_INIT_PENDING state.
364 * Server sends S_HELLO and transitions to Q_INIT_PENDING state.
365 * Client sends ACK and transitions to Q_OPEN state.
366 * Server receives ACK and transitions to Q_OPEN state.
368 * Return 0 for success, nonzero for failure.
371 etm_handle_startup(fmd_hdl_t
*hdl
, etm_epmap_t
*mp
)
374 size_t hdrlen
= ETM_HDRLEN
;
376 char hbuf
[ETM_HDRLEN
];
378 if ((mp
->epm_oconn
= etm_xport_open(hdl
, mp
->epm_tlhdl
)) == NULL
)
381 mp
->epm_cstat
= C_OPEN
;
383 hdrlen
= etm_create_hdr(hbuf
, mp
->epm_ver
, ETM_HDR_C_HELLO
, 0);
385 if ((etm_xport_write(hdl
, mp
->epm_oconn
, Rw_timeout
, hbuf
,
386 hdrlen
)) != hdrlen
) {
387 fmd_hdl_error(hdl
, "Failed to write C_HELLO to %s",
392 mp
->epm_qstat
= Q_INIT_PENDING
;
394 if ((etm_xport_read(hdl
, mp
->epm_oconn
, Rw_timeout
, hbuf
,
395 hdrlen
)) != hdrlen
) {
396 fmd_hdl_error(hdl
, "Failed to read S_HELLO from %s",
401 hdrstat
= etm_check_hdr(hdl
, mp
, hbuf
);
403 if (hdrstat
!= ETM_HDR_S_HELLO
) {
404 fmd_hdl_error(hdl
, "Protocol error, did not receive S_HELLO "
405 "from %s", mp
->epm_ep_str
);
410 * Get version from the server.
411 * Currently, only one version is supported.
413 hp
= (etm_proto_hdr_t
*)(void *)hbuf
;
414 if (hp
->hdr_ver
!= ETM_PROTO_V1
) {
415 fmd_hdl_error(hdl
, "Unable to use same version as %s : %d",
416 mp
->epm_ep_str
, hp
->hdr_ver
);
419 mp
->epm_ver
= hp
->hdr_ver
;
421 hdrlen
= etm_create_hdr(hbuf
, mp
->epm_ver
, ETM_HDR_ACK
, 0);
423 if ((etm_xport_write(hdl
, mp
->epm_oconn
, Rw_timeout
, hbuf
,
424 hdrlen
)) != hdrlen
) {
425 fmd_hdl_error(hdl
, "Failed to write ACK for S_HELLO to %s",
431 * Call fmd_xprt_open and fmd_xprt_setspecific with
432 * Etm_mod_lock held to avoid race with etm_send thread.
434 (void) pthread_mutex_lock(&Etm_mod_lock
);
435 if ((mp
->epm_xprthdl
= fmd_xprt_open(hdl
, mp
->epm_xprtflags
,
436 mp
->epm_ep_nvl
, NULL
)) == NULL
) {
437 fmd_hdl_abort(hdl
, "Failed to init xprthdl for %s",
440 fmd_xprt_setspecific(hdl
, mp
->epm_xprthdl
, mp
);
441 (void) pthread_mutex_unlock(&Etm_mod_lock
);
443 mp
->epm_qstat
= Q_OPEN
;
444 fmd_hdl_debug(hdl
, "queue open for %s", mp
->epm_ep_str
);
450 * Open a connection to the peer, send a SHUTDOWN message,
451 * and close the connection.
454 etm_send_shutdown(fmd_hdl_t
*hdl
, etm_epmap_t
*mp
)
456 size_t hdrlen
= ETM_HDRLEN
;
457 char hbuf
[ETM_HDRLEN
];
459 if ((mp
->epm_oconn
= etm_xport_open(hdl
, mp
->epm_tlhdl
)) == NULL
)
462 hdrlen
= etm_create_hdr(hbuf
, mp
->epm_ver
, ETM_HDR_SHUTDOWN
, 0);
464 (void) etm_xport_write(hdl
, mp
->epm_oconn
, Rw_timeout
, hbuf
, hdrlen
);
466 (void) etm_xport_close(hdl
, mp
->epm_oconn
);
467 mp
->epm_oconn
= NULL
;
471 * Alloc a nvlist and add a string for the endpoint.
472 * Return zero for success, non-zero for failure.
475 etm_get_ep_nvl(fmd_hdl_t
*hdl
, etm_epmap_t
*mp
)
478 * Cannot use nvlist_xalloc(3NVPAIR) due to a recursive mutex situation
479 * in fmd when this nvlist_t is free'd.
481 (void) nvlist_alloc(&mp
->epm_ep_nvl
, NV_UNIQUE_NAME
, 0);
483 if (nvlist_add_string(mp
->epm_ep_nvl
, "domain-id", mp
->epm_ep_str
)) {
484 fmd_hdl_error(hdl
, "failed to add domain-id string to nvlist "
485 "for %s", mp
->epm_ep_str
);
486 nvlist_free(mp
->epm_ep_nvl
);
494 * Free the nvlist for the endpoint_id string.
498 etm_free_ep_nvl(fmd_hdl_t
*hdl
, etm_epmap_t
*mp
)
500 nvlist_free(mp
->epm_ep_nvl
);
504 * Check for a duplicate endpoint/peer string.
508 etm_check_dup_ep_str(fmd_hdl_t
*hdl
, char *epname
)
512 for (mp
= Epmap_head
; mp
!= NULL
; mp
= mp
->epm_next
)
513 if (strcmp(epname
, mp
->epm_ep_str
) == 0)
520 * Attempt to re-open a connection with the remote endpoint.
523 etm_reconnect(fmd_hdl_t
*hdl
, etm_epmap_t
*mp
)
525 if ((mp
->epm_reconn_end
> 0) && (mp
->epm_cstat
== C_UNINITIALIZED
)) {
526 if (gethrtime() < mp
->epm_reconn_end
) {
527 if ((mp
->epm_oconn
= etm_xport_open(hdl
,
528 mp
->epm_tlhdl
)) == NULL
) {
529 fmd_hdl_debug(hdl
, "reconnect failed for %s",
531 mp
->epm_timer_id
= fmd_timer_install(hdl
, mp
,
532 NULL
, Reconn_interval
);
533 mp
->epm_timer_in_use
= 1;
535 fmd_hdl_debug(hdl
, "reconnect success for %s",
537 mp
->epm_reconn_end
= 0;
538 mp
->epm_cstat
= C_OPEN
;
541 fmd_hdl_error(hdl
, "Reconnect timed out for %s\n",
543 mp
->epm_reconn_end
= 0;
544 mp
->epm_cstat
= C_TIMED_OUT
;
548 if (mp
->epm_cstat
== C_OPEN
) {
549 fmd_xprt_resume(hdl
, mp
->epm_xprthdl
);
550 mp
->epm_qstat
= Q_OPEN
;
551 fmd_hdl_debug(hdl
, "queue resumed for %s", mp
->epm_ep_str
);
556 * Suspend a given connection and setup for reconnection retries.
557 * Assume caller holds lock on epm_lock.
560 etm_suspend_reconnect(fmd_hdl_t
*hdl
, etm_epmap_t
*mp
)
562 (void) pthread_mutex_lock(&Etm_mod_lock
);
564 (void) pthread_mutex_unlock(&Etm_mod_lock
);
567 (void) pthread_mutex_unlock(&Etm_mod_lock
);
569 if (mp
->epm_oconn
!= NULL
) {
570 (void) etm_xport_close(hdl
, mp
->epm_oconn
);
571 mp
->epm_oconn
= NULL
;
574 mp
->epm_reconn_end
= gethrtime() + Reconn_timeout
;
575 mp
->epm_cstat
= C_UNINITIALIZED
;
577 if (mp
->epm_xprthdl
!= NULL
) {
578 fmd_xprt_suspend(hdl
, mp
->epm_xprthdl
);
579 mp
->epm_qstat
= Q_SUSPENDED
;
580 fmd_hdl_debug(hdl
, "queue suspended for %s", mp
->epm_ep_str
);
582 if (mp
->epm_timer_in_use
== 0) {
583 mp
->epm_timer_id
= fmd_timer_install(hdl
, mp
, NULL
,
585 mp
->epm_timer_in_use
= 1;
591 * Reinitialize the connection. The old fmd_xprt_t handle must be
592 * removed/closed first.
593 * Assume caller holds lock on epm_lock.
596 etm_reinit(fmd_hdl_t
*hdl
, etm_epmap_t
*mp
)
599 * To avoid a deadlock, wait for etm_send to finish before
600 * calling fmd_xprt_close()
602 while (mp
->epm_txbusy
)
603 (void) pthread_cond_wait(&mp
->epm_tx_cv
, &mp
->epm_lock
);
605 if (mp
->epm_xprthdl
!= NULL
) {
606 fmd_xprt_close(hdl
, mp
->epm_xprthdl
);
607 fmd_hdl_debug(hdl
, "queue closed for %s", mp
->epm_ep_str
);
608 mp
->epm_xprthdl
= NULL
;
609 /* mp->epm_ep_nvl is free'd in fmd_xprt_close */
610 mp
->epm_ep_nvl
= NULL
;
613 if (mp
->epm_timer_in_use
) {
614 fmd_timer_remove(hdl
, mp
->epm_timer_id
);
615 mp
->epm_timer_in_use
= 0;
618 if (mp
->epm_oconn
!= NULL
) {
619 (void) etm_xport_close(hdl
, mp
->epm_oconn
);
620 mp
->epm_oconn
= NULL
;
623 mp
->epm_cstat
= C_UNINITIALIZED
;
624 mp
->epm_qstat
= Q_UNINITIALIZED
;
628 * Receive data from ETM transport layer.
629 * Note : This is not the fmdo_recv entry point.
633 etm_recv(fmd_hdl_t
*hdl
, etm_xport_conn_t conn
, etm_epmap_t
*mp
)
635 size_t buflen
, hdrlen
;
637 char hbuf
[ETM_HDRLEN
];
642 if ((etm_xport_read(hdl
, conn
, Rw_timeout
, hbuf
, hdrlen
)) != hdrlen
) {
643 fmd_hdl_debug(hdl
, "failed to read header from %s",
645 INCRSTAT(Etm_stats
.error_read
.fmds_value
.ui64
);
649 hdrstat
= etm_check_hdr(hdl
, mp
, hbuf
);
652 case ETM_HDR_INVALID
:
653 (void) pthread_mutex_lock(&mp
->epm_lock
);
654 if (mp
->epm_cstat
== C_OPEN
)
655 mp
->epm_cstat
= C_CLOSED
;
656 (void) pthread_mutex_unlock(&mp
->epm_lock
);
658 INCRSTAT(Etm_stats
.error_read_badhdr
.fmds_value
.ui64
);
662 case ETM_HDR_BADTYPE
:
663 case ETM_HDR_BADVERSION
:
664 hdrlen
= etm_create_hdr(hbuf
, mp
->epm_ver
, ETM_HDR_NAK
, 0);
666 if ((etm_xport_write(hdl
, conn
, Rw_timeout
, hbuf
,
667 hdrlen
)) != hdrlen
) {
668 fmd_hdl_debug(hdl
, "failed to write NAK to %s",
670 INCRSTAT(Etm_stats
.error_write
.fmds_value
.ui64
);
674 (void) pthread_mutex_lock(&mp
->epm_lock
);
675 mp
->epm_cstat
= C_LIMBO
;
676 (void) pthread_mutex_unlock(&mp
->epm_lock
);
678 INCRSTAT(Etm_stats
.error_read_badhdr
.fmds_value
.ui64
);
682 case ETM_HDR_C_HELLO
:
683 /* Client is initiating a startup handshake */
684 (void) pthread_mutex_lock(&mp
->epm_lock
);
686 mp
->epm_qstat
= Q_INIT_PENDING
;
687 (void) pthread_mutex_unlock(&mp
->epm_lock
);
689 hdrlen
= etm_create_hdr(hbuf
, mp
->epm_ver
, ETM_HDR_S_HELLO
, 0);
691 if ((etm_xport_write(hdl
, conn
, Rw_timeout
, hbuf
,
692 hdrlen
)) != hdrlen
) {
693 fmd_hdl_debug(hdl
, "failed to write S_HELLO to %s",
695 INCRSTAT(Etm_stats
.error_write
.fmds_value
.ui64
);
703 (void) pthread_mutex_lock(&mp
->epm_lock
);
704 if (mp
->epm_qstat
== Q_INIT_PENDING
) {
705 /* This is client's ACK from startup handshake */
706 /* mp->epm_ep_nvl is free'd in fmd_xprt_close */
707 if (mp
->epm_ep_nvl
== NULL
)
708 (void) etm_get_ep_nvl(hdl
, mp
);
711 * Call fmd_xprt_open and fmd_xprt_setspecific with
712 * Etm_mod_lock held to avoid race with etm_send thread.
714 (void) pthread_mutex_lock(&Etm_mod_lock
);
715 if ((mp
->epm_xprthdl
= fmd_xprt_open(hdl
,
716 mp
->epm_xprtflags
, mp
->epm_ep_nvl
, NULL
)) == NULL
) {
717 fmd_hdl_abort(hdl
, "Failed to init xprthdl "
718 "for %s", mp
->epm_ep_str
);
720 fmd_xprt_setspecific(hdl
, mp
->epm_xprthdl
, mp
);
721 (void) pthread_mutex_unlock(&Etm_mod_lock
);
723 mp
->epm_qstat
= Q_OPEN
;
724 (void) pthread_mutex_unlock(&mp
->epm_lock
);
725 fmd_hdl_debug(hdl
, "queue open for %s",
728 (void) pthread_mutex_unlock(&mp
->epm_lock
);
729 fmd_hdl_debug(hdl
, "protocol error, not expecting ACK "
730 "from %s\n", mp
->epm_ep_str
);
731 INCRSTAT(Etm_stats
.error_protocol
.fmds_value
.ui64
);
737 case ETM_HDR_SHUTDOWN
:
738 fmd_hdl_debug(hdl
, "received shutdown from %s",
741 (void) pthread_mutex_lock(&mp
->epm_lock
);
747 * A server shutdown is considered to be temporary.
748 * Prepare for reconnection.
750 mp
->epm_timer_id
= fmd_timer_install(hdl
, mp
, NULL
,
753 mp
->epm_timer_in_use
= 1;
756 (void) pthread_mutex_unlock(&mp
->epm_lock
);
762 (void) pthread_mutex_lock(&mp
->epm_lock
);
763 if (mp
->epm_qstat
== Q_UNINITIALIZED
) {
764 /* Peer (client) is unaware that we've restarted */
765 (void) pthread_mutex_unlock(&mp
->epm_lock
);
766 hdrlen
= etm_create_hdr(hbuf
, mp
->epm_ver
,
767 ETM_HDR_S_RESTART
, 0);
769 if ((etm_xport_write(hdl
, conn
, Rw_timeout
, hbuf
,
770 hdrlen
)) != hdrlen
) {
771 fmd_hdl_debug(hdl
, "failed to write S_RESTART "
772 "to %s", mp
->epm_ep_str
);
773 INCRSTAT(Etm_stats
.error_write
.fmds_value
.ui64
);
779 (void) pthread_mutex_unlock(&mp
->epm_lock
);
781 buflen
= etm_get_msglen(hbuf
);
782 ALLOC_BUF(hdl
, buf
, buflen
);
784 if (etm_xport_read(hdl
, conn
, Rw_timeout
, buf
,
786 fmd_hdl_debug(hdl
, "failed to read message from %s",
788 FREE_BUF(hdl
, buf
, buflen
);
789 INCRSTAT(Etm_stats
.error_read
.fmds_value
.ui64
);
793 INCRSTAT(Etm_stats
.read_msg
.fmds_value
.ui64
);
794 ADDSTAT(Etm_stats
.read_bytes
.fmds_value
.ui64
, buflen
);
796 etm_hex_dump(hdl
, buf
, buflen
, 0);
798 if (etm_post_msg(hdl
, mp
, buf
, buflen
)) {
799 INCRSTAT(Etm_stats
.error_drop_read
.fmds_value
.ui64
);
800 FREE_BUF(hdl
, buf
, buflen
);
804 FREE_BUF(hdl
, buf
, buflen
);
806 hdrlen
= etm_create_hdr(hbuf
, mp
->epm_ver
, ETM_HDR_ACK
, 0);
808 if ((etm_xport_write(hdl
, conn
, Rw_timeout
, hbuf
,
809 hdrlen
)) != hdrlen
) {
810 fmd_hdl_debug(hdl
, "failed to write ACK to %s",
812 INCRSTAT(Etm_stats
.error_write
.fmds_value
.ui64
);
816 INCRSTAT(Etm_stats
.write_ack
.fmds_value
.ui64
);
819 * If we got this far and the current state of the
820 * outbound/sending connection is TIMED_OUT or
821 * LIMBO, then we should reinitialize it.
823 (void) pthread_mutex_lock(&mp
->epm_lock
);
824 if (mp
->epm_cstat
== C_TIMED_OUT
||
825 mp
->epm_cstat
== C_LIMBO
) {
826 if (mp
->epm_oconn
!= NULL
) {
827 (void) etm_xport_close(hdl
, mp
->epm_oconn
);
828 mp
->epm_oconn
= NULL
;
830 mp
->epm_cstat
= C_UNINITIALIZED
;
831 fmd_xprt_resume(hdl
, mp
->epm_xprthdl
);
832 if (mp
->epm_timer_in_use
) {
833 fmd_timer_remove(hdl
, mp
->epm_timer_id
);
834 mp
->epm_timer_in_use
= 0;
836 mp
->epm_qstat
= Q_OPEN
;
837 fmd_hdl_debug(hdl
, "queue resumed for %s",
840 (void) pthread_mutex_unlock(&mp
->epm_lock
);
846 fmd_hdl_debug(hdl
, "protocol error, unexpected header "
847 "from %s : %d", mp
->epm_ep_str
, hdrstat
);
848 INCRSTAT(Etm_stats
.error_protocol
.fmds_value
.ui64
);
856 * ETM transport layer callback function.
857 * The transport layer calls this function to :
858 * (a) pass an incoming message (flag == ETM_CBFLAG_RECV)
859 * (b) tell us to reinitialize the connection (flag == ETM_CBFLAG_REINIT)
862 etm_cb_func(fmd_hdl_t
*hdl
, etm_xport_conn_t conn
, etm_cb_flag_t flag
,
865 etm_epmap_t
*mp
= (etm_epmap_t
*)arg
;
868 (void) pthread_mutex_lock(&Etm_mod_lock
);
870 (void) pthread_mutex_unlock(&Etm_mod_lock
);
873 (void) pthread_mutex_unlock(&Etm_mod_lock
);
876 case ETM_CBFLAG_RECV
:
877 rv
= etm_recv(hdl
, conn
, mp
);
879 case ETM_CBFLAG_REINIT
:
880 (void) pthread_mutex_lock(&mp
->epm_lock
);
882 etm_send_shutdown(hdl
, mp
);
883 (void) pthread_mutex_unlock(&mp
->epm_lock
);
885 * Return ECANCELED so the transport layer will close the
886 * server connection. The transport layer is responsible for
887 * reestablishing this connection (should a connection request
888 * arrive from the peer).
893 fmd_hdl_debug(hdl
, "Unknown callback flag : 0x%x", flag
);
901 * Allocate and initialize an etm_epmap_t struct for the given endpoint
905 etm_init_epmap(fmd_hdl_t
*hdl
, char *epname
, int flags
)
909 if (etm_check_dup_ep_str(hdl
, epname
)) {
910 fmd_hdl_debug(hdl
, "skipping duplicate peer : %s", epname
);
914 newmap
= fmd_hdl_zalloc(hdl
, sizeof (etm_epmap_t
), FMD_SLEEP
);
915 newmap
->epm_ep_str
= fmd_hdl_strdup(hdl
, epname
, FMD_SLEEP
);
916 newmap
->epm_xprtflags
= flags
;
917 newmap
->epm_cstat
= C_UNINITIALIZED
;
918 newmap
->epm_qstat
= Q_UNINITIALIZED
;
919 newmap
->epm_ver
= ETM_PROTO_V1
; /* Currently support one proto ver */
920 newmap
->epm_txbusy
= 0;
922 (void) pthread_mutex_init(&newmap
->epm_lock
, NULL
);
923 (void) pthread_cond_init(&newmap
->epm_tx_cv
, NULL
);
925 if (etm_get_ep_nvl(hdl
, newmap
)) {
926 fmd_hdl_strfree(hdl
, newmap
->epm_ep_str
);
927 fmd_hdl_free(hdl
, newmap
, sizeof (etm_epmap_t
));
931 (void) pthread_mutex_lock(&newmap
->epm_lock
);
933 if ((newmap
->epm_tlhdl
= etm_xport_init(hdl
, newmap
->epm_ep_str
,
934 etm_cb_func
, newmap
)) == NULL
) {
935 fmd_hdl_debug(hdl
, "failed to init tlhdl for %s\n",
937 etm_free_ep_nvl(hdl
, newmap
);
938 (void) pthread_mutex_unlock(&newmap
->epm_lock
);
939 (void) pthread_mutex_destroy(&newmap
->epm_lock
);
940 fmd_hdl_strfree(hdl
, newmap
->epm_ep_str
);
941 fmd_hdl_free(hdl
, newmap
, sizeof (etm_epmap_t
));
945 if (IS_CLIENT(newmap
)) {
946 if (etm_handle_startup(hdl
, newmap
)) {
948 * For whatever reason, we could not complete the
949 * startup handshake with the server. Set the timer
952 if (newmap
->epm_oconn
!= NULL
) {
953 (void) etm_xport_close(hdl
, newmap
->epm_oconn
);
954 newmap
->epm_oconn
= NULL
;
956 newmap
->epm_cstat
= C_UNINITIALIZED
;
957 newmap
->epm_qstat
= Q_UNINITIALIZED
;
958 newmap
->epm_timer_id
= fmd_timer_install(hdl
, newmap
,
959 NULL
, Reconn_interval
);
960 newmap
->epm_timer_in_use
= 1;
964 * We may be restarting after a crash. If so, the client
965 * may be unaware of this.
967 etm_send_shutdown(hdl
, newmap
);
970 /* Add this transport instance handle to the list */
971 newmap
->epm_next
= Epmap_head
;
974 (void) pthread_mutex_unlock(&newmap
->epm_lock
);
976 INCRSTAT(Etm_stats
.peer_count
.fmds_value
.ui64
);
980 * Parse the given property list string and call etm_init_epmap
984 etm_create_epmaps(fmd_hdl_t
*hdl
, char *eplist
, int flags
)
986 char *epstr
, *ep
, *prefix
, *lasts
, *numstr
;
987 char epname
[MAXPATHLEN
];
994 * Create a copy of eplist for parsing.
995 * strtok/strtok_r(3C) will insert null chars to the string.
996 * Therefore, fmd_hdl_strdup/fmd_hdl_strfree cannot be used.
998 slen
= strlen(eplist
);
999 epstr
= fmd_hdl_zalloc(hdl
, slen
+ 1, FMD_SLEEP
);
1000 (void) strcpy(epstr
, eplist
);
1003 * The following are supported for the "client_list" and
1004 * "server_list" properties :
1006 * A space-separated list of endpoints.
1007 * "dev:///dom0 dev:///dom1 dev:///dom2"
1009 * An array syntax for a range of instances.
1012 * A combination of both.
1013 * "dev:///dom0 dev:///dom[1:2]"
1015 ep
= strtok_r(epstr
, " ", &lasts
);
1016 while (ep
!= NULL
) {
1017 if (strchr(ep
, '[') != NULL
) {
1019 * This string is using array syntax.
1020 * Check the string for correct syntax.
1022 if ((strchr(ep
, ':') == NULL
) ||
1023 (strchr(ep
, ']') == NULL
)) {
1024 fmd_hdl_error(hdl
, "Syntax error in property "
1025 "that includes : %s\n", ep
);
1026 ep
= strtok_r(NULL
, " ", &lasts
);
1030 /* expand the array syntax */
1031 prefix
= strtok(ep
, "[");
1033 numstr
= strtok(NULL
, ":");
1034 if ((numstr
== NULL
) || (!isdigit(*numstr
))) {
1035 fmd_hdl_error(hdl
, "Syntax error in property "
1036 "that includes : %s[\n", prefix
);
1037 ep
= strtok_r(NULL
, " ", &lasts
);
1042 numstr
= strtok(NULL
, "]");
1043 if ((numstr
== NULL
) || (!isdigit(*numstr
))) {
1044 fmd_hdl_error(hdl
, "Syntax error in property "
1045 "that includes : %s[\n", prefix
);
1046 ep
= strtok_r(NULL
, " ", &lasts
);
1051 nlen
= strlen(prefix
) + ETM_EP_INST_MAX
;
1053 if (nlen
> MAXPATHLEN
) {
1054 fmd_hdl_error(hdl
, "Endpoint prop string "
1055 "exceeds MAXPATHLEN\n");
1056 ep
= strtok_r(NULL
, " ", &lasts
);
1060 for (i
= beg
; i
<= end
; i
++) {
1061 bzero(epname
, MAXPATHLEN
);
1062 (void) snprintf(epname
, nlen
, "%s%d",
1064 etm_init_epmap(hdl
, epname
, flags
);
1067 etm_init_epmap(hdl
, ep
, flags
);
1070 ep
= strtok_r(NULL
, " ", &lasts
);
1073 fmd_hdl_free(hdl
, epstr
, slen
+ 1);
1077 * Free the transport infrastructure for an endpoint.
1080 etm_free_epmap(fmd_hdl_t
*hdl
, etm_epmap_t
*mp
)
1083 char hbuf
[ETM_HDRLEN
];
1085 (void) pthread_mutex_lock(&mp
->epm_lock
);
1088 * If an etm_send thread is in progress, wait for it to finish.
1089 * The etm_recv thread is managed by the transport layer and will
1090 * be destroyed with etm_xport_fini().
1092 while (mp
->epm_txbusy
)
1093 (void) pthread_cond_wait(&mp
->epm_tx_cv
, &mp
->epm_lock
);
1095 if (mp
->epm_timer_in_use
)
1096 fmd_timer_remove(hdl
, mp
->epm_timer_id
);
1098 if (mp
->epm_oconn
!= NULL
) {
1099 hdrlen
= etm_create_hdr(hbuf
, mp
->epm_ver
,
1100 ETM_HDR_SHUTDOWN
, 0);
1101 (void) etm_xport_write(hdl
, mp
->epm_oconn
, Rw_timeout
, hbuf
,
1103 (void) etm_xport_close(hdl
, mp
->epm_oconn
);
1104 mp
->epm_oconn
= NULL
;
1107 if (mp
->epm_xprthdl
!= NULL
) {
1108 fmd_xprt_close(hdl
, mp
->epm_xprthdl
);
1109 /* mp->epm_ep_nvl is free'd in fmd_xprt_close */
1110 mp
->epm_ep_nvl
= NULL
;
1113 if (mp
->epm_ep_nvl
!= NULL
)
1114 etm_free_ep_nvl(hdl
, mp
);
1116 if (mp
->epm_tlhdl
!= NULL
)
1117 (void) etm_xport_fini(hdl
, mp
->epm_tlhdl
);
1119 (void) pthread_mutex_unlock(&mp
->epm_lock
);
1120 (void) pthread_mutex_destroy(&mp
->epm_lock
);
1121 fmd_hdl_strfree(hdl
, mp
->epm_ep_str
);
1122 fmd_hdl_free(hdl
, mp
, sizeof (etm_epmap_t
));
1123 DECRSTAT(Etm_stats
.peer_count
.fmds_value
.ui64
);
1131 * FMD fmdo_send entry point.
1132 * Send an event to the remote endpoint and receive an ACK.
1135 etm_send(fmd_hdl_t
*hdl
, fmd_xprt_t
*xprthdl
, fmd_event_t
*ep
, nvlist_t
*nvl
)
1139 int hdrstat
, rv
, cnt
= 0;
1140 char *buf
, *nvbuf
, *class;
1141 size_t nvsize
, buflen
, hdrlen
;
1142 struct timespec tms
;
1144 (void) pthread_mutex_lock(&Etm_mod_lock
);
1146 (void) pthread_mutex_unlock(&Etm_mod_lock
);
1147 return (FMD_SEND_RETRY
);
1149 (void) pthread_mutex_unlock(&Etm_mod_lock
);
1151 mp
= fmd_xprt_getspecific(hdl
, xprthdl
);
1154 if (pthread_mutex_trylock(&mp
->epm_lock
) == 0) {
1158 * Another thread may be (1) trying to close this
1159 * fmd_xprt_t, or (2) posting an event to it.
1160 * If (1), don't want to spend too much time here.
1161 * If (2), allow it to finish and release epm_lock.
1165 tms
.tv_nsec
= (cnt
* 10000);
1166 (void) nanosleep(&tms
, NULL
);
1169 return (FMD_SEND_RETRY
);
1176 if (mp
->epm_qstat
== Q_UNINITIALIZED
) {
1178 (void) pthread_cond_broadcast(&mp
->epm_tx_cv
);
1179 (void) pthread_mutex_unlock(&mp
->epm_lock
);
1180 return (FMD_SEND_FAILED
);
1183 if (mp
->epm_cstat
== C_CLOSED
) {
1184 etm_suspend_reconnect(hdl
, mp
);
1186 (void) pthread_cond_broadcast(&mp
->epm_tx_cv
);
1187 (void) pthread_mutex_unlock(&mp
->epm_lock
);
1188 return (FMD_SEND_RETRY
);
1191 if (mp
->epm_cstat
== C_LIMBO
) {
1192 if (mp
->epm_oconn
!= NULL
) {
1193 (void) etm_xport_close(hdl
, mp
->epm_oconn
);
1194 mp
->epm_oconn
= NULL
;
1197 fmd_xprt_suspend(hdl
, xprthdl
);
1198 mp
->epm_qstat
= Q_SUSPENDED
;
1200 (void) pthread_cond_broadcast(&mp
->epm_tx_cv
);
1201 (void) pthread_mutex_unlock(&mp
->epm_lock
);
1202 fmd_hdl_debug(hdl
, "queue suspended for %s", mp
->epm_ep_str
);
1203 return (FMD_SEND_RETRY
);
1206 if (mp
->epm_oconn
== NULL
) {
1207 if ((mp
->epm_oconn
= etm_xport_open(hdl
, mp
->epm_tlhdl
))
1209 etm_suspend_reconnect(hdl
, mp
);
1211 (void) pthread_cond_broadcast(&mp
->epm_tx_cv
);
1212 (void) pthread_mutex_unlock(&mp
->epm_lock
);
1213 return (FMD_SEND_RETRY
);
1215 mp
->epm_cstat
= C_OPEN
;
1219 if (nvlist_lookup_string(nvl
, FM_CLASS
, &class) != 0)
1220 fmd_hdl_abort(hdl
, "No class string in nvlist");
1222 msgnvl
= fmd_xprt_translate(hdl
, xprthdl
, ep
);
1223 if (msgnvl
== NULL
) {
1225 (void) pthread_cond_broadcast(&mp
->epm_tx_cv
);
1226 (void) pthread_mutex_unlock(&mp
->epm_lock
);
1227 fmd_hdl_error(hdl
, "Failed to translate event %p\n",
1229 return (FMD_SEND_FAILED
);
1232 rv
= etm_xport_send_filter(hdl
, msgnvl
, mp
->epm_ep_str
);
1233 if (rv
== ETM_XPORT_FILTER_DROP
) {
1235 (void) pthread_cond_broadcast(&mp
->epm_tx_cv
);
1236 (void) pthread_mutex_unlock(&mp
->epm_lock
);
1237 fmd_hdl_debug(hdl
, "send_filter dropped event");
1238 nvlist_free(msgnvl
);
1239 INCRSTAT(Etm_stats
.send_filter
.fmds_value
.ui64
);
1240 return (FMD_SEND_SUCCESS
);
1241 } else if (rv
== ETM_XPORT_FILTER_ERROR
) {
1242 fmd_hdl_debug(hdl
, "send_filter error : %s", strerror(errno
));
1243 INCRSTAT(Etm_stats
.error_send_filter
.fmds_value
.ui64
);
1244 /* Still send event */
1247 (void) pthread_mutex_unlock(&mp
->epm_lock
);
1249 (void) nvlist_size(msgnvl
, &nvsize
, NV_ENCODE_XDR
);
1251 hdrlen
= ETM_HDRLEN
;
1252 buflen
= nvsize
+ hdrlen
;
1254 ALLOC_BUF(hdl
, buf
, buflen
);
1256 nvbuf
= buf
+ hdrlen
;
1258 (void) etm_create_hdr(buf
, mp
->epm_ver
, ETM_HDR_MSG
, nvsize
);
1260 if (rv
= nvlist_pack(msgnvl
, &nvbuf
, &nvsize
, NV_ENCODE_XDR
, 0)) {
1261 (void) pthread_mutex_lock(&mp
->epm_lock
);
1263 (void) pthread_cond_broadcast(&mp
->epm_tx_cv
);
1264 (void) pthread_mutex_unlock(&mp
->epm_lock
);
1265 fmd_hdl_error(hdl
, "Failed to pack event : %s\n", strerror(rv
));
1266 nvlist_free(msgnvl
);
1267 FREE_BUF(hdl
, buf
, buflen
);
1268 return (FMD_SEND_FAILED
);
1271 nvlist_free(msgnvl
);
1273 if (etm_xport_write(hdl
, mp
->epm_oconn
, Rw_timeout
, buf
,
1274 buflen
) != buflen
) {
1275 fmd_hdl_debug(hdl
, "failed to send message to %s",
1277 (void) pthread_mutex_lock(&mp
->epm_lock
);
1278 etm_suspend_reconnect(hdl
, mp
);
1280 (void) pthread_cond_broadcast(&mp
->epm_tx_cv
);
1281 (void) pthread_mutex_unlock(&mp
->epm_lock
);
1282 FREE_BUF(hdl
, buf
, buflen
);
1283 INCRSTAT(Etm_stats
.error_write
.fmds_value
.ui64
);
1284 return (FMD_SEND_RETRY
);
1287 INCRSTAT(Etm_stats
.write_msg
.fmds_value
.ui64
);
1288 ADDSTAT(Etm_stats
.write_bytes
.fmds_value
.ui64
, nvsize
);
1290 etm_hex_dump(hdl
, nvbuf
, nvsize
, 1);
1292 if (etm_xport_read(hdl
, mp
->epm_oconn
, Rw_timeout
, buf
,
1293 hdrlen
) != hdrlen
) {
1294 fmd_hdl_debug(hdl
, "failed to read ACK from %s",
1296 (void) pthread_mutex_lock(&mp
->epm_lock
);
1297 etm_suspend_reconnect(hdl
, mp
);
1299 (void) pthread_cond_broadcast(&mp
->epm_tx_cv
);
1300 (void) pthread_mutex_unlock(&mp
->epm_lock
);
1301 FREE_BUF(hdl
, buf
, buflen
);
1302 INCRSTAT(Etm_stats
.error_read
.fmds_value
.ui64
);
1303 return (FMD_SEND_RETRY
);
1306 hdrstat
= etm_check_hdr(hdl
, mp
, buf
);
1307 FREE_BUF(hdl
, buf
, buflen
);
1309 if (hdrstat
== ETM_HDR_ACK
) {
1310 INCRSTAT(Etm_stats
.read_ack
.fmds_value
.ui64
);
1312 (void) pthread_mutex_lock(&mp
->epm_lock
);
1314 (void) etm_xport_close(hdl
, mp
->epm_oconn
);
1315 mp
->epm_oconn
= NULL
;
1317 if (hdrstat
== ETM_HDR_NAK
) {
1318 /* Peer received a bad value in the header */
1319 if (mp
->epm_xprthdl
!= NULL
) {
1320 mp
->epm_cstat
= C_LIMBO
;
1321 fmd_xprt_suspend(hdl
, xprthdl
);
1322 mp
->epm_qstat
= Q_SUSPENDED
;
1323 fmd_hdl_debug(hdl
, "received NAK, queue "
1324 "suspended for %s", mp
->epm_ep_str
);
1327 rv
= FMD_SEND_RETRY
;
1329 } else if (hdrstat
== ETM_HDR_S_RESTART
) {
1330 /* Server has restarted */
1331 mp
->epm_cstat
= C_CLOSED
;
1332 mp
->epm_qstat
= Q_UNINITIALIZED
;
1333 fmd_hdl_debug(hdl
, "server %s restarted",
1336 * Cannot call fmd_xprt_close here, so we'll do it
1337 * on the timeout thread.
1339 if (mp
->epm_timer_in_use
== 0) {
1340 mp
->epm_timer_id
= fmd_timer_install(
1342 mp
->epm_timer_in_use
= 1;
1346 * fault.* or list.* events will be replayed if a
1347 * transport is opened with the same auth.
1348 * Other events will be discarded.
1350 rv
= FMD_SEND_FAILED
;
1353 mp
->epm_cstat
= C_CLOSED
;
1354 fmd_hdl_debug(hdl
, "bad ACK from %s", mp
->epm_ep_str
);
1356 rv
= FMD_SEND_RETRY
;
1361 (void) pthread_cond_broadcast(&mp
->epm_tx_cv
);
1362 (void) pthread_mutex_unlock(&mp
->epm_lock
);
1364 INCRSTAT(Etm_stats
.error_read_badhdr
.fmds_value
.ui64
);
1369 (void) pthread_mutex_lock(&mp
->epm_lock
);
1371 (void) pthread_cond_broadcast(&mp
->epm_tx_cv
);
1372 (void) pthread_mutex_unlock(&mp
->epm_lock
);
1374 return (FMD_SEND_SUCCESS
);
1378 * FMD fmdo_timeout entry point..
1382 etm_timeout(fmd_hdl_t
*hdl
, id_t id
, void *data
)
1384 etm_epmap_t
*mp
= (etm_epmap_t
*)data
;
1386 (void) pthread_mutex_lock(&mp
->epm_lock
);
1388 mp
->epm_timer_in_use
= 0;
1390 if (mp
->epm_qstat
== Q_UNINITIALIZED
) {
1391 /* Server has shutdown and we (client) need to reconnect */
1392 if (mp
->epm_xprthdl
!= NULL
) {
1393 fmd_xprt_close(hdl
, mp
->epm_xprthdl
);
1394 fmd_hdl_debug(hdl
, "queue closed for %s",
1396 mp
->epm_xprthdl
= NULL
;
1397 /* mp->epm_ep_nvl is free'd in fmd_xprt_close */
1398 mp
->epm_ep_nvl
= NULL
;
1401 if (mp
->epm_ep_nvl
== NULL
)
1402 (void) etm_get_ep_nvl(hdl
, mp
);
1404 if (etm_handle_startup(hdl
, mp
)) {
1405 if (mp
->epm_oconn
!= NULL
) {
1406 (void) etm_xport_close(hdl
, mp
->epm_oconn
);
1407 mp
->epm_oconn
= NULL
;
1409 mp
->epm_cstat
= C_UNINITIALIZED
;
1410 mp
->epm_qstat
= Q_UNINITIALIZED
;
1411 mp
->epm_timer_id
= fmd_timer_install(hdl
, mp
, NULL
,
1413 mp
->epm_timer_in_use
= 1;
1416 etm_reconnect(hdl
, mp
);
1419 (void) pthread_mutex_unlock(&mp
->epm_lock
);
1423 * FMD Module declarations
1425 static const fmd_hdl_ops_t etm_ops
= {
1426 NULL
, /* fmdo_recv */
1427 etm_timeout
, /* fmdo_timeout */
1428 NULL
, /* fmdo_close */
1429 NULL
, /* fmdo_stats */
1431 etm_send
, /* fmdo_send */
1434 static const fmd_prop_t etm_props
[] = {
1435 { "client_list", FMD_TYPE_STRING
, NULL
},
1436 { "server_list", FMD_TYPE_STRING
, NULL
},
1437 { "reconnect_interval", FMD_TYPE_UINT64
, "10000000000" },
1438 { "reconnect_timeout", FMD_TYPE_UINT64
, "300000000000" },
1439 { "rw_timeout", FMD_TYPE_UINT64
, "2000000000" },
1440 { "filter_path", FMD_TYPE_STRING
, NULL
},
1444 static const fmd_hdl_info_t etm_info
= {
1445 "Event Transport Module", "2.0", &etm_ops
, etm_props
1449 * Initialize the transport for use by ETM.
1452 _fmd_init(fmd_hdl_t
*hdl
)
1456 if (fmd_hdl_register(hdl
, FMD_API_VERSION
, &etm_info
) != 0) {
1457 return; /* invalid data in configuration file */
1460 /* Create global stats */
1461 (void) fmd_stat_create(hdl
, FMD_STAT_NOALLOC
,
1462 sizeof (Etm_stats
) / sizeof (fmd_stat_t
), (fmd_stat_t
*)&Etm_stats
);
1464 /* Get module properties */
1465 Reconn_timeout
= fmd_prop_get_int64(hdl
, "reconnect_timeout");
1466 Reconn_interval
= fmd_prop_get_int64(hdl
, "reconnect_interval");
1467 Rw_timeout
= fmd_prop_get_int64(hdl
, "rw_timeout");
1469 propstr
= fmd_prop_get_string(hdl
, "client_list");
1470 etm_create_epmaps(hdl
, propstr
, ETM_SERVER_XPRT_FLAGS
);
1471 fmd_prop_free_string(hdl
, propstr
);
1473 propstr
= fmd_prop_get_string(hdl
, "server_list");
1474 etm_create_epmaps(hdl
, propstr
, ETM_CLIENT_XPRT_FLAGS
);
1475 fmd_prop_free_string(hdl
, propstr
);
1477 if (Etm_stats
.peer_count
.fmds_value
.ui64
== 0) {
1478 fmd_hdl_debug(hdl
, "Failed to init any endpoint\n");
1479 fmd_hdl_unregister(hdl
);
1485 * Teardown the transport
1488 _fmd_fini(fmd_hdl_t
*hdl
)
1490 etm_epmap_t
*mp
, *next
;
1492 (void) pthread_mutex_lock(&Etm_mod_lock
);
1494 (void) pthread_mutex_unlock(&Etm_mod_lock
);
1499 next
= mp
->epm_next
;
1500 etm_free_epmap(hdl
, mp
);
1504 fmd_hdl_unregister(hdl
);