4 * The contents of this file are subject to the terms of the
5 * Common Development and Distribution License, Version 1.0 only
6 * (the "License"). You may not use this file except in compliance
9 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
10 * or http://www.opensolaris.org/os/licensing.
11 * See the License for the specific language governing permissions
12 * and limitations under the License.
14 * When distributing Covered Code, include this CDDL HEADER in each
15 * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
16 * If applicable, add the following below this CDDL HEADER, with the
17 * fields enclosed by brackets "[]" replaced with your own identifying
18 * information: Portions Copyright [yyyy] [name of copyright owner]
23 * Copyright 2004 Sun Microsystems, Inc. All rights reserved.
24 * Use is subject to license terms.
27 #pragma ident "%Z%%M% %I% %E% SMI"
30 * A synchronized FIFO queue for inter-thread producer-consumer semantics.
31 * This queue will handle multiple writers and readers simultaneously.
33 * The following operations are provided:
34 * slp_new_queue: create a new queue
35 * slp_enqueue: place a message at the end of the queue
36 * slp_enqueue_at_head: place a message the the start of the queue
37 * slp_dequeue: remove and return the next message on the queue
38 * (waits indefinately)
39 * slp_dequeue_timed: remove and return the next message on the queue
40 * (waits only for a specified time)
41 * slp_flush_queue: flushes and frees all messages on a queue
42 * slp_destroy_queue: frees an empty queue.
51 #include <slp-internal.h>
53 /* Private implementation details */
56 struct queue_entry
*next
;
58 typedef struct queue_entry slp_queue_entry_t
;
61 slp_queue_entry_t
*head
;
62 slp_queue_entry_t
*tail
;
69 * Creates, initializes, and returns a new queue.
70 * If an initialization error occured, returns NULL and sets err to
71 * the appropriate SLP error code.
72 * queues can operate in one of two modes: timed-wait, and infinite
73 * wait. The timeout parameter specifies which of these modes should
74 * be enabled for the new queue.
76 slp_queue_t
*slp_new_queue(SLPError
*err
) {
83 /* initialize new mutex and semaphore */
84 if ((lock
= calloc(1, sizeof (*lock
))) == NULL
) {
85 *err
= SLP_MEMORY_ALLOC_FAILED
;
86 slp_err(LOG_CRIT
, 0, "slp_new_queue", "out of memory");
90 /* intialize condition vars */
91 if (!(wait
= calloc(1, sizeof (*wait
)))) {
92 *err
= SLP_MEMORY_ALLOC_FAILED
;
93 slp_err(LOG_CRIT
, 0, "slp_new_queue", "out of memory");
96 (void) cond_init(wait
, NULL
, NULL
);
98 /* create the queue */
99 if ((q
= malloc(sizeof (*q
))) == NULL
) {
100 *err
= SLP_MEMORY_ALLOC_FAILED
;
101 slp_err(LOG_CRIT
, 0, "slp_new_queue", "out of memory");
114 * Adds msg to the tail of queue q.
115 * Returns an SLP error code: SLP_OK for no error, or SLP_MEMORY_ALLOC_FAILED
116 * if it couldn't allocate memory.
118 SLPError
slp_enqueue(slp_queue_t
*qa
, void *msg
) {
119 slp_queue_entry_t
*qe
;
120 struct queue
*q
= qa
;
122 if ((qe
= malloc(sizeof (*qe
))) == NULL
) {
123 slp_err(LOG_CRIT
, 0, "slp_enqueue", "out of memory");
124 return (SLP_MEMORY_ALLOC_FAILED
);
127 (void) mutex_lock(q
->lock
);
130 if (q
->head
!= NULL
) { /* queue is not emptry */
133 } else { /* queue is empty */
134 q
->head
= q
->tail
= qe
;
137 (void) mutex_unlock(q
->lock
);
138 (void) cond_signal(q
->wait
);
144 * Inserts a message at the head of the queue. This is useful for inserting
145 * things like cancel messages.
147 SLPError
slp_enqueue_at_head(slp_queue_t
*qa
, void *msg
) {
148 slp_queue_entry_t
*qe
;
149 struct queue
*q
= qa
;
151 if ((qe
= malloc(sizeof (*qe
))) == NULL
) {
152 slp_err(LOG_CRIT
, 0, "slp_enqueue", "out of memory");
153 return (SLP_MEMORY_ALLOC_FAILED
);
156 (void) mutex_lock(q
->lock
);
162 (void) mutex_unlock(q
->lock
);
163 (void) cond_signal(q
->wait
);
169 * The core functionality for dequeue.
171 static void *dequeue_nolock(struct queue
*q
) {
173 slp_queue_entry_t
*qe
= q
->head
;
176 return (NULL
); /* shouldn't get here */
178 if (!qe
->next
) /* last one in queue */
179 q
->head
= q
->tail
= NULL
;
188 * Returns the first message waiting or arriving in the queue, or if no
189 * message is available after waiting the amount of time specified in
190 * 'to', returns NULL, and sets 'etimed' to true. If an error occured,
191 * returns NULL and sets 'etimed' to false.
193 void *slp_dequeue_timed(slp_queue_t
*qa
, timestruc_t
*to
, SLPBoolean
*etimed
) {
196 struct queue
*q
= qa
;
201 (void) mutex_lock(q
->lock
);
203 /* something's in the q, so no need to wait */
208 while (q
->count
== 0) {
210 err
= cond_timedwait(q
->wait
, q
->lock
, to
);
212 err
= cond_wait(q
->wait
, q
->lock
);
215 (void) mutex_unlock(q
->lock
);
222 ans
= dequeue_nolock(q
);
223 (void) mutex_unlock(q
->lock
);
228 * Removes the first message from the queue and returns it.
229 * Returns NULL only on internal error.
231 void *slp_dequeue(slp_queue_t
*qa
) {
232 return (slp_dequeue_timed(qa
, NULL
, NULL
));
236 * Flushes the queue, using the caller-specified free function to
237 * free each message in the queue.
239 void slp_flush_queue(slp_queue_t
*qa
, void (*free_f
)(void *)) {
240 slp_queue_entry_t
*p
, *pn
;
241 struct queue
*q
= qa
;
243 for (p
= q
->head
; p
; p
= pn
) {
251 * The queue must be empty before it can be destroyed; slp_flush_queue
252 * can be used to empty a queue.
254 void slp_destroy_queue(slp_queue_t
*qa
) {
255 struct queue
*q
= qa
;
257 (void) mutex_destroy(q
->lock
);
258 (void) cond_destroy(q
->wait
);