default: esd is obsolete, hence don't load it anymore by default
[pulseaudio-mirror.git] / src / pulsecore / pstream.c
blob3e59fc45673b2ec48144d42a3f97e1ed36eed36f
1 /***
2 This file is part of PulseAudio.
4 Copyright 2004-2006 Lennart Poettering
5 Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
7 PulseAudio is free software; you can redistribute it and/or modify
8 it under the terms of the GNU Lesser General Public License as
9 published by the Free Software Foundation; either version 2.1 of the
10 License, or (at your option) any later version.
12 PulseAudio is distributed in the hope that it will be useful, but
13 WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 Lesser General Public License for more details.
17 You should have received a copy of the GNU Lesser General Public
18 License along with PulseAudio; if not, write to the Free Software
19 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
20 USA.
21 ***/
23 #ifdef HAVE_CONFIG_H
24 #include <config.h>
25 #endif
27 #include <stdio.h>
28 #include <stdlib.h>
29 #include <unistd.h>
31 #ifdef HAVE_NETINET_IN_H
32 #include <netinet/in.h>
33 #endif
35 #include <pulse/xmalloc.h>
37 #include <pulsecore/socket.h>
38 #include <pulsecore/queue.h>
39 #include <pulsecore/log.h>
40 #include <pulsecore/core-scache.h>
41 #include <pulsecore/creds.h>
42 #include <pulsecore/refcnt.h>
43 #include <pulsecore/flist.h>
44 #include <pulsecore/macro.h>
46 #include "pstream.h"
48 /* We piggyback information if audio data blocks are stored in SHM on the seek mode */
49 #define PA_FLAG_SHMDATA 0x80000000LU
50 #define PA_FLAG_SHMRELEASE 0x40000000LU
51 #define PA_FLAG_SHMREVOKE 0xC0000000LU
52 #define PA_FLAG_SHMMASK 0xFF000000LU
53 #define PA_FLAG_SEEKMASK 0x000000FFLU
55 /* The sequence descriptor header consists of 5 32bit integers: */
56 enum {
57 PA_PSTREAM_DESCRIPTOR_LENGTH,
58 PA_PSTREAM_DESCRIPTOR_CHANNEL,
59 PA_PSTREAM_DESCRIPTOR_OFFSET_HI,
60 PA_PSTREAM_DESCRIPTOR_OFFSET_LO,
61 PA_PSTREAM_DESCRIPTOR_FLAGS,
62 PA_PSTREAM_DESCRIPTOR_MAX
65 /* If we have an SHM block, this info follows the descriptor */
66 enum {
67 PA_PSTREAM_SHM_BLOCKID,
68 PA_PSTREAM_SHM_SHMID,
69 PA_PSTREAM_SHM_INDEX,
70 PA_PSTREAM_SHM_LENGTH,
71 PA_PSTREAM_SHM_MAX
74 typedef uint32_t pa_pstream_descriptor[PA_PSTREAM_DESCRIPTOR_MAX];
76 #define PA_PSTREAM_DESCRIPTOR_SIZE (PA_PSTREAM_DESCRIPTOR_MAX*sizeof(uint32_t))
77 #define FRAME_SIZE_MAX_ALLOW PA_SCACHE_ENTRY_SIZE_MAX /* allow uploading a single sample in one frame at max */
79 PA_STATIC_FLIST_DECLARE(items, 0, pa_xfree);
81 struct item_info {
82 enum {
83 PA_PSTREAM_ITEM_PACKET,
84 PA_PSTREAM_ITEM_MEMBLOCK,
85 PA_PSTREAM_ITEM_SHMRELEASE,
86 PA_PSTREAM_ITEM_SHMREVOKE
87 } type;
89 /* packet info */
90 pa_packet *packet;
91 #ifdef HAVE_CREDS
92 pa_bool_t with_creds;
93 pa_creds creds;
94 #endif
96 /* memblock info */
97 pa_memchunk chunk;
98 uint32_t channel;
99 int64_t offset;
100 pa_seek_mode_t seek_mode;
102 /* release/revoke info */
103 uint32_t block_id;
106 struct pa_pstream {
107 PA_REFCNT_DECLARE;
109 pa_mainloop_api *mainloop;
110 pa_defer_event *defer_event;
111 pa_iochannel *io;
113 pa_queue *send_queue;
115 pa_bool_t dead;
117 struct {
118 pa_pstream_descriptor descriptor;
119 struct item_info* current;
120 uint32_t shm_info[PA_PSTREAM_SHM_MAX];
121 void *data;
122 size_t index;
123 pa_memchunk memchunk;
124 } write;
126 struct {
127 pa_pstream_descriptor descriptor;
128 pa_memblock *memblock;
129 pa_packet *packet;
130 uint32_t shm_info[PA_PSTREAM_SHM_MAX];
131 void *data;
132 size_t index;
133 } read;
135 pa_bool_t use_shm;
136 pa_memimport *import;
137 pa_memexport *export;
139 pa_pstream_packet_cb_t recieve_packet_callback;
140 void *recieve_packet_callback_userdata;
142 pa_pstream_memblock_cb_t recieve_memblock_callback;
143 void *recieve_memblock_callback_userdata;
145 pa_pstream_notify_cb_t drain_callback;
146 void *drain_callback_userdata;
148 pa_pstream_notify_cb_t die_callback;
149 void *die_callback_userdata;
151 pa_pstream_block_id_cb_t revoke_callback;
152 void *revoke_callback_userdata;
154 pa_pstream_block_id_cb_t release_callback;
155 void *release_callback_userdata;
157 pa_mempool *mempool;
159 #ifdef HAVE_CREDS
160 pa_creds read_creds, write_creds;
161 pa_bool_t read_creds_valid, send_creds_now;
162 #endif
165 static int do_write(pa_pstream *p);
166 static int do_read(pa_pstream *p);
168 static void do_something(pa_pstream *p) {
169 pa_assert(p);
170 pa_assert(PA_REFCNT_VALUE(p) > 0);
172 pa_pstream_ref(p);
174 p->mainloop->defer_enable(p->defer_event, 0);
176 if (!p->dead && pa_iochannel_is_readable(p->io)) {
177 if (do_read(p) < 0)
178 goto fail;
179 } else if (!p->dead && pa_iochannel_is_hungup(p->io))
180 goto fail;
182 if (!p->dead && pa_iochannel_is_writable(p->io)) {
183 if (do_write(p) < 0)
184 goto fail;
187 pa_pstream_unref(p);
188 return;
190 fail:
192 if (p->die_callback)
193 p->die_callback(p, p->die_callback_userdata);
195 pa_pstream_unlink(p);
196 pa_pstream_unref(p);
199 static void io_callback(pa_iochannel*io, void *userdata) {
200 pa_pstream *p = userdata;
202 pa_assert(p);
203 pa_assert(PA_REFCNT_VALUE(p) > 0);
204 pa_assert(p->io == io);
206 do_something(p);
209 static void defer_callback(pa_mainloop_api *m, pa_defer_event *e, void*userdata) {
210 pa_pstream *p = userdata;
212 pa_assert(p);
213 pa_assert(PA_REFCNT_VALUE(p) > 0);
214 pa_assert(p->defer_event == e);
215 pa_assert(p->mainloop == m);
217 do_something(p);
220 static void memimport_release_cb(pa_memimport *i, uint32_t block_id, void *userdata);
222 pa_pstream *pa_pstream_new(pa_mainloop_api *m, pa_iochannel *io, pa_mempool *pool) {
223 pa_pstream *p;
225 pa_assert(m);
226 pa_assert(io);
227 pa_assert(pool);
229 p = pa_xnew(pa_pstream, 1);
230 PA_REFCNT_INIT(p);
231 p->io = io;
232 pa_iochannel_set_callback(io, io_callback, p);
233 p->dead = FALSE;
235 p->mainloop = m;
236 p->defer_event = m->defer_new(m, defer_callback, p);
237 m->defer_enable(p->defer_event, 0);
239 p->send_queue = pa_queue_new();
241 p->write.current = NULL;
242 p->write.index = 0;
243 pa_memchunk_reset(&p->write.memchunk);
244 p->read.memblock = NULL;
245 p->read.packet = NULL;
246 p->read.index = 0;
248 p->recieve_packet_callback = NULL;
249 p->recieve_packet_callback_userdata = NULL;
250 p->recieve_memblock_callback = NULL;
251 p->recieve_memblock_callback_userdata = NULL;
252 p->drain_callback = NULL;
253 p->drain_callback_userdata = NULL;
254 p->die_callback = NULL;
255 p->die_callback_userdata = NULL;
256 p->revoke_callback = NULL;
257 p->revoke_callback_userdata = NULL;
258 p->release_callback = NULL;
259 p->release_callback_userdata = NULL;
261 p->mempool = pool;
263 p->use_shm = FALSE;
264 p->export = NULL;
266 /* We do importing unconditionally */
267 p->import = pa_memimport_new(p->mempool, memimport_release_cb, p);
269 pa_iochannel_socket_set_rcvbuf(io, pa_mempool_block_size_max(p->mempool));
270 pa_iochannel_socket_set_sndbuf(io, pa_mempool_block_size_max(p->mempool));
272 #ifdef HAVE_CREDS
273 p->send_creds_now = FALSE;
274 p->read_creds_valid = FALSE;
275 #endif
276 return p;
279 static void item_free(void *item, void *q) {
280 struct item_info *i = item;
281 pa_assert(i);
283 if (i->type == PA_PSTREAM_ITEM_MEMBLOCK) {
284 pa_assert(i->chunk.memblock);
285 pa_memblock_unref(i->chunk.memblock);
286 } else if (i->type == PA_PSTREAM_ITEM_PACKET) {
287 pa_assert(i->packet);
288 pa_packet_unref(i->packet);
291 if (pa_flist_push(PA_STATIC_FLIST_GET(items), i) < 0)
292 pa_xfree(i);
295 static void pstream_free(pa_pstream *p) {
296 pa_assert(p);
298 pa_pstream_unlink(p);
300 pa_queue_free(p->send_queue, item_free, NULL);
302 if (p->write.current)
303 item_free(p->write.current, NULL);
305 if (p->write.memchunk.memblock)
306 pa_memblock_unref(p->write.memchunk.memblock);
308 if (p->read.memblock)
309 pa_memblock_unref(p->read.memblock);
311 if (p->read.packet)
312 pa_packet_unref(p->read.packet);
314 pa_xfree(p);
317 void pa_pstream_send_packet(pa_pstream*p, pa_packet *packet, const pa_creds *creds) {
318 struct item_info *i;
320 pa_assert(p);
321 pa_assert(PA_REFCNT_VALUE(p) > 0);
322 pa_assert(packet);
324 if (p->dead)
325 return;
327 if (!(i = pa_flist_pop(PA_STATIC_FLIST_GET(items))))
328 i = pa_xnew(struct item_info, 1);
330 i->type = PA_PSTREAM_ITEM_PACKET;
331 i->packet = pa_packet_ref(packet);
333 #ifdef HAVE_CREDS
334 if ((i->with_creds = !!creds))
335 i->creds = *creds;
336 #endif
338 pa_queue_push(p->send_queue, i);
340 p->mainloop->defer_enable(p->defer_event, 1);
343 void pa_pstream_send_memblock(pa_pstream*p, uint32_t channel, int64_t offset, pa_seek_mode_t seek_mode, const pa_memchunk *chunk) {
344 size_t length, idx;
345 size_t bsm;
347 pa_assert(p);
348 pa_assert(PA_REFCNT_VALUE(p) > 0);
349 pa_assert(channel != (uint32_t) -1);
350 pa_assert(chunk);
352 if (p->dead)
353 return;
355 idx = 0;
356 length = chunk->length;
358 bsm = pa_mempool_block_size_max(p->mempool);
360 while (length > 0) {
361 struct item_info *i;
362 size_t n;
364 if (!(i = pa_flist_pop(PA_STATIC_FLIST_GET(items))))
365 i = pa_xnew(struct item_info, 1);
366 i->type = PA_PSTREAM_ITEM_MEMBLOCK;
368 n = PA_MIN(length, bsm);
369 i->chunk.index = chunk->index + idx;
370 i->chunk.length = n;
371 i->chunk.memblock = pa_memblock_ref(chunk->memblock);
373 i->channel = channel;
374 i->offset = offset;
375 i->seek_mode = seek_mode;
376 #ifdef HAVE_CREDS
377 i->with_creds = FALSE;
378 #endif
380 pa_queue_push(p->send_queue, i);
382 idx += n;
383 length -= n;
386 p->mainloop->defer_enable(p->defer_event, 1);
389 void pa_pstream_send_release(pa_pstream *p, uint32_t block_id) {
390 struct item_info *item;
391 pa_assert(p);
392 pa_assert(PA_REFCNT_VALUE(p) > 0);
394 if (p->dead)
395 return;
397 /* pa_log("Releasing block %u", block_id); */
399 if (!(item = pa_flist_pop(PA_STATIC_FLIST_GET(items))))
400 item = pa_xnew(struct item_info, 1);
401 item->type = PA_PSTREAM_ITEM_SHMRELEASE;
402 item->block_id = block_id;
403 #ifdef HAVE_CREDS
404 item->with_creds = FALSE;
405 #endif
407 pa_queue_push(p->send_queue, item);
408 p->mainloop->defer_enable(p->defer_event, 1);
411 /* might be called from thread context */
412 static void memimport_release_cb(pa_memimport *i, uint32_t block_id, void *userdata) {
413 pa_pstream *p = userdata;
415 pa_assert(p);
416 pa_assert(PA_REFCNT_VALUE(p) > 0);
418 if (p->dead)
419 return;
421 if (p->release_callback)
422 p->release_callback(p, block_id, p->release_callback_userdata);
423 else
424 pa_pstream_send_release(p, block_id);
427 void pa_pstream_send_revoke(pa_pstream *p, uint32_t block_id) {
428 struct item_info *item;
429 pa_assert(p);
430 pa_assert(PA_REFCNT_VALUE(p) > 0);
432 if (p->dead)
433 return;
434 /* pa_log("Revoking block %u", block_id); */
436 if (!(item = pa_flist_pop(PA_STATIC_FLIST_GET(items))))
437 item = pa_xnew(struct item_info, 1);
438 item->type = PA_PSTREAM_ITEM_SHMREVOKE;
439 item->block_id = block_id;
440 #ifdef HAVE_CREDS
441 item->with_creds = FALSE;
442 #endif
444 pa_queue_push(p->send_queue, item);
445 p->mainloop->defer_enable(p->defer_event, 1);
448 /* might be called from thread context */
449 static void memexport_revoke_cb(pa_memexport *e, uint32_t block_id, void *userdata) {
450 pa_pstream *p = userdata;
452 pa_assert(p);
453 pa_assert(PA_REFCNT_VALUE(p) > 0);
455 if (p->revoke_callback)
456 p->revoke_callback(p, block_id, p->revoke_callback_userdata);
457 else
458 pa_pstream_send_revoke(p, block_id);
461 static void prepare_next_write_item(pa_pstream *p) {
462 pa_assert(p);
463 pa_assert(PA_REFCNT_VALUE(p) > 0);
465 p->write.current = pa_queue_pop(p->send_queue);
467 if (!p->write.current)
468 return;
470 p->write.index = 0;
471 p->write.data = NULL;
472 pa_memchunk_reset(&p->write.memchunk);
474 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = 0;
475 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL] = htonl((uint32_t) -1);
476 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = 0;
477 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = 0;
478 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = 0;
480 if (p->write.current->type == PA_PSTREAM_ITEM_PACKET) {
482 pa_assert(p->write.current->packet);
483 p->write.data = p->write.current->packet->data;
484 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl((uint32_t) p->write.current->packet->length);
486 } else if (p->write.current->type == PA_PSTREAM_ITEM_SHMRELEASE) {
488 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = htonl(PA_FLAG_SHMRELEASE);
489 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = htonl(p->write.current->block_id);
491 } else if (p->write.current->type == PA_PSTREAM_ITEM_SHMREVOKE) {
493 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = htonl(PA_FLAG_SHMREVOKE);
494 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = htonl(p->write.current->block_id);
496 } else {
497 uint32_t flags;
498 pa_bool_t send_payload = TRUE;
500 pa_assert(p->write.current->type == PA_PSTREAM_ITEM_MEMBLOCK);
501 pa_assert(p->write.current->chunk.memblock);
503 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL] = htonl(p->write.current->channel);
504 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] = htonl((uint32_t) (((uint64_t) p->write.current->offset) >> 32));
505 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = htonl((uint32_t) ((uint64_t) p->write.current->offset));
507 flags = (uint32_t) (p->write.current->seek_mode & PA_FLAG_SEEKMASK);
509 if (p->use_shm) {
510 uint32_t block_id, shm_id;
511 size_t offset, length;
513 pa_assert(p->export);
515 if (pa_memexport_put(p->export,
516 p->write.current->chunk.memblock,
517 &block_id,
518 &shm_id,
519 &offset,
520 &length) >= 0) {
522 flags |= PA_FLAG_SHMDATA;
523 send_payload = FALSE;
525 p->write.shm_info[PA_PSTREAM_SHM_BLOCKID] = htonl(block_id);
526 p->write.shm_info[PA_PSTREAM_SHM_SHMID] = htonl(shm_id);
527 p->write.shm_info[PA_PSTREAM_SHM_INDEX] = htonl((uint32_t) (offset + p->write.current->chunk.index));
528 p->write.shm_info[PA_PSTREAM_SHM_LENGTH] = htonl((uint32_t) p->write.current->chunk.length);
530 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl(sizeof(p->write.shm_info));
531 p->write.data = p->write.shm_info;
533 /* else */
534 /* pa_log_warn("Failed to export memory block."); */
537 if (send_payload) {
538 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl((uint32_t) p->write.current->chunk.length);
539 p->write.memchunk = p->write.current->chunk;
540 pa_memblock_ref(p->write.memchunk.memblock);
541 p->write.data = NULL;
544 p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = htonl(flags);
547 #ifdef HAVE_CREDS
548 if ((p->send_creds_now = p->write.current->with_creds))
549 p->write_creds = p->write.current->creds;
550 #endif
553 static int do_write(pa_pstream *p) {
554 void *d;
555 size_t l;
556 ssize_t r;
557 pa_memblock *release_memblock = NULL;
559 pa_assert(p);
560 pa_assert(PA_REFCNT_VALUE(p) > 0);
562 if (!p->write.current)
563 prepare_next_write_item(p);
565 if (!p->write.current)
566 return 0;
568 if (p->write.index < PA_PSTREAM_DESCRIPTOR_SIZE) {
569 d = (uint8_t*) p->write.descriptor + p->write.index;
570 l = PA_PSTREAM_DESCRIPTOR_SIZE - p->write.index;
571 } else {
572 pa_assert(p->write.data || p->write.memchunk.memblock);
574 if (p->write.data)
575 d = p->write.data;
576 else {
577 d = (uint8_t*) pa_memblock_acquire(p->write.memchunk.memblock) + p->write.memchunk.index;
578 release_memblock = p->write.memchunk.memblock;
581 d = (uint8_t*) d + p->write.index - PA_PSTREAM_DESCRIPTOR_SIZE;
582 l = ntohl(p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) - (p->write.index - PA_PSTREAM_DESCRIPTOR_SIZE);
585 pa_assert(l > 0);
587 #ifdef HAVE_CREDS
588 if (p->send_creds_now) {
590 if ((r = pa_iochannel_write_with_creds(p->io, d, l, &p->write_creds)) < 0)
591 goto fail;
593 p->send_creds_now = FALSE;
594 } else
595 #endif
597 if ((r = pa_iochannel_write(p->io, d, l)) < 0)
598 goto fail;
600 if (release_memblock)
601 pa_memblock_release(release_memblock);
603 p->write.index += (size_t) r;
605 if (p->write.index >= PA_PSTREAM_DESCRIPTOR_SIZE + ntohl(p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH])) {
606 pa_assert(p->write.current);
607 item_free(p->write.current, NULL);
608 p->write.current = NULL;
610 if (p->write.memchunk.memblock)
611 pa_memblock_unref(p->write.memchunk.memblock);
613 pa_memchunk_reset(&p->write.memchunk);
615 if (p->drain_callback && !pa_pstream_is_pending(p))
616 p->drain_callback(p, p->drain_callback_userdata);
619 return 0;
621 fail:
623 if (release_memblock)
624 pa_memblock_release(release_memblock);
626 return -1;
629 static int do_read(pa_pstream *p) {
630 void *d;
631 size_t l;
632 ssize_t r;
633 pa_memblock *release_memblock = NULL;
634 pa_assert(p);
635 pa_assert(PA_REFCNT_VALUE(p) > 0);
637 if (p->read.index < PA_PSTREAM_DESCRIPTOR_SIZE) {
638 d = (uint8_t*) p->read.descriptor + p->read.index;
639 l = PA_PSTREAM_DESCRIPTOR_SIZE - p->read.index;
640 } else {
641 pa_assert(p->read.data || p->read.memblock);
643 if (p->read.data)
644 d = p->read.data;
645 else {
646 d = pa_memblock_acquire(p->read.memblock);
647 release_memblock = p->read.memblock;
650 d = (uint8_t*) d + p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE;
651 l = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) - (p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE);
654 #ifdef HAVE_CREDS
656 pa_bool_t b = 0;
658 if ((r = pa_iochannel_read_with_creds(p->io, d, l, &p->read_creds, &b)) <= 0)
659 goto fail;
661 p->read_creds_valid = p->read_creds_valid || b;
663 #else
664 if ((r = pa_iochannel_read(p->io, d, l)) <= 0)
665 goto fail;
666 #endif
668 if (release_memblock)
669 pa_memblock_release(release_memblock);
671 p->read.index += (size_t) r;
673 if (p->read.index == PA_PSTREAM_DESCRIPTOR_SIZE) {
674 uint32_t flags, length, channel;
675 /* Reading of frame descriptor complete */
677 flags = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]);
679 if (!p->use_shm && (flags & PA_FLAG_SHMMASK) != 0) {
680 pa_log_warn("Received SHM frame on a socket where SHM is disabled.");
681 return -1;
684 if (flags == PA_FLAG_SHMRELEASE) {
686 /* This is a SHM memblock release frame with no payload */
688 /* pa_log("Got release frame for %u", ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); */
690 pa_assert(p->export);
691 pa_memexport_process_release(p->export, ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI]));
693 goto frame_done;
695 } else if (flags == PA_FLAG_SHMREVOKE) {
697 /* This is a SHM memblock revoke frame with no payload */
699 /* pa_log("Got revoke frame for %u", ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); */
701 pa_assert(p->import);
702 pa_memimport_process_revoke(p->import, ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI]));
704 goto frame_done;
707 length = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]);
709 if (length > FRAME_SIZE_MAX_ALLOW || length <= 0) {
710 pa_log_warn("Received invalid frame size: %lu", (unsigned long) length);
711 return -1;
714 pa_assert(!p->read.packet && !p->read.memblock);
716 channel = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]);
718 if (channel == (uint32_t) -1) {
720 if (flags != 0) {
721 pa_log_warn("Received packet frame with invalid flags value.");
722 return -1;
725 /* Frame is a packet frame */
726 p->read.packet = pa_packet_new(length);
727 p->read.data = p->read.packet->data;
729 } else {
731 if ((flags & PA_FLAG_SEEKMASK) > PA_SEEK_RELATIVE_END) {
732 pa_log_warn("Received memblock frame with invalid seek mode.");
733 return -1;
736 if ((flags & PA_FLAG_SHMMASK) == PA_FLAG_SHMDATA) {
738 if (length != sizeof(p->read.shm_info)) {
739 pa_log_warn("Received SHM memblock frame with Invalid frame length.");
740 return -1;
743 /* Frame is a memblock frame referencing an SHM memblock */
744 p->read.data = p->read.shm_info;
746 } else if ((flags & PA_FLAG_SHMMASK) == 0) {
748 /* Frame is a memblock frame */
750 p->read.memblock = pa_memblock_new(p->mempool, length);
751 p->read.data = NULL;
752 } else {
754 pa_log_warn("Received memblock frame with invalid flags value.");
755 return -1;
759 } else if (p->read.index > PA_PSTREAM_DESCRIPTOR_SIZE) {
760 /* Frame payload available */
762 if (p->read.memblock && p->recieve_memblock_callback) {
764 /* Is this memblock data? Than pass it to the user */
765 l = (p->read.index - (size_t) r) < PA_PSTREAM_DESCRIPTOR_SIZE ? (size_t) (p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE) : (size_t) r;
767 if (l > 0) {
768 pa_memchunk chunk;
770 chunk.memblock = p->read.memblock;
771 chunk.index = p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE - l;
772 chunk.length = l;
774 if (p->recieve_memblock_callback) {
775 int64_t offset;
777 offset = (int64_t) (
778 (((uint64_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])) << 32) |
779 (((uint64_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO]))));
781 p->recieve_memblock_callback(
783 ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]),
784 offset,
785 ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SEEKMASK,
786 &chunk,
787 p->recieve_memblock_callback_userdata);
790 /* Drop seek info for following callbacks */
791 p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] =
792 p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] =
793 p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = 0;
797 /* Frame complete */
798 if (p->read.index >= ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) + PA_PSTREAM_DESCRIPTOR_SIZE) {
800 if (p->read.memblock) {
802 /* This was a memblock frame. We can unref the memblock now */
803 pa_memblock_unref(p->read.memblock);
805 } else if (p->read.packet) {
807 if (p->recieve_packet_callback)
808 #ifdef HAVE_CREDS
809 p->recieve_packet_callback(p, p->read.packet, p->read_creds_valid ? &p->read_creds : NULL, p->recieve_packet_callback_userdata);
810 #else
811 p->recieve_packet_callback(p, p->read.packet, NULL, p->recieve_packet_callback_userdata);
812 #endif
814 pa_packet_unref(p->read.packet);
815 } else {
816 pa_memblock *b;
818 pa_assert((ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SHMMASK) == PA_FLAG_SHMDATA);
820 pa_assert(p->import);
822 if (!(b = pa_memimport_get(p->import,
823 ntohl(p->read.shm_info[PA_PSTREAM_SHM_BLOCKID]),
824 ntohl(p->read.shm_info[PA_PSTREAM_SHM_SHMID]),
825 ntohl(p->read.shm_info[PA_PSTREAM_SHM_INDEX]),
826 ntohl(p->read.shm_info[PA_PSTREAM_SHM_LENGTH])))) {
828 if (pa_log_ratelimit(PA_LOG_DEBUG))
829 pa_log_debug("Failed to import memory block.");
832 if (p->recieve_memblock_callback) {
833 int64_t offset;
834 pa_memchunk chunk;
836 chunk.memblock = b;
837 chunk.index = 0;
838 chunk.length = b ? pa_memblock_get_length(b) : ntohl(p->read.shm_info[PA_PSTREAM_SHM_LENGTH]);
840 offset = (int64_t) (
841 (((uint64_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])) << 32) |
842 (((uint64_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO]))));
844 p->recieve_memblock_callback(
846 ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]),
847 offset,
848 ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SEEKMASK,
849 &chunk,
850 p->recieve_memblock_callback_userdata);
853 if (b)
854 pa_memblock_unref(b);
857 goto frame_done;
861 return 0;
863 frame_done:
864 p->read.memblock = NULL;
865 p->read.packet = NULL;
866 p->read.index = 0;
867 p->read.data = NULL;
869 #ifdef HAVE_CREDS
870 p->read_creds_valid = FALSE;
871 #endif
873 return 0;
875 fail:
876 if (release_memblock)
877 pa_memblock_release(release_memblock);
879 return -1;
882 void pa_pstream_set_die_callback(pa_pstream *p, pa_pstream_notify_cb_t cb, void *userdata) {
883 pa_assert(p);
884 pa_assert(PA_REFCNT_VALUE(p) > 0);
886 p->die_callback = cb;
887 p->die_callback_userdata = userdata;
890 void pa_pstream_set_drain_callback(pa_pstream *p, pa_pstream_notify_cb_t cb, void *userdata) {
891 pa_assert(p);
892 pa_assert(PA_REFCNT_VALUE(p) > 0);
894 p->drain_callback = cb;
895 p->drain_callback_userdata = userdata;
898 void pa_pstream_set_recieve_packet_callback(pa_pstream *p, pa_pstream_packet_cb_t cb, void *userdata) {
899 pa_assert(p);
900 pa_assert(PA_REFCNT_VALUE(p) > 0);
902 p->recieve_packet_callback = cb;
903 p->recieve_packet_callback_userdata = userdata;
906 void pa_pstream_set_recieve_memblock_callback(pa_pstream *p, pa_pstream_memblock_cb_t cb, void *userdata) {
907 pa_assert(p);
908 pa_assert(PA_REFCNT_VALUE(p) > 0);
910 p->recieve_memblock_callback = cb;
911 p->recieve_memblock_callback_userdata = userdata;
914 void pa_pstream_set_release_callback(pa_pstream *p, pa_pstream_block_id_cb_t cb, void *userdata) {
915 pa_assert(p);
916 pa_assert(PA_REFCNT_VALUE(p) > 0);
918 p->release_callback = cb;
919 p->release_callback_userdata = userdata;
922 void pa_pstream_set_revoke_callback(pa_pstream *p, pa_pstream_block_id_cb_t cb, void *userdata) {
923 pa_assert(p);
924 pa_assert(PA_REFCNT_VALUE(p) > 0);
926 p->release_callback = cb;
927 p->release_callback_userdata = userdata;
930 pa_bool_t pa_pstream_is_pending(pa_pstream *p) {
931 pa_bool_t b;
933 pa_assert(p);
934 pa_assert(PA_REFCNT_VALUE(p) > 0);
936 if (p->dead)
937 b = FALSE;
938 else
939 b = p->write.current || !pa_queue_isempty(p->send_queue);
941 return b;
944 void pa_pstream_unref(pa_pstream*p) {
945 pa_assert(p);
946 pa_assert(PA_REFCNT_VALUE(p) > 0);
948 if (PA_REFCNT_DEC(p) <= 0)
949 pstream_free(p);
952 pa_pstream* pa_pstream_ref(pa_pstream*p) {
953 pa_assert(p);
954 pa_assert(PA_REFCNT_VALUE(p) > 0);
956 PA_REFCNT_INC(p);
957 return p;
960 void pa_pstream_unlink(pa_pstream *p) {
961 pa_assert(p);
963 if (p->dead)
964 return;
966 p->dead = TRUE;
968 if (p->import) {
969 pa_memimport_free(p->import);
970 p->import = NULL;
973 if (p->export) {
974 pa_memexport_free(p->export);
975 p->export = NULL;
978 if (p->io) {
979 pa_iochannel_free(p->io);
980 p->io = NULL;
983 if (p->defer_event) {
984 p->mainloop->defer_free(p->defer_event);
985 p->defer_event = NULL;
988 p->die_callback = NULL;
989 p->drain_callback = NULL;
990 p->recieve_packet_callback = NULL;
991 p->recieve_memblock_callback = NULL;
994 void pa_pstream_enable_shm(pa_pstream *p, pa_bool_t enable) {
995 pa_assert(p);
996 pa_assert(PA_REFCNT_VALUE(p) > 0);
998 p->use_shm = enable;
1000 if (enable) {
1002 if (!p->export)
1003 p->export = pa_memexport_new(p->mempool, memexport_revoke_cb, p);
1005 } else {
1007 if (p->export) {
1008 pa_memexport_free(p->export);
1009 p->export = NULL;
1014 pa_bool_t pa_pstream_get_shm(pa_pstream *p) {
1015 pa_assert(p);
1016 pa_assert(PA_REFCNT_VALUE(p) > 0);
1018 return p->use_shm;