dmake: do not set MAKEFLAGS=k
[unleashed/tickless.git] / usr / src / cmd / fm / modules / common / event-transport / etm.c
blob1cd11fa1dcc2d9629e5d1e9a82c60145b9d28d3e
1 /*
2 * CDDL HEADER START
4 * The contents of this file are subject to the terms of the
5 * Common Development and Distribution License (the "License").
6 * You may not use this file except in compliance with the License.
8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9 * or http://www.opensolaris.org/os/licensing.
10 * See the License for the specific language governing permissions
11 * and limitations under the License.
13 * When distributing Covered Code, include this CDDL HEADER in each
14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15 * If applicable, add the following below this CDDL HEADER, with the
16 * fields enclosed by brackets "[]" replaced with your own identifying
17 * information: Portions Copyright [yyyy] [name of copyright owner]
19 * CDDL HEADER END
23 * Copyright 2006 Sun Microsystems, Inc. All rights reserved.
24 * Use is subject to license terms.
27 #pragma ident "%Z%%M% %I% %E% SMI"
30 * FMA Event Transport Module
32 * Plugin for sending/receiving FMA events to/from a remote endoint.
35 #include <netinet/in.h>
36 #include <errno.h>
37 #include <sys/fm/protocol.h>
38 #include <sys/sysmacros.h>
39 #include <pthread.h>
40 #include <strings.h>
41 #include <ctype.h>
42 #include <link.h>
43 #include <libnvpair.h>
44 #include "etm_xport_api.h"
45 #include "etm_proto.h"
48 * ETM declarations
51 typedef enum etm_connection_status {
52 C_UNINITIALIZED = 0,
53 C_OPEN, /* Connection is open */
54 C_CLOSED, /* Connection is closed */
55 C_LIMBO, /* Bad value in header from peer */
56 C_TIMED_OUT /* Reconnection to peer timed out */
57 } etm_connstat_t;
59 typedef enum etm_fmd_queue_status {
60 Q_UNINITIALIZED = 100,
61 Q_INIT_PENDING, /* Queue initialization in progress */
62 Q_OPEN, /* Queue is open */
63 Q_SUSPENDED /* Queue is suspended */
64 } etm_qstat_t;
66 /* Per endpoint data */
67 typedef struct etm_endpoint_map {
68 uint8_t epm_ver; /* Protocol version being used */
69 char *epm_ep_str; /* Endpoint ID string */
70 int epm_xprtflags; /* FMD transport open flags */
71 etm_xport_hdl_t epm_tlhdl; /* Transport Layer instance handle */
72 pthread_mutex_t epm_lock; /* Protects remainder of struct */
73 pthread_cond_t epm_tx_cv; /* Cond var for send/transmit */
74 int epm_txbusy; /* Busy doing send/transmit */
75 fmd_xprt_t *epm_xprthdl; /* FMD transport handle */
76 etm_qstat_t epm_qstat; /* Status of fmd xprt queue */
77 nvlist_t *epm_ep_nvl; /* Endpoint ID nv_list */
78 etm_xport_conn_t epm_oconn; /* Connection for outgoing events */
79 etm_connstat_t epm_cstat; /* Status of connection */
80 id_t epm_timer_id; /* Timer id */
81 int epm_timer_in_use; /* Indicates if timer is in use */
82 hrtime_t epm_reconn_end; /* Reconnection end time */
83 struct etm_endpoint_map *epm_next;
84 } etm_epmap_t;
86 #define ETM_HDR_INVALID (ETM_HDR_TYPE_TOO_HIGH + 1)
87 #define ETM_HDR_BADVERSION (ETM_HDR_TYPE_TOO_HIGH + 2)
88 #define ETM_HDR_BADTYPE (ETM_HDR_TYPE_TOO_HIGH + 3)
89 #define ETM_EP_INST_MAX 4 /* Max chars in endpt instance */
90 #define ETM_CLIENT_XPRT_FLAGS FMD_XPRT_RDWR
91 #define ETM_SERVER_XPRT_FLAGS (FMD_XPRT_RDWR | FMD_XPRT_ACCEPT)
93 #define ALLOC_BUF(hdl, buf, size) \
94 buf = fmd_hdl_zalloc((hdl), (size), FMD_SLEEP);
96 #define FREE_BUF(hdl, buf, size) fmd_hdl_free((hdl), (buf), (size));
98 #define IS_CLIENT(mp) (((mp)->epm_xprtflags & FMD_XPRT_ACCEPT) ? 0 : 1)
100 #define INCRSTAT(x) { (void) pthread_mutex_lock(&Etm_mod_lock); \
101 (x)++; \
102 (void) pthread_mutex_unlock(&Etm_mod_lock); \
105 #define DECRSTAT(x) { (void) pthread_mutex_lock(&Etm_mod_lock); \
106 (x)--; \
107 (void) pthread_mutex_unlock(&Etm_mod_lock); \
110 #define ADDSTAT(x, y) { (void) pthread_mutex_lock(&Etm_mod_lock); \
111 (x) += (y); \
112 (void) pthread_mutex_unlock(&Etm_mod_lock); \
116 * Global variables
118 static pthread_mutex_t Etm_mod_lock = PTHREAD_MUTEX_INITIALIZER;
119 /* Protects globals */
120 static hrtime_t Reconn_interval; /* Time between reconnection attempts */
121 static hrtime_t Reconn_timeout; /* Time allowed for reconnection */
122 static hrtime_t Rw_timeout; /* Time allowed for I/O operation */
123 static int Etm_dump = 0; /* Enables hex dump for debug */
124 static int Etm_exit = 0; /* Flag for exit */
125 static etm_epmap_t *Epmap_head = NULL; /* Head of list of epmap structs */
127 /* Module statistics */
128 static struct etm_stats {
129 /* read counters */
130 fmd_stat_t read_ack;
131 fmd_stat_t read_bytes;
132 fmd_stat_t read_msg;
133 fmd_stat_t post_filter;
134 /* write counters */
135 fmd_stat_t write_ack;
136 fmd_stat_t write_bytes;
137 fmd_stat_t write_msg;
138 fmd_stat_t send_filter;
139 /* error counters */
140 fmd_stat_t error_protocol;
141 fmd_stat_t error_drop_read;
142 fmd_stat_t error_read;
143 fmd_stat_t error_read_badhdr;
144 fmd_stat_t error_write;
145 fmd_stat_t error_send_filter;
146 fmd_stat_t error_post_filter;
147 /* misc */
148 fmd_stat_t peer_count;
150 } Etm_stats = {
151 /* read counters */
152 { "read_ack", FMD_TYPE_UINT64, "ACKs read" },
153 { "read_bytes", FMD_TYPE_UINT64, "Bytes read" },
154 { "read_msg", FMD_TYPE_UINT64, "Messages read" },
155 { "post_filter", FMD_TYPE_UINT64, "Drops by post_filter" },
156 /* write counters */
157 { "write_ack", FMD_TYPE_UINT64, "ACKs sent" },
158 { "write_bytes", FMD_TYPE_UINT64, "Bytes sent" },
159 { "write_msg", FMD_TYPE_UINT64, "Messages sent" },
160 { "send_filter", FMD_TYPE_UINT64, "Drops by send_filter" },
161 /* ETM error counters */
162 { "error_protocol", FMD_TYPE_UINT64, "ETM protocol errors" },
163 { "error_drop_read", FMD_TYPE_UINT64, "Dropped read messages" },
164 { "error_read", FMD_TYPE_UINT64, "Read I/O errors" },
165 { "error_read_badhdr", FMD_TYPE_UINT64, "Bad headers read" },
166 { "error_write", FMD_TYPE_UINT64, "Write I/O errors" },
167 { "error_send_filter", FMD_TYPE_UINT64, "Send filter errors" },
168 { "error_post_filter", FMD_TYPE_UINT64, "Post filter errors" },
169 /* ETM Misc */
170 { "peer_count", FMD_TYPE_UINT64, "Number of peers initialized" },
174 * ETM Private functions
178 * Hex dump for debug.
180 static void
181 etm_hex_dump(fmd_hdl_t *hdl, void *buf, size_t buflen, int direction)
183 int i, j, k;
184 int16_t *c;
186 if (Etm_dump == 0)
187 return;
189 j = buflen / 16; /* Number of complete 8-column rows */
190 k = buflen % 16; /* Is there a last (non-8-column) row? */
192 if (direction)
193 fmd_hdl_debug(hdl, "--- WRITE Message Dump ---");
194 else
195 fmd_hdl_debug(hdl, "--- READ Message Dump ---");
197 fmd_hdl_debug(hdl, " Displaying %d bytes", buflen);
199 /* Dump the complete 8-column rows */
200 for (i = 0; i < j; i++) {
201 c = (int16_t *)buf + (i * 8);
202 fmd_hdl_debug(hdl, "%3d: %4x %4x %4x %4x %4x %4x %4x %4x", i,
203 *(c+0), *(c+1), *(c+2), *(c+3),
204 *(c+4), *(c+5), *(c+6), *(c+7));
207 /* Dump the last (incomplete) row */
208 c = (int16_t *)buf + (i * 8);
209 switch (k) {
210 case 4:
211 fmd_hdl_debug(hdl, "%3d: %4x %4x", i, *(c+0), *(c+1));
212 break;
213 case 8:
214 fmd_hdl_debug(hdl, "%3d: %4x %4x %4x %4x", i, *(c+0), *(c+1),
215 *(c+2), *(c+3));
216 break;
217 case 12:
218 fmd_hdl_debug(hdl, "%3d: %4x %4x %4x %4x %4x %4x", i, *(c+0),
219 *(c+1), *(c+2), *(c+3), *(c+4), *(c+5));
220 break;
223 fmd_hdl_debug(hdl, "--- End Dump ---");
227 * Provide the length of a message based on the data in the given ETM header.
229 static size_t
230 etm_get_msglen(void *buf)
232 etm_proto_hdr_t *hp = (etm_proto_hdr_t *)buf;
234 return (ntohl(hp->hdr_msglen));
238 * Check the contents of the ETM header for errors.
239 * Return the header type (hdr_type).
241 static int
242 etm_check_hdr(fmd_hdl_t *hdl, etm_epmap_t *mp, void *buf)
244 etm_proto_hdr_t *hp = (etm_proto_hdr_t *)buf;
246 if (bcmp(hp->hdr_delim, ETM_DELIM, ETM_DELIMLEN) != 0) {
247 fmd_hdl_debug(hdl, "Bad delimiter in ETM header from %s "
248 ": 0x%x\n", mp->epm_ep_str, hp->hdr_delim);
249 return (ETM_HDR_INVALID);
252 if ((hp->hdr_type == ETM_HDR_C_HELLO) ||
253 (hp->hdr_type == ETM_HDR_S_HELLO)) {
254 /* Until version is negotiated, other fields may be wrong */
255 return (hp->hdr_type);
258 if (hp->hdr_ver != mp->epm_ver) {
259 fmd_hdl_debug(hdl, "Bad version in ETM header from %s : 0x%x\n",
260 mp->epm_ep_str, hp->hdr_ver);
261 return (ETM_HDR_BADVERSION);
264 if ((hp->hdr_type == ETM_HDR_TYPE_TOO_LOW) ||
265 (hp->hdr_type >= ETM_HDR_TYPE_TOO_HIGH)) {
266 fmd_hdl_debug(hdl, "Bad type in ETM header from %s : 0x%x\n",
267 mp->epm_ep_str, hp->hdr_type);
268 return (ETM_HDR_BADTYPE);
271 return (hp->hdr_type);
275 * Create an ETM header of a given type in the given buffer.
276 * Return length of header.
278 static size_t
279 etm_create_hdr(void *buf, uint8_t ver, uint8_t type, uint32_t msglen)
281 etm_proto_hdr_t *hp = (etm_proto_hdr_t *)buf;
283 bcopy(ETM_DELIM, hp->hdr_delim, ETM_DELIMLEN);
284 hp->hdr_ver = ver;
285 hp->hdr_type = type;
286 hp->hdr_msglen = htonl(msglen);
288 return (ETM_HDRLEN);
292 * Convert message bytes to nvlist and post to fmd.
293 * Return zero for success, non-zero for failure.
295 * Note : nvl is free'd by fmd.
297 static int
298 etm_post_msg(fmd_hdl_t *hdl, etm_epmap_t *mp, void *buf, size_t buflen)
300 nvlist_t *nvl;
301 int rv;
303 if (nvlist_unpack((char *)buf, buflen, &nvl, 0)) {
304 fmd_hdl_error(hdl, "failed to unpack message");
305 return (1);
308 rv = etm_xport_post_filter(hdl, nvl, mp->epm_ep_str);
309 if (rv == ETM_XPORT_FILTER_DROP) {
310 fmd_hdl_debug(hdl, "post_filter dropped event");
311 INCRSTAT(Etm_stats.post_filter.fmds_value.ui64);
312 nvlist_free(nvl);
313 return (0);
314 } else if (rv == ETM_XPORT_FILTER_ERROR) {
315 fmd_hdl_debug(hdl, "post_filter error : %s", strerror(errno));
316 INCRSTAT(Etm_stats.error_post_filter.fmds_value.ui64);
317 /* Still post event */
320 (void) pthread_mutex_lock(&mp->epm_lock);
321 (void) pthread_mutex_lock(&Etm_mod_lock);
322 if (!Etm_exit) {
323 (void) pthread_mutex_unlock(&Etm_mod_lock);
324 if (mp->epm_qstat == Q_OPEN) {
325 fmd_xprt_post(hdl, mp->epm_xprthdl, nvl, 0);
326 rv = 0;
327 } else if (mp->epm_qstat == Q_SUSPENDED) {
328 fmd_xprt_resume(hdl, mp->epm_xprthdl);
329 if (mp->epm_timer_in_use) {
330 fmd_timer_remove(hdl, mp->epm_timer_id);
331 mp->epm_timer_in_use = 0;
333 mp->epm_qstat = Q_OPEN;
334 fmd_hdl_debug(hdl, "queue resumed for %s",
335 mp->epm_ep_str);
336 fmd_xprt_post(hdl, mp->epm_xprthdl, nvl, 0);
337 rv = 0;
338 } else {
339 fmd_hdl_debug(hdl, "unable to post message, qstat = %d",
340 mp->epm_qstat);
341 nvlist_free(nvl);
342 /* Remote peer will attempt to resend event */
343 rv = 2;
345 } else {
346 (void) pthread_mutex_unlock(&Etm_mod_lock);
347 fmd_hdl_debug(hdl, "unable to post message, module exiting");
348 nvlist_free(nvl);
349 /* Remote peer will attempt to resend event */
350 rv = 3;
353 (void) pthread_mutex_unlock(&mp->epm_lock);
355 return (rv);
359 * Handle the startup handshake to the server. The client always initiates
360 * the startup handshake. In the following sequence, we are the client and
361 * the remote endpoint is the server.
363 * Client sends C_HELLO and transitions to Q_INIT_PENDING state.
364 * Server sends S_HELLO and transitions to Q_INIT_PENDING state.
365 * Client sends ACK and transitions to Q_OPEN state.
366 * Server receives ACK and transitions to Q_OPEN state.
368 * Return 0 for success, nonzero for failure.
370 static int
371 etm_handle_startup(fmd_hdl_t *hdl, etm_epmap_t *mp)
373 etm_proto_hdr_t *hp;
374 size_t hdrlen = ETM_HDRLEN;
375 int hdrstat;
376 char hbuf[ETM_HDRLEN];
378 if ((mp->epm_oconn = etm_xport_open(hdl, mp->epm_tlhdl)) == NULL)
379 return (1);
381 mp->epm_cstat = C_OPEN;
383 hdrlen = etm_create_hdr(hbuf, mp->epm_ver, ETM_HDR_C_HELLO, 0);
385 if ((etm_xport_write(hdl, mp->epm_oconn, Rw_timeout, hbuf,
386 hdrlen)) != hdrlen) {
387 fmd_hdl_error(hdl, "Failed to write C_HELLO to %s",
388 mp->epm_ep_str);
389 return (2);
392 mp->epm_qstat = Q_INIT_PENDING;
394 if ((etm_xport_read(hdl, mp->epm_oconn, Rw_timeout, hbuf,
395 hdrlen)) != hdrlen) {
396 fmd_hdl_error(hdl, "Failed to read S_HELLO from %s",
397 mp->epm_ep_str);
398 return (3);
401 hdrstat = etm_check_hdr(hdl, mp, hbuf);
403 if (hdrstat != ETM_HDR_S_HELLO) {
404 fmd_hdl_error(hdl, "Protocol error, did not receive S_HELLO "
405 "from %s", mp->epm_ep_str);
406 return (4);
410 * Get version from the server.
411 * Currently, only one version is supported.
413 hp = (etm_proto_hdr_t *)(void *)hbuf;
414 if (hp->hdr_ver != ETM_PROTO_V1) {
415 fmd_hdl_error(hdl, "Unable to use same version as %s : %d",
416 mp->epm_ep_str, hp->hdr_ver);
417 return (5);
419 mp->epm_ver = hp->hdr_ver;
421 hdrlen = etm_create_hdr(hbuf, mp->epm_ver, ETM_HDR_ACK, 0);
423 if ((etm_xport_write(hdl, mp->epm_oconn, Rw_timeout, hbuf,
424 hdrlen)) != hdrlen) {
425 fmd_hdl_error(hdl, "Failed to write ACK for S_HELLO to %s",
426 mp->epm_ep_str);
427 return (6);
431 * Call fmd_xprt_open and fmd_xprt_setspecific with
432 * Etm_mod_lock held to avoid race with etm_send thread.
434 (void) pthread_mutex_lock(&Etm_mod_lock);
435 if ((mp->epm_xprthdl = fmd_xprt_open(hdl, mp->epm_xprtflags,
436 mp->epm_ep_nvl, NULL)) == NULL) {
437 fmd_hdl_abort(hdl, "Failed to init xprthdl for %s",
438 mp->epm_ep_str);
440 fmd_xprt_setspecific(hdl, mp->epm_xprthdl, mp);
441 (void) pthread_mutex_unlock(&Etm_mod_lock);
443 mp->epm_qstat = Q_OPEN;
444 fmd_hdl_debug(hdl, "queue open for %s", mp->epm_ep_str);
446 return (0);
450 * Open a connection to the peer, send a SHUTDOWN message,
451 * and close the connection.
453 static void
454 etm_send_shutdown(fmd_hdl_t *hdl, etm_epmap_t *mp)
456 size_t hdrlen = ETM_HDRLEN;
457 char hbuf[ETM_HDRLEN];
459 if ((mp->epm_oconn = etm_xport_open(hdl, mp->epm_tlhdl)) == NULL)
460 return;
462 hdrlen = etm_create_hdr(hbuf, mp->epm_ver, ETM_HDR_SHUTDOWN, 0);
464 (void) etm_xport_write(hdl, mp->epm_oconn, Rw_timeout, hbuf, hdrlen);
466 (void) etm_xport_close(hdl, mp->epm_oconn);
467 mp->epm_oconn = NULL;
471 * Alloc a nvlist and add a string for the endpoint.
472 * Return zero for success, non-zero for failure.
474 static int
475 etm_get_ep_nvl(fmd_hdl_t *hdl, etm_epmap_t *mp)
478 * Cannot use nvlist_xalloc(3NVPAIR) due to a recursive mutex situation
479 * in fmd when this nvlist_t is free'd.
481 (void) nvlist_alloc(&mp->epm_ep_nvl, NV_UNIQUE_NAME, 0);
483 if (nvlist_add_string(mp->epm_ep_nvl, "domain-id", mp->epm_ep_str)) {
484 fmd_hdl_error(hdl, "failed to add domain-id string to nvlist "
485 "for %s", mp->epm_ep_str);
486 nvlist_free(mp->epm_ep_nvl);
487 return (1);
490 return (0);
494 * Free the nvlist for the endpoint_id string.
496 /*ARGSUSED*/
497 static void
498 etm_free_ep_nvl(fmd_hdl_t *hdl, etm_epmap_t *mp)
500 nvlist_free(mp->epm_ep_nvl);
504 * Check for a duplicate endpoint/peer string.
506 /*ARGSUSED*/
507 static int
508 etm_check_dup_ep_str(fmd_hdl_t *hdl, char *epname)
510 etm_epmap_t *mp;
512 for (mp = Epmap_head; mp != NULL; mp = mp->epm_next)
513 if (strcmp(epname, mp->epm_ep_str) == 0)
514 return (1);
516 return (0);
520 * Attempt to re-open a connection with the remote endpoint.
522 static void
523 etm_reconnect(fmd_hdl_t *hdl, etm_epmap_t *mp)
525 if ((mp->epm_reconn_end > 0) && (mp->epm_cstat == C_UNINITIALIZED)) {
526 if (gethrtime() < mp->epm_reconn_end) {
527 if ((mp->epm_oconn = etm_xport_open(hdl,
528 mp->epm_tlhdl)) == NULL) {
529 fmd_hdl_debug(hdl, "reconnect failed for %s",
530 mp->epm_ep_str);
531 mp->epm_timer_id = fmd_timer_install(hdl, mp,
532 NULL, Reconn_interval);
533 mp->epm_timer_in_use = 1;
534 } else {
535 fmd_hdl_debug(hdl, "reconnect success for %s",
536 mp->epm_ep_str);
537 mp->epm_reconn_end = 0;
538 mp->epm_cstat = C_OPEN;
540 } else {
541 fmd_hdl_error(hdl, "Reconnect timed out for %s\n",
542 mp->epm_ep_str);
543 mp->epm_reconn_end = 0;
544 mp->epm_cstat = C_TIMED_OUT;
548 if (mp->epm_cstat == C_OPEN) {
549 fmd_xprt_resume(hdl, mp->epm_xprthdl);
550 mp->epm_qstat = Q_OPEN;
551 fmd_hdl_debug(hdl, "queue resumed for %s", mp->epm_ep_str);
556 * Suspend a given connection and setup for reconnection retries.
557 * Assume caller holds lock on epm_lock.
559 static void
560 etm_suspend_reconnect(fmd_hdl_t *hdl, etm_epmap_t *mp)
562 (void) pthread_mutex_lock(&Etm_mod_lock);
563 if (Etm_exit) {
564 (void) pthread_mutex_unlock(&Etm_mod_lock);
565 return;
567 (void) pthread_mutex_unlock(&Etm_mod_lock);
569 if (mp->epm_oconn != NULL) {
570 (void) etm_xport_close(hdl, mp->epm_oconn);
571 mp->epm_oconn = NULL;
574 mp->epm_reconn_end = gethrtime() + Reconn_timeout;
575 mp->epm_cstat = C_UNINITIALIZED;
577 if (mp->epm_xprthdl != NULL) {
578 fmd_xprt_suspend(hdl, mp->epm_xprthdl);
579 mp->epm_qstat = Q_SUSPENDED;
580 fmd_hdl_debug(hdl, "queue suspended for %s", mp->epm_ep_str);
582 if (mp->epm_timer_in_use == 0) {
583 mp->epm_timer_id = fmd_timer_install(hdl, mp, NULL,
584 Reconn_interval);
585 mp->epm_timer_in_use = 1;
591 * Reinitialize the connection. The old fmd_xprt_t handle must be
592 * removed/closed first.
593 * Assume caller holds lock on epm_lock.
595 static void
596 etm_reinit(fmd_hdl_t *hdl, etm_epmap_t *mp)
599 * To avoid a deadlock, wait for etm_send to finish before
600 * calling fmd_xprt_close()
602 while (mp->epm_txbusy)
603 (void) pthread_cond_wait(&mp->epm_tx_cv, &mp->epm_lock);
605 if (mp->epm_xprthdl != NULL) {
606 fmd_xprt_close(hdl, mp->epm_xprthdl);
607 fmd_hdl_debug(hdl, "queue closed for %s", mp->epm_ep_str);
608 mp->epm_xprthdl = NULL;
609 /* mp->epm_ep_nvl is free'd in fmd_xprt_close */
610 mp->epm_ep_nvl = NULL;
613 if (mp->epm_timer_in_use) {
614 fmd_timer_remove(hdl, mp->epm_timer_id);
615 mp->epm_timer_in_use = 0;
618 if (mp->epm_oconn != NULL) {
619 (void) etm_xport_close(hdl, mp->epm_oconn);
620 mp->epm_oconn = NULL;
623 mp->epm_cstat = C_UNINITIALIZED;
624 mp->epm_qstat = Q_UNINITIALIZED;
628 * Receive data from ETM transport layer.
629 * Note : This is not the fmdo_recv entry point.
632 static int
633 etm_recv(fmd_hdl_t *hdl, etm_xport_conn_t conn, etm_epmap_t *mp)
635 size_t buflen, hdrlen;
636 void *buf;
637 char hbuf[ETM_HDRLEN];
638 int hdrstat, rv;
640 hdrlen = ETM_HDRLEN;
642 if ((etm_xport_read(hdl, conn, Rw_timeout, hbuf, hdrlen)) != hdrlen) {
643 fmd_hdl_debug(hdl, "failed to read header from %s",
644 mp->epm_ep_str);
645 INCRSTAT(Etm_stats.error_read.fmds_value.ui64);
646 return (EIO);
649 hdrstat = etm_check_hdr(hdl, mp, hbuf);
651 switch (hdrstat) {
652 case ETM_HDR_INVALID:
653 (void) pthread_mutex_lock(&mp->epm_lock);
654 if (mp->epm_cstat == C_OPEN)
655 mp->epm_cstat = C_CLOSED;
656 (void) pthread_mutex_unlock(&mp->epm_lock);
658 INCRSTAT(Etm_stats.error_read_badhdr.fmds_value.ui64);
659 rv = ECANCELED;
660 break;
662 case ETM_HDR_BADTYPE:
663 case ETM_HDR_BADVERSION:
664 hdrlen = etm_create_hdr(hbuf, mp->epm_ver, ETM_HDR_NAK, 0);
666 if ((etm_xport_write(hdl, conn, Rw_timeout, hbuf,
667 hdrlen)) != hdrlen) {
668 fmd_hdl_debug(hdl, "failed to write NAK to %s",
669 mp->epm_ep_str);
670 INCRSTAT(Etm_stats.error_write.fmds_value.ui64);
671 return (EIO);
674 (void) pthread_mutex_lock(&mp->epm_lock);
675 mp->epm_cstat = C_LIMBO;
676 (void) pthread_mutex_unlock(&mp->epm_lock);
678 INCRSTAT(Etm_stats.error_read_badhdr.fmds_value.ui64);
679 rv = ENOTSUP;
680 break;
682 case ETM_HDR_C_HELLO:
683 /* Client is initiating a startup handshake */
684 (void) pthread_mutex_lock(&mp->epm_lock);
685 etm_reinit(hdl, mp);
686 mp->epm_qstat = Q_INIT_PENDING;
687 (void) pthread_mutex_unlock(&mp->epm_lock);
689 hdrlen = etm_create_hdr(hbuf, mp->epm_ver, ETM_HDR_S_HELLO, 0);
691 if ((etm_xport_write(hdl, conn, Rw_timeout, hbuf,
692 hdrlen)) != hdrlen) {
693 fmd_hdl_debug(hdl, "failed to write S_HELLO to %s",
694 mp->epm_ep_str);
695 INCRSTAT(Etm_stats.error_write.fmds_value.ui64);
696 return (EIO);
699 rv = 0;
700 break;
702 case ETM_HDR_ACK:
703 (void) pthread_mutex_lock(&mp->epm_lock);
704 if (mp->epm_qstat == Q_INIT_PENDING) {
705 /* This is client's ACK from startup handshake */
706 /* mp->epm_ep_nvl is free'd in fmd_xprt_close */
707 if (mp->epm_ep_nvl == NULL)
708 (void) etm_get_ep_nvl(hdl, mp);
711 * Call fmd_xprt_open and fmd_xprt_setspecific with
712 * Etm_mod_lock held to avoid race with etm_send thread.
714 (void) pthread_mutex_lock(&Etm_mod_lock);
715 if ((mp->epm_xprthdl = fmd_xprt_open(hdl,
716 mp->epm_xprtflags, mp->epm_ep_nvl, NULL)) == NULL) {
717 fmd_hdl_abort(hdl, "Failed to init xprthdl "
718 "for %s", mp->epm_ep_str);
720 fmd_xprt_setspecific(hdl, mp->epm_xprthdl, mp);
721 (void) pthread_mutex_unlock(&Etm_mod_lock);
723 mp->epm_qstat = Q_OPEN;
724 (void) pthread_mutex_unlock(&mp->epm_lock);
725 fmd_hdl_debug(hdl, "queue open for %s",
726 mp->epm_ep_str);
727 } else {
728 (void) pthread_mutex_unlock(&mp->epm_lock);
729 fmd_hdl_debug(hdl, "protocol error, not expecting ACK "
730 "from %s\n", mp->epm_ep_str);
731 INCRSTAT(Etm_stats.error_protocol.fmds_value.ui64);
734 rv = 0;
735 break;
737 case ETM_HDR_SHUTDOWN:
738 fmd_hdl_debug(hdl, "received shutdown from %s",
739 mp->epm_ep_str);
741 (void) pthread_mutex_lock(&mp->epm_lock);
743 etm_reinit(hdl, mp);
745 if (IS_CLIENT(mp)) {
747 * A server shutdown is considered to be temporary.
748 * Prepare for reconnection.
750 mp->epm_timer_id = fmd_timer_install(hdl, mp, NULL,
751 Reconn_interval);
753 mp->epm_timer_in_use = 1;
756 (void) pthread_mutex_unlock(&mp->epm_lock);
758 rv = ECANCELED;
759 break;
761 case ETM_HDR_MSG:
762 (void) pthread_mutex_lock(&mp->epm_lock);
763 if (mp->epm_qstat == Q_UNINITIALIZED) {
764 /* Peer (client) is unaware that we've restarted */
765 (void) pthread_mutex_unlock(&mp->epm_lock);
766 hdrlen = etm_create_hdr(hbuf, mp->epm_ver,
767 ETM_HDR_S_RESTART, 0);
769 if ((etm_xport_write(hdl, conn, Rw_timeout, hbuf,
770 hdrlen)) != hdrlen) {
771 fmd_hdl_debug(hdl, "failed to write S_RESTART "
772 "to %s", mp->epm_ep_str);
773 INCRSTAT(Etm_stats.error_write.fmds_value.ui64);
774 return (EIO);
777 return (ECANCELED);
779 (void) pthread_mutex_unlock(&mp->epm_lock);
781 buflen = etm_get_msglen(hbuf);
782 ALLOC_BUF(hdl, buf, buflen);
784 if (etm_xport_read(hdl, conn, Rw_timeout, buf,
785 buflen) != buflen) {
786 fmd_hdl_debug(hdl, "failed to read message from %s",
787 mp->epm_ep_str);
788 FREE_BUF(hdl, buf, buflen);
789 INCRSTAT(Etm_stats.error_read.fmds_value.ui64);
790 return (EIO);
793 INCRSTAT(Etm_stats.read_msg.fmds_value.ui64);
794 ADDSTAT(Etm_stats.read_bytes.fmds_value.ui64, buflen);
796 etm_hex_dump(hdl, buf, buflen, 0);
798 if (etm_post_msg(hdl, mp, buf, buflen)) {
799 INCRSTAT(Etm_stats.error_drop_read.fmds_value.ui64);
800 FREE_BUF(hdl, buf, buflen);
801 return (EIO);
804 FREE_BUF(hdl, buf, buflen);
806 hdrlen = etm_create_hdr(hbuf, mp->epm_ver, ETM_HDR_ACK, 0);
808 if ((etm_xport_write(hdl, conn, Rw_timeout, hbuf,
809 hdrlen)) != hdrlen) {
810 fmd_hdl_debug(hdl, "failed to write ACK to %s",
811 mp->epm_ep_str);
812 INCRSTAT(Etm_stats.error_write.fmds_value.ui64);
813 return (EIO);
816 INCRSTAT(Etm_stats.write_ack.fmds_value.ui64);
819 * If we got this far and the current state of the
820 * outbound/sending connection is TIMED_OUT or
821 * LIMBO, then we should reinitialize it.
823 (void) pthread_mutex_lock(&mp->epm_lock);
824 if (mp->epm_cstat == C_TIMED_OUT ||
825 mp->epm_cstat == C_LIMBO) {
826 if (mp->epm_oconn != NULL) {
827 (void) etm_xport_close(hdl, mp->epm_oconn);
828 mp->epm_oconn = NULL;
830 mp->epm_cstat = C_UNINITIALIZED;
831 fmd_xprt_resume(hdl, mp->epm_xprthdl);
832 if (mp->epm_timer_in_use) {
833 fmd_timer_remove(hdl, mp->epm_timer_id);
834 mp->epm_timer_in_use = 0;
836 mp->epm_qstat = Q_OPEN;
837 fmd_hdl_debug(hdl, "queue resumed for %s",
838 mp->epm_ep_str);
840 (void) pthread_mutex_unlock(&mp->epm_lock);
842 rv = 0;
843 break;
845 default:
846 fmd_hdl_debug(hdl, "protocol error, unexpected header "
847 "from %s : %d", mp->epm_ep_str, hdrstat);
848 INCRSTAT(Etm_stats.error_protocol.fmds_value.ui64);
849 rv = 0;
852 return (rv);
856 * ETM transport layer callback function.
857 * The transport layer calls this function to :
858 * (a) pass an incoming message (flag == ETM_CBFLAG_RECV)
859 * (b) tell us to reinitialize the connection (flag == ETM_CBFLAG_REINIT)
861 static int
862 etm_cb_func(fmd_hdl_t *hdl, etm_xport_conn_t conn, etm_cb_flag_t flag,
863 void *arg)
865 etm_epmap_t *mp = (etm_epmap_t *)arg;
866 int rv = 0;
868 (void) pthread_mutex_lock(&Etm_mod_lock);
869 if (Etm_exit) {
870 (void) pthread_mutex_unlock(&Etm_mod_lock);
871 return (ECANCELED);
873 (void) pthread_mutex_unlock(&Etm_mod_lock);
875 switch (flag) {
876 case ETM_CBFLAG_RECV:
877 rv = etm_recv(hdl, conn, mp);
878 break;
879 case ETM_CBFLAG_REINIT:
880 (void) pthread_mutex_lock(&mp->epm_lock);
881 etm_reinit(hdl, mp);
882 etm_send_shutdown(hdl, mp);
883 (void) pthread_mutex_unlock(&mp->epm_lock);
885 * Return ECANCELED so the transport layer will close the
886 * server connection. The transport layer is responsible for
887 * reestablishing this connection (should a connection request
888 * arrive from the peer).
890 rv = ECANCELED;
891 break;
892 default:
893 fmd_hdl_debug(hdl, "Unknown callback flag : 0x%x", flag);
894 rv = ENOTSUP;
897 return (rv);
901 * Allocate and initialize an etm_epmap_t struct for the given endpoint
902 * name string.
904 static void
905 etm_init_epmap(fmd_hdl_t *hdl, char *epname, int flags)
907 etm_epmap_t *newmap;
909 if (etm_check_dup_ep_str(hdl, epname)) {
910 fmd_hdl_debug(hdl, "skipping duplicate peer : %s", epname);
911 return;
914 newmap = fmd_hdl_zalloc(hdl, sizeof (etm_epmap_t), FMD_SLEEP);
915 newmap->epm_ep_str = fmd_hdl_strdup(hdl, epname, FMD_SLEEP);
916 newmap->epm_xprtflags = flags;
917 newmap->epm_cstat = C_UNINITIALIZED;
918 newmap->epm_qstat = Q_UNINITIALIZED;
919 newmap->epm_ver = ETM_PROTO_V1; /* Currently support one proto ver */
920 newmap->epm_txbusy = 0;
922 (void) pthread_mutex_init(&newmap->epm_lock, NULL);
923 (void) pthread_cond_init(&newmap->epm_tx_cv, NULL);
925 if (etm_get_ep_nvl(hdl, newmap)) {
926 fmd_hdl_strfree(hdl, newmap->epm_ep_str);
927 fmd_hdl_free(hdl, newmap, sizeof (etm_epmap_t));
928 return;
931 (void) pthread_mutex_lock(&newmap->epm_lock);
933 if ((newmap->epm_tlhdl = etm_xport_init(hdl, newmap->epm_ep_str,
934 etm_cb_func, newmap)) == NULL) {
935 fmd_hdl_debug(hdl, "failed to init tlhdl for %s\n",
936 newmap->epm_ep_str);
937 etm_free_ep_nvl(hdl, newmap);
938 (void) pthread_mutex_unlock(&newmap->epm_lock);
939 (void) pthread_mutex_destroy(&newmap->epm_lock);
940 fmd_hdl_strfree(hdl, newmap->epm_ep_str);
941 fmd_hdl_free(hdl, newmap, sizeof (etm_epmap_t));
942 return;
945 if (IS_CLIENT(newmap)) {
946 if (etm_handle_startup(hdl, newmap)) {
948 * For whatever reason, we could not complete the
949 * startup handshake with the server. Set the timer
950 * and try again.
952 if (newmap->epm_oconn != NULL) {
953 (void) etm_xport_close(hdl, newmap->epm_oconn);
954 newmap->epm_oconn = NULL;
956 newmap->epm_cstat = C_UNINITIALIZED;
957 newmap->epm_qstat = Q_UNINITIALIZED;
958 newmap->epm_timer_id = fmd_timer_install(hdl, newmap,
959 NULL, Reconn_interval);
960 newmap->epm_timer_in_use = 1;
962 } else {
964 * We may be restarting after a crash. If so, the client
965 * may be unaware of this.
967 etm_send_shutdown(hdl, newmap);
970 /* Add this transport instance handle to the list */
971 newmap->epm_next = Epmap_head;
972 Epmap_head = newmap;
974 (void) pthread_mutex_unlock(&newmap->epm_lock);
976 INCRSTAT(Etm_stats.peer_count.fmds_value.ui64);
980 * Parse the given property list string and call etm_init_epmap
981 * for each endpoint.
983 static void
984 etm_create_epmaps(fmd_hdl_t *hdl, char *eplist, int flags)
986 char *epstr, *ep, *prefix, *lasts, *numstr;
987 char epname[MAXPATHLEN];
988 size_t slen, nlen;
989 int beg, end, i;
991 if (eplist == NULL)
992 return;
994 * Create a copy of eplist for parsing.
995 * strtok/strtok_r(3C) will insert null chars to the string.
996 * Therefore, fmd_hdl_strdup/fmd_hdl_strfree cannot be used.
998 slen = strlen(eplist);
999 epstr = fmd_hdl_zalloc(hdl, slen + 1, FMD_SLEEP);
1000 (void) strcpy(epstr, eplist);
1003 * The following are supported for the "client_list" and
1004 * "server_list" properties :
1006 * A space-separated list of endpoints.
1007 * "dev:///dom0 dev:///dom1 dev:///dom2"
1009 * An array syntax for a range of instances.
1010 * "dev:///dom[0:2]"
1012 * A combination of both.
1013 * "dev:///dom0 dev:///dom[1:2]"
1015 ep = strtok_r(epstr, " ", &lasts);
1016 while (ep != NULL) {
1017 if (strchr(ep, '[') != NULL) {
1019 * This string is using array syntax.
1020 * Check the string for correct syntax.
1022 if ((strchr(ep, ':') == NULL) ||
1023 (strchr(ep, ']') == NULL)) {
1024 fmd_hdl_error(hdl, "Syntax error in property "
1025 "that includes : %s\n", ep);
1026 ep = strtok_r(NULL, " ", &lasts);
1027 continue;
1030 /* expand the array syntax */
1031 prefix = strtok(ep, "[");
1033 numstr = strtok(NULL, ":");
1034 if ((numstr == NULL) || (!isdigit(*numstr))) {
1035 fmd_hdl_error(hdl, "Syntax error in property "
1036 "that includes : %s[\n", prefix);
1037 ep = strtok_r(NULL, " ", &lasts);
1038 continue;
1040 beg = atoi(numstr);
1042 numstr = strtok(NULL, "]");
1043 if ((numstr == NULL) || (!isdigit(*numstr))) {
1044 fmd_hdl_error(hdl, "Syntax error in property "
1045 "that includes : %s[\n", prefix);
1046 ep = strtok_r(NULL, " ", &lasts);
1047 continue;
1049 end = atoi(numstr);
1051 nlen = strlen(prefix) + ETM_EP_INST_MAX;
1053 if (nlen > MAXPATHLEN) {
1054 fmd_hdl_error(hdl, "Endpoint prop string "
1055 "exceeds MAXPATHLEN\n");
1056 ep = strtok_r(NULL, " ", &lasts);
1057 continue;
1060 for (i = beg; i <= end; i++) {
1061 bzero(epname, MAXPATHLEN);
1062 (void) snprintf(epname, nlen, "%s%d",
1063 prefix, i);
1064 etm_init_epmap(hdl, epname, flags);
1066 } else {
1067 etm_init_epmap(hdl, ep, flags);
1070 ep = strtok_r(NULL, " ", &lasts);
1073 fmd_hdl_free(hdl, epstr, slen + 1);
1077 * Free the transport infrastructure for an endpoint.
1079 static void
1080 etm_free_epmap(fmd_hdl_t *hdl, etm_epmap_t *mp)
1082 size_t hdrlen;
1083 char hbuf[ETM_HDRLEN];
1085 (void) pthread_mutex_lock(&mp->epm_lock);
1088 * If an etm_send thread is in progress, wait for it to finish.
1089 * The etm_recv thread is managed by the transport layer and will
1090 * be destroyed with etm_xport_fini().
1092 while (mp->epm_txbusy)
1093 (void) pthread_cond_wait(&mp->epm_tx_cv, &mp->epm_lock);
1095 if (mp->epm_timer_in_use)
1096 fmd_timer_remove(hdl, mp->epm_timer_id);
1098 if (mp->epm_oconn != NULL) {
1099 hdrlen = etm_create_hdr(hbuf, mp->epm_ver,
1100 ETM_HDR_SHUTDOWN, 0);
1101 (void) etm_xport_write(hdl, mp->epm_oconn, Rw_timeout, hbuf,
1102 hdrlen);
1103 (void) etm_xport_close(hdl, mp->epm_oconn);
1104 mp->epm_oconn = NULL;
1107 if (mp->epm_xprthdl != NULL) {
1108 fmd_xprt_close(hdl, mp->epm_xprthdl);
1109 /* mp->epm_ep_nvl is free'd in fmd_xprt_close */
1110 mp->epm_ep_nvl = NULL;
1113 if (mp->epm_ep_nvl != NULL)
1114 etm_free_ep_nvl(hdl, mp);
1116 if (mp->epm_tlhdl != NULL)
1117 (void) etm_xport_fini(hdl, mp->epm_tlhdl);
1119 (void) pthread_mutex_unlock(&mp->epm_lock);
1120 (void) pthread_mutex_destroy(&mp->epm_lock);
1121 fmd_hdl_strfree(hdl, mp->epm_ep_str);
1122 fmd_hdl_free(hdl, mp, sizeof (etm_epmap_t));
1123 DECRSTAT(Etm_stats.peer_count.fmds_value.ui64);
1127 * FMD entry points
1131 * FMD fmdo_send entry point.
1132 * Send an event to the remote endpoint and receive an ACK.
1134 static int
1135 etm_send(fmd_hdl_t *hdl, fmd_xprt_t *xprthdl, fmd_event_t *ep, nvlist_t *nvl)
1137 etm_epmap_t *mp;
1138 nvlist_t *msgnvl;
1139 int hdrstat, rv, cnt = 0;
1140 char *buf, *nvbuf, *class;
1141 size_t nvsize, buflen, hdrlen;
1142 struct timespec tms;
1144 (void) pthread_mutex_lock(&Etm_mod_lock);
1145 if (Etm_exit) {
1146 (void) pthread_mutex_unlock(&Etm_mod_lock);
1147 return (FMD_SEND_RETRY);
1149 (void) pthread_mutex_unlock(&Etm_mod_lock);
1151 mp = fmd_xprt_getspecific(hdl, xprthdl);
1153 for (;;) {
1154 if (pthread_mutex_trylock(&mp->epm_lock) == 0) {
1155 break;
1156 } else {
1158 * Another thread may be (1) trying to close this
1159 * fmd_xprt_t, or (2) posting an event to it.
1160 * If (1), don't want to spend too much time here.
1161 * If (2), allow it to finish and release epm_lock.
1163 if (cnt++ < 10) {
1164 tms.tv_sec = 0;
1165 tms.tv_nsec = (cnt * 10000);
1166 (void) nanosleep(&tms, NULL);
1168 } else {
1169 return (FMD_SEND_RETRY);
1174 mp->epm_txbusy++;
1176 if (mp->epm_qstat == Q_UNINITIALIZED) {
1177 mp->epm_txbusy--;
1178 (void) pthread_cond_broadcast(&mp->epm_tx_cv);
1179 (void) pthread_mutex_unlock(&mp->epm_lock);
1180 return (FMD_SEND_FAILED);
1183 if (mp->epm_cstat == C_CLOSED) {
1184 etm_suspend_reconnect(hdl, mp);
1185 mp->epm_txbusy--;
1186 (void) pthread_cond_broadcast(&mp->epm_tx_cv);
1187 (void) pthread_mutex_unlock(&mp->epm_lock);
1188 return (FMD_SEND_RETRY);
1191 if (mp->epm_cstat == C_LIMBO) {
1192 if (mp->epm_oconn != NULL) {
1193 (void) etm_xport_close(hdl, mp->epm_oconn);
1194 mp->epm_oconn = NULL;
1197 fmd_xprt_suspend(hdl, xprthdl);
1198 mp->epm_qstat = Q_SUSPENDED;
1199 mp->epm_txbusy--;
1200 (void) pthread_cond_broadcast(&mp->epm_tx_cv);
1201 (void) pthread_mutex_unlock(&mp->epm_lock);
1202 fmd_hdl_debug(hdl, "queue suspended for %s", mp->epm_ep_str);
1203 return (FMD_SEND_RETRY);
1206 if (mp->epm_oconn == NULL) {
1207 if ((mp->epm_oconn = etm_xport_open(hdl, mp->epm_tlhdl))
1208 == NULL) {
1209 etm_suspend_reconnect(hdl, mp);
1210 mp->epm_txbusy--;
1211 (void) pthread_cond_broadcast(&mp->epm_tx_cv);
1212 (void) pthread_mutex_unlock(&mp->epm_lock);
1213 return (FMD_SEND_RETRY);
1214 } else {
1215 mp->epm_cstat = C_OPEN;
1219 if (nvlist_lookup_string(nvl, FM_CLASS, &class) != 0)
1220 fmd_hdl_abort(hdl, "No class string in nvlist");
1222 msgnvl = fmd_xprt_translate(hdl, xprthdl, ep);
1223 if (msgnvl == NULL) {
1224 mp->epm_txbusy--;
1225 (void) pthread_cond_broadcast(&mp->epm_tx_cv);
1226 (void) pthread_mutex_unlock(&mp->epm_lock);
1227 fmd_hdl_error(hdl, "Failed to translate event %p\n",
1228 (void *) ep);
1229 return (FMD_SEND_FAILED);
1232 rv = etm_xport_send_filter(hdl, msgnvl, mp->epm_ep_str);
1233 if (rv == ETM_XPORT_FILTER_DROP) {
1234 mp->epm_txbusy--;
1235 (void) pthread_cond_broadcast(&mp->epm_tx_cv);
1236 (void) pthread_mutex_unlock(&mp->epm_lock);
1237 fmd_hdl_debug(hdl, "send_filter dropped event");
1238 nvlist_free(msgnvl);
1239 INCRSTAT(Etm_stats.send_filter.fmds_value.ui64);
1240 return (FMD_SEND_SUCCESS);
1241 } else if (rv == ETM_XPORT_FILTER_ERROR) {
1242 fmd_hdl_debug(hdl, "send_filter error : %s", strerror(errno));
1243 INCRSTAT(Etm_stats.error_send_filter.fmds_value.ui64);
1244 /* Still send event */
1247 (void) pthread_mutex_unlock(&mp->epm_lock);
1249 (void) nvlist_size(msgnvl, &nvsize, NV_ENCODE_XDR);
1251 hdrlen = ETM_HDRLEN;
1252 buflen = nvsize + hdrlen;
1254 ALLOC_BUF(hdl, buf, buflen);
1256 nvbuf = buf + hdrlen;
1258 (void) etm_create_hdr(buf, mp->epm_ver, ETM_HDR_MSG, nvsize);
1260 if (rv = nvlist_pack(msgnvl, &nvbuf, &nvsize, NV_ENCODE_XDR, 0)) {
1261 (void) pthread_mutex_lock(&mp->epm_lock);
1262 mp->epm_txbusy--;
1263 (void) pthread_cond_broadcast(&mp->epm_tx_cv);
1264 (void) pthread_mutex_unlock(&mp->epm_lock);
1265 fmd_hdl_error(hdl, "Failed to pack event : %s\n", strerror(rv));
1266 nvlist_free(msgnvl);
1267 FREE_BUF(hdl, buf, buflen);
1268 return (FMD_SEND_FAILED);
1271 nvlist_free(msgnvl);
1273 if (etm_xport_write(hdl, mp->epm_oconn, Rw_timeout, buf,
1274 buflen) != buflen) {
1275 fmd_hdl_debug(hdl, "failed to send message to %s",
1276 mp->epm_ep_str);
1277 (void) pthread_mutex_lock(&mp->epm_lock);
1278 etm_suspend_reconnect(hdl, mp);
1279 mp->epm_txbusy--;
1280 (void) pthread_cond_broadcast(&mp->epm_tx_cv);
1281 (void) pthread_mutex_unlock(&mp->epm_lock);
1282 FREE_BUF(hdl, buf, buflen);
1283 INCRSTAT(Etm_stats.error_write.fmds_value.ui64);
1284 return (FMD_SEND_RETRY);
1287 INCRSTAT(Etm_stats.write_msg.fmds_value.ui64);
1288 ADDSTAT(Etm_stats.write_bytes.fmds_value.ui64, nvsize);
1290 etm_hex_dump(hdl, nvbuf, nvsize, 1);
1292 if (etm_xport_read(hdl, mp->epm_oconn, Rw_timeout, buf,
1293 hdrlen) != hdrlen) {
1294 fmd_hdl_debug(hdl, "failed to read ACK from %s",
1295 mp->epm_ep_str);
1296 (void) pthread_mutex_lock(&mp->epm_lock);
1297 etm_suspend_reconnect(hdl, mp);
1298 mp->epm_txbusy--;
1299 (void) pthread_cond_broadcast(&mp->epm_tx_cv);
1300 (void) pthread_mutex_unlock(&mp->epm_lock);
1301 FREE_BUF(hdl, buf, buflen);
1302 INCRSTAT(Etm_stats.error_read.fmds_value.ui64);
1303 return (FMD_SEND_RETRY);
1306 hdrstat = etm_check_hdr(hdl, mp, buf);
1307 FREE_BUF(hdl, buf, buflen);
1309 if (hdrstat == ETM_HDR_ACK) {
1310 INCRSTAT(Etm_stats.read_ack.fmds_value.ui64);
1311 } else {
1312 (void) pthread_mutex_lock(&mp->epm_lock);
1314 (void) etm_xport_close(hdl, mp->epm_oconn);
1315 mp->epm_oconn = NULL;
1317 if (hdrstat == ETM_HDR_NAK) {
1318 /* Peer received a bad value in the header */
1319 if (mp->epm_xprthdl != NULL) {
1320 mp->epm_cstat = C_LIMBO;
1321 fmd_xprt_suspend(hdl, xprthdl);
1322 mp->epm_qstat = Q_SUSPENDED;
1323 fmd_hdl_debug(hdl, "received NAK, queue "
1324 "suspended for %s", mp->epm_ep_str);
1327 rv = FMD_SEND_RETRY;
1329 } else if (hdrstat == ETM_HDR_S_RESTART) {
1330 /* Server has restarted */
1331 mp->epm_cstat = C_CLOSED;
1332 mp->epm_qstat = Q_UNINITIALIZED;
1333 fmd_hdl_debug(hdl, "server %s restarted",
1334 mp->epm_ep_str);
1336 * Cannot call fmd_xprt_close here, so we'll do it
1337 * on the timeout thread.
1339 if (mp->epm_timer_in_use == 0) {
1340 mp->epm_timer_id = fmd_timer_install(
1341 hdl, mp, NULL, 0);
1342 mp->epm_timer_in_use = 1;
1346 * fault.* or list.* events will be replayed if a
1347 * transport is opened with the same auth.
1348 * Other events will be discarded.
1350 rv = FMD_SEND_FAILED;
1352 } else {
1353 mp->epm_cstat = C_CLOSED;
1354 fmd_hdl_debug(hdl, "bad ACK from %s", mp->epm_ep_str);
1356 rv = FMD_SEND_RETRY;
1359 mp->epm_txbusy--;
1361 (void) pthread_cond_broadcast(&mp->epm_tx_cv);
1362 (void) pthread_mutex_unlock(&mp->epm_lock);
1364 INCRSTAT(Etm_stats.error_read_badhdr.fmds_value.ui64);
1366 return (rv);
1369 (void) pthread_mutex_lock(&mp->epm_lock);
1370 mp->epm_txbusy--;
1371 (void) pthread_cond_broadcast(&mp->epm_tx_cv);
1372 (void) pthread_mutex_unlock(&mp->epm_lock);
1374 return (FMD_SEND_SUCCESS);
1378 * FMD fmdo_timeout entry point..
1380 /*ARGSUSED*/
1381 static void
1382 etm_timeout(fmd_hdl_t *hdl, id_t id, void *data)
1384 etm_epmap_t *mp = (etm_epmap_t *)data;
1386 (void) pthread_mutex_lock(&mp->epm_lock);
1388 mp->epm_timer_in_use = 0;
1390 if (mp->epm_qstat == Q_UNINITIALIZED) {
1391 /* Server has shutdown and we (client) need to reconnect */
1392 if (mp->epm_xprthdl != NULL) {
1393 fmd_xprt_close(hdl, mp->epm_xprthdl);
1394 fmd_hdl_debug(hdl, "queue closed for %s",
1395 mp->epm_ep_str);
1396 mp->epm_xprthdl = NULL;
1397 /* mp->epm_ep_nvl is free'd in fmd_xprt_close */
1398 mp->epm_ep_nvl = NULL;
1401 if (mp->epm_ep_nvl == NULL)
1402 (void) etm_get_ep_nvl(hdl, mp);
1404 if (etm_handle_startup(hdl, mp)) {
1405 if (mp->epm_oconn != NULL) {
1406 (void) etm_xport_close(hdl, mp->epm_oconn);
1407 mp->epm_oconn = NULL;
1409 mp->epm_cstat = C_UNINITIALIZED;
1410 mp->epm_qstat = Q_UNINITIALIZED;
1411 mp->epm_timer_id = fmd_timer_install(hdl, mp, NULL,
1412 Reconn_interval);
1413 mp->epm_timer_in_use = 1;
1415 } else {
1416 etm_reconnect(hdl, mp);
1419 (void) pthread_mutex_unlock(&mp->epm_lock);
1423 * FMD Module declarations
1425 static const fmd_hdl_ops_t etm_ops = {
1426 NULL, /* fmdo_recv */
1427 etm_timeout, /* fmdo_timeout */
1428 NULL, /* fmdo_close */
1429 NULL, /* fmdo_stats */
1430 NULL, /* fmdo_gc */
1431 etm_send, /* fmdo_send */
1434 static const fmd_prop_t etm_props[] = {
1435 { "client_list", FMD_TYPE_STRING, NULL },
1436 { "server_list", FMD_TYPE_STRING, NULL },
1437 { "reconnect_interval", FMD_TYPE_UINT64, "10000000000" },
1438 { "reconnect_timeout", FMD_TYPE_UINT64, "300000000000" },
1439 { "rw_timeout", FMD_TYPE_UINT64, "2000000000" },
1440 { "filter_path", FMD_TYPE_STRING, NULL },
1441 { NULL, 0, NULL }
1444 static const fmd_hdl_info_t etm_info = {
1445 "Event Transport Module", "2.0", &etm_ops, etm_props
1449 * Initialize the transport for use by ETM.
1451 void
1452 _fmd_init(fmd_hdl_t *hdl)
1454 char *propstr;
1456 if (fmd_hdl_register(hdl, FMD_API_VERSION, &etm_info) != 0) {
1457 return; /* invalid data in configuration file */
1460 /* Create global stats */
1461 (void) fmd_stat_create(hdl, FMD_STAT_NOALLOC,
1462 sizeof (Etm_stats) / sizeof (fmd_stat_t), (fmd_stat_t *)&Etm_stats);
1464 /* Get module properties */
1465 Reconn_timeout = fmd_prop_get_int64(hdl, "reconnect_timeout");
1466 Reconn_interval = fmd_prop_get_int64(hdl, "reconnect_interval");
1467 Rw_timeout = fmd_prop_get_int64(hdl, "rw_timeout");
1469 propstr = fmd_prop_get_string(hdl, "client_list");
1470 etm_create_epmaps(hdl, propstr, ETM_SERVER_XPRT_FLAGS);
1471 fmd_prop_free_string(hdl, propstr);
1473 propstr = fmd_prop_get_string(hdl, "server_list");
1474 etm_create_epmaps(hdl, propstr, ETM_CLIENT_XPRT_FLAGS);
1475 fmd_prop_free_string(hdl, propstr);
1477 if (Etm_stats.peer_count.fmds_value.ui64 == 0) {
1478 fmd_hdl_debug(hdl, "Failed to init any endpoint\n");
1479 fmd_hdl_unregister(hdl);
1480 return;
1485 * Teardown the transport
1487 void
1488 _fmd_fini(fmd_hdl_t *hdl)
1490 etm_epmap_t *mp, *next;
1492 (void) pthread_mutex_lock(&Etm_mod_lock);
1493 Etm_exit = 1;
1494 (void) pthread_mutex_unlock(&Etm_mod_lock);
1496 mp = Epmap_head;
1498 while (mp) {
1499 next = mp->epm_next;
1500 etm_free_epmap(hdl, mp);
1501 mp = next;
1504 fmd_hdl_unregister(hdl);