default: esd is obsolete, hence don't load it anymore by default
[pulseaudio-mirror.git] / src / pulsecore / asyncmsgq.c
blob408416c918045e3a525a29cba982816a692c906f
1 /***
2 This file is part of PulseAudio.
4 Copyright 2006 Lennart Poettering
6 PulseAudio is free software; you can redistribute it and/or modify
7 it under the terms of the GNU Lesser General Public License as
8 published by the Free Software Foundation; either version 2.1 of the
9 License, or (at your option) any later version.
11 PulseAudio is distributed in the hope that it will be useful, but
12 WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 Lesser General Public License for more details.
16 You should have received a copy of the GNU Lesser General Public
17 License along with PulseAudio; if not, write to the Free Software
18 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
19 USA.
20 ***/
22 #ifdef HAVE_CONFIG_H
23 #include <config.h>
24 #endif
26 #include <unistd.h>
27 #include <errno.h>
29 #include <pulse/xmalloc.h>
31 #include <pulsecore/macro.h>
32 #include <pulsecore/log.h>
33 #include <pulsecore/semaphore.h>
34 #include <pulsecore/macro.h>
35 #include <pulsecore/mutex.h>
36 #include <pulsecore/flist.h>
38 #include "asyncmsgq.h"
40 PA_STATIC_FLIST_DECLARE(asyncmsgq, 0, pa_xfree);
41 PA_STATIC_FLIST_DECLARE(semaphores, 0, (void(*)(void*)) pa_semaphore_free);
43 struct asyncmsgq_item {
44 int code;
45 pa_msgobject *object;
46 void *userdata;
47 pa_free_cb_t free_cb;
48 int64_t offset;
49 pa_memchunk memchunk;
50 pa_semaphore *semaphore;
51 int ret;
54 struct pa_asyncmsgq {
55 PA_REFCNT_DECLARE;
56 pa_asyncq *asyncq;
57 pa_mutex *mutex; /* only for the writer side */
59 struct asyncmsgq_item *current;
62 pa_asyncmsgq *pa_asyncmsgq_new(unsigned size) {
63 pa_asyncmsgq *a;
65 a = pa_xnew(pa_asyncmsgq, 1);
67 PA_REFCNT_INIT(a);
68 pa_assert_se(a->asyncq = pa_asyncq_new(size));
69 pa_assert_se(a->mutex = pa_mutex_new(FALSE, TRUE));
70 a->current = NULL;
72 return a;
75 static void asyncmsgq_free(pa_asyncmsgq *a) {
76 struct asyncmsgq_item *i;
77 pa_assert(a);
79 while ((i = pa_asyncq_pop(a->asyncq, FALSE))) {
81 pa_assert(!i->semaphore);
83 if (i->object)
84 pa_msgobject_unref(i->object);
86 if (i->memchunk.memblock)
87 pa_memblock_unref(i->memchunk.memblock);
89 if (i->free_cb)
90 i->free_cb(i->userdata);
92 if (pa_flist_push(PA_STATIC_FLIST_GET(asyncmsgq), i) < 0)
93 pa_xfree(i);
96 pa_asyncq_free(a->asyncq, NULL);
97 pa_mutex_free(a->mutex);
98 pa_xfree(a);
101 pa_asyncmsgq* pa_asyncmsgq_ref(pa_asyncmsgq *q) {
102 pa_assert(PA_REFCNT_VALUE(q) > 0);
104 PA_REFCNT_INC(q);
105 return q;
108 void pa_asyncmsgq_unref(pa_asyncmsgq* q) {
109 pa_assert(PA_REFCNT_VALUE(q) > 0);
111 if (PA_REFCNT_DEC(q) <= 0)
112 asyncmsgq_free(q);
115 void pa_asyncmsgq_post(pa_asyncmsgq *a, pa_msgobject *object, int code, const void *userdata, int64_t offset, const pa_memchunk *chunk, pa_free_cb_t free_cb) {
116 struct asyncmsgq_item *i;
117 pa_assert(PA_REFCNT_VALUE(a) > 0);
119 if (!(i = pa_flist_pop(PA_STATIC_FLIST_GET(asyncmsgq))))
120 i = pa_xnew(struct asyncmsgq_item, 1);
122 i->code = code;
123 i->object = object ? pa_msgobject_ref(object) : NULL;
124 i->userdata = (void*) userdata;
125 i->free_cb = free_cb;
126 i->offset = offset;
127 if (chunk) {
128 pa_assert(chunk->memblock);
129 i->memchunk = *chunk;
130 pa_memblock_ref(i->memchunk.memblock);
131 } else
132 pa_memchunk_reset(&i->memchunk);
133 i->semaphore = NULL;
135 /* This mutex makes the queue multiple-writer safe. This lock is only used on the writing side */
136 pa_mutex_lock(a->mutex);
137 pa_asyncq_post(a->asyncq, i);
138 pa_mutex_unlock(a->mutex);
141 int pa_asyncmsgq_send(pa_asyncmsgq *a, pa_msgobject *object, int code, const void *userdata, int64_t offset, const pa_memchunk *chunk) {
142 struct asyncmsgq_item i;
143 pa_assert(PA_REFCNT_VALUE(a) > 0);
145 i.code = code;
146 i.object = object;
147 i.userdata = (void*) userdata;
148 i.free_cb = NULL;
149 i.ret = -1;
150 i.offset = offset;
151 if (chunk) {
152 pa_assert(chunk->memblock);
153 i.memchunk = *chunk;
154 } else
155 pa_memchunk_reset(&i.memchunk);
157 if (!(i.semaphore = pa_flist_pop(PA_STATIC_FLIST_GET(semaphores))))
158 i.semaphore = pa_semaphore_new(0);
160 pa_assert_se(i.semaphore);
162 /* This mutex makes the queue multiple-writer safe. This lock is only used on the writing side */
163 pa_mutex_lock(a->mutex);
164 pa_assert_se(pa_asyncq_push(a->asyncq, &i, TRUE) == 0);
165 pa_mutex_unlock(a->mutex);
167 pa_semaphore_wait(i.semaphore);
169 if (pa_flist_push(PA_STATIC_FLIST_GET(semaphores), i.semaphore) < 0)
170 pa_semaphore_free(i.semaphore);
172 return i.ret;
175 int pa_asyncmsgq_get(pa_asyncmsgq *a, pa_msgobject **object, int *code, void **userdata, int64_t *offset, pa_memchunk *chunk, pa_bool_t wait_op) {
176 pa_assert(PA_REFCNT_VALUE(a) > 0);
177 pa_assert(!a->current);
179 if (!(a->current = pa_asyncq_pop(a->asyncq, wait_op))) {
180 /* pa_log("failure"); */
181 return -1;
184 /* pa_log("success"); */
186 if (code)
187 *code = a->current->code;
188 if (userdata)
189 *userdata = a->current->userdata;
190 if (offset)
191 *offset = a->current->offset;
192 if (object) {
193 if ((*object = a->current->object))
194 pa_msgobject_assert_ref(*object);
196 if (chunk)
197 *chunk = a->current->memchunk;
199 /* pa_log_debug("Get q=%p object=%p (%s) code=%i data=%p chunk.length=%lu", */
200 /* (void*) a, */
201 /* (void*) a->current->object, */
202 /* a->current->object ? a->current->object->parent.type_name : NULL, */
203 /* a->current->code, */
204 /* (void*) a->current->userdata, */
205 /* (unsigned long) a->current->memchunk.length); */
207 return 0;
210 void pa_asyncmsgq_done(pa_asyncmsgq *a, int ret) {
211 pa_assert(PA_REFCNT_VALUE(a) > 0);
212 pa_assert(a);
213 pa_assert(a->current);
215 if (a->current->semaphore) {
216 a->current->ret = ret;
217 pa_semaphore_post(a->current->semaphore);
218 } else {
220 if (a->current->free_cb)
221 a->current->free_cb(a->current->userdata);
223 if (a->current->object)
224 pa_msgobject_unref(a->current->object);
226 if (a->current->memchunk.memblock)
227 pa_memblock_unref(a->current->memchunk.memblock);
229 if (pa_flist_push(PA_STATIC_FLIST_GET(asyncmsgq), a->current) < 0)
230 pa_xfree(a->current);
233 a->current = NULL;
236 int pa_asyncmsgq_wait_for(pa_asyncmsgq *a, int code) {
237 int c;
238 pa_assert(PA_REFCNT_VALUE(a) > 0);
240 pa_asyncmsgq_ref(a);
242 do {
243 pa_msgobject *o;
244 void *data;
245 int64_t offset;
246 pa_memchunk chunk;
247 int ret;
249 if (pa_asyncmsgq_get(a, &o, &c, &data, &offset, &chunk, TRUE) < 0)
250 return -1;
252 ret = pa_asyncmsgq_dispatch(o, c, data, offset, &chunk);
253 pa_asyncmsgq_done(a, ret);
255 } while (c != code);
257 pa_asyncmsgq_unref(a);
259 return 0;
262 int pa_asyncmsgq_process_one(pa_asyncmsgq *a) {
263 pa_msgobject *object;
264 int code;
265 void *data;
266 pa_memchunk chunk;
267 int64_t offset;
268 int ret;
270 pa_assert(PA_REFCNT_VALUE(a) > 0);
272 if (pa_asyncmsgq_get(a, &object, &code, &data, &offset, &chunk, FALSE) < 0)
273 return 0;
275 pa_asyncmsgq_ref(a);
276 ret = pa_asyncmsgq_dispatch(object, code, data, offset, &chunk);
277 pa_asyncmsgq_done(a, ret);
278 pa_asyncmsgq_unref(a);
280 return 1;
283 int pa_asyncmsgq_read_fd(pa_asyncmsgq *a) {
284 pa_assert(PA_REFCNT_VALUE(a) > 0);
286 return pa_asyncq_read_fd(a->asyncq);
289 int pa_asyncmsgq_read_before_poll(pa_asyncmsgq *a) {
290 pa_assert(PA_REFCNT_VALUE(a) > 0);
292 return pa_asyncq_read_before_poll(a->asyncq);
295 void pa_asyncmsgq_read_after_poll(pa_asyncmsgq *a) {
296 pa_assert(PA_REFCNT_VALUE(a) > 0);
298 pa_asyncq_read_after_poll(a->asyncq);
301 int pa_asyncmsgq_write_fd(pa_asyncmsgq *a) {
302 pa_assert(PA_REFCNT_VALUE(a) > 0);
304 return pa_asyncq_write_fd(a->asyncq);
307 void pa_asyncmsgq_write_before_poll(pa_asyncmsgq *a) {
308 pa_assert(PA_REFCNT_VALUE(a) > 0);
310 pa_asyncq_write_before_poll(a->asyncq);
313 void pa_asyncmsgq_write_after_poll(pa_asyncmsgq *a) {
314 pa_assert(PA_REFCNT_VALUE(a) > 0);
316 pa_asyncq_write_after_poll(a->asyncq);
319 int pa_asyncmsgq_dispatch(pa_msgobject *object, int code, void *userdata, int64_t offset, pa_memchunk *memchunk) {
321 if (object)
322 return object->process_msg(object, code, userdata, offset, pa_memchunk_isset(memchunk) ? memchunk : NULL);
324 return 0;
327 void pa_asyncmsgq_flush(pa_asyncmsgq *a, pa_bool_t run) {
328 pa_assert(PA_REFCNT_VALUE(a) > 0);
330 for (;;) {
331 pa_msgobject *object;
332 int code;
333 void *data;
334 int64_t offset;
335 pa_memchunk chunk;
336 int ret;
338 if (pa_asyncmsgq_get(a, &object, &code, &data, &offset, &chunk, FALSE) < 0)
339 return;
341 if (!run) {
342 pa_asyncmsgq_done(a, -1);
343 continue;
346 pa_asyncmsgq_ref(a);
347 ret = pa_asyncmsgq_dispatch(object, code, data, offset, &chunk);
348 pa_asyncmsgq_done(a, ret);
349 pa_asyncmsgq_unref(a);
353 pa_bool_t pa_asyncmsgq_dispatching(pa_asyncmsgq *a) {
354 pa_assert(PA_REFCNT_VALUE(a) > 0);
356 return !!a->current;