1 /********************************************************************
3 * THIS FILE IS PART OF THE OggVorbis SOFTWARE CODEC SOURCE CODE. *
4 * USE, DISTRIBUTION AND REPRODUCTION OF THIS SOURCE IS GOVERNED BY *
5 * THE GNU PUBLIC LICENSE 2, WHICH IS INCLUDED WITH THIS SOURCE. *
6 * PLEASE READ THESE TERMS BEFORE DISTRIBUTING. *
8 * THE Ogg123 SOURCE CODE IS (C) COPYRIGHT 2000-2001 *
9 * by Stan Seibert <volsung@xiph.org> AND OTHER CONTRIBUTORS *
10 * http://www.xiph.org/ *
12 ********************************************************************
14 last mod: $Id: buffer.c,v 1.22 2003/09/01 20:15:19 volsung Exp $
16 ********************************************************************/
22 #include <sys/types.h>
36 #define MIN(x,y) ( (x) < (y) ? (x) : (y) )
37 #define MIN3(x,y,z) MIN(x,MIN(y,z))
38 #define MIN4(w,x,y,z) MIN( MIN(w,x), MIN(y,z) )
42 #define DEBUG(x) { fprintf (debugfile, "%d: " x "\n", getpid()); }
43 #define DEBUG1(x, y) { fprintf (debugfile, "%d: " x "\n", getpid(), y); }
44 #define DEBUG2(x, y, z) { fprintf (debugfile, "%d: " x "\n", getpid(), y, z); }
48 #define DEBUG2(x, y, z)
51 /* Macros that support debugging of threading structures */
53 #define LOCK_MUTEX(mutex) { DEBUG1("Locking mutex %s.", #mutex); pthread_mutex_lock (&(mutex)); }
54 #define UNLOCK_MUTEX(mutex) { DEBUG1("Unlocking mutex %s", #mutex); pthread_mutex_unlock(&(mutex)); }
55 #define COND_WAIT(cond, mutex) { DEBUG2("Unlocking %s and waiting on %s", #mutex, #cond); pthread_cond_wait(&(cond), &(mutex)); }
56 #define COND_SIGNAL(cond) { DEBUG1("Signalling %s", #cond); pthread_cond_signal(&(cond)); }
58 extern signal_request_t sig_request
; /* Need access to global cancel flag */
61 /* -------------------- Private Functions ------------------ */
63 void buffer_init_vars (buf_t
*buf
)
65 /* Initialize buffer flags */
66 buf
->prebuffering
= buf
->prebuffer_size
> 0;
75 buf
->position_end
= 0;
78 void buffer_thread_init (buf_t
*buf
)
82 DEBUG("Enter buffer_thread_init");
84 /* Block signals to this thread */
86 sigaddset(&set
, SIGINT
);
87 sigaddset(&set
, SIGTSTP
);
88 sigaddset(&set
, SIGCONT
);
89 if (pthread_sigmask(SIG_BLOCK
, &set
, NULL
) != 0)
90 DEBUG("pthread_sigmask failed");
94 void buffer_thread_cleanup (void *arg
)
96 buf_t
*buf
= (buf_t
*)arg
;
98 DEBUG("Enter buffer_thread_cleanup");
102 void buffer_mutex_unlock (void *arg
)
104 buf_t
*buf
= (buf_t
*)arg
;
106 UNLOCK_MUTEX(buf
->mutex
);
110 action_t
*malloc_action (action_func_t action_func
, void *action_arg
)
114 action
= malloc(sizeof(action_t
));
116 if (action
== NULL
) {
117 fprintf(stderr
, _("Error: Out of memory in malloc_action().\n"));
121 action
->position
= 0;
122 action
->action_func
= action_func
;
123 action
->arg
= action_arg
;
130 /* insert = 1: Make this action the first action associated with this position
131 insert = 0: Make this action the last action associated with this position
135 void in_order_add_action (action_t
**action_list
, action_t
*action
, int insert
)
137 insert
= insert
> 0 ? 1 : 0; /* Clamp in case caller messed up */
139 while (*action_list
!= NULL
&&
140 (*action_list
)->position
<= (action
->position
+ insert
))
141 action_list
= &((*action_list
)->next
);
143 action
->next
= *action_list
;
144 *action_list
= action
;
148 void execute_actions (buf_t
*buf
, action_t
**action_list
, ogg_int64_t position
)
152 while (*action_list
!= NULL
&& (*action_list
)->position
<= position
) {
153 action
= *action_list
;
154 action
->action_func(buf
, action
->arg
);
156 *action_list
= (*action_list
)->next
;
162 void free_action (action_t
*action
)
169 int compute_dequeue_size (buf_t
*buf
, int request_size
)
171 ogg_int64_t next_action_pos
;
174 For simplicity, the number of bytes played must satisfy the following
176 1. Do not extract more bytes than are stored in the buffer.
177 2. Do not extract more bytes than the requested number of bytes.
178 3. Do not run off the end of the buffer.
179 4. Do not go past the next action.
182 if (buf
->actions
!= NULL
) {
184 next_action_pos
= buf
->actions
->position
;
186 return MIN4((ogg_int64_t
)buf
->curfill
, (ogg_int64_t
)request_size
,
187 (ogg_int64_t
)(buf
->size
- buf
->start
),
188 next_action_pos
- buf
->position
);
190 return MIN3(buf
->curfill
, (long)request_size
, buf
->size
- buf
->start
);
195 void *buffer_thread_func (void *arg
)
197 buf_t
*buf
= (buf_t
*) arg
;
200 DEBUG("Enter buffer_thread_func");
202 buffer_thread_init(buf
);
204 pthread_cleanup_push(buffer_thread_cleanup
, buf
);
206 DEBUG("Start main play loop");
208 /* This test is safe since curfill will never decrease and eos will
210 while ( !(buf
->eos
&& buf
->curfill
== 0)) {
212 if (buf
->cancel_flag
|| sig_request
.cancel
)
215 DEBUG("Check for something to play");
216 /* Block until we can play something */
217 LOCK_MUTEX (buf
->mutex
);
218 if (buf
->prebuffering
||
220 (buf
->curfill
< buf
->audio_chunk_size
&& !buf
->eos
)) {
222 DEBUG("Waiting for more data to play.");
223 COND_WAIT(buf
->playback_cond
, buf
->mutex
);
226 DEBUG("Ready to play");
228 UNLOCK_MUTEX(buf
->mutex
);
230 if (buf
->cancel_flag
|| sig_request
.cancel
)
233 /* Don't need to lock buffer while running actions since position
234 won't change. We clear out any actions before we compute the
235 dequeue size so we don't consider actions that need to
237 execute_actions(buf
, &buf
->actions
, buf
->position
);
239 LOCK_MUTEX(buf
->mutex
);
241 /* Need to be locked while we check things. */
242 write_amount
= compute_dequeue_size(buf
, buf
->audio_chunk_size
);
244 UNLOCK_MUTEX(buf
->mutex
);
246 /* No need to lock mutex here because the other thread will
247 NEVER reduce the number of bytes stored in the buffer */
248 DEBUG1("Sending %d bytes to the audio device", write_amount
);
249 write_amount
= buf
->write_func(buf
->buffer
+ buf
->start
, write_amount
,
250 /* Only set EOS if this is the last chunk */
251 write_amount
== buf
->curfill
? buf
->eos
: 0,
254 LOCK_MUTEX(buf
->mutex
);
256 buf
->curfill
-= write_amount
;
257 buf
->position
+= write_amount
;
258 buf
->start
= (buf
->start
+ write_amount
) % buf
->size
;
259 DEBUG1("Updated buffer fill, curfill = %ld", buf
->curfill
);
261 /* If we've essentially emptied the buffer and prebuffering is enabled,
262 we need to do another prebuffering session */
263 if (!buf
->eos
&& (buf
->curfill
< buf
->audio_chunk_size
))
264 buf
->prebuffering
= buf
->prebuffer_size
> 0;
266 /* Signal a waiting decoder thread that they can put more audio into the
268 DEBUG("Signal decoder thread that buffer space is available");
269 COND_SIGNAL(buf
->write_cond
);
271 UNLOCK_MUTEX(buf
->mutex
);
274 pthread_cleanup_pop(1);
275 DEBUG("exiting buffer_thread_func");
281 void submit_data_chunk (buf_t
*buf
, char *data
, size_t size
)
283 long buf_write_pos
; /* offset of first available write location */
286 DEBUG1("Enter submit_data_chunk, size %d", size
);
288 pthread_cleanup_push(buffer_mutex_unlock
, buf
);
290 /* Put the data into the buffer as space is made available */
291 while (size
> 0 && !buf
->abort_write
) {
293 /* Section 1: Write a chunk of data */
294 DEBUG("Obtaining lock on buffer");
295 LOCK_MUTEX(buf
->mutex
);
296 if (buf
->size
- buf
->curfill
> 0) {
298 /* Figure how much we can write into the buffer. Requirements:
299 1. Don't write more data than we have.
300 2. Don't write more data than we have room for.
301 3. Don't write past the end of the buffer. */
302 buf_write_pos
= (buf
->start
+ buf
->curfill
) % buf
->size
;
303 write_size
= MIN3(size
, buf
->size
- buf
->curfill
,
304 buf
->size
- buf_write_pos
);
306 memcpy(buf
->buffer
+ buf_write_pos
, data
, write_size
);
307 buf
->curfill
+= write_size
;
310 buf
->position_end
+= write_size
;
311 DEBUG1("writing chunk into buffer, curfill = %ld", buf
->curfill
);
315 if (buf
->cancel_flag
|| sig_request
.cancel
) {
316 UNLOCK_MUTEX(buf
->mutex
);
319 /* No room for more data, wait until there is */
320 DEBUG("No room for data in buffer. Waiting.");
321 COND_WAIT(buf
->write_cond
, buf
->mutex
);
323 /* Section 2: signal if we are not prebuffering, done
324 prebuffering, or paused */
325 if (buf
->prebuffering
&& (buf
->prebuffer_size
<= buf
->curfill
)) {
327 DEBUG("prebuffering done")
328 buf
->prebuffering
= 0; /* done prebuffering */
331 if (!buf
->prebuffering
&& !buf
->paused
) {
333 DEBUG("Signalling playback thread that more data is available.");
334 COND_SIGNAL(buf
->playback_cond
);
336 DEBUG("Not signalling playback thread since prebuffering or paused.");
338 UNLOCK_MUTEX(buf
->mutex
);
341 pthread_cleanup_pop(0);
343 DEBUG("Exit submit_data_chunk");
347 buffer_stats_t
*malloc_buffer_stats ()
349 buffer_stats_t
*new_stats
;
351 new_stats
= malloc(sizeof(buffer_stats_t
));
353 if (new_stats
== NULL
) {
354 fprintf(stderr
, _("Error: Could not allocate memory in malloc_buffer_stats()\n"));
362 /* ------------------ Begin public interface ------------------ */
364 /* --- Buffer allocation --- */
366 buf_t
*buffer_create (long size
, long prebuffer
,
367 buffer_write_func_t write_func
, void *arg
,
368 int audio_chunk_size
)
370 buf_t
*buf
= malloc (sizeof(buf_t
) + sizeof (char) * (size
- 1));
378 debugfile
= fopen ("/tmp/bufferdebug", "w");
379 setvbuf (debugfile
, NULL
, _IONBF
, 0);
382 /* Initialize the buffer structure. */
383 DEBUG1("buffer_create, size = %ld", size
);
385 memset (buf
, 0, sizeof(*buf
));
387 buf
->write_func
= write_func
;
388 buf
->write_arg
= arg
;
390 /* Setup pthread variables */
391 pthread_mutex_init(&buf
->mutex
, NULL
);
392 pthread_cond_init(&buf
->write_cond
, NULL
);
393 pthread_cond_init(&buf
->playback_cond
, NULL
);
395 /* Correct for impossible prebuffer and chunk sizes */
396 if (audio_chunk_size
> size
|| audio_chunk_size
== 0)
397 audio_chunk_size
= size
/ 2;
399 if (prebuffer
> size
)
400 prebuffer
= prebuffer
/ 2;
402 buf
->audio_chunk_size
= audio_chunk_size
;
404 buf
->prebuffer_size
= prebuffer
;
409 /* Initialize flags */
410 buffer_init_vars(buf
);
416 void buffer_reset (buf_t
*buf
)
420 /* Cleanup pthread variables */
421 pthread_mutex_destroy(&buf
->mutex
);
422 pthread_cond_destroy(&buf
->write_cond
);
423 pthread_cond_destroy(&buf
->playback_cond
);
425 /* Reinit pthread variables */
426 pthread_mutex_init(&buf
->mutex
, NULL
);
427 pthread_cond_init(&buf
->write_cond
, NULL
);
428 pthread_cond_init(&buf
->playback_cond
, NULL
);
430 /* Clear old actions */
431 while (buf
->actions
!= NULL
) {
432 action
= buf
->actions
;
433 buf
->actions
= buf
->actions
->next
;
437 buffer_init_vars(buf
);
441 void buffer_destroy (buf_t
*buf
)
443 DEBUG("buffer_destroy");
445 /* Cleanup pthread variables */
446 pthread_mutex_destroy(&buf
->mutex
);
447 COND_SIGNAL(buf
->write_cond
);
448 pthread_cond_destroy(&buf
->write_cond
);
449 COND_SIGNAL(buf
->playback_cond
);
450 pthread_cond_destroy(&buf
->playback_cond
);
456 /* --- Buffer thread control --- */
458 int buffer_thread_start (buf_t
*buf
)
460 DEBUG("Starting new thread.");
462 return pthread_create(&buf
->thread
, NULL
, buffer_thread_func
, buf
);
466 /* WARNING: DO NOT call buffer_submit_data after you pause the
467 playback thread, or you run the risk of deadlocking. Call
468 buffer_thread_unpause first. */
469 void buffer_thread_pause (buf_t
*buf
)
471 DEBUG("Pausing playback thread");
473 pthread_cleanup_push(buffer_mutex_unlock
, buf
);
475 LOCK_MUTEX(buf
->mutex
);
477 UNLOCK_MUTEX(buf
->mutex
);
479 pthread_cleanup_pop(0);
483 void buffer_thread_unpause (buf_t
*buf
)
485 DEBUG("Unpausing playback thread");
487 pthread_cleanup_push(buffer_mutex_unlock
, buf
);
489 LOCK_MUTEX(buf
->mutex
);
491 COND_SIGNAL(buf
->playback_cond
);
492 UNLOCK_MUTEX(buf
->mutex
);
494 pthread_cleanup_pop(0);
498 void buffer_thread_kill (buf_t
*buf
)
500 DEBUG("Attempting to kill playback thread.");
502 /* Flag the cancellation */
503 buf
->cancel_flag
= 1;
505 /* Signal the playback condition to wake stuff up */
506 COND_SIGNAL(buf
->playback_cond
);
508 pthread_join(buf
->thread
, NULL
);
510 buffer_thread_cleanup(buf
);
512 DEBUG("Playback thread killed.");
516 /* --- Data buffering functions --- */
518 void buffer_submit_data (buf_t
*buf
, char *data
, long nbytes
) {
519 submit_data_chunk (buf
, data
, nbytes
);
522 size_t buffer_get_data (buf_t
*buf
, char *data
, long nbytes
)
529 DEBUG("Enter buffer_get_data");
531 pthread_cleanup_push(buffer_mutex_unlock
, buf
);
533 LOCK_MUTEX(buf
->mutex
);
535 /* Put the data into the buffer as space is made available */
538 if (buf
->abort_write
)
541 DEBUG("Obtaining lock on buffer");
542 /* Block until we can read something */
543 if (buf
->curfill
== 0 && buf
->eos
)
544 break; /* No more data to read */
546 if (buf
->curfill
== 0 || (buf
->prebuffering
&& !buf
->eos
)) {
547 DEBUG("Waiting for more data to copy.");
548 COND_WAIT(buf
->playback_cond
, buf
->mutex
);
551 if (buf
->abort_write
)
554 /* Note: Even if curfill is still 0, nothing bad will happen here */
556 /* For simplicity, the number of bytes played must satisfy
557 the following three requirements:
559 1. Do not copy more bytes than are stored in the buffer.
560 2. Do not copy more bytes than the reqested data size.
561 3. Do not run off the end of the buffer. */
562 write_amount
= compute_dequeue_size(buf
, nbytes
);
564 UNLOCK_MUTEX(buf
->mutex
);
565 execute_actions(buf
, &buf
->actions
, buf
->position
);
567 /* No need to lock mutex here because the other thread will
568 NEVER reduce the number of bytes stored in the buffer */
569 DEBUG1("Copying %d bytes from the buffer", write_amount
);
570 memcpy(data
, buf
->buffer
+ buf
->start
, write_amount
);
571 LOCK_MUTEX(buf
->mutex
);
573 buf
->curfill
-= write_amount
;
574 data
+= write_amount
;
575 nbytes
-= write_amount
;
576 buf
->start
= (buf
->start
+ write_amount
) % buf
->size
;
577 DEBUG1("Updated buffer fill, curfill = %ld", buf
->curfill
);
579 /* Signal a waiting decoder thread that they can put more
580 audio into the buffer */
581 DEBUG("Signal decoder thread that buffer space is available");
582 COND_SIGNAL(buf
->write_cond
);
585 UNLOCK_MUTEX(buf
->mutex
);
587 pthread_cleanup_pop(0);
589 pthread_testcancel();
591 DEBUG("Exit buffer_get_data");
593 return orig_size
- nbytes
;
596 void buffer_mark_eos (buf_t
*buf
)
598 DEBUG("buffer_mark_eos");
600 pthread_cleanup_push(buffer_mutex_unlock
, buf
);
602 LOCK_MUTEX(buf
->mutex
);
604 buf
->prebuffering
= 0;
605 COND_SIGNAL(buf
->playback_cond
);
606 UNLOCK_MUTEX(buf
->mutex
);
608 pthread_cleanup_pop(0);
611 void buffer_abort_write (buf_t
*buf
)
613 DEBUG("buffer_abort_write");
615 pthread_cleanup_push(buffer_mutex_unlock
, buf
);
617 LOCK_MUTEX(buf
->mutex
);
618 buf
->abort_write
= 1;
619 COND_SIGNAL(buf
->write_cond
);
620 COND_SIGNAL(buf
->playback_cond
);
621 UNLOCK_MUTEX(buf
->mutex
);
623 pthread_cleanup_pop(0);
627 /* --- Action buffering functions --- */
629 void buffer_action_now (buf_t
*buf
, action_func_t action_func
,
634 action
= malloc_action(action_func
, action_arg
);
636 pthread_cleanup_push(buffer_mutex_unlock
, buf
);
638 LOCK_MUTEX(buf
->mutex
);
640 action
->position
= buf
->position
;
642 /* Insert this action right at the front */
643 action
->next
= buf
->actions
;
644 buf
->actions
= action
;
646 UNLOCK_MUTEX(buf
->mutex
);
648 pthread_cleanup_pop(0);
652 void buffer_insert_action_at_end (buf_t
*buf
, action_func_t action_func
,
657 action
= malloc_action(action_func
, action_arg
);
659 pthread_cleanup_push(buffer_mutex_unlock
, buf
);
661 LOCK_MUTEX(buf
->mutex
);
663 /* Stick after the last item in the buffer */
664 action
->position
= buf
->position_end
;
666 in_order_add_action(&buf
->actions
, action
, INSERT
);
668 UNLOCK_MUTEX(buf
->mutex
);
670 pthread_cleanup_pop(0);
674 void buffer_append_action_at_end (buf_t
*buf
, action_func_t action_func
,
679 action
= malloc_action(action_func
, action_arg
);
681 pthread_cleanup_push(buffer_mutex_unlock
, buf
);
683 LOCK_MUTEX(buf
->mutex
);
685 /* Stick after the last item in the buffer */
686 action
->position
= buf
->position_end
;
688 in_order_add_action(&buf
->actions
, action
, APPEND
);
690 UNLOCK_MUTEX(buf
->mutex
);
692 pthread_cleanup_pop(0);
696 void buffer_insert_action_at (buf_t
*buf
, action_func_t action_func
,
697 void *action_arg
, ogg_int64_t position
)
701 action
= malloc_action(action_func
, action_arg
);
703 pthread_cleanup_push(buffer_mutex_unlock
, buf
);
705 LOCK_MUTEX(buf
->mutex
);
707 action
->position
= position
;
709 in_order_add_action(&buf
->actions
, action
, INSERT
);
711 UNLOCK_MUTEX(buf
->mutex
);
713 pthread_cleanup_pop(0);
717 void buffer_append_action_at (buf_t
*buf
, action_func_t action_func
,
718 void *action_arg
, ogg_int64_t position
)
722 action
= malloc_action(action_func
, action_arg
);
724 pthread_cleanup_push(buffer_mutex_unlock
, buf
);
726 LOCK_MUTEX(buf
->mutex
);
728 action
->position
= position
;
730 in_order_add_action(&buf
->actions
, action
, APPEND
);
732 UNLOCK_MUTEX(buf
->mutex
);
734 pthread_cleanup_pop(0);
738 /* --- Buffer status functions --- */
740 void buffer_wait_for_empty (buf_t
*buf
)
744 DEBUG("Enter buffer_wait_for_empty");
746 pthread_cleanup_push(buffer_mutex_unlock
, buf
);
748 LOCK_MUTEX(buf
->mutex
);
751 if (buf
->curfill
> 0) {
752 DEBUG1("Buffer curfill = %ld, going back to sleep.", buf
->curfill
);
753 COND_WAIT(buf
->write_cond
, buf
->mutex
);
757 UNLOCK_MUTEX(buf
->mutex
);
759 pthread_cleanup_pop(0);
761 DEBUG("Exit buffer_wait_for_empty");
765 long buffer_full (buf_t
*buf
)
771 buffer_stats_t
*buffer_statistics (buf_t
*buf
)
773 buffer_stats_t
*stats
;
775 pthread_cleanup_push(buffer_mutex_unlock
, buf
);
777 LOCK_MUTEX(buf
->mutex
);
779 stats
= malloc_buffer_stats();
781 stats
->size
= buf
->size
;
782 stats
->fill
= (double) buf
->curfill
/ (double) buf
->size
* 100.0;
783 stats
->prebuffer_fill
= (double) buf
->prebuffer_size
/ (double) buf
->size
;
784 stats
->prebuffering
= buf
->prebuffering
;
785 stats
->paused
= buf
->paused
;
786 stats
->eos
= buf
->eos
;
788 UNLOCK_MUTEX(buf
->mutex
);
790 pthread_cleanup_pop(0);