4 * The contents of this file are subject to the terms of the
5 * Common Development and Distribution License (the "License").
6 * You may not use this file except in compliance with the License.
8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9 * or http://www.opensolaris.org/os/licensing.
10 * See the License for the specific language governing permissions
11 * and limitations under the License.
13 * When distributing Covered Code, include this CDDL HEADER in each
14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15 * If applicable, add the following below this CDDL HEADER, with the
16 * fields enclosed by brackets "[]" replaced with your own identifying
17 * information: Portions Copyright [yyyy] [name of copyright owner]
23 * Copyright (c) 2004, 2010, Oracle and/or its affiliates. All rights reserved.
26 #include <fmd_alloc.h>
27 #include <fmd_eventq.h>
28 #include <fmd_module.h>
29 #include <fmd_dispq.h>
35 fmd_eventq_create(fmd_module_t
*mp
, fmd_eventqstat_t
*stats
,
36 pthread_mutex_t
*stats_lock
, uint_t limit
)
38 fmd_eventq_t
*eq
= fmd_zalloc(sizeof (fmd_eventq_t
), FMD_SLEEP
);
40 (void) pthread_mutex_init(&eq
->eq_lock
, NULL
);
41 (void) pthread_cond_init(&eq
->eq_cv
, NULL
);
45 eq
->eq_stats_lock
= stats_lock
;
47 eq
->eq_sgid
= fmd_dispq_getgid(fmd
.d_disp
, eq
);
53 fmd_eventq_destroy(fmd_eventq_t
*eq
)
55 fmd_eventqelem_t
*eqe
;
57 while ((eqe
= fmd_list_next(&eq
->eq_list
)) != NULL
) {
58 fmd_list_delete(&eq
->eq_list
, eqe
);
59 fmd_event_rele(eqe
->eqe_event
);
60 fmd_free(eqe
, sizeof (fmd_eventqelem_t
));
63 fmd_dispq_delgid(fmd
.d_disp
, eq
->eq_sgid
);
64 fmd_free(eq
, sizeof (fmd_eventq_t
));
68 fmd_eventq_drop(fmd_eventq_t
*eq
, fmd_eventqelem_t
*eqe
)
70 (void) pthread_mutex_lock(eq
->eq_stats_lock
);
71 eq
->eq_stats
->eqs_dropped
.fmds_value
.ui64
++;
72 (void) pthread_mutex_unlock(eq
->eq_stats_lock
);
74 fmd_event_rele(eqe
->eqe_event
);
75 fmd_free(eqe
, sizeof (fmd_eventqelem_t
));
79 fmd_eventq_drop_topo(fmd_eventq_t
*eq
)
81 fmd_eventqelem_t
*eqe
, *tmp
;
82 boolean_t got_fm_events
= B_FALSE
;
85 * Here we iterate through the per-module event queue in order to remove
86 * redundant FMD_EVT_TOPO events. The trick is to not drop a given
87 * topo event if there are any FM protocol events in the queue after
88 * it, as those events need to be processed with the correct topology.
90 (void) pthread_mutex_lock(&eq
->eq_lock
);
91 eqe
= fmd_list_prev(&eq
->eq_list
);
93 if (FMD_EVENT_TYPE(eqe
->eqe_event
) == FMD_EVT_TOPO
) {
96 eqe
= fmd_list_prev(eqe
);
97 fmd_list_delete(&eq
->eq_list
, tmp
);
99 fmd_eventq_drop(eq
, tmp
);
101 got_fm_events
= B_FALSE
;
102 eqe
= fmd_list_prev(eqe
);
104 } else if (FMD_EVENT_TYPE(eqe
->eqe_event
) == FMD_EVT_PROTOCOL
) {
105 got_fm_events
= B_TRUE
;
106 eqe
= fmd_list_prev(eqe
);
108 eqe
= fmd_list_prev(eqe
);
110 (void) pthread_mutex_unlock(&eq
->eq_lock
);
114 * Update statistics when an event is dispatched and placed on a module's event
115 * queue. This is essentially the same code as kstat_waitq_enter(9F).
118 fmd_eventqstat_dispatch(fmd_eventq_t
*eq
)
120 fmd_eventqstat_t
*eqs
= eq
->eq_stats
;
124 (void) pthread_mutex_lock(eq
->eq_stats_lock
);
127 delta
= new - eqs
->eqs_wlastupdate
.fmds_value
.ui64
;
128 eqs
->eqs_wlastupdate
.fmds_value
.ui64
= new;
129 wcnt
= eqs
->eqs_wcnt
.fmds_value
.ui32
++;
132 eqs
->eqs_wlentime
.fmds_value
.ui64
+= delta
* wcnt
;
133 eqs
->eqs_wtime
.fmds_value
.ui64
+= delta
;
136 eqs
->eqs_dispatched
.fmds_value
.ui64
++;
137 (void) pthread_mutex_unlock(eq
->eq_stats_lock
);
141 fmd_eventq_insert_at_head(fmd_eventq_t
*eq
, fmd_event_t
*ep
)
143 uint_t evt
= FMD_EVENT_TYPE(ep
);
144 fmd_eventqelem_t
*eqe
;
148 * If this event queue is acting as /dev/null, bounce the reference
149 * count to free an unreferenced event and just return immediately.
151 if (eq
->eq_limit
== 0) {
157 eqe
= fmd_alloc(sizeof (fmd_eventqelem_t
), FMD_SLEEP
);
161 (void) pthread_mutex_lock(&eq
->eq_lock
);
163 if ((ok
= eq
->eq_size
< eq
->eq_limit
|| evt
!= FMD_EVT_PROTOCOL
) != 0) {
164 if (evt
!= FMD_EVT_CTL
)
165 fmd_eventqstat_dispatch(eq
);
167 fmd_list_prepend(&eq
->eq_list
, eqe
);
171 (void) pthread_cond_broadcast(&eq
->eq_cv
);
172 (void) pthread_mutex_unlock(&eq
->eq_lock
);
175 fmd_eventq_drop(eq
, eqe
);
179 fmd_eventq_insert_at_time(fmd_eventq_t
*eq
, fmd_event_t
*ep
)
181 uint_t evt
= FMD_EVENT_TYPE(ep
);
182 hrtime_t hrt
= fmd_event_hrtime(ep
);
183 fmd_eventqelem_t
*eqe
, *oqe
;
187 * If this event queue is acting as /dev/null, bounce the reference
188 * count to free an unreferenced event and just return immediately.
190 if (eq
->eq_limit
== 0) {
196 eqe
= fmd_alloc(sizeof (fmd_eventqelem_t
), FMD_SLEEP
);
200 (void) pthread_mutex_lock(&eq
->eq_lock
);
203 * fmd makes no guarantees that events will be delivered in time order
204 * because its transport can make no such guarantees. Instead we make
205 * a looser guarantee that an enqueued event will be dequeued before
206 * any newer *pending* events according to event time. This permits us
207 * to state, for example, that a timer expiry event will be delivered
208 * prior to any enqueued event whose time is after the timer expired.
209 * We use a simple insertion sort for this task, as queue lengths are
210 * typically short and events do *tend* to be received chronologically.
212 for (oqe
= fmd_list_prev(&eq
->eq_list
); oqe
; oqe
= fmd_list_prev(oqe
)) {
213 if (hrt
>= fmd_event_hrtime(oqe
->eqe_event
))
214 break; /* 'ep' is newer than the event in 'oqe' */
217 if ((ok
= eq
->eq_size
< eq
->eq_limit
|| evt
!= FMD_EVT_PROTOCOL
) != 0) {
218 if (evt
!= FMD_EVT_CTL
)
219 fmd_eventqstat_dispatch(eq
);
222 fmd_list_prepend(&eq
->eq_list
, eqe
);
224 fmd_list_insert_after(&eq
->eq_list
, oqe
, eqe
);
228 (void) pthread_cond_broadcast(&eq
->eq_cv
);
229 (void) pthread_mutex_unlock(&eq
->eq_lock
);
232 fmd_eventq_drop(eq
, eqe
);
236 fmd_eventq_delete(fmd_eventq_t
*eq
)
238 fmd_eventqstat_t
*eqs
= eq
->eq_stats
;
242 fmd_eventqelem_t
*eqe
;
245 (void) pthread_mutex_lock(&eq
->eq_lock
);
247 while (!(eq
->eq_flags
& FMD_EVENTQ_ABORT
) &&
248 (eq
->eq_size
== 0 || (eq
->eq_flags
& FMD_EVENTQ_SUSPEND
)))
249 (void) pthread_cond_wait(&eq
->eq_cv
, &eq
->eq_lock
);
251 if (eq
->eq_flags
& FMD_EVENTQ_ABORT
) {
252 (void) pthread_mutex_unlock(&eq
->eq_lock
);
256 eqe
= fmd_list_next(&eq
->eq_list
);
257 fmd_list_delete(&eq
->eq_list
, eqe
);
260 (void) pthread_mutex_unlock(&eq
->eq_lock
);
263 fmd_free(eqe
, sizeof (fmd_eventqelem_t
));
266 * If we dequeued a control event, release it and go back to sleep.
267 * fmd_event_rele() on the event will block as described in fmd_ctl.c.
268 * This effectively renders control events invisible to our callers
269 * as well as to statistics and observability tools (e.g. fmstat(1M)).
271 if (FMD_EVENT_TYPE(ep
) == FMD_EVT_CTL
) {
277 * Before returning, update our statistics. This code is essentially
278 * kstat_waitq_to_runq(9F), except simplified because our queues are
279 * always consumed by a single thread (i.e. runq len == 1).
281 (void) pthread_mutex_lock(eq
->eq_stats_lock
);
284 delta
= new - eqs
->eqs_wlastupdate
.fmds_value
.ui64
;
286 eqs
->eqs_wlastupdate
.fmds_value
.ui64
= new;
287 eqs
->eqs_dlastupdate
.fmds_value
.ui64
= new;
289 ASSERT(eqs
->eqs_wcnt
.fmds_value
.ui32
!= 0);
290 wcnt
= eqs
->eqs_wcnt
.fmds_value
.ui32
--;
292 eqs
->eqs_wlentime
.fmds_value
.ui64
+= delta
* wcnt
;
293 eqs
->eqs_wtime
.fmds_value
.ui64
+= delta
;
295 if (FMD_EVENT_TYPE(ep
) == FMD_EVT_PROTOCOL
)
296 eqs
->eqs_prdequeued
.fmds_value
.ui64
++;
298 eqs
->eqs_dequeued
.fmds_value
.ui64
++;
299 (void) pthread_mutex_unlock(eq
->eq_stats_lock
);
305 * Update statistics when an event is done being processed by the eventq's
306 * consumer thread. This is essentially kstat_runq_exit(9F) simplified for
307 * our principle that a single thread consumes the queue (i.e. runq len == 1).
310 fmd_eventq_done(fmd_eventq_t
*eq
)
312 fmd_eventqstat_t
*eqs
= eq
->eq_stats
;
315 (void) pthread_mutex_lock(eq
->eq_stats_lock
);
318 delta
= new - eqs
->eqs_dlastupdate
.fmds_value
.ui64
;
320 eqs
->eqs_dlastupdate
.fmds_value
.ui64
= new;
321 eqs
->eqs_dtime
.fmds_value
.ui64
+= delta
;
323 (void) pthread_mutex_unlock(eq
->eq_stats_lock
);
327 fmd_eventq_cancel(fmd_eventq_t
*eq
, uint_t type
, void *data
)
329 fmd_eventqelem_t
*eqe
, *nqe
;
331 (void) pthread_mutex_lock(&eq
->eq_lock
);
333 for (eqe
= fmd_list_next(&eq
->eq_list
); eqe
!= NULL
; eqe
= nqe
) {
334 nqe
= fmd_list_next(eqe
);
336 if (fmd_event_match(eqe
->eqe_event
, type
, data
)) {
337 fmd_list_delete(&eq
->eq_list
, eqe
);
339 fmd_event_rele(eqe
->eqe_event
);
340 fmd_free(eqe
, sizeof (fmd_eventqelem_t
));
344 (void) pthread_mutex_unlock(&eq
->eq_lock
);
348 fmd_eventq_suspend(fmd_eventq_t
*eq
)
350 (void) pthread_mutex_lock(&eq
->eq_lock
);
351 eq
->eq_flags
|= FMD_EVENTQ_SUSPEND
;
352 (void) pthread_mutex_unlock(&eq
->eq_lock
);
356 fmd_eventq_resume(fmd_eventq_t
*eq
)
358 (void) pthread_mutex_lock(&eq
->eq_lock
);
359 eq
->eq_flags
&= ~FMD_EVENTQ_SUSPEND
;
360 (void) pthread_cond_broadcast(&eq
->eq_cv
);
361 (void) pthread_mutex_unlock(&eq
->eq_lock
);
365 fmd_eventq_abort(fmd_eventq_t
*eq
)
367 fmd_eventqelem_t
*eqe
;
369 (void) pthread_mutex_lock(&eq
->eq_lock
);
371 while ((eqe
= fmd_list_next(&eq
->eq_list
)) != NULL
) {
372 fmd_list_delete(&eq
->eq_list
, eqe
);
373 fmd_event_rele(eqe
->eqe_event
);
374 fmd_free(eqe
, sizeof (fmd_eventqelem_t
));
377 eq
->eq_flags
|= FMD_EVENTQ_ABORT
;
378 (void) pthread_cond_broadcast(&eq
->eq_cv
);
379 (void) pthread_mutex_unlock(&eq
->eq_lock
);