2 This file is part of PulseAudio.
4 Copyright 2004-2006 Lennart Poettering
5 Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
7 PulseAudio is free software; you can redistribute it and/or modify
8 it under the terms of the GNU Lesser General Public License as
9 published by the Free Software Foundation; either version 2.1 of the
10 License, or (at your option) any later version.
12 PulseAudio is distributed in the hope that it will be useful, but
13 WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 Lesser General Public License for more details.
17 You should have received a copy of the GNU Lesser General Public
18 License along with PulseAudio; if not, write to the Free Software
19 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
31 #ifdef HAVE_NETINET_IN_H
32 #include <netinet/in.h>
35 #include <pulse/xmalloc.h>
37 #include <pulsecore/socket.h>
38 #include <pulsecore/queue.h>
39 #include <pulsecore/log.h>
40 #include <pulsecore/core-scache.h>
41 #include <pulsecore/creds.h>
42 #include <pulsecore/refcnt.h>
43 #include <pulsecore/flist.h>
44 #include <pulsecore/macro.h>
48 /* We piggyback information if audio data blocks are stored in SHM on the seek mode */
49 #define PA_FLAG_SHMDATA 0x80000000LU
50 #define PA_FLAG_SHMRELEASE 0x40000000LU
51 #define PA_FLAG_SHMREVOKE 0xC0000000LU
52 #define PA_FLAG_SHMMASK 0xFF000000LU
53 #define PA_FLAG_SEEKMASK 0x000000FFLU
55 /* The sequence descriptor header consists of 5 32bit integers: */
57 PA_PSTREAM_DESCRIPTOR_LENGTH
,
58 PA_PSTREAM_DESCRIPTOR_CHANNEL
,
59 PA_PSTREAM_DESCRIPTOR_OFFSET_HI
,
60 PA_PSTREAM_DESCRIPTOR_OFFSET_LO
,
61 PA_PSTREAM_DESCRIPTOR_FLAGS
,
62 PA_PSTREAM_DESCRIPTOR_MAX
65 /* If we have an SHM block, this info follows the descriptor */
67 PA_PSTREAM_SHM_BLOCKID
,
70 PA_PSTREAM_SHM_LENGTH
,
74 typedef uint32_t pa_pstream_descriptor
[PA_PSTREAM_DESCRIPTOR_MAX
];
76 #define PA_PSTREAM_DESCRIPTOR_SIZE (PA_PSTREAM_DESCRIPTOR_MAX*sizeof(uint32_t))
77 #define FRAME_SIZE_MAX_ALLOW PA_SCACHE_ENTRY_SIZE_MAX /* allow uploading a single sample in one frame at max */
79 PA_STATIC_FLIST_DECLARE(items
, 0, pa_xfree
);
83 PA_PSTREAM_ITEM_PACKET
,
84 PA_PSTREAM_ITEM_MEMBLOCK
,
85 PA_PSTREAM_ITEM_SHMRELEASE
,
86 PA_PSTREAM_ITEM_SHMREVOKE
100 pa_seek_mode_t seek_mode
;
102 /* release/revoke info */
109 pa_mainloop_api
*mainloop
;
110 pa_defer_event
*defer_event
;
113 pa_queue
*send_queue
;
118 pa_pstream_descriptor descriptor
;
119 struct item_info
* current
;
120 uint32_t shm_info
[PA_PSTREAM_SHM_MAX
];
123 pa_memchunk memchunk
;
127 pa_pstream_descriptor descriptor
;
128 pa_memblock
*memblock
;
130 uint32_t shm_info
[PA_PSTREAM_SHM_MAX
];
136 pa_memimport
*import
;
137 pa_memexport
*export
;
139 pa_pstream_packet_cb_t recieve_packet_callback
;
140 void *recieve_packet_callback_userdata
;
142 pa_pstream_memblock_cb_t recieve_memblock_callback
;
143 void *recieve_memblock_callback_userdata
;
145 pa_pstream_notify_cb_t drain_callback
;
146 void *drain_callback_userdata
;
148 pa_pstream_notify_cb_t die_callback
;
149 void *die_callback_userdata
;
151 pa_pstream_block_id_cb_t revoke_callback
;
152 void *revoke_callback_userdata
;
154 pa_pstream_block_id_cb_t release_callback
;
155 void *release_callback_userdata
;
160 pa_creds read_creds
, write_creds
;
161 pa_bool_t read_creds_valid
, send_creds_now
;
165 static int do_write(pa_pstream
*p
);
166 static int do_read(pa_pstream
*p
);
168 static void do_something(pa_pstream
*p
) {
170 pa_assert(PA_REFCNT_VALUE(p
) > 0);
174 p
->mainloop
->defer_enable(p
->defer_event
, 0);
176 if (!p
->dead
&& pa_iochannel_is_readable(p
->io
)) {
179 } else if (!p
->dead
&& pa_iochannel_is_hungup(p
->io
))
182 if (!p
->dead
&& pa_iochannel_is_writable(p
->io
)) {
193 p
->die_callback(p
, p
->die_callback_userdata
);
195 pa_pstream_unlink(p
);
199 static void io_callback(pa_iochannel
*io
, void *userdata
) {
200 pa_pstream
*p
= userdata
;
203 pa_assert(PA_REFCNT_VALUE(p
) > 0);
204 pa_assert(p
->io
== io
);
209 static void defer_callback(pa_mainloop_api
*m
, pa_defer_event
*e
, void*userdata
) {
210 pa_pstream
*p
= userdata
;
213 pa_assert(PA_REFCNT_VALUE(p
) > 0);
214 pa_assert(p
->defer_event
== e
);
215 pa_assert(p
->mainloop
== m
);
220 static void memimport_release_cb(pa_memimport
*i
, uint32_t block_id
, void *userdata
);
222 pa_pstream
*pa_pstream_new(pa_mainloop_api
*m
, pa_iochannel
*io
, pa_mempool
*pool
) {
229 p
= pa_xnew(pa_pstream
, 1);
232 pa_iochannel_set_callback(io
, io_callback
, p
);
236 p
->defer_event
= m
->defer_new(m
, defer_callback
, p
);
237 m
->defer_enable(p
->defer_event
, 0);
239 p
->send_queue
= pa_queue_new();
241 p
->write
.current
= NULL
;
243 pa_memchunk_reset(&p
->write
.memchunk
);
244 p
->read
.memblock
= NULL
;
245 p
->read
.packet
= NULL
;
248 p
->recieve_packet_callback
= NULL
;
249 p
->recieve_packet_callback_userdata
= NULL
;
250 p
->recieve_memblock_callback
= NULL
;
251 p
->recieve_memblock_callback_userdata
= NULL
;
252 p
->drain_callback
= NULL
;
253 p
->drain_callback_userdata
= NULL
;
254 p
->die_callback
= NULL
;
255 p
->die_callback_userdata
= NULL
;
256 p
->revoke_callback
= NULL
;
257 p
->revoke_callback_userdata
= NULL
;
258 p
->release_callback
= NULL
;
259 p
->release_callback_userdata
= NULL
;
266 /* We do importing unconditionally */
267 p
->import
= pa_memimport_new(p
->mempool
, memimport_release_cb
, p
);
269 pa_iochannel_socket_set_rcvbuf(io
, pa_mempool_block_size_max(p
->mempool
));
270 pa_iochannel_socket_set_sndbuf(io
, pa_mempool_block_size_max(p
->mempool
));
273 p
->send_creds_now
= FALSE
;
274 p
->read_creds_valid
= FALSE
;
279 static void item_free(void *item
, void *q
) {
280 struct item_info
*i
= item
;
283 if (i
->type
== PA_PSTREAM_ITEM_MEMBLOCK
) {
284 pa_assert(i
->chunk
.memblock
);
285 pa_memblock_unref(i
->chunk
.memblock
);
286 } else if (i
->type
== PA_PSTREAM_ITEM_PACKET
) {
287 pa_assert(i
->packet
);
288 pa_packet_unref(i
->packet
);
291 if (pa_flist_push(PA_STATIC_FLIST_GET(items
), i
) < 0)
295 static void pstream_free(pa_pstream
*p
) {
298 pa_pstream_unlink(p
);
300 pa_queue_free(p
->send_queue
, item_free
, NULL
);
302 if (p
->write
.current
)
303 item_free(p
->write
.current
, NULL
);
305 if (p
->write
.memchunk
.memblock
)
306 pa_memblock_unref(p
->write
.memchunk
.memblock
);
308 if (p
->read
.memblock
)
309 pa_memblock_unref(p
->read
.memblock
);
312 pa_packet_unref(p
->read
.packet
);
317 void pa_pstream_send_packet(pa_pstream
*p
, pa_packet
*packet
, const pa_creds
*creds
) {
321 pa_assert(PA_REFCNT_VALUE(p
) > 0);
327 if (!(i
= pa_flist_pop(PA_STATIC_FLIST_GET(items
))))
328 i
= pa_xnew(struct item_info
, 1);
330 i
->type
= PA_PSTREAM_ITEM_PACKET
;
331 i
->packet
= pa_packet_ref(packet
);
334 if ((i
->with_creds
= !!creds
))
338 pa_queue_push(p
->send_queue
, i
);
340 p
->mainloop
->defer_enable(p
->defer_event
, 1);
343 void pa_pstream_send_memblock(pa_pstream
*p
, uint32_t channel
, int64_t offset
, pa_seek_mode_t seek_mode
, const pa_memchunk
*chunk
) {
348 pa_assert(PA_REFCNT_VALUE(p
) > 0);
349 pa_assert(channel
!= (uint32_t) -1);
356 length
= chunk
->length
;
358 bsm
= pa_mempool_block_size_max(p
->mempool
);
364 if (!(i
= pa_flist_pop(PA_STATIC_FLIST_GET(items
))))
365 i
= pa_xnew(struct item_info
, 1);
366 i
->type
= PA_PSTREAM_ITEM_MEMBLOCK
;
368 n
= PA_MIN(length
, bsm
);
369 i
->chunk
.index
= chunk
->index
+ idx
;
371 i
->chunk
.memblock
= pa_memblock_ref(chunk
->memblock
);
373 i
->channel
= channel
;
375 i
->seek_mode
= seek_mode
;
377 i
->with_creds
= FALSE
;
380 pa_queue_push(p
->send_queue
, i
);
386 p
->mainloop
->defer_enable(p
->defer_event
, 1);
389 void pa_pstream_send_release(pa_pstream
*p
, uint32_t block_id
) {
390 struct item_info
*item
;
392 pa_assert(PA_REFCNT_VALUE(p
) > 0);
397 /* pa_log("Releasing block %u", block_id); */
399 if (!(item
= pa_flist_pop(PA_STATIC_FLIST_GET(items
))))
400 item
= pa_xnew(struct item_info
, 1);
401 item
->type
= PA_PSTREAM_ITEM_SHMRELEASE
;
402 item
->block_id
= block_id
;
404 item
->with_creds
= FALSE
;
407 pa_queue_push(p
->send_queue
, item
);
408 p
->mainloop
->defer_enable(p
->defer_event
, 1);
411 /* might be called from thread context */
412 static void memimport_release_cb(pa_memimport
*i
, uint32_t block_id
, void *userdata
) {
413 pa_pstream
*p
= userdata
;
416 pa_assert(PA_REFCNT_VALUE(p
) > 0);
421 if (p
->release_callback
)
422 p
->release_callback(p
, block_id
, p
->release_callback_userdata
);
424 pa_pstream_send_release(p
, block_id
);
427 void pa_pstream_send_revoke(pa_pstream
*p
, uint32_t block_id
) {
428 struct item_info
*item
;
430 pa_assert(PA_REFCNT_VALUE(p
) > 0);
434 /* pa_log("Revoking block %u", block_id); */
436 if (!(item
= pa_flist_pop(PA_STATIC_FLIST_GET(items
))))
437 item
= pa_xnew(struct item_info
, 1);
438 item
->type
= PA_PSTREAM_ITEM_SHMREVOKE
;
439 item
->block_id
= block_id
;
441 item
->with_creds
= FALSE
;
444 pa_queue_push(p
->send_queue
, item
);
445 p
->mainloop
->defer_enable(p
->defer_event
, 1);
448 /* might be called from thread context */
449 static void memexport_revoke_cb(pa_memexport
*e
, uint32_t block_id
, void *userdata
) {
450 pa_pstream
*p
= userdata
;
453 pa_assert(PA_REFCNT_VALUE(p
) > 0);
455 if (p
->revoke_callback
)
456 p
->revoke_callback(p
, block_id
, p
->revoke_callback_userdata
);
458 pa_pstream_send_revoke(p
, block_id
);
461 static void prepare_next_write_item(pa_pstream
*p
) {
463 pa_assert(PA_REFCNT_VALUE(p
) > 0);
465 p
->write
.current
= pa_queue_pop(p
->send_queue
);
467 if (!p
->write
.current
)
471 p
->write
.data
= NULL
;
472 pa_memchunk_reset(&p
->write
.memchunk
);
474 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
] = 0;
475 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_CHANNEL
] = htonl((uint32_t) -1);
476 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_HI
] = 0;
477 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_LO
] = 0;
478 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_FLAGS
] = 0;
480 if (p
->write
.current
->type
== PA_PSTREAM_ITEM_PACKET
) {
482 pa_assert(p
->write
.current
->packet
);
483 p
->write
.data
= p
->write
.current
->packet
->data
;
484 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
] = htonl((uint32_t) p
->write
.current
->packet
->length
);
486 } else if (p
->write
.current
->type
== PA_PSTREAM_ITEM_SHMRELEASE
) {
488 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_FLAGS
] = htonl(PA_FLAG_SHMRELEASE
);
489 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_HI
] = htonl(p
->write
.current
->block_id
);
491 } else if (p
->write
.current
->type
== PA_PSTREAM_ITEM_SHMREVOKE
) {
493 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_FLAGS
] = htonl(PA_FLAG_SHMREVOKE
);
494 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_HI
] = htonl(p
->write
.current
->block_id
);
498 pa_bool_t send_payload
= TRUE
;
500 pa_assert(p
->write
.current
->type
== PA_PSTREAM_ITEM_MEMBLOCK
);
501 pa_assert(p
->write
.current
->chunk
.memblock
);
503 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_CHANNEL
] = htonl(p
->write
.current
->channel
);
504 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_HI
] = htonl((uint32_t) (((uint64_t) p
->write
.current
->offset
) >> 32));
505 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_LO
] = htonl((uint32_t) ((uint64_t) p
->write
.current
->offset
));
507 flags
= (uint32_t) (p
->write
.current
->seek_mode
& PA_FLAG_SEEKMASK
);
510 uint32_t block_id
, shm_id
;
511 size_t offset
, length
;
513 pa_assert(p
->export
);
515 if (pa_memexport_put(p
->export
,
516 p
->write
.current
->chunk
.memblock
,
522 flags
|= PA_FLAG_SHMDATA
;
523 send_payload
= FALSE
;
525 p
->write
.shm_info
[PA_PSTREAM_SHM_BLOCKID
] = htonl(block_id
);
526 p
->write
.shm_info
[PA_PSTREAM_SHM_SHMID
] = htonl(shm_id
);
527 p
->write
.shm_info
[PA_PSTREAM_SHM_INDEX
] = htonl((uint32_t) (offset
+ p
->write
.current
->chunk
.index
));
528 p
->write
.shm_info
[PA_PSTREAM_SHM_LENGTH
] = htonl((uint32_t) p
->write
.current
->chunk
.length
);
530 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
] = htonl(sizeof(p
->write
.shm_info
));
531 p
->write
.data
= p
->write
.shm_info
;
534 /* pa_log_warn("Failed to export memory block."); */
538 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
] = htonl((uint32_t) p
->write
.current
->chunk
.length
);
539 p
->write
.memchunk
= p
->write
.current
->chunk
;
540 pa_memblock_ref(p
->write
.memchunk
.memblock
);
541 p
->write
.data
= NULL
;
544 p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_FLAGS
] = htonl(flags
);
548 if ((p
->send_creds_now
= p
->write
.current
->with_creds
))
549 p
->write_creds
= p
->write
.current
->creds
;
553 static int do_write(pa_pstream
*p
) {
557 pa_memblock
*release_memblock
= NULL
;
560 pa_assert(PA_REFCNT_VALUE(p
) > 0);
562 if (!p
->write
.current
)
563 prepare_next_write_item(p
);
565 if (!p
->write
.current
)
568 if (p
->write
.index
< PA_PSTREAM_DESCRIPTOR_SIZE
) {
569 d
= (uint8_t*) p
->write
.descriptor
+ p
->write
.index
;
570 l
= PA_PSTREAM_DESCRIPTOR_SIZE
- p
->write
.index
;
572 pa_assert(p
->write
.data
|| p
->write
.memchunk
.memblock
);
577 d
= (uint8_t*) pa_memblock_acquire(p
->write
.memchunk
.memblock
) + p
->write
.memchunk
.index
;
578 release_memblock
= p
->write
.memchunk
.memblock
;
581 d
= (uint8_t*) d
+ p
->write
.index
- PA_PSTREAM_DESCRIPTOR_SIZE
;
582 l
= ntohl(p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
]) - (p
->write
.index
- PA_PSTREAM_DESCRIPTOR_SIZE
);
588 if (p
->send_creds_now
) {
590 if ((r
= pa_iochannel_write_with_creds(p
->io
, d
, l
, &p
->write_creds
)) < 0)
593 p
->send_creds_now
= FALSE
;
597 if ((r
= pa_iochannel_write(p
->io
, d
, l
)) < 0)
600 if (release_memblock
)
601 pa_memblock_release(release_memblock
);
603 p
->write
.index
+= (size_t) r
;
605 if (p
->write
.index
>= PA_PSTREAM_DESCRIPTOR_SIZE
+ ntohl(p
->write
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
])) {
606 pa_assert(p
->write
.current
);
607 item_free(p
->write
.current
, NULL
);
608 p
->write
.current
= NULL
;
610 if (p
->write
.memchunk
.memblock
)
611 pa_memblock_unref(p
->write
.memchunk
.memblock
);
613 pa_memchunk_reset(&p
->write
.memchunk
);
615 if (p
->drain_callback
&& !pa_pstream_is_pending(p
))
616 p
->drain_callback(p
, p
->drain_callback_userdata
);
623 if (release_memblock
)
624 pa_memblock_release(release_memblock
);
629 static int do_read(pa_pstream
*p
) {
633 pa_memblock
*release_memblock
= NULL
;
635 pa_assert(PA_REFCNT_VALUE(p
) > 0);
637 if (p
->read
.index
< PA_PSTREAM_DESCRIPTOR_SIZE
) {
638 d
= (uint8_t*) p
->read
.descriptor
+ p
->read
.index
;
639 l
= PA_PSTREAM_DESCRIPTOR_SIZE
- p
->read
.index
;
641 pa_assert(p
->read
.data
|| p
->read
.memblock
);
646 d
= pa_memblock_acquire(p
->read
.memblock
);
647 release_memblock
= p
->read
.memblock
;
650 d
= (uint8_t*) d
+ p
->read
.index
- PA_PSTREAM_DESCRIPTOR_SIZE
;
651 l
= ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
]) - (p
->read
.index
- PA_PSTREAM_DESCRIPTOR_SIZE
);
658 if ((r
= pa_iochannel_read_with_creds(p
->io
, d
, l
, &p
->read_creds
, &b
)) <= 0)
661 p
->read_creds_valid
= p
->read_creds_valid
|| b
;
664 if ((r
= pa_iochannel_read(p
->io
, d
, l
)) <= 0)
668 if (release_memblock
)
669 pa_memblock_release(release_memblock
);
671 p
->read
.index
+= (size_t) r
;
673 if (p
->read
.index
== PA_PSTREAM_DESCRIPTOR_SIZE
) {
674 uint32_t flags
, length
, channel
;
675 /* Reading of frame descriptor complete */
677 flags
= ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_FLAGS
]);
679 if (!p
->use_shm
&& (flags
& PA_FLAG_SHMMASK
) != 0) {
680 pa_log_warn("Received SHM frame on a socket where SHM is disabled.");
684 if (flags
== PA_FLAG_SHMRELEASE
) {
686 /* This is a SHM memblock release frame with no payload */
688 /* pa_log("Got release frame for %u", ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); */
690 pa_assert(p
->export
);
691 pa_memexport_process_release(p
->export
, ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_HI
]));
695 } else if (flags
== PA_FLAG_SHMREVOKE
) {
697 /* This is a SHM memblock revoke frame with no payload */
699 /* pa_log("Got revoke frame for %u", ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); */
701 pa_assert(p
->import
);
702 pa_memimport_process_revoke(p
->import
, ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_HI
]));
707 length
= ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
]);
709 if (length
> FRAME_SIZE_MAX_ALLOW
|| length
<= 0) {
710 pa_log_warn("Received invalid frame size: %lu", (unsigned long) length
);
714 pa_assert(!p
->read
.packet
&& !p
->read
.memblock
);
716 channel
= ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_CHANNEL
]);
718 if (channel
== (uint32_t) -1) {
721 pa_log_warn("Received packet frame with invalid flags value.");
725 /* Frame is a packet frame */
726 p
->read
.packet
= pa_packet_new(length
);
727 p
->read
.data
= p
->read
.packet
->data
;
731 if ((flags
& PA_FLAG_SEEKMASK
) > PA_SEEK_RELATIVE_END
) {
732 pa_log_warn("Received memblock frame with invalid seek mode.");
736 if ((flags
& PA_FLAG_SHMMASK
) == PA_FLAG_SHMDATA
) {
738 if (length
!= sizeof(p
->read
.shm_info
)) {
739 pa_log_warn("Received SHM memblock frame with Invalid frame length.");
743 /* Frame is a memblock frame referencing an SHM memblock */
744 p
->read
.data
= p
->read
.shm_info
;
746 } else if ((flags
& PA_FLAG_SHMMASK
) == 0) {
748 /* Frame is a memblock frame */
750 p
->read
.memblock
= pa_memblock_new(p
->mempool
, length
);
754 pa_log_warn("Received memblock frame with invalid flags value.");
759 } else if (p
->read
.index
> PA_PSTREAM_DESCRIPTOR_SIZE
) {
760 /* Frame payload available */
762 if (p
->read
.memblock
&& p
->recieve_memblock_callback
) {
764 /* Is this memblock data? Than pass it to the user */
765 l
= (p
->read
.index
- (size_t) r
) < PA_PSTREAM_DESCRIPTOR_SIZE
? (size_t) (p
->read
.index
- PA_PSTREAM_DESCRIPTOR_SIZE
) : (size_t) r
;
770 chunk
.memblock
= p
->read
.memblock
;
771 chunk
.index
= p
->read
.index
- PA_PSTREAM_DESCRIPTOR_SIZE
- l
;
774 if (p
->recieve_memblock_callback
) {
778 (((uint64_t) ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_HI
])) << 32) |
779 (((uint64_t) ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_LO
]))));
781 p
->recieve_memblock_callback(
783 ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_CHANNEL
]),
785 ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_FLAGS
]) & PA_FLAG_SEEKMASK
,
787 p
->recieve_memblock_callback_userdata
);
790 /* Drop seek info for following callbacks */
791 p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_FLAGS
] =
792 p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_HI
] =
793 p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_LO
] = 0;
798 if (p
->read
.index
>= ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_LENGTH
]) + PA_PSTREAM_DESCRIPTOR_SIZE
) {
800 if (p
->read
.memblock
) {
802 /* This was a memblock frame. We can unref the memblock now */
803 pa_memblock_unref(p
->read
.memblock
);
805 } else if (p
->read
.packet
) {
807 if (p
->recieve_packet_callback
)
809 p
->recieve_packet_callback(p
, p
->read
.packet
, p
->read_creds_valid
? &p
->read_creds
: NULL
, p
->recieve_packet_callback_userdata
);
811 p
->recieve_packet_callback(p
, p
->read
.packet
, NULL
, p
->recieve_packet_callback_userdata
);
814 pa_packet_unref(p
->read
.packet
);
818 pa_assert((ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_FLAGS
]) & PA_FLAG_SHMMASK
) == PA_FLAG_SHMDATA
);
820 pa_assert(p
->import
);
822 if (!(b
= pa_memimport_get(p
->import
,
823 ntohl(p
->read
.shm_info
[PA_PSTREAM_SHM_BLOCKID
]),
824 ntohl(p
->read
.shm_info
[PA_PSTREAM_SHM_SHMID
]),
825 ntohl(p
->read
.shm_info
[PA_PSTREAM_SHM_INDEX
]),
826 ntohl(p
->read
.shm_info
[PA_PSTREAM_SHM_LENGTH
])))) {
828 if (pa_log_ratelimit(PA_LOG_DEBUG
))
829 pa_log_debug("Failed to import memory block.");
832 if (p
->recieve_memblock_callback
) {
838 chunk
.length
= b
? pa_memblock_get_length(b
) : ntohl(p
->read
.shm_info
[PA_PSTREAM_SHM_LENGTH
]);
841 (((uint64_t) ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_HI
])) << 32) |
842 (((uint64_t) ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_OFFSET_LO
]))));
844 p
->recieve_memblock_callback(
846 ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_CHANNEL
]),
848 ntohl(p
->read
.descriptor
[PA_PSTREAM_DESCRIPTOR_FLAGS
]) & PA_FLAG_SEEKMASK
,
850 p
->recieve_memblock_callback_userdata
);
854 pa_memblock_unref(b
);
864 p
->read
.memblock
= NULL
;
865 p
->read
.packet
= NULL
;
870 p
->read_creds_valid
= FALSE
;
876 if (release_memblock
)
877 pa_memblock_release(release_memblock
);
882 void pa_pstream_set_die_callback(pa_pstream
*p
, pa_pstream_notify_cb_t cb
, void *userdata
) {
884 pa_assert(PA_REFCNT_VALUE(p
) > 0);
886 p
->die_callback
= cb
;
887 p
->die_callback_userdata
= userdata
;
890 void pa_pstream_set_drain_callback(pa_pstream
*p
, pa_pstream_notify_cb_t cb
, void *userdata
) {
892 pa_assert(PA_REFCNT_VALUE(p
) > 0);
894 p
->drain_callback
= cb
;
895 p
->drain_callback_userdata
= userdata
;
898 void pa_pstream_set_recieve_packet_callback(pa_pstream
*p
, pa_pstream_packet_cb_t cb
, void *userdata
) {
900 pa_assert(PA_REFCNT_VALUE(p
) > 0);
902 p
->recieve_packet_callback
= cb
;
903 p
->recieve_packet_callback_userdata
= userdata
;
906 void pa_pstream_set_recieve_memblock_callback(pa_pstream
*p
, pa_pstream_memblock_cb_t cb
, void *userdata
) {
908 pa_assert(PA_REFCNT_VALUE(p
) > 0);
910 p
->recieve_memblock_callback
= cb
;
911 p
->recieve_memblock_callback_userdata
= userdata
;
914 void pa_pstream_set_release_callback(pa_pstream
*p
, pa_pstream_block_id_cb_t cb
, void *userdata
) {
916 pa_assert(PA_REFCNT_VALUE(p
) > 0);
918 p
->release_callback
= cb
;
919 p
->release_callback_userdata
= userdata
;
922 void pa_pstream_set_revoke_callback(pa_pstream
*p
, pa_pstream_block_id_cb_t cb
, void *userdata
) {
924 pa_assert(PA_REFCNT_VALUE(p
) > 0);
926 p
->release_callback
= cb
;
927 p
->release_callback_userdata
= userdata
;
930 pa_bool_t
pa_pstream_is_pending(pa_pstream
*p
) {
934 pa_assert(PA_REFCNT_VALUE(p
) > 0);
939 b
= p
->write
.current
|| !pa_queue_isempty(p
->send_queue
);
944 void pa_pstream_unref(pa_pstream
*p
) {
946 pa_assert(PA_REFCNT_VALUE(p
) > 0);
948 if (PA_REFCNT_DEC(p
) <= 0)
952 pa_pstream
* pa_pstream_ref(pa_pstream
*p
) {
954 pa_assert(PA_REFCNT_VALUE(p
) > 0);
960 void pa_pstream_unlink(pa_pstream
*p
) {
969 pa_memimport_free(p
->import
);
974 pa_memexport_free(p
->export
);
979 pa_iochannel_free(p
->io
);
983 if (p
->defer_event
) {
984 p
->mainloop
->defer_free(p
->defer_event
);
985 p
->defer_event
= NULL
;
988 p
->die_callback
= NULL
;
989 p
->drain_callback
= NULL
;
990 p
->recieve_packet_callback
= NULL
;
991 p
->recieve_memblock_callback
= NULL
;
994 void pa_pstream_enable_shm(pa_pstream
*p
, pa_bool_t enable
) {
996 pa_assert(PA_REFCNT_VALUE(p
) > 0);
1003 p
->export
= pa_memexport_new(p
->mempool
, memexport_revoke_cb
, p
);
1008 pa_memexport_free(p
->export
);
1014 pa_bool_t
pa_pstream_get_shm(pa_pstream
*p
) {
1016 pa_assert(PA_REFCNT_VALUE(p
) > 0);