2 This file is part of PulseAudio.
4 Copyright 2006 Lennart Poettering
6 PulseAudio is free software; you can redistribute it and/or modify
7 it under the terms of the GNU Lesser General Public License as
8 published by the Free Software Foundation; either version 2.1 of the
9 License, or (at your option) any later version.
11 PulseAudio is distributed in the hope that it will be useful, but
12 WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 Lesser General Public License for more details.
16 You should have received a copy of the GNU Lesser General Public
17 License along with PulseAudio; if not, write to the Free Software
18 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
29 #include <pulse/xmalloc.h>
31 #include <pulsecore/macro.h>
32 #include <pulsecore/log.h>
33 #include <pulsecore/semaphore.h>
34 #include <pulsecore/macro.h>
35 #include <pulsecore/mutex.h>
36 #include <pulsecore/flist.h>
38 #include "asyncmsgq.h"
40 PA_STATIC_FLIST_DECLARE(asyncmsgq
, 0, pa_xfree
);
41 PA_STATIC_FLIST_DECLARE(semaphores
, 0, (void(*)(void*)) pa_semaphore_free
);
43 struct asyncmsgq_item
{
50 pa_semaphore
*semaphore
;
57 pa_mutex
*mutex
; /* only for the writer side */
59 struct asyncmsgq_item
*current
;
62 pa_asyncmsgq
*pa_asyncmsgq_new(unsigned size
) {
65 a
= pa_xnew(pa_asyncmsgq
, 1);
68 pa_assert_se(a
->asyncq
= pa_asyncq_new(size
));
69 pa_assert_se(a
->mutex
= pa_mutex_new(FALSE
, TRUE
));
75 static void asyncmsgq_free(pa_asyncmsgq
*a
) {
76 struct asyncmsgq_item
*i
;
79 while ((i
= pa_asyncq_pop(a
->asyncq
, FALSE
))) {
81 pa_assert(!i
->semaphore
);
84 pa_msgobject_unref(i
->object
);
86 if (i
->memchunk
.memblock
)
87 pa_memblock_unref(i
->memchunk
.memblock
);
90 i
->free_cb(i
->userdata
);
92 if (pa_flist_push(PA_STATIC_FLIST_GET(asyncmsgq
), i
) < 0)
96 pa_asyncq_free(a
->asyncq
, NULL
);
97 pa_mutex_free(a
->mutex
);
101 pa_asyncmsgq
* pa_asyncmsgq_ref(pa_asyncmsgq
*q
) {
102 pa_assert(PA_REFCNT_VALUE(q
) > 0);
108 void pa_asyncmsgq_unref(pa_asyncmsgq
* q
) {
109 pa_assert(PA_REFCNT_VALUE(q
) > 0);
111 if (PA_REFCNT_DEC(q
) <= 0)
115 void pa_asyncmsgq_post(pa_asyncmsgq
*a
, pa_msgobject
*object
, int code
, const void *userdata
, int64_t offset
, const pa_memchunk
*chunk
, pa_free_cb_t free_cb
) {
116 struct asyncmsgq_item
*i
;
117 pa_assert(PA_REFCNT_VALUE(a
) > 0);
119 if (!(i
= pa_flist_pop(PA_STATIC_FLIST_GET(asyncmsgq
))))
120 i
= pa_xnew(struct asyncmsgq_item
, 1);
123 i
->object
= object
? pa_msgobject_ref(object
) : NULL
;
124 i
->userdata
= (void*) userdata
;
125 i
->free_cb
= free_cb
;
128 pa_assert(chunk
->memblock
);
129 i
->memchunk
= *chunk
;
130 pa_memblock_ref(i
->memchunk
.memblock
);
132 pa_memchunk_reset(&i
->memchunk
);
135 /* This mutex makes the queue multiple-writer safe. This lock is only used on the writing side */
136 pa_mutex_lock(a
->mutex
);
137 pa_asyncq_post(a
->asyncq
, i
);
138 pa_mutex_unlock(a
->mutex
);
141 int pa_asyncmsgq_send(pa_asyncmsgq
*a
, pa_msgobject
*object
, int code
, const void *userdata
, int64_t offset
, const pa_memchunk
*chunk
) {
142 struct asyncmsgq_item i
;
143 pa_assert(PA_REFCNT_VALUE(a
) > 0);
147 i
.userdata
= (void*) userdata
;
152 pa_assert(chunk
->memblock
);
155 pa_memchunk_reset(&i
.memchunk
);
157 if (!(i
.semaphore
= pa_flist_pop(PA_STATIC_FLIST_GET(semaphores
))))
158 i
.semaphore
= pa_semaphore_new(0);
160 pa_assert_se(i
.semaphore
);
162 /* This mutex makes the queue multiple-writer safe. This lock is only used on the writing side */
163 pa_mutex_lock(a
->mutex
);
164 pa_assert_se(pa_asyncq_push(a
->asyncq
, &i
, TRUE
) == 0);
165 pa_mutex_unlock(a
->mutex
);
167 pa_semaphore_wait(i
.semaphore
);
169 if (pa_flist_push(PA_STATIC_FLIST_GET(semaphores
), i
.semaphore
) < 0)
170 pa_semaphore_free(i
.semaphore
);
175 int pa_asyncmsgq_get(pa_asyncmsgq
*a
, pa_msgobject
**object
, int *code
, void **userdata
, int64_t *offset
, pa_memchunk
*chunk
, pa_bool_t wait_op
) {
176 pa_assert(PA_REFCNT_VALUE(a
) > 0);
177 pa_assert(!a
->current
);
179 if (!(a
->current
= pa_asyncq_pop(a
->asyncq
, wait_op
))) {
180 /* pa_log("failure"); */
184 /* pa_log("success"); */
187 *code
= a
->current
->code
;
189 *userdata
= a
->current
->userdata
;
191 *offset
= a
->current
->offset
;
193 if ((*object
= a
->current
->object
))
194 pa_msgobject_assert_ref(*object
);
197 *chunk
= a
->current
->memchunk
;
199 /* pa_log_debug("Get q=%p object=%p (%s) code=%i data=%p chunk.length=%lu", */
201 /* (void*) a->current->object, */
202 /* a->current->object ? a->current->object->parent.type_name : NULL, */
203 /* a->current->code, */
204 /* (void*) a->current->userdata, */
205 /* (unsigned long) a->current->memchunk.length); */
210 void pa_asyncmsgq_done(pa_asyncmsgq
*a
, int ret
) {
211 pa_assert(PA_REFCNT_VALUE(a
) > 0);
213 pa_assert(a
->current
);
215 if (a
->current
->semaphore
) {
216 a
->current
->ret
= ret
;
217 pa_semaphore_post(a
->current
->semaphore
);
220 if (a
->current
->free_cb
)
221 a
->current
->free_cb(a
->current
->userdata
);
223 if (a
->current
->object
)
224 pa_msgobject_unref(a
->current
->object
);
226 if (a
->current
->memchunk
.memblock
)
227 pa_memblock_unref(a
->current
->memchunk
.memblock
);
229 if (pa_flist_push(PA_STATIC_FLIST_GET(asyncmsgq
), a
->current
) < 0)
230 pa_xfree(a
->current
);
236 int pa_asyncmsgq_wait_for(pa_asyncmsgq
*a
, int code
) {
238 pa_assert(PA_REFCNT_VALUE(a
) > 0);
249 if (pa_asyncmsgq_get(a
, &o
, &c
, &data
, &offset
, &chunk
, TRUE
) < 0)
252 ret
= pa_asyncmsgq_dispatch(o
, c
, data
, offset
, &chunk
);
253 pa_asyncmsgq_done(a
, ret
);
257 pa_asyncmsgq_unref(a
);
262 int pa_asyncmsgq_process_one(pa_asyncmsgq
*a
) {
263 pa_msgobject
*object
;
270 pa_assert(PA_REFCNT_VALUE(a
) > 0);
272 if (pa_asyncmsgq_get(a
, &object
, &code
, &data
, &offset
, &chunk
, FALSE
) < 0)
276 ret
= pa_asyncmsgq_dispatch(object
, code
, data
, offset
, &chunk
);
277 pa_asyncmsgq_done(a
, ret
);
278 pa_asyncmsgq_unref(a
);
283 int pa_asyncmsgq_read_fd(pa_asyncmsgq
*a
) {
284 pa_assert(PA_REFCNT_VALUE(a
) > 0);
286 return pa_asyncq_read_fd(a
->asyncq
);
289 int pa_asyncmsgq_read_before_poll(pa_asyncmsgq
*a
) {
290 pa_assert(PA_REFCNT_VALUE(a
) > 0);
292 return pa_asyncq_read_before_poll(a
->asyncq
);
295 void pa_asyncmsgq_read_after_poll(pa_asyncmsgq
*a
) {
296 pa_assert(PA_REFCNT_VALUE(a
) > 0);
298 pa_asyncq_read_after_poll(a
->asyncq
);
301 int pa_asyncmsgq_write_fd(pa_asyncmsgq
*a
) {
302 pa_assert(PA_REFCNT_VALUE(a
) > 0);
304 return pa_asyncq_write_fd(a
->asyncq
);
307 void pa_asyncmsgq_write_before_poll(pa_asyncmsgq
*a
) {
308 pa_assert(PA_REFCNT_VALUE(a
) > 0);
310 pa_asyncq_write_before_poll(a
->asyncq
);
313 void pa_asyncmsgq_write_after_poll(pa_asyncmsgq
*a
) {
314 pa_assert(PA_REFCNT_VALUE(a
) > 0);
316 pa_asyncq_write_after_poll(a
->asyncq
);
319 int pa_asyncmsgq_dispatch(pa_msgobject
*object
, int code
, void *userdata
, int64_t offset
, pa_memchunk
*memchunk
) {
322 return object
->process_msg(object
, code
, userdata
, offset
, pa_memchunk_isset(memchunk
) ? memchunk
: NULL
);
327 void pa_asyncmsgq_flush(pa_asyncmsgq
*a
, pa_bool_t run
) {
328 pa_assert(PA_REFCNT_VALUE(a
) > 0);
331 pa_msgobject
*object
;
338 if (pa_asyncmsgq_get(a
, &object
, &code
, &data
, &offset
, &chunk
, FALSE
) < 0)
342 pa_asyncmsgq_done(a
, -1);
347 ret
= pa_asyncmsgq_dispatch(object
, code
, data
, offset
, &chunk
);
348 pa_asyncmsgq_done(a
, ret
);
349 pa_asyncmsgq_unref(a
);
353 pa_bool_t
pa_asyncmsgq_dispatching(pa_asyncmsgq
*a
) {
354 pa_assert(PA_REFCNT_VALUE(a
) > 0);