Make prefetch-piped samples stop correctly.
[calfbox.git] / streamplay.c
blobb8273134a861908de8f90017e276cc801b73b65d
1 /*
2 Calf Box, an open source musical instrument.
3 Copyright (C) 2010-2013 Krzysztof Foltman
5 This program is free software: you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation, either version 3 of the License, or
8 (at your option) any later version.
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 GNU General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
19 #include "assert.h"
20 #include "config.h"
21 #include "config-api.h"
22 #include "dspmath.h"
23 #include "fifo.h"
24 #include "module.h"
25 #include "rt.h"
26 #include <errno.h>
27 #include <glib.h>
28 #include <malloc.h>
29 #include <math.h>
30 #include <memory.h>
31 #include <pthread.h>
32 #include <sndfile.h>
33 #include <stdio.h>
34 #include <stdlib.h>
35 #include <unistd.h>
37 #define CBOX_STREAM_PLAYER_ERROR cbox_stream_player_error_quark()
39 enum CboxStreamPlayerError
41 CBOX_STREAM_PLAYER_ERROR_FAILED,
44 GQuark cbox_stream_player_error_quark(void)
46 return g_quark_from_string("cbox-stream-player-error-quark");
49 #define CUE_BUFFER_SIZE 16000
50 #define PREFETCH_THRESHOLD (CUE_BUFFER_SIZE / 4)
51 #define MAX_READAHEAD_BUFFERS 3
53 #define NO_SAMPLE_LOOP ((uint64_t)-1ULL)
55 struct stream_player_cue_point
57 volatile uint64_t position;
58 volatile uint32_t size, length;
59 float *data;
60 int queued;
63 enum stream_state_phase
65 STOPPED,
66 PLAYING,
67 STOPPING,
68 STARTING
71 struct stream_state
73 SNDFILE *sndfile;
74 SF_INFO info;
75 uint64_t readptr;
76 uint64_t restart;
77 uint64_t readptr_new;
79 volatile int buffer_in_use;
81 struct stream_player_cue_point cp_start, cp_loop, cp_readahead[MAX_READAHEAD_BUFFERS];
82 int cp_readahead_ready[MAX_READAHEAD_BUFFERS];
83 struct stream_player_cue_point *pcp_current, *pcp_next;
85 struct cbox_fifo *rb_for_reading, *rb_just_read;
86 float gain, fade_gain, fade_increment;
87 enum stream_state_phase phase;
89 pthread_t thr_preload;
90 int thread_started;
92 gchar *filename;
95 struct stream_player_module
97 struct cbox_module module;
99 struct stream_state *stream;
100 float fade_increment;
103 static void init_cue(struct stream_state *ss, struct stream_player_cue_point *pt, uint32_t size, uint64_t pos)
105 pt->data = malloc(size * sizeof(float) * ss->info.channels);
106 pt->size = size;
107 pt->length = 0;
108 pt->queued = 0;
109 pt->position = pos;
112 static void load_at_cue(struct stream_state *ss, struct stream_player_cue_point *pt)
114 if (pt->position != NO_SAMPLE_LOOP)
116 sf_seek(ss->sndfile, pt->position, 0);
117 pt->length = sf_readf_float(ss->sndfile, pt->data, pt->size);
119 pt->queued = 0;
122 static int is_contained(struct stream_player_cue_point *pt, uint64_t ofs)
124 return pt->position != NO_SAMPLE_LOOP && ofs >= pt->position && ofs < pt->position + pt->length;
127 static int is_queued(struct stream_player_cue_point *pt, uint64_t ofs)
129 return pt->queued && pt->position != NO_SAMPLE_LOOP && ofs >= pt->position && ofs < pt->position + pt->size;
132 struct stream_player_cue_point *get_cue(struct stream_state *ss, uint64_t pos)
134 int i;
136 if (is_contained(&ss->cp_loop, pos))
137 return &ss->cp_loop;
138 if (is_contained(&ss->cp_start, pos))
139 return &ss->cp_start;
141 for (i = 0; i < MAX_READAHEAD_BUFFERS; i++)
143 if (ss->cp_readahead_ready[i] && is_contained(&ss->cp_readahead[i], pos))
144 return &ss->cp_readahead[i];
146 return NULL;
149 struct stream_player_cue_point *get_queued_buffer(struct stream_state *ss, uint64_t pos)
151 int i;
153 for (i = 0; i < MAX_READAHEAD_BUFFERS; i++)
155 if (!ss->cp_readahead_ready[i] && is_queued(&ss->cp_readahead[i], pos))
156 return &ss->cp_readahead[i];
158 return NULL;
161 void request_load(struct stream_state *ss, int buf_idx, uint64_t pos)
163 unsigned char cidx = (unsigned char)buf_idx;
164 struct stream_player_cue_point *pt = &ss->cp_readahead[buf_idx];
166 ss->cp_readahead_ready[buf_idx] = 0;
167 pt->position = pos;
168 pt->length = 0;
169 pt->queued = 1;
171 #ifdef NDEBUG
172 cbox_fifo_write_atomic(ss->rb_for_reading, &cidx, 1);
173 #else
174 gboolean result = cbox_fifo_write_atomic(ss->rb_for_reading, &cidx, 1);
175 assert(result);
176 #endif
179 int get_unused_buffer(struct stream_state *ss)
181 int i = 0;
182 int notbad = -1;
184 // return first buffer that is not currently played or in queue; XXXKF this is a very primitive strategy, a good one would at least use the current play position
185 for (i = 0; i < MAX_READAHEAD_BUFFERS; i++)
187 int64_t rel;
188 if (&ss->cp_readahead[i] == ss->pcp_current)
189 continue;
190 if (ss->cp_readahead[i].queued)
191 continue;
192 // If there's any unused buffer, return it
193 if (ss->cp_readahead[i].position == NO_SAMPLE_LOOP)
194 return i;
195 // If this has already been played, return it
196 rel = ss->readptr - ss->cp_readahead[i].position;
197 if (rel >= ss->cp_readahead[i].length)
198 return i;
199 // Use as second chance
200 notbad = i;
202 return notbad;
205 static void *sample_preload_thread(void *user_data)
207 struct stream_state *ss = user_data;
209 do {
210 unsigned char buf_idx;
211 if (!cbox_fifo_read_atomic(ss->rb_for_reading, &buf_idx, 1))
213 usleep(5000);
214 continue;
216 if (buf_idx == 255)
217 break;
218 // fprintf(stderr, "Preload: %d, %lld\n", (int)buf_idx, (long long)m->cp_readahead[buf_idx].position);
219 load_at_cue(ss, &ss->cp_readahead[buf_idx]);
220 // fprintf(stderr, "Preloaded\n", (int)buf_idx, (long long)m->cp_readahead[buf_idx].position);
221 cbox_fifo_write_atomic(ss->rb_just_read, &buf_idx, 1);
222 } while(1);
223 return NULL;
226 void stream_player_process_event(struct cbox_module *module, const uint8_t *data, uint32_t len)
228 // struct stream_player_module *m = (struct stream_player_module *)module;
231 static void request_next(struct stream_state *ss, uint64_t pos)
233 // Check if we've requested a next buffer, if not, request it
235 // First verify if our idea of 'next' buffer is correct
236 // XXXKF This is technically incorrect, it won't tell whether the next "block" that's there
237 // isn't actually a single sample. I worked it around by ensuring end of blocks are always
238 // at CUE_BUFFER_SIZE boundary, and this works well, but causes buffers to be of uneven size.
239 if (ss->pcp_next && (is_contained(ss->pcp_next, pos) || is_queued(ss->pcp_next, pos)))
241 // We're still waiting for the requested buffer, but that's OK
242 return;
245 // We don't know the next buffer, or the next buffer doesn't contain
246 // the sample we're looking for.
247 ss->pcp_next = get_queued_buffer(ss, pos);
248 if (!ss->pcp_next)
250 // It hasn't even been requested - request it
251 int buf_idx = get_unused_buffer(ss);
252 if(buf_idx == -1)
254 printf("Ran out of buffers\n");
255 return;
257 request_load(ss, buf_idx, pos);
258 ss->pcp_next = &ss->cp_readahead[buf_idx];
260 // printf("@%lld: Requested load into buffer %d at %lld\n", (long long)m->readptr, buf_idx, (long long) pos);
264 static void copy_samples(struct stream_state *ss, cbox_sample_t **outputs, float *data, int count, int ofs, int pos)
266 int i;
267 float gain = ss->gain * ss->fade_gain;
268 if (ss->phase == STARTING)
270 ss->fade_gain += ss->fade_increment;
271 if (ss->fade_gain >= 1)
273 ss->fade_gain = 1;
274 ss->phase = PLAYING;
277 else
278 if (ss->phase == STOPPING)
280 ss->fade_gain -= ss->fade_increment;
281 if (ss->fade_gain < 0)
283 ss->fade_gain = 0;
284 ss->phase = STOPPED;
287 float new_gain = ss->gain * ss->fade_gain;
288 float gain_delta = (new_gain - gain) * (1.0 / CBOX_BLOCK_SIZE);
290 if (ss->info.channels == 1)
292 for (i = 0; i < count; i++)
294 outputs[0][ofs + i] = outputs[1][ofs + i] = gain * data[pos + i];
295 gain += gain_delta;
298 else
299 if (ss->info.channels == 2)
301 for (i = 0; i < count; i++)
303 outputs[0][ofs + i] = gain * data[pos << 1];
304 outputs[1][ofs + i] = gain * data[(pos << 1) + 1];
305 gain += gain_delta;
306 pos++;
309 else
311 uint32_t ch = ss->info.channels;
312 for (i = 0; i < count; i++)
314 outputs[0][ofs + i] = gain * data[pos * ch];
315 outputs[1][ofs + i] = gain * data[pos * ch + 1];
316 gain += gain_delta;
317 pos++;
320 ss->readptr += count;
321 if (ss->readptr >= (uint32_t)ss->info.frames)
323 ss->readptr = ss->restart;
327 void stream_player_process_block(struct cbox_module *module, cbox_sample_t **inputs, cbox_sample_t **outputs)
329 struct stream_player_module *m = (struct stream_player_module *)module;
330 struct stream_state *ss = m->stream;
331 int i, optr;
332 unsigned char buf_idx;
334 if (!ss || ss->readptr == NO_SAMPLE_LOOP)
336 for (int i = 0; i < CBOX_BLOCK_SIZE; i++)
338 outputs[0][i] = outputs[1][i] = 0;
340 return;
343 // receive buffer completion messages from the queue
344 while(cbox_fifo_read_atomic(ss->rb_just_read, &buf_idx, 1))
346 ss->cp_readahead_ready[buf_idx] = 1;
349 optr = 0;
350 do {
351 if (ss->phase == STOPPED)
352 break;
353 if (ss->readptr == NO_SAMPLE_LOOP)
355 ss->phase = STOPPED;
356 break;
359 if (ss->pcp_current && !is_contained(ss->pcp_current, ss->readptr))
360 ss->pcp_current = NULL;
362 if (!ss->pcp_current)
364 if (ss->pcp_next && is_contained(ss->pcp_next, ss->readptr))
366 ss->pcp_current = ss->pcp_next;
367 ss->pcp_next = NULL;
369 else
370 ss->pcp_current = get_cue(ss, ss->readptr);
373 if (!ss->pcp_current)
375 printf("Underrun at %d\n", (int)ss->readptr);
376 // Underrun; request/wait for next block and output zeros
377 request_next(ss, ss->readptr);
378 break;
380 assert(!ss->pcp_current->queued);
382 uint64_t data_end = ss->pcp_current->position + ss->pcp_current->length;
383 uint32_t data_left = data_end - ss->readptr;
385 // If we're close to running out of space, prefetch the next bit
386 if (data_left < PREFETCH_THRESHOLD && data_end < ss->info.frames)
387 request_next(ss, data_end);
389 float *data = ss->pcp_current->data;
390 uint32_t pos = ss->readptr - ss->pcp_current->position;
391 uint32_t count = data_end - ss->readptr;
392 if (count > CBOX_BLOCK_SIZE - optr)
393 count = CBOX_BLOCK_SIZE - optr;
395 // printf("Copy samples: copying %d, optr %d, %lld = %d @ [%lld - %lld], left %d\n", count, optr, (long long)m->readptr, pos, (long long)m->pcp_current->position, (long long)data_end, (int)data_left);
396 copy_samples(ss, outputs, data, count, optr, pos);
397 optr += count;
398 } while(optr < CBOX_BLOCK_SIZE);
400 for (i = optr; i < CBOX_BLOCK_SIZE; i++)
402 outputs[0][i] = outputs[1][i] = 0;
406 static void stream_state_destroy(struct stream_state *ss)
408 unsigned char cmd = 255;
410 if (ss->rb_for_reading && ss->thread_started)
412 cbox_fifo_write_atomic(ss->rb_for_reading, &cmd, 1);
413 pthread_join(ss->thr_preload, NULL);
416 if (ss->rb_for_reading)
417 cbox_fifo_destroy(ss->rb_for_reading);
418 if (ss->rb_just_read)
419 cbox_fifo_destroy(ss->rb_just_read);
420 if (ss->sndfile)
421 sf_close(ss->sndfile);
422 if (ss->filename)
423 g_free(ss->filename);
424 free(ss);
427 void stream_player_destroyfunc(struct cbox_module *module)
429 struct stream_player_module *m = (struct stream_player_module *)module;
430 if (m->stream)
431 stream_state_destroy(m->stream);
434 static struct stream_state *stream_state_new(const char *context, const gchar *filename, uint64_t loop, float fade_increment, GError **error)
436 struct stream_state *stream = malloc(sizeof(struct stream_state));
437 memset(&stream->info, 0, sizeof(stream->info));
438 stream->sndfile = sf_open(filename, SFM_READ, &stream->info);
440 if (!stream->sndfile)
442 g_set_error(error, CBOX_STREAM_PLAYER_ERROR, CBOX_STREAM_PLAYER_ERROR_FAILED, "instrument '%s': cannot open file '%s': %s", context, filename, sf_strerror(NULL));
443 free(stream);
444 return NULL;
446 g_message("Frames %d channels %d", (int)stream->info.frames, (int)stream->info.channels);
448 stream->rb_for_reading = cbox_fifo_new(MAX_READAHEAD_BUFFERS + 1);
449 stream->rb_just_read = cbox_fifo_new(MAX_READAHEAD_BUFFERS + 1);
451 stream->phase = STOPPED;
452 stream->readptr = 0;
453 stream->restart = loop;
454 stream->pcp_current = &stream->cp_start;
455 stream->pcp_next = NULL;
456 stream->gain = 1.0;
457 stream->fade_gain = 0.0;
458 stream->fade_increment = fade_increment;
459 stream->thread_started = 0;
460 stream->filename = g_strdup(filename);
462 init_cue(stream, &stream->cp_start, CUE_BUFFER_SIZE, 0);
463 load_at_cue(stream, &stream->cp_start);
464 if (stream->restart > 0 && (stream->restart % CUE_BUFFER_SIZE) > 0)
465 init_cue(stream, &stream->cp_loop, CUE_BUFFER_SIZE + (CUE_BUFFER_SIZE - (stream->restart % CUE_BUFFER_SIZE)), stream->restart);
466 else
467 init_cue(stream, &stream->cp_loop, CUE_BUFFER_SIZE, stream->restart);
468 load_at_cue(stream, &stream->cp_loop);
469 for (int i = 0; i < MAX_READAHEAD_BUFFERS; i++)
471 init_cue(stream, &stream->cp_readahead[i], CUE_BUFFER_SIZE, NO_SAMPLE_LOOP);
472 stream->cp_readahead_ready[i] = 0;
474 if (pthread_create(&stream->thr_preload, NULL, sample_preload_thread, stream))
476 stream_state_destroy(stream);
477 g_set_error(error, CBOX_STREAM_PLAYER_ERROR, CBOX_STREAM_PLAYER_ERROR_FAILED, "cannot create streaming thread: %s", strerror(errno));
478 return NULL;
480 stream->thread_started = 1;
482 return stream;
485 ///////////////////////////////////////////////////////////////////////////////////
487 static int stream_player_seek_execute(void *p)
489 struct stream_player_module *m = p;
491 m->stream->readptr = m->stream->readptr_new;
493 return 1;
496 static struct cbox_rt_cmd_definition stream_seek_command = {
497 .prepare = NULL,
498 .execute = stream_player_seek_execute,
499 .cleanup = NULL
502 ///////////////////////////////////////////////////////////////////////////////////
504 static int stream_player_play_execute(void *p)
506 struct stream_player_module *m = p;
508 if (m->stream->readptr == NO_SAMPLE_LOOP)
509 m->stream->readptr = 0;
510 if (m->stream->phase != PLAYING)
512 if (m->stream->readptr == 0)
514 m->stream->fade_gain = 1.0;
515 m->stream->phase = PLAYING;
517 else
518 m->stream->phase = STARTING;
520 return 1;
523 static struct cbox_rt_cmd_definition stream_play_command = {
524 .prepare = NULL,
525 .execute = stream_player_play_execute,
526 .cleanup = NULL
529 ///////////////////////////////////////////////////////////////////////////////////
531 static int stream_player_stop_execute(void *p)
533 struct stream_player_module *m = p;
535 if (m->stream->phase != STOPPED)
536 m->stream->phase = STOPPING;
537 return 1;
540 static struct cbox_rt_cmd_definition stream_stop_command = {
541 .prepare = NULL,
542 .execute = stream_player_stop_execute,
543 .cleanup = NULL
546 ///////////////////////////////////////////////////////////////////////////////////
548 struct load_command_data
550 struct stream_player_module *module;
551 gchar *context;
552 gchar *filename;
553 int loop_start;
554 struct stream_state *stream, *old_stream;
555 GError **error;
558 static int stream_player_load_prepare(void *p)
560 struct load_command_data *c = p;
562 if (!c->filename)
563 return 0;
564 c->stream = stream_state_new(c->context, c->filename, c->loop_start, c->module->fade_increment, c->error);
565 c->old_stream = NULL;
566 if (!c->stream)
568 g_free(c->filename);
569 return -1;
571 return 0;
574 static int stream_player_load_execute(void *p)
576 struct load_command_data *c = p;
578 c->old_stream = c->module->stream;
579 c->module->stream = c->stream;
580 return 1;
583 static void stream_player_load_cleanup(void *p)
585 struct load_command_data *c = p;
587 if (c->filename)
588 g_free(c->filename);
589 if (c->old_stream && c->old_stream != c->stream)
590 stream_state_destroy(c->old_stream);
593 static struct cbox_rt_cmd_definition stream_load_command = {
594 .prepare = stream_player_load_prepare,
595 .execute = stream_player_load_execute,
596 .cleanup = stream_player_load_cleanup
599 ///////////////////////////////////////////////////////////////////////////////////
601 static gboolean require_stream(struct stream_player_module *m, GError **error)
603 if (!m->stream)
605 g_set_error(error, CBOX_MODULE_ERROR, CBOX_MODULE_ERROR_FAILED, "No stream loaded");
606 return FALSE;
608 return TRUE;
611 gboolean stream_player_process_cmd(struct cbox_command_target *ct, struct cbox_command_target *fb, struct cbox_osc_command *cmd, GError **error)
613 struct stream_player_module *m = (struct stream_player_module *)ct->user_data;
614 if (!strcmp(cmd->command, "/seek") && !strcmp(cmd->arg_types, "i"))
616 if (!require_stream(m, error))
617 return FALSE;
618 m->stream->readptr_new = CBOX_ARG_I(cmd, 0);
619 cbox_rt_execute_cmd_async(m->module.rt, &stream_seek_command, m);
621 else if (!strcmp(cmd->command, "/play") && !strcmp(cmd->arg_types, ""))
623 if (!require_stream(m, error))
624 return FALSE;
625 cbox_rt_execute_cmd_async(m->module.rt, &stream_play_command, m);
627 else if (!strcmp(cmd->command, "/stop") && !strcmp(cmd->arg_types, ""))
629 if (!require_stream(m, error))
630 return FALSE;
631 cbox_rt_execute_cmd_async(m->module.rt, &stream_stop_command, m);
633 else if (!strcmp(cmd->command, "/status") && !strcmp(cmd->arg_types, ""))
635 if (!cbox_check_fb_channel(fb, cmd->command, error))
636 return FALSE;
637 if (m->stream)
639 return cbox_execute_on(fb, NULL, "/filename", "s", error, m->stream->filename) &&
640 cbox_execute_on(fb, NULL, "/pos", "i", error, m->stream->readptr) &&
641 cbox_execute_on(fb, NULL, "/length", "i", error, m->stream->info.frames) &&
642 cbox_execute_on(fb, NULL, "/playing", "i", error, m->stream->phase != STOPPED);
644 else
645 return
646 cbox_execute_on(fb, NULL, "/filename", "s", error, "");
648 else if (!strcmp(cmd->command, "/load") && !strcmp(cmd->arg_types, "si"))
650 struct load_command_data *c = malloc(sizeof(struct load_command_data));
651 c->context = m->module.instance_name;
652 c->module = m;
653 c->stream = NULL;
654 c->old_stream = NULL;
655 c->filename = g_strdup(CBOX_ARG_S(cmd, 0));
656 c->loop_start = CBOX_ARG_I(cmd, 1);
657 c->error = error;
658 cbox_rt_execute_cmd_sync(m->module.rt, &stream_load_command, c);
659 gboolean success = c->stream != NULL;
660 free(c);
661 return success;
663 else if (!strcmp(cmd->command, "/unload") && !strcmp(cmd->arg_types, ""))
665 struct load_command_data *c = malloc(sizeof(struct load_command_data));
666 c->context = m->module.instance_name;
667 c->module = m;
668 c->stream = NULL;
669 c->old_stream = NULL;
670 c->filename = NULL;
671 c->loop_start = 0;
672 c->error = error;
673 cbox_rt_execute_cmd_sync(m->module.rt, &stream_load_command, c);
674 gboolean success = c->stream == NULL;
675 free(c);
676 return success;
678 else
680 g_set_error(error, CBOX_MODULE_ERROR, CBOX_MODULE_ERROR_FAILED, "Unknown command '%s'", cmd->command);
681 return FALSE;
683 return TRUE;
686 MODULE_CREATE_FUNCTION(stream_player)
688 static int inited = 0;
690 if (!inited)
692 inited = 1;
695 struct stream_player_module *m = malloc(sizeof(struct stream_player_module));
696 gchar *filename = cbox_config_get_string(cfg_section, "file");
697 CALL_MODULE_INIT(m, 0, 2, stream_player);
698 m->module.process_event = stream_player_process_event;
699 m->module.process_block = stream_player_process_block;
700 m->fade_increment = 1.0 / (cbox_config_get_float(cfg_section, "fade_time", 0.01) * (m->module.srate / CBOX_BLOCK_SIZE));
701 if (filename)
703 m->stream = stream_state_new(cfg_section, filename, (uint64_t)(int64_t)cbox_config_get_int(cfg_section, "loop", -1), m->fade_increment, error);
704 if (!m->stream)
706 CBOX_DELETE(&m->module);
707 return NULL;
710 else
711 m->stream = NULL;
713 return &m->module;
716 struct cbox_module_keyrange_metadata stream_player_keyranges[] = {
719 struct cbox_module_livecontroller_metadata stream_player_controllers[] = {
722 DEFINE_MODULE(stream_player, 0, 2)