swscale/internal: group user-facing options together
[ffmpeg.git] / fftools / ffmpeg_sched.c
blobb8ed8ae3f8ca05db26f133e41b6ac39324ee13d6
1 /*
2 * Inter-thread scheduling/synchronization.
3 * Copyright (c) 2023 Anton Khirnov
5 * This file is part of FFmpeg.
7 * FFmpeg is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU Lesser General Public
9 * License as published by the Free Software Foundation; either
10 * version 2.1 of the License, or (at your option) any later version.
12 * FFmpeg is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with FFmpeg; if not, write to the Free Software
19 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
22 #include <stdatomic.h>
23 #include <stddef.h>
24 #include <stdint.h>
26 #include "cmdutils.h"
27 #include "ffmpeg_sched.h"
28 #include "ffmpeg_utils.h"
29 #include "sync_queue.h"
30 #include "thread_queue.h"
32 #include "libavcodec/packet.h"
34 #include "libavutil/avassert.h"
35 #include "libavutil/error.h"
36 #include "libavutil/fifo.h"
37 #include "libavutil/frame.h"
38 #include "libavutil/mem.h"
39 #include "libavutil/thread.h"
40 #include "libavutil/threadmessage.h"
41 #include "libavutil/time.h"
43 // 100 ms
44 // FIXME: some other value? make this dynamic?
45 #define SCHEDULE_TOLERANCE (100 * 1000)
47 enum QueueType {
48 QUEUE_PACKETS,
49 QUEUE_FRAMES,
52 typedef struct SchWaiter {
53 pthread_mutex_t lock;
54 pthread_cond_t cond;
55 atomic_int choked;
57 // the following are internal state of schedule_update_locked() and must not
58 // be accessed outside of it
59 int choked_prev;
60 int choked_next;
61 } SchWaiter;
63 typedef struct SchTask {
64 Scheduler *parent;
65 SchedulerNode node;
67 SchThreadFunc func;
68 void *func_arg;
70 pthread_t thread;
71 int thread_running;
72 } SchTask;
74 typedef struct SchDecOutput {
75 SchedulerNode *dst;
76 uint8_t *dst_finished;
77 unsigned nb_dst;
78 } SchDecOutput;
80 typedef struct SchDec {
81 const AVClass *class;
83 SchedulerNode src;
85 SchDecOutput *outputs;
86 unsigned nb_outputs;
88 SchTask task;
89 // Queue for receiving input packets, one stream.
90 ThreadQueue *queue;
92 // Queue for sending post-flush end timestamps back to the source
93 AVThreadMessageQueue *queue_end_ts;
94 int expect_end_ts;
96 // temporary storage used by sch_dec_send()
97 AVFrame *send_frame;
98 } SchDec;
100 typedef struct SchSyncQueue {
101 SyncQueue *sq;
102 AVFrame *frame;
103 pthread_mutex_t lock;
105 unsigned *enc_idx;
106 unsigned nb_enc_idx;
107 } SchSyncQueue;
109 typedef struct SchEnc {
110 const AVClass *class;
112 SchedulerNode src;
113 SchedulerNode *dst;
114 uint8_t *dst_finished;
115 unsigned nb_dst;
117 // [0] - index of the sync queue in Scheduler.sq_enc,
118 // [1] - index of this encoder in the sq
119 int sq_idx[2];
121 /* Opening encoders is somewhat nontrivial due to their interaction with
122 * sync queues, which are (among other things) responsible for maintaining
123 * constant audio frame size, when it is required by the encoder.
125 * Opening the encoder requires stream parameters, obtained from the first
126 * frame. However, that frame cannot be properly chunked by the sync queue
127 * without knowing the required frame size, which is only available after
128 * opening the encoder.
130 * This apparent circular dependency is resolved in the following way:
131 * - the caller creating the encoder gives us a callback which opens the
132 * encoder and returns the required frame size (if any)
133 * - when the first frame is sent to the encoder, the sending thread
134 * - calls this callback, opening the encoder
135 * - passes the returned frame size to the sync queue
137 int (*open_cb)(void *opaque, const AVFrame *frame);
138 int opened;
140 SchTask task;
141 // Queue for receiving input frames, one stream.
142 ThreadQueue *queue;
143 // tq_send() to queue returned EOF
144 int in_finished;
146 // temporary storage used by sch_enc_send()
147 AVPacket *send_pkt;
148 } SchEnc;
150 typedef struct SchDemuxStream {
151 SchedulerNode *dst;
152 uint8_t *dst_finished;
153 unsigned nb_dst;
154 } SchDemuxStream;
156 typedef struct SchDemux {
157 const AVClass *class;
159 SchDemuxStream *streams;
160 unsigned nb_streams;
162 SchTask task;
163 SchWaiter waiter;
165 // temporary storage used by sch_demux_send()
166 AVPacket *send_pkt;
168 // protected by schedule_lock
169 int task_exited;
170 } SchDemux;
172 typedef struct PreMuxQueue {
174 * Queue for buffering the packets before the muxer task can be started.
176 AVFifo *fifo;
178 * Maximum number of packets in fifo.
180 int max_packets;
182 * The size of the AVPackets' buffers in queue.
183 * Updated when a packet is either pushed or pulled from the queue.
185 size_t data_size;
186 /* Threshold after which max_packets will be in effect */
187 size_t data_threshold;
188 } PreMuxQueue;
190 typedef struct SchMuxStream {
191 SchedulerNode src;
192 SchedulerNode src_sched;
194 unsigned *sub_heartbeat_dst;
195 unsigned nb_sub_heartbeat_dst;
197 PreMuxQueue pre_mux_queue;
199 // an EOF was generated while flushing the pre-mux queue
200 int init_eof;
202 ////////////////////////////////////////////////////////////
203 // The following are protected by Scheduler.schedule_lock //
205 /* dts+duration of the last packet sent to this stream
206 in AV_TIME_BASE_Q */
207 int64_t last_dts;
208 // this stream no longer accepts input
209 int source_finished;
210 ////////////////////////////////////////////////////////////
211 } SchMuxStream;
213 typedef struct SchMux {
214 const AVClass *class;
216 SchMuxStream *streams;
217 unsigned nb_streams;
218 unsigned nb_streams_ready;
220 int (*init)(void *arg);
222 SchTask task;
224 * Set to 1 after starting the muxer task and flushing the
225 * pre-muxing queues.
226 * Set either before any tasks have started, or with
227 * Scheduler.mux_ready_lock held.
229 atomic_int mux_started;
230 ThreadQueue *queue;
231 unsigned queue_size;
233 AVPacket *sub_heartbeat_pkt;
234 } SchMux;
236 typedef struct SchFilterIn {
237 SchedulerNode src;
238 SchedulerNode src_sched;
239 int send_finished;
240 int receive_finished;
241 } SchFilterIn;
243 typedef struct SchFilterOut {
244 SchedulerNode dst;
245 } SchFilterOut;
247 typedef struct SchFilterGraph {
248 const AVClass *class;
250 SchFilterIn *inputs;
251 unsigned nb_inputs;
252 atomic_uint nb_inputs_finished_send;
253 unsigned nb_inputs_finished_receive;
255 SchFilterOut *outputs;
256 unsigned nb_outputs;
258 SchTask task;
259 // input queue, nb_inputs+1 streams
260 // last stream is control
261 ThreadQueue *queue;
262 SchWaiter waiter;
264 // protected by schedule_lock
265 unsigned best_input;
266 int task_exited;
267 } SchFilterGraph;
269 enum SchedulerState {
270 SCH_STATE_UNINIT,
271 SCH_STATE_STARTED,
272 SCH_STATE_STOPPED,
275 struct Scheduler {
276 const AVClass *class;
278 SchDemux *demux;
279 unsigned nb_demux;
281 SchMux *mux;
282 unsigned nb_mux;
284 unsigned nb_mux_ready;
285 pthread_mutex_t mux_ready_lock;
287 unsigned nb_mux_done;
288 pthread_mutex_t mux_done_lock;
289 pthread_cond_t mux_done_cond;
292 SchDec *dec;
293 unsigned nb_dec;
295 SchEnc *enc;
296 unsigned nb_enc;
298 SchSyncQueue *sq_enc;
299 unsigned nb_sq_enc;
301 SchFilterGraph *filters;
302 unsigned nb_filters;
304 char *sdp_filename;
305 int sdp_auto;
307 enum SchedulerState state;
308 atomic_int terminate;
309 atomic_int task_failed;
311 pthread_mutex_t schedule_lock;
313 atomic_int_least64_t last_dts;
317 * Wait until this task is allowed to proceed.
319 * @retval 0 the caller should proceed
320 * @retval 1 the caller should terminate
322 static int waiter_wait(Scheduler *sch, SchWaiter *w)
324 int terminate;
326 if (!atomic_load(&w->choked))
327 return 0;
329 pthread_mutex_lock(&w->lock);
331 while (atomic_load(&w->choked) && !atomic_load(&sch->terminate))
332 pthread_cond_wait(&w->cond, &w->lock);
334 terminate = atomic_load(&sch->terminate);
336 pthread_mutex_unlock(&w->lock);
338 return terminate;
341 static void waiter_set(SchWaiter *w, int choked)
343 pthread_mutex_lock(&w->lock);
345 atomic_store(&w->choked, choked);
346 pthread_cond_signal(&w->cond);
348 pthread_mutex_unlock(&w->lock);
351 static int waiter_init(SchWaiter *w)
353 int ret;
355 atomic_init(&w->choked, 0);
357 ret = pthread_mutex_init(&w->lock, NULL);
358 if (ret)
359 return AVERROR(ret);
361 ret = pthread_cond_init(&w->cond, NULL);
362 if (ret)
363 return AVERROR(ret);
365 return 0;
368 static void waiter_uninit(SchWaiter *w)
370 pthread_mutex_destroy(&w->lock);
371 pthread_cond_destroy(&w->cond);
374 static int queue_alloc(ThreadQueue **ptq, unsigned nb_streams, unsigned queue_size,
375 enum QueueType type)
377 ThreadQueue *tq;
378 ObjPool *op;
380 if (queue_size <= 0) {
381 if (type == QUEUE_FRAMES)
382 queue_size = DEFAULT_FRAME_THREAD_QUEUE_SIZE;
383 else
384 queue_size = DEFAULT_PACKET_THREAD_QUEUE_SIZE;
387 if (type == QUEUE_FRAMES) {
388 // This queue length is used in the decoder code to ensure that
389 // there are enough entries in fixed-size frame pools to account
390 // for frames held in queues inside the ffmpeg utility. If this
391 // can ever dynamically change then the corresponding decode
392 // code needs to be updated as well.
393 av_assert0(queue_size == DEFAULT_FRAME_THREAD_QUEUE_SIZE);
396 op = (type == QUEUE_PACKETS) ? objpool_alloc_packets() :
397 objpool_alloc_frames();
398 if (!op)
399 return AVERROR(ENOMEM);
401 tq = tq_alloc(nb_streams, queue_size, op,
402 (type == QUEUE_PACKETS) ? pkt_move : frame_move);
403 if (!tq) {
404 objpool_free(&op);
405 return AVERROR(ENOMEM);
408 *ptq = tq;
409 return 0;
412 static void *task_wrapper(void *arg);
414 static int task_start(SchTask *task)
416 int ret;
418 av_log(task->func_arg, AV_LOG_VERBOSE, "Starting thread...\n");
420 av_assert0(!task->thread_running);
422 ret = pthread_create(&task->thread, NULL, task_wrapper, task);
423 if (ret) {
424 av_log(task->func_arg, AV_LOG_ERROR, "pthread_create() failed: %s\n",
425 strerror(ret));
426 return AVERROR(ret);
429 task->thread_running = 1;
430 return 0;
433 static void task_init(Scheduler *sch, SchTask *task, enum SchedulerNodeType type, unsigned idx,
434 SchThreadFunc func, void *func_arg)
436 task->parent = sch;
438 task->node.type = type;
439 task->node.idx = idx;
441 task->func = func;
442 task->func_arg = func_arg;
445 static int64_t trailing_dts(const Scheduler *sch, int count_finished)
447 int64_t min_dts = INT64_MAX;
449 for (unsigned i = 0; i < sch->nb_mux; i++) {
450 const SchMux *mux = &sch->mux[i];
452 for (unsigned j = 0; j < mux->nb_streams; j++) {
453 const SchMuxStream *ms = &mux->streams[j];
455 if (ms->source_finished && !count_finished)
456 continue;
457 if (ms->last_dts == AV_NOPTS_VALUE)
458 return AV_NOPTS_VALUE;
460 min_dts = FFMIN(min_dts, ms->last_dts);
464 return min_dts == INT64_MAX ? AV_NOPTS_VALUE : min_dts;
467 void sch_free(Scheduler **psch)
469 Scheduler *sch = *psch;
471 if (!sch)
472 return;
474 sch_stop(sch, NULL);
476 for (unsigned i = 0; i < sch->nb_demux; i++) {
477 SchDemux *d = &sch->demux[i];
479 for (unsigned j = 0; j < d->nb_streams; j++) {
480 SchDemuxStream *ds = &d->streams[j];
481 av_freep(&ds->dst);
482 av_freep(&ds->dst_finished);
484 av_freep(&d->streams);
486 av_packet_free(&d->send_pkt);
488 waiter_uninit(&d->waiter);
490 av_freep(&sch->demux);
492 for (unsigned i = 0; i < sch->nb_mux; i++) {
493 SchMux *mux = &sch->mux[i];
495 for (unsigned j = 0; j < mux->nb_streams; j++) {
496 SchMuxStream *ms = &mux->streams[j];
498 if (ms->pre_mux_queue.fifo) {
499 AVPacket *pkt;
500 while (av_fifo_read(ms->pre_mux_queue.fifo, &pkt, 1) >= 0)
501 av_packet_free(&pkt);
502 av_fifo_freep2(&ms->pre_mux_queue.fifo);
505 av_freep(&ms->sub_heartbeat_dst);
507 av_freep(&mux->streams);
509 av_packet_free(&mux->sub_heartbeat_pkt);
511 tq_free(&mux->queue);
513 av_freep(&sch->mux);
515 for (unsigned i = 0; i < sch->nb_dec; i++) {
516 SchDec *dec = &sch->dec[i];
518 tq_free(&dec->queue);
520 av_thread_message_queue_free(&dec->queue_end_ts);
522 for (unsigned j = 0; j < dec->nb_outputs; j++) {
523 SchDecOutput *o = &dec->outputs[j];
525 av_freep(&o->dst);
526 av_freep(&o->dst_finished);
529 av_freep(&dec->outputs);
531 av_frame_free(&dec->send_frame);
533 av_freep(&sch->dec);
535 for (unsigned i = 0; i < sch->nb_enc; i++) {
536 SchEnc *enc = &sch->enc[i];
538 tq_free(&enc->queue);
540 av_packet_free(&enc->send_pkt);
542 av_freep(&enc->dst);
543 av_freep(&enc->dst_finished);
545 av_freep(&sch->enc);
547 for (unsigned i = 0; i < sch->nb_sq_enc; i++) {
548 SchSyncQueue *sq = &sch->sq_enc[i];
549 sq_free(&sq->sq);
550 av_frame_free(&sq->frame);
551 pthread_mutex_destroy(&sq->lock);
552 av_freep(&sq->enc_idx);
554 av_freep(&sch->sq_enc);
556 for (unsigned i = 0; i < sch->nb_filters; i++) {
557 SchFilterGraph *fg = &sch->filters[i];
559 tq_free(&fg->queue);
561 av_freep(&fg->inputs);
562 av_freep(&fg->outputs);
564 waiter_uninit(&fg->waiter);
566 av_freep(&sch->filters);
568 av_freep(&sch->sdp_filename);
570 pthread_mutex_destroy(&sch->schedule_lock);
572 pthread_mutex_destroy(&sch->mux_ready_lock);
574 pthread_mutex_destroy(&sch->mux_done_lock);
575 pthread_cond_destroy(&sch->mux_done_cond);
577 av_freep(psch);
580 static const AVClass scheduler_class = {
581 .class_name = "Scheduler",
582 .version = LIBAVUTIL_VERSION_INT,
585 Scheduler *sch_alloc(void)
587 Scheduler *sch;
588 int ret;
590 sch = av_mallocz(sizeof(*sch));
591 if (!sch)
592 return NULL;
594 sch->class = &scheduler_class;
595 sch->sdp_auto = 1;
597 ret = pthread_mutex_init(&sch->schedule_lock, NULL);
598 if (ret)
599 goto fail;
601 ret = pthread_mutex_init(&sch->mux_ready_lock, NULL);
602 if (ret)
603 goto fail;
605 ret = pthread_mutex_init(&sch->mux_done_lock, NULL);
606 if (ret)
607 goto fail;
609 ret = pthread_cond_init(&sch->mux_done_cond, NULL);
610 if (ret)
611 goto fail;
613 return sch;
614 fail:
615 sch_free(&sch);
616 return NULL;
619 int sch_sdp_filename(Scheduler *sch, const char *sdp_filename)
621 av_freep(&sch->sdp_filename);
622 sch->sdp_filename = av_strdup(sdp_filename);
623 return sch->sdp_filename ? 0 : AVERROR(ENOMEM);
626 static const AVClass sch_mux_class = {
627 .class_name = "SchMux",
628 .version = LIBAVUTIL_VERSION_INT,
629 .parent_log_context_offset = offsetof(SchMux, task.func_arg),
632 int sch_add_mux(Scheduler *sch, SchThreadFunc func, int (*init)(void *),
633 void *arg, int sdp_auto, unsigned thread_queue_size)
635 const unsigned idx = sch->nb_mux;
637 SchMux *mux;
638 int ret;
640 ret = GROW_ARRAY(sch->mux, sch->nb_mux);
641 if (ret < 0)
642 return ret;
644 mux = &sch->mux[idx];
645 mux->class = &sch_mux_class;
646 mux->init = init;
647 mux->queue_size = thread_queue_size;
649 task_init(sch, &mux->task, SCH_NODE_TYPE_MUX, idx, func, arg);
651 sch->sdp_auto &= sdp_auto;
653 return idx;
656 int sch_add_mux_stream(Scheduler *sch, unsigned mux_idx)
658 SchMux *mux;
659 SchMuxStream *ms;
660 unsigned stream_idx;
661 int ret;
663 av_assert0(mux_idx < sch->nb_mux);
664 mux = &sch->mux[mux_idx];
666 ret = GROW_ARRAY(mux->streams, mux->nb_streams);
667 if (ret < 0)
668 return ret;
669 stream_idx = mux->nb_streams - 1;
671 ms = &mux->streams[stream_idx];
673 ms->pre_mux_queue.fifo = av_fifo_alloc2(8, sizeof(AVPacket*), 0);
674 if (!ms->pre_mux_queue.fifo)
675 return AVERROR(ENOMEM);
677 ms->last_dts = AV_NOPTS_VALUE;
679 return stream_idx;
682 static const AVClass sch_demux_class = {
683 .class_name = "SchDemux",
684 .version = LIBAVUTIL_VERSION_INT,
685 .parent_log_context_offset = offsetof(SchDemux, task.func_arg),
688 int sch_add_demux(Scheduler *sch, SchThreadFunc func, void *ctx)
690 const unsigned idx = sch->nb_demux;
692 SchDemux *d;
693 int ret;
695 ret = GROW_ARRAY(sch->demux, sch->nb_demux);
696 if (ret < 0)
697 return ret;
699 d = &sch->demux[idx];
701 task_init(sch, &d->task, SCH_NODE_TYPE_DEMUX, idx, func, ctx);
703 d->class = &sch_demux_class;
704 d->send_pkt = av_packet_alloc();
705 if (!d->send_pkt)
706 return AVERROR(ENOMEM);
708 ret = waiter_init(&d->waiter);
709 if (ret < 0)
710 return ret;
712 return idx;
715 int sch_add_demux_stream(Scheduler *sch, unsigned demux_idx)
717 SchDemux *d;
718 int ret;
720 av_assert0(demux_idx < sch->nb_demux);
721 d = &sch->demux[demux_idx];
723 ret = GROW_ARRAY(d->streams, d->nb_streams);
724 return ret < 0 ? ret : d->nb_streams - 1;
727 int sch_add_dec_output(Scheduler *sch, unsigned dec_idx)
729 SchDec *dec;
730 int ret;
732 av_assert0(dec_idx < sch->nb_dec);
733 dec = &sch->dec[dec_idx];
735 ret = GROW_ARRAY(dec->outputs, dec->nb_outputs);
736 if (ret < 0)
737 return ret;
739 return dec->nb_outputs - 1;
742 static const AVClass sch_dec_class = {
743 .class_name = "SchDec",
744 .version = LIBAVUTIL_VERSION_INT,
745 .parent_log_context_offset = offsetof(SchDec, task.func_arg),
748 int sch_add_dec(Scheduler *sch, SchThreadFunc func, void *ctx, int send_end_ts)
750 const unsigned idx = sch->nb_dec;
752 SchDec *dec;
753 int ret;
755 ret = GROW_ARRAY(sch->dec, sch->nb_dec);
756 if (ret < 0)
757 return ret;
759 dec = &sch->dec[idx];
761 task_init(sch, &dec->task, SCH_NODE_TYPE_DEC, idx, func, ctx);
763 dec->class = &sch_dec_class;
764 dec->send_frame = av_frame_alloc();
765 if (!dec->send_frame)
766 return AVERROR(ENOMEM);
768 ret = sch_add_dec_output(sch, idx);
769 if (ret < 0)
770 return ret;
772 ret = queue_alloc(&dec->queue, 1, 0, QUEUE_PACKETS);
773 if (ret < 0)
774 return ret;
776 if (send_end_ts) {
777 ret = av_thread_message_queue_alloc(&dec->queue_end_ts, 1, sizeof(Timestamp));
778 if (ret < 0)
779 return ret;
782 return idx;
785 static const AVClass sch_enc_class = {
786 .class_name = "SchEnc",
787 .version = LIBAVUTIL_VERSION_INT,
788 .parent_log_context_offset = offsetof(SchEnc, task.func_arg),
791 int sch_add_enc(Scheduler *sch, SchThreadFunc func, void *ctx,
792 int (*open_cb)(void *opaque, const AVFrame *frame))
794 const unsigned idx = sch->nb_enc;
796 SchEnc *enc;
797 int ret;
799 ret = GROW_ARRAY(sch->enc, sch->nb_enc);
800 if (ret < 0)
801 return ret;
803 enc = &sch->enc[idx];
805 enc->class = &sch_enc_class;
806 enc->open_cb = open_cb;
807 enc->sq_idx[0] = -1;
808 enc->sq_idx[1] = -1;
810 task_init(sch, &enc->task, SCH_NODE_TYPE_ENC, idx, func, ctx);
812 enc->send_pkt = av_packet_alloc();
813 if (!enc->send_pkt)
814 return AVERROR(ENOMEM);
816 ret = queue_alloc(&enc->queue, 1, 0, QUEUE_FRAMES);
817 if (ret < 0)
818 return ret;
820 return idx;
823 static const AVClass sch_fg_class = {
824 .class_name = "SchFilterGraph",
825 .version = LIBAVUTIL_VERSION_INT,
826 .parent_log_context_offset = offsetof(SchFilterGraph, task.func_arg),
829 int sch_add_filtergraph(Scheduler *sch, unsigned nb_inputs, unsigned nb_outputs,
830 SchThreadFunc func, void *ctx)
832 const unsigned idx = sch->nb_filters;
834 SchFilterGraph *fg;
835 int ret;
837 ret = GROW_ARRAY(sch->filters, sch->nb_filters);
838 if (ret < 0)
839 return ret;
840 fg = &sch->filters[idx];
842 fg->class = &sch_fg_class;
844 task_init(sch, &fg->task, SCH_NODE_TYPE_FILTER_IN, idx, func, ctx);
846 if (nb_inputs) {
847 fg->inputs = av_calloc(nb_inputs, sizeof(*fg->inputs));
848 if (!fg->inputs)
849 return AVERROR(ENOMEM);
850 fg->nb_inputs = nb_inputs;
853 if (nb_outputs) {
854 fg->outputs = av_calloc(nb_outputs, sizeof(*fg->outputs));
855 if (!fg->outputs)
856 return AVERROR(ENOMEM);
857 fg->nb_outputs = nb_outputs;
860 ret = waiter_init(&fg->waiter);
861 if (ret < 0)
862 return ret;
864 ret = queue_alloc(&fg->queue, fg->nb_inputs + 1, 0, QUEUE_FRAMES);
865 if (ret < 0)
866 return ret;
868 return idx;
871 int sch_add_sq_enc(Scheduler *sch, uint64_t buf_size_us, void *logctx)
873 SchSyncQueue *sq;
874 int ret;
876 ret = GROW_ARRAY(sch->sq_enc, sch->nb_sq_enc);
877 if (ret < 0)
878 return ret;
879 sq = &sch->sq_enc[sch->nb_sq_enc - 1];
881 sq->sq = sq_alloc(SYNC_QUEUE_FRAMES, buf_size_us, logctx);
882 if (!sq->sq)
883 return AVERROR(ENOMEM);
885 sq->frame = av_frame_alloc();
886 if (!sq->frame)
887 return AVERROR(ENOMEM);
889 ret = pthread_mutex_init(&sq->lock, NULL);
890 if (ret)
891 return AVERROR(ret);
893 return sq - sch->sq_enc;
896 int sch_sq_add_enc(Scheduler *sch, unsigned sq_idx, unsigned enc_idx,
897 int limiting, uint64_t max_frames)
899 SchSyncQueue *sq;
900 SchEnc *enc;
901 int ret;
903 av_assert0(sq_idx < sch->nb_sq_enc);
904 sq = &sch->sq_enc[sq_idx];
906 av_assert0(enc_idx < sch->nb_enc);
907 enc = &sch->enc[enc_idx];
909 ret = GROW_ARRAY(sq->enc_idx, sq->nb_enc_idx);
910 if (ret < 0)
911 return ret;
912 sq->enc_idx[sq->nb_enc_idx - 1] = enc_idx;
914 ret = sq_add_stream(sq->sq, limiting);
915 if (ret < 0)
916 return ret;
918 enc->sq_idx[0] = sq_idx;
919 enc->sq_idx[1] = ret;
921 if (max_frames != INT64_MAX)
922 sq_limit_frames(sq->sq, enc->sq_idx[1], max_frames);
924 return 0;
927 int sch_connect(Scheduler *sch, SchedulerNode src, SchedulerNode dst)
929 int ret;
931 switch (src.type) {
932 case SCH_NODE_TYPE_DEMUX: {
933 SchDemuxStream *ds;
935 av_assert0(src.idx < sch->nb_demux &&
936 src.idx_stream < sch->demux[src.idx].nb_streams);
937 ds = &sch->demux[src.idx].streams[src.idx_stream];
939 ret = GROW_ARRAY(ds->dst, ds->nb_dst);
940 if (ret < 0)
941 return ret;
943 ds->dst[ds->nb_dst - 1] = dst;
945 // demuxed packets go to decoding or streamcopy
946 switch (dst.type) {
947 case SCH_NODE_TYPE_DEC: {
948 SchDec *dec;
950 av_assert0(dst.idx < sch->nb_dec);
951 dec = &sch->dec[dst.idx];
953 av_assert0(!dec->src.type);
954 dec->src = src;
955 break;
957 case SCH_NODE_TYPE_MUX: {
958 SchMuxStream *ms;
960 av_assert0(dst.idx < sch->nb_mux &&
961 dst.idx_stream < sch->mux[dst.idx].nb_streams);
962 ms = &sch->mux[dst.idx].streams[dst.idx_stream];
964 av_assert0(!ms->src.type);
965 ms->src = src;
967 break;
969 default: av_assert0(0);
972 break;
974 case SCH_NODE_TYPE_DEC: {
975 SchDec *dec;
976 SchDecOutput *o;
978 av_assert0(src.idx < sch->nb_dec);
979 dec = &sch->dec[src.idx];
981 av_assert0(src.idx_stream < dec->nb_outputs);
982 o = &dec->outputs[src.idx_stream];
984 ret = GROW_ARRAY(o->dst, o->nb_dst);
985 if (ret < 0)
986 return ret;
988 o->dst[o->nb_dst - 1] = dst;
990 // decoded frames go to filters or encoding
991 switch (dst.type) {
992 case SCH_NODE_TYPE_FILTER_IN: {
993 SchFilterIn *fi;
995 av_assert0(dst.idx < sch->nb_filters &&
996 dst.idx_stream < sch->filters[dst.idx].nb_inputs);
997 fi = &sch->filters[dst.idx].inputs[dst.idx_stream];
999 av_assert0(!fi->src.type);
1000 fi->src = src;
1001 break;
1003 case SCH_NODE_TYPE_ENC: {
1004 SchEnc *enc;
1006 av_assert0(dst.idx < sch->nb_enc);
1007 enc = &sch->enc[dst.idx];
1009 av_assert0(!enc->src.type);
1010 enc->src = src;
1011 break;
1013 default: av_assert0(0);
1016 break;
1018 case SCH_NODE_TYPE_FILTER_OUT: {
1019 SchFilterOut *fo;
1021 av_assert0(src.idx < sch->nb_filters &&
1022 src.idx_stream < sch->filters[src.idx].nb_outputs);
1023 fo = &sch->filters[src.idx].outputs[src.idx_stream];
1025 av_assert0(!fo->dst.type);
1026 fo->dst = dst;
1028 // filtered frames go to encoding or another filtergraph
1029 switch (dst.type) {
1030 case SCH_NODE_TYPE_ENC: {
1031 SchEnc *enc;
1033 av_assert0(dst.idx < sch->nb_enc);
1034 enc = &sch->enc[dst.idx];
1036 av_assert0(!enc->src.type);
1037 enc->src = src;
1038 break;
1040 case SCH_NODE_TYPE_FILTER_IN: {
1041 SchFilterIn *fi;
1043 av_assert0(dst.idx < sch->nb_filters &&
1044 dst.idx_stream < sch->filters[dst.idx].nb_inputs);
1045 fi = &sch->filters[dst.idx].inputs[dst.idx_stream];
1047 av_assert0(!fi->src.type);
1048 fi->src = src;
1049 break;
1051 default: av_assert0(0);
1055 break;
1057 case SCH_NODE_TYPE_ENC: {
1058 SchEnc *enc;
1060 av_assert0(src.idx < sch->nb_enc);
1061 enc = &sch->enc[src.idx];
1063 ret = GROW_ARRAY(enc->dst, enc->nb_dst);
1064 if (ret < 0)
1065 return ret;
1067 enc->dst[enc->nb_dst - 1] = dst;
1069 // encoding packets go to muxing or decoding
1070 switch (dst.type) {
1071 case SCH_NODE_TYPE_MUX: {
1072 SchMuxStream *ms;
1074 av_assert0(dst.idx < sch->nb_mux &&
1075 dst.idx_stream < sch->mux[dst.idx].nb_streams);
1076 ms = &sch->mux[dst.idx].streams[dst.idx_stream];
1078 av_assert0(!ms->src.type);
1079 ms->src = src;
1081 break;
1083 case SCH_NODE_TYPE_DEC: {
1084 SchDec *dec;
1086 av_assert0(dst.idx < sch->nb_dec);
1087 dec = &sch->dec[dst.idx];
1089 av_assert0(!dec->src.type);
1090 dec->src = src;
1092 break;
1094 default: av_assert0(0);
1097 break;
1099 default: av_assert0(0);
1102 return 0;
1105 static int mux_task_start(SchMux *mux)
1107 int ret = 0;
1109 ret = task_start(&mux->task);
1110 if (ret < 0)
1111 return ret;
1113 /* flush the pre-muxing queues */
1114 while (1) {
1115 int min_stream = -1;
1116 Timestamp min_ts = { .ts = AV_NOPTS_VALUE };
1118 AVPacket *pkt;
1120 // find the stream with the earliest dts or EOF in pre-muxing queue
1121 for (unsigned i = 0; i < mux->nb_streams; i++) {
1122 SchMuxStream *ms = &mux->streams[i];
1124 if (av_fifo_peek(ms->pre_mux_queue.fifo, &pkt, 1, 0) < 0)
1125 continue;
1127 if (!pkt || pkt->dts == AV_NOPTS_VALUE) {
1128 min_stream = i;
1129 break;
1132 if (min_ts.ts == AV_NOPTS_VALUE ||
1133 av_compare_ts(min_ts.ts, min_ts.tb, pkt->dts, pkt->time_base) > 0) {
1134 min_stream = i;
1135 min_ts = (Timestamp){ .ts = pkt->dts, .tb = pkt->time_base };
1139 if (min_stream >= 0) {
1140 SchMuxStream *ms = &mux->streams[min_stream];
1142 ret = av_fifo_read(ms->pre_mux_queue.fifo, &pkt, 1);
1143 av_assert0(ret >= 0);
1145 if (pkt) {
1146 if (!ms->init_eof)
1147 ret = tq_send(mux->queue, min_stream, pkt);
1148 av_packet_free(&pkt);
1149 if (ret == AVERROR_EOF)
1150 ms->init_eof = 1;
1151 else if (ret < 0)
1152 return ret;
1153 } else
1154 tq_send_finish(mux->queue, min_stream);
1156 continue;
1159 break;
1162 atomic_store(&mux->mux_started, 1);
1164 return 0;
1167 int print_sdp(const char *filename);
1169 static int mux_init(Scheduler *sch, SchMux *mux)
1171 int ret;
1173 ret = mux->init(mux->task.func_arg);
1174 if (ret < 0)
1175 return ret;
1177 sch->nb_mux_ready++;
1179 if (sch->sdp_filename || sch->sdp_auto) {
1180 if (sch->nb_mux_ready < sch->nb_mux)
1181 return 0;
1183 ret = print_sdp(sch->sdp_filename);
1184 if (ret < 0) {
1185 av_log(sch, AV_LOG_ERROR, "Error writing the SDP.\n");
1186 return ret;
1189 /* SDP is written only after all the muxers are ready, so now we
1190 * start ALL the threads */
1191 for (unsigned i = 0; i < sch->nb_mux; i++) {
1192 ret = mux_task_start(&sch->mux[i]);
1193 if (ret < 0)
1194 return ret;
1196 } else {
1197 ret = mux_task_start(mux);
1198 if (ret < 0)
1199 return ret;
1202 return 0;
1205 void sch_mux_stream_buffering(Scheduler *sch, unsigned mux_idx, unsigned stream_idx,
1206 size_t data_threshold, int max_packets)
1208 SchMux *mux;
1209 SchMuxStream *ms;
1211 av_assert0(mux_idx < sch->nb_mux);
1212 mux = &sch->mux[mux_idx];
1214 av_assert0(stream_idx < mux->nb_streams);
1215 ms = &mux->streams[stream_idx];
1217 ms->pre_mux_queue.max_packets = max_packets;
1218 ms->pre_mux_queue.data_threshold = data_threshold;
1221 int sch_mux_stream_ready(Scheduler *sch, unsigned mux_idx, unsigned stream_idx)
1223 SchMux *mux;
1224 int ret = 0;
1226 av_assert0(mux_idx < sch->nb_mux);
1227 mux = &sch->mux[mux_idx];
1229 av_assert0(stream_idx < mux->nb_streams);
1231 pthread_mutex_lock(&sch->mux_ready_lock);
1233 av_assert0(mux->nb_streams_ready < mux->nb_streams);
1235 // this may be called during initialization - do not start
1236 // threads before sch_start() is called
1237 if (++mux->nb_streams_ready == mux->nb_streams &&
1238 sch->state >= SCH_STATE_STARTED)
1239 ret = mux_init(sch, mux);
1241 pthread_mutex_unlock(&sch->mux_ready_lock);
1243 return ret;
1246 int sch_mux_sub_heartbeat_add(Scheduler *sch, unsigned mux_idx, unsigned stream_idx,
1247 unsigned dec_idx)
1249 SchMux *mux;
1250 SchMuxStream *ms;
1251 int ret = 0;
1253 av_assert0(mux_idx < sch->nb_mux);
1254 mux = &sch->mux[mux_idx];
1256 av_assert0(stream_idx < mux->nb_streams);
1257 ms = &mux->streams[stream_idx];
1259 ret = GROW_ARRAY(ms->sub_heartbeat_dst, ms->nb_sub_heartbeat_dst);
1260 if (ret < 0)
1261 return ret;
1263 av_assert0(dec_idx < sch->nb_dec);
1264 ms->sub_heartbeat_dst[ms->nb_sub_heartbeat_dst - 1] = dec_idx;
1266 if (!mux->sub_heartbeat_pkt) {
1267 mux->sub_heartbeat_pkt = av_packet_alloc();
1268 if (!mux->sub_heartbeat_pkt)
1269 return AVERROR(ENOMEM);
1272 return 0;
1275 static void unchoke_for_stream(Scheduler *sch, SchedulerNode src)
1277 while (1) {
1278 SchFilterGraph *fg;
1280 // fed directly by a demuxer (i.e. not through a filtergraph)
1281 if (src.type == SCH_NODE_TYPE_DEMUX) {
1282 sch->demux[src.idx].waiter.choked_next = 0;
1283 return;
1286 av_assert0(src.type == SCH_NODE_TYPE_FILTER_OUT);
1287 fg = &sch->filters[src.idx];
1289 // the filtergraph contains internal sources and
1290 // requested to be scheduled directly
1291 if (fg->best_input == fg->nb_inputs) {
1292 fg->waiter.choked_next = 0;
1293 return;
1296 src = fg->inputs[fg->best_input].src_sched;
1300 static void schedule_update_locked(Scheduler *sch)
1302 int64_t dts;
1303 int have_unchoked = 0;
1305 // on termination request all waiters are choked,
1306 // we are not to unchoke them
1307 if (atomic_load(&sch->terminate))
1308 return;
1310 dts = trailing_dts(sch, 0);
1312 atomic_store(&sch->last_dts, dts);
1314 // initialize our internal state
1315 for (unsigned type = 0; type < 2; type++)
1316 for (unsigned i = 0; i < (type ? sch->nb_filters : sch->nb_demux); i++) {
1317 SchWaiter *w = type ? &sch->filters[i].waiter : &sch->demux[i].waiter;
1318 w->choked_prev = atomic_load(&w->choked);
1319 w->choked_next = 1;
1322 // figure out the sources that are allowed to proceed
1323 for (unsigned i = 0; i < sch->nb_mux; i++) {
1324 SchMux *mux = &sch->mux[i];
1326 for (unsigned j = 0; j < mux->nb_streams; j++) {
1327 SchMuxStream *ms = &mux->streams[j];
1329 // unblock sources for output streams that are not finished
1330 // and not too far ahead of the trailing stream
1331 if (ms->source_finished)
1332 continue;
1333 if (dts == AV_NOPTS_VALUE && ms->last_dts != AV_NOPTS_VALUE)
1334 continue;
1335 if (dts != AV_NOPTS_VALUE && ms->last_dts - dts >= SCHEDULE_TOLERANCE)
1336 continue;
1338 // resolve the source to unchoke
1339 unchoke_for_stream(sch, ms->src_sched);
1340 have_unchoked = 1;
1344 // make sure to unchoke at least one source, if still available
1345 for (unsigned type = 0; !have_unchoked && type < 2; type++)
1346 for (unsigned i = 0; i < (type ? sch->nb_filters : sch->nb_demux); i++) {
1347 int exited = type ? sch->filters[i].task_exited : sch->demux[i].task_exited;
1348 SchWaiter *w = type ? &sch->filters[i].waiter : &sch->demux[i].waiter;
1349 if (!exited) {
1350 w->choked_next = 0;
1351 have_unchoked = 1;
1352 break;
1357 for (unsigned type = 0; type < 2; type++)
1358 for (unsigned i = 0; i < (type ? sch->nb_filters : sch->nb_demux); i++) {
1359 SchWaiter *w = type ? &sch->filters[i].waiter : &sch->demux[i].waiter;
1360 if (w->choked_prev != w->choked_next)
1361 waiter_set(w, w->choked_next);
1366 enum {
1367 CYCLE_NODE_NEW = 0,
1368 CYCLE_NODE_STARTED,
1369 CYCLE_NODE_DONE,
1372 static int
1373 check_acyclic_for_output(const Scheduler *sch, SchedulerNode src,
1374 uint8_t *filters_visited, SchedulerNode *filters_stack)
1376 unsigned nb_filters_stack = 0;
1378 memset(filters_visited, 0, sch->nb_filters * sizeof(*filters_visited));
1380 while (1) {
1381 const SchFilterGraph *fg = &sch->filters[src.idx];
1383 filters_visited[src.idx] = CYCLE_NODE_STARTED;
1385 // descend into every input, depth first
1386 if (src.idx_stream < fg->nb_inputs) {
1387 const SchFilterIn *fi = &fg->inputs[src.idx_stream++];
1389 // connected to demuxer, no cycles possible
1390 if (fi->src_sched.type == SCH_NODE_TYPE_DEMUX)
1391 continue;
1393 // otherwise connected to another filtergraph
1394 av_assert0(fi->src_sched.type == SCH_NODE_TYPE_FILTER_OUT);
1396 // found a cycle
1397 if (filters_visited[fi->src_sched.idx] == CYCLE_NODE_STARTED)
1398 return AVERROR(EINVAL);
1400 // place current position on stack and descend
1401 av_assert0(nb_filters_stack < sch->nb_filters);
1402 filters_stack[nb_filters_stack++] = src;
1403 src = (SchedulerNode){ .idx = fi->src_sched.idx, .idx_stream = 0 };
1404 continue;
1407 filters_visited[src.idx] = CYCLE_NODE_DONE;
1409 // previous search finished,
1410 if (nb_filters_stack) {
1411 src = filters_stack[--nb_filters_stack];
1412 continue;
1414 return 0;
1418 static int check_acyclic(Scheduler *sch)
1420 uint8_t *filters_visited = NULL;
1421 SchedulerNode *filters_stack = NULL;
1423 int ret = 0;
1425 if (!sch->nb_filters)
1426 return 0;
1428 filters_visited = av_malloc_array(sch->nb_filters, sizeof(*filters_visited));
1429 if (!filters_visited)
1430 return AVERROR(ENOMEM);
1432 filters_stack = av_malloc_array(sch->nb_filters, sizeof(*filters_stack));
1433 if (!filters_stack) {
1434 ret = AVERROR(ENOMEM);
1435 goto fail;
1438 // trace the transcoding graph upstream from every filtegraph
1439 for (unsigned i = 0; i < sch->nb_filters; i++) {
1440 ret = check_acyclic_for_output(sch, (SchedulerNode){ .idx = i },
1441 filters_visited, filters_stack);
1442 if (ret < 0) {
1443 av_log(&sch->filters[i], AV_LOG_ERROR, "Transcoding graph has a cycle\n");
1444 goto fail;
1448 fail:
1449 av_freep(&filters_visited);
1450 av_freep(&filters_stack);
1451 return ret;
1454 static int start_prepare(Scheduler *sch)
1456 int ret;
1458 for (unsigned i = 0; i < sch->nb_demux; i++) {
1459 SchDemux *d = &sch->demux[i];
1461 for (unsigned j = 0; j < d->nb_streams; j++) {
1462 SchDemuxStream *ds = &d->streams[j];
1464 if (!ds->nb_dst) {
1465 av_log(d, AV_LOG_ERROR,
1466 "Demuxer stream %u not connected to any sink\n", j);
1467 return AVERROR(EINVAL);
1470 ds->dst_finished = av_calloc(ds->nb_dst, sizeof(*ds->dst_finished));
1471 if (!ds->dst_finished)
1472 return AVERROR(ENOMEM);
1476 for (unsigned i = 0; i < sch->nb_dec; i++) {
1477 SchDec *dec = &sch->dec[i];
1479 if (!dec->src.type) {
1480 av_log(dec, AV_LOG_ERROR,
1481 "Decoder not connected to a source\n");
1482 return AVERROR(EINVAL);
1485 for (unsigned j = 0; j < dec->nb_outputs; j++) {
1486 SchDecOutput *o = &dec->outputs[j];
1488 if (!o->nb_dst) {
1489 av_log(dec, AV_LOG_ERROR,
1490 "Decoder output %u not connected to any sink\n", j);
1491 return AVERROR(EINVAL);
1494 o->dst_finished = av_calloc(o->nb_dst, sizeof(*o->dst_finished));
1495 if (!o->dst_finished)
1496 return AVERROR(ENOMEM);
1500 for (unsigned i = 0; i < sch->nb_enc; i++) {
1501 SchEnc *enc = &sch->enc[i];
1503 if (!enc->src.type) {
1504 av_log(enc, AV_LOG_ERROR,
1505 "Encoder not connected to a source\n");
1506 return AVERROR(EINVAL);
1508 if (!enc->nb_dst) {
1509 av_log(enc, AV_LOG_ERROR,
1510 "Encoder not connected to any sink\n");
1511 return AVERROR(EINVAL);
1514 enc->dst_finished = av_calloc(enc->nb_dst, sizeof(*enc->dst_finished));
1515 if (!enc->dst_finished)
1516 return AVERROR(ENOMEM);
1519 for (unsigned i = 0; i < sch->nb_mux; i++) {
1520 SchMux *mux = &sch->mux[i];
1522 for (unsigned j = 0; j < mux->nb_streams; j++) {
1523 SchMuxStream *ms = &mux->streams[j];
1525 switch (ms->src.type) {
1526 case SCH_NODE_TYPE_ENC: {
1527 SchEnc *enc = &sch->enc[ms->src.idx];
1528 if (enc->src.type == SCH_NODE_TYPE_DEC) {
1529 ms->src_sched = sch->dec[enc->src.idx].src;
1530 av_assert0(ms->src_sched.type == SCH_NODE_TYPE_DEMUX);
1531 } else {
1532 ms->src_sched = enc->src;
1533 av_assert0(ms->src_sched.type == SCH_NODE_TYPE_FILTER_OUT);
1535 break;
1537 case SCH_NODE_TYPE_DEMUX:
1538 ms->src_sched = ms->src;
1539 break;
1540 default:
1541 av_log(mux, AV_LOG_ERROR,
1542 "Muxer stream #%u not connected to a source\n", j);
1543 return AVERROR(EINVAL);
1547 ret = queue_alloc(&mux->queue, mux->nb_streams, mux->queue_size,
1548 QUEUE_PACKETS);
1549 if (ret < 0)
1550 return ret;
1553 for (unsigned i = 0; i < sch->nb_filters; i++) {
1554 SchFilterGraph *fg = &sch->filters[i];
1556 for (unsigned j = 0; j < fg->nb_inputs; j++) {
1557 SchFilterIn *fi = &fg->inputs[j];
1558 SchDec *dec;
1560 if (!fi->src.type) {
1561 av_log(fg, AV_LOG_ERROR,
1562 "Filtergraph input %u not connected to a source\n", j);
1563 return AVERROR(EINVAL);
1566 if (fi->src.type == SCH_NODE_TYPE_FILTER_OUT)
1567 fi->src_sched = fi->src;
1568 else {
1569 av_assert0(fi->src.type == SCH_NODE_TYPE_DEC);
1570 dec = &sch->dec[fi->src.idx];
1572 switch (dec->src.type) {
1573 case SCH_NODE_TYPE_DEMUX: fi->src_sched = dec->src; break;
1574 case SCH_NODE_TYPE_ENC: fi->src_sched = sch->enc[dec->src.idx].src; break;
1575 default: av_assert0(0);
1580 for (unsigned j = 0; j < fg->nb_outputs; j++) {
1581 SchFilterOut *fo = &fg->outputs[j];
1583 if (!fo->dst.type) {
1584 av_log(fg, AV_LOG_ERROR,
1585 "Filtergraph %u output %u not connected to a sink\n", i, j);
1586 return AVERROR(EINVAL);
1591 // Check that the transcoding graph has no cycles.
1592 ret = check_acyclic(sch);
1593 if (ret < 0)
1594 return ret;
1596 return 0;
1599 int sch_start(Scheduler *sch)
1601 int ret;
1603 ret = start_prepare(sch);
1604 if (ret < 0)
1605 return ret;
1607 av_assert0(sch->state == SCH_STATE_UNINIT);
1608 sch->state = SCH_STATE_STARTED;
1610 for (unsigned i = 0; i < sch->nb_mux; i++) {
1611 SchMux *mux = &sch->mux[i];
1613 if (mux->nb_streams_ready == mux->nb_streams) {
1614 ret = mux_init(sch, mux);
1615 if (ret < 0)
1616 goto fail;
1620 for (unsigned i = 0; i < sch->nb_enc; i++) {
1621 SchEnc *enc = &sch->enc[i];
1623 ret = task_start(&enc->task);
1624 if (ret < 0)
1625 goto fail;
1628 for (unsigned i = 0; i < sch->nb_filters; i++) {
1629 SchFilterGraph *fg = &sch->filters[i];
1631 ret = task_start(&fg->task);
1632 if (ret < 0)
1633 goto fail;
1636 for (unsigned i = 0; i < sch->nb_dec; i++) {
1637 SchDec *dec = &sch->dec[i];
1639 ret = task_start(&dec->task);
1640 if (ret < 0)
1641 goto fail;
1644 for (unsigned i = 0; i < sch->nb_demux; i++) {
1645 SchDemux *d = &sch->demux[i];
1647 if (!d->nb_streams)
1648 continue;
1650 ret = task_start(&d->task);
1651 if (ret < 0)
1652 goto fail;
1655 pthread_mutex_lock(&sch->schedule_lock);
1656 schedule_update_locked(sch);
1657 pthread_mutex_unlock(&sch->schedule_lock);
1659 return 0;
1660 fail:
1661 sch_stop(sch, NULL);
1662 return ret;
1665 int sch_wait(Scheduler *sch, uint64_t timeout_us, int64_t *transcode_ts)
1667 int ret, err;
1669 // convert delay to absolute timestamp
1670 timeout_us += av_gettime();
1672 pthread_mutex_lock(&sch->mux_done_lock);
1674 if (sch->nb_mux_done < sch->nb_mux) {
1675 struct timespec tv = { .tv_sec = timeout_us / 1000000,
1676 .tv_nsec = (timeout_us % 1000000) * 1000 };
1677 pthread_cond_timedwait(&sch->mux_done_cond, &sch->mux_done_lock, &tv);
1680 ret = sch->nb_mux_done == sch->nb_mux;
1682 pthread_mutex_unlock(&sch->mux_done_lock);
1684 *transcode_ts = atomic_load(&sch->last_dts);
1686 // abort transcoding if any task failed
1687 err = atomic_load(&sch->task_failed);
1689 return ret || err;
1692 static int enc_open(Scheduler *sch, SchEnc *enc, const AVFrame *frame)
1694 int ret;
1696 ret = enc->open_cb(enc->task.func_arg, frame);
1697 if (ret < 0)
1698 return ret;
1700 // ret>0 signals audio frame size, which means sync queue must
1701 // have been enabled during encoder creation
1702 if (ret > 0) {
1703 SchSyncQueue *sq;
1705 av_assert0(enc->sq_idx[0] >= 0);
1706 sq = &sch->sq_enc[enc->sq_idx[0]];
1708 pthread_mutex_lock(&sq->lock);
1710 sq_frame_samples(sq->sq, enc->sq_idx[1], ret);
1712 pthread_mutex_unlock(&sq->lock);
1715 return 0;
1718 static int send_to_enc_thread(Scheduler *sch, SchEnc *enc, AVFrame *frame)
1720 int ret;
1722 if (!frame) {
1723 tq_send_finish(enc->queue, 0);
1724 return 0;
1727 if (enc->in_finished)
1728 return AVERROR_EOF;
1730 ret = tq_send(enc->queue, 0, frame);
1731 if (ret < 0)
1732 enc->in_finished = 1;
1734 return ret;
1737 static int send_to_enc_sq(Scheduler *sch, SchEnc *enc, AVFrame *frame)
1739 SchSyncQueue *sq = &sch->sq_enc[enc->sq_idx[0]];
1740 int ret = 0;
1742 // inform the scheduling code that no more input will arrive along this path;
1743 // this is necessary because the sync queue may not send an EOF downstream
1744 // until other streams finish
1745 // TODO: consider a cleaner way of passing this information through
1746 // the pipeline
1747 if (!frame) {
1748 for (unsigned i = 0; i < enc->nb_dst; i++) {
1749 SchMux *mux;
1750 SchMuxStream *ms;
1752 if (enc->dst[i].type != SCH_NODE_TYPE_MUX)
1753 continue;
1755 mux = &sch->mux[enc->dst[i].idx];
1756 ms = &mux->streams[enc->dst[i].idx_stream];
1758 pthread_mutex_lock(&sch->schedule_lock);
1760 ms->source_finished = 1;
1761 schedule_update_locked(sch);
1763 pthread_mutex_unlock(&sch->schedule_lock);
1767 pthread_mutex_lock(&sq->lock);
1769 ret = sq_send(sq->sq, enc->sq_idx[1], SQFRAME(frame));
1770 if (ret < 0)
1771 goto finish;
1773 while (1) {
1774 SchEnc *enc;
1776 // TODO: the SQ API should be extended to allow returning EOF
1777 // for individual streams
1778 ret = sq_receive(sq->sq, -1, SQFRAME(sq->frame));
1779 if (ret < 0) {
1780 ret = (ret == AVERROR(EAGAIN)) ? 0 : ret;
1781 break;
1784 enc = &sch->enc[sq->enc_idx[ret]];
1785 ret = send_to_enc_thread(sch, enc, sq->frame);
1786 if (ret < 0) {
1787 av_frame_unref(sq->frame);
1788 if (ret != AVERROR_EOF)
1789 break;
1791 sq_send(sq->sq, enc->sq_idx[1], SQFRAME(NULL));
1792 continue;
1796 if (ret < 0) {
1797 // close all encoders fed from this sync queue
1798 for (unsigned i = 0; i < sq->nb_enc_idx; i++) {
1799 int err = send_to_enc_thread(sch, &sch->enc[sq->enc_idx[i]], NULL);
1801 // if the sync queue error is EOF and closing the encoder
1802 // produces a more serious error, make sure to pick the latter
1803 ret = err_merge((ret == AVERROR_EOF && err < 0) ? 0 : ret, err);
1807 finish:
1808 pthread_mutex_unlock(&sq->lock);
1810 return ret;
1813 static int send_to_enc(Scheduler *sch, SchEnc *enc, AVFrame *frame)
1815 if (enc->open_cb && frame && !enc->opened) {
1816 int ret = enc_open(sch, enc, frame);
1817 if (ret < 0)
1818 return ret;
1819 enc->opened = 1;
1821 // discard empty frames that only carry encoder init parameters
1822 if (!frame->buf[0]) {
1823 av_frame_unref(frame);
1824 return 0;
1828 return (enc->sq_idx[0] >= 0) ?
1829 send_to_enc_sq (sch, enc, frame) :
1830 send_to_enc_thread(sch, enc, frame);
1833 static int mux_queue_packet(SchMux *mux, SchMuxStream *ms, AVPacket *pkt)
1835 PreMuxQueue *q = &ms->pre_mux_queue;
1836 AVPacket *tmp_pkt = NULL;
1837 int ret;
1839 if (!av_fifo_can_write(q->fifo)) {
1840 size_t packets = av_fifo_can_read(q->fifo);
1841 size_t pkt_size = pkt ? pkt->size : 0;
1842 int thresh_reached = (q->data_size + pkt_size) > q->data_threshold;
1843 size_t max_packets = thresh_reached ? q->max_packets : SIZE_MAX;
1844 size_t new_size = FFMIN(2 * packets, max_packets);
1846 if (new_size <= packets) {
1847 av_log(mux, AV_LOG_ERROR,
1848 "Too many packets buffered for output stream.\n");
1849 return AVERROR(ENOSPC);
1851 ret = av_fifo_grow2(q->fifo, new_size - packets);
1852 if (ret < 0)
1853 return ret;
1856 if (pkt) {
1857 tmp_pkt = av_packet_alloc();
1858 if (!tmp_pkt)
1859 return AVERROR(ENOMEM);
1861 av_packet_move_ref(tmp_pkt, pkt);
1862 q->data_size += tmp_pkt->size;
1864 av_fifo_write(q->fifo, &tmp_pkt, 1);
1866 return 0;
1869 static int send_to_mux(Scheduler *sch, SchMux *mux, unsigned stream_idx,
1870 AVPacket *pkt)
1872 SchMuxStream *ms = &mux->streams[stream_idx];
1873 int64_t dts = (pkt && pkt->dts != AV_NOPTS_VALUE) ?
1874 av_rescale_q(pkt->dts + pkt->duration, pkt->time_base, AV_TIME_BASE_Q) :
1875 AV_NOPTS_VALUE;
1877 // queue the packet if the muxer cannot be started yet
1878 if (!atomic_load(&mux->mux_started)) {
1879 int queued = 0;
1881 // the muxer could have started between the above atomic check and
1882 // locking the mutex, then this block falls through to normal send path
1883 pthread_mutex_lock(&sch->mux_ready_lock);
1885 if (!atomic_load(&mux->mux_started)) {
1886 int ret = mux_queue_packet(mux, ms, pkt);
1887 queued = ret < 0 ? ret : 1;
1890 pthread_mutex_unlock(&sch->mux_ready_lock);
1892 if (queued < 0)
1893 return queued;
1894 else if (queued)
1895 goto update_schedule;
1898 if (pkt) {
1899 int ret;
1901 if (ms->init_eof)
1902 return AVERROR_EOF;
1904 ret = tq_send(mux->queue, stream_idx, pkt);
1905 if (ret < 0)
1906 return ret;
1907 } else
1908 tq_send_finish(mux->queue, stream_idx);
1910 update_schedule:
1911 // TODO: use atomics to check whether this changes trailing dts
1912 // to avoid locking unnecesarily
1913 if (dts != AV_NOPTS_VALUE || !pkt) {
1914 pthread_mutex_lock(&sch->schedule_lock);
1916 if (pkt) ms->last_dts = dts;
1917 else ms->source_finished = 1;
1919 schedule_update_locked(sch);
1921 pthread_mutex_unlock(&sch->schedule_lock);
1924 return 0;
1927 static int
1928 demux_stream_send_to_dst(Scheduler *sch, const SchedulerNode dst,
1929 uint8_t *dst_finished, AVPacket *pkt, unsigned flags)
1931 int ret;
1933 if (*dst_finished)
1934 return AVERROR_EOF;
1936 if (pkt && dst.type == SCH_NODE_TYPE_MUX &&
1937 (flags & DEMUX_SEND_STREAMCOPY_EOF)) {
1938 av_packet_unref(pkt);
1939 pkt = NULL;
1942 if (!pkt)
1943 goto finish;
1945 ret = (dst.type == SCH_NODE_TYPE_MUX) ?
1946 send_to_mux(sch, &sch->mux[dst.idx], dst.idx_stream, pkt) :
1947 tq_send(sch->dec[dst.idx].queue, 0, pkt);
1948 if (ret == AVERROR_EOF)
1949 goto finish;
1951 return ret;
1953 finish:
1954 if (dst.type == SCH_NODE_TYPE_MUX)
1955 send_to_mux(sch, &sch->mux[dst.idx], dst.idx_stream, NULL);
1956 else
1957 tq_send_finish(sch->dec[dst.idx].queue, 0);
1959 *dst_finished = 1;
1960 return AVERROR_EOF;
1963 static int demux_send_for_stream(Scheduler *sch, SchDemux *d, SchDemuxStream *ds,
1964 AVPacket *pkt, unsigned flags)
1966 unsigned nb_done = 0;
1968 for (unsigned i = 0; i < ds->nb_dst; i++) {
1969 AVPacket *to_send = pkt;
1970 uint8_t *finished = &ds->dst_finished[i];
1972 int ret;
1974 // sending a packet consumes it, so make a temporary reference if needed
1975 if (pkt && i < ds->nb_dst - 1) {
1976 to_send = d->send_pkt;
1978 ret = av_packet_ref(to_send, pkt);
1979 if (ret < 0)
1980 return ret;
1983 ret = demux_stream_send_to_dst(sch, ds->dst[i], finished, to_send, flags);
1984 if (to_send)
1985 av_packet_unref(to_send);
1986 if (ret == AVERROR_EOF)
1987 nb_done++;
1988 else if (ret < 0)
1989 return ret;
1992 return (nb_done == ds->nb_dst) ? AVERROR_EOF : 0;
1995 static int demux_flush(Scheduler *sch, SchDemux *d, AVPacket *pkt)
1997 Timestamp max_end_ts = (Timestamp){ .ts = AV_NOPTS_VALUE };
1999 av_assert0(!pkt->buf && !pkt->data && !pkt->side_data_elems);
2001 for (unsigned i = 0; i < d->nb_streams; i++) {
2002 SchDemuxStream *ds = &d->streams[i];
2004 for (unsigned j = 0; j < ds->nb_dst; j++) {
2005 const SchedulerNode *dst = &ds->dst[j];
2006 SchDec *dec;
2007 int ret;
2009 if (ds->dst_finished[j] || dst->type != SCH_NODE_TYPE_DEC)
2010 continue;
2012 dec = &sch->dec[dst->idx];
2014 ret = tq_send(dec->queue, 0, pkt);
2015 if (ret < 0)
2016 return ret;
2018 if (dec->queue_end_ts) {
2019 Timestamp ts;
2020 ret = av_thread_message_queue_recv(dec->queue_end_ts, &ts, 0);
2021 if (ret < 0)
2022 return ret;
2024 if (max_end_ts.ts == AV_NOPTS_VALUE ||
2025 (ts.ts != AV_NOPTS_VALUE &&
2026 av_compare_ts(max_end_ts.ts, max_end_ts.tb, ts.ts, ts.tb) < 0))
2027 max_end_ts = ts;
2033 pkt->pts = max_end_ts.ts;
2034 pkt->time_base = max_end_ts.tb;
2036 return 0;
2039 int sch_demux_send(Scheduler *sch, unsigned demux_idx, AVPacket *pkt,
2040 unsigned flags)
2042 SchDemux *d;
2043 int terminate;
2045 av_assert0(demux_idx < sch->nb_demux);
2046 d = &sch->demux[demux_idx];
2048 terminate = waiter_wait(sch, &d->waiter);
2049 if (terminate)
2050 return AVERROR_EXIT;
2052 // flush the downstreams after seek
2053 if (pkt->stream_index == -1)
2054 return demux_flush(sch, d, pkt);
2056 av_assert0(pkt->stream_index < d->nb_streams);
2058 return demux_send_for_stream(sch, d, &d->streams[pkt->stream_index], pkt, flags);
2061 static int demux_done(Scheduler *sch, unsigned demux_idx)
2063 SchDemux *d = &sch->demux[demux_idx];
2064 int ret = 0;
2066 for (unsigned i = 0; i < d->nb_streams; i++) {
2067 int err = demux_send_for_stream(sch, d, &d->streams[i], NULL, 0);
2068 if (err != AVERROR_EOF)
2069 ret = err_merge(ret, err);
2072 pthread_mutex_lock(&sch->schedule_lock);
2074 d->task_exited = 1;
2076 schedule_update_locked(sch);
2078 pthread_mutex_unlock(&sch->schedule_lock);
2080 return ret;
2083 int sch_mux_receive(Scheduler *sch, unsigned mux_idx, AVPacket *pkt)
2085 SchMux *mux;
2086 int ret, stream_idx;
2088 av_assert0(mux_idx < sch->nb_mux);
2089 mux = &sch->mux[mux_idx];
2091 ret = tq_receive(mux->queue, &stream_idx, pkt);
2092 pkt->stream_index = stream_idx;
2093 return ret;
2096 void sch_mux_receive_finish(Scheduler *sch, unsigned mux_idx, unsigned stream_idx)
2098 SchMux *mux;
2100 av_assert0(mux_idx < sch->nb_mux);
2101 mux = &sch->mux[mux_idx];
2103 av_assert0(stream_idx < mux->nb_streams);
2104 tq_receive_finish(mux->queue, stream_idx);
2106 pthread_mutex_lock(&sch->schedule_lock);
2107 mux->streams[stream_idx].source_finished = 1;
2109 schedule_update_locked(sch);
2111 pthread_mutex_unlock(&sch->schedule_lock);
2114 int sch_mux_sub_heartbeat(Scheduler *sch, unsigned mux_idx, unsigned stream_idx,
2115 const AVPacket *pkt)
2117 SchMux *mux;
2118 SchMuxStream *ms;
2120 av_assert0(mux_idx < sch->nb_mux);
2121 mux = &sch->mux[mux_idx];
2123 av_assert0(stream_idx < mux->nb_streams);
2124 ms = &mux->streams[stream_idx];
2126 for (unsigned i = 0; i < ms->nb_sub_heartbeat_dst; i++) {
2127 SchDec *dst = &sch->dec[ms->sub_heartbeat_dst[i]];
2128 int ret;
2130 ret = av_packet_copy_props(mux->sub_heartbeat_pkt, pkt);
2131 if (ret < 0)
2132 return ret;
2134 tq_send(dst->queue, 0, mux->sub_heartbeat_pkt);
2137 return 0;
2140 static int mux_done(Scheduler *sch, unsigned mux_idx)
2142 SchMux *mux = &sch->mux[mux_idx];
2144 pthread_mutex_lock(&sch->schedule_lock);
2146 for (unsigned i = 0; i < mux->nb_streams; i++) {
2147 tq_receive_finish(mux->queue, i);
2148 mux->streams[i].source_finished = 1;
2151 schedule_update_locked(sch);
2153 pthread_mutex_unlock(&sch->schedule_lock);
2155 pthread_mutex_lock(&sch->mux_done_lock);
2157 av_assert0(sch->nb_mux_done < sch->nb_mux);
2158 sch->nb_mux_done++;
2160 pthread_cond_signal(&sch->mux_done_cond);
2162 pthread_mutex_unlock(&sch->mux_done_lock);
2164 return 0;
2167 int sch_dec_receive(Scheduler *sch, unsigned dec_idx, AVPacket *pkt)
2169 SchDec *dec;
2170 int ret, dummy;
2172 av_assert0(dec_idx < sch->nb_dec);
2173 dec = &sch->dec[dec_idx];
2175 // the decoder should have given us post-flush end timestamp in pkt
2176 if (dec->expect_end_ts) {
2177 Timestamp ts = (Timestamp){ .ts = pkt->pts, .tb = pkt->time_base };
2178 ret = av_thread_message_queue_send(dec->queue_end_ts, &ts, 0);
2179 if (ret < 0)
2180 return ret;
2182 dec->expect_end_ts = 0;
2185 ret = tq_receive(dec->queue, &dummy, pkt);
2186 av_assert0(dummy <= 0);
2188 // got a flush packet, on the next call to this function the decoder
2189 // will give us post-flush end timestamp
2190 if (ret >= 0 && !pkt->data && !pkt->side_data_elems && dec->queue_end_ts)
2191 dec->expect_end_ts = 1;
2193 return ret;
2196 static int send_to_filter(Scheduler *sch, SchFilterGraph *fg,
2197 unsigned in_idx, AVFrame *frame)
2199 if (frame)
2200 return tq_send(fg->queue, in_idx, frame);
2202 if (!fg->inputs[in_idx].send_finished) {
2203 fg->inputs[in_idx].send_finished = 1;
2204 tq_send_finish(fg->queue, in_idx);
2206 // close the control stream when all actual inputs are done
2207 if (atomic_fetch_add(&fg->nb_inputs_finished_send, 1) == fg->nb_inputs - 1)
2208 tq_send_finish(fg->queue, fg->nb_inputs);
2210 return 0;
2213 static int dec_send_to_dst(Scheduler *sch, const SchedulerNode dst,
2214 uint8_t *dst_finished, AVFrame *frame)
2216 int ret;
2218 if (*dst_finished)
2219 return AVERROR_EOF;
2221 if (!frame)
2222 goto finish;
2224 ret = (dst.type == SCH_NODE_TYPE_FILTER_IN) ?
2225 send_to_filter(sch, &sch->filters[dst.idx], dst.idx_stream, frame) :
2226 send_to_enc(sch, &sch->enc[dst.idx], frame);
2227 if (ret == AVERROR_EOF)
2228 goto finish;
2230 return ret;
2232 finish:
2233 if (dst.type == SCH_NODE_TYPE_FILTER_IN)
2234 send_to_filter(sch, &sch->filters[dst.idx], dst.idx_stream, NULL);
2235 else
2236 send_to_enc(sch, &sch->enc[dst.idx], NULL);
2238 *dst_finished = 1;
2240 return AVERROR_EOF;
2243 int sch_dec_send(Scheduler *sch, unsigned dec_idx,
2244 unsigned out_idx, AVFrame *frame)
2246 SchDec *dec;
2247 SchDecOutput *o;
2248 int ret;
2249 unsigned nb_done = 0;
2251 av_assert0(dec_idx < sch->nb_dec);
2252 dec = &sch->dec[dec_idx];
2254 av_assert0(out_idx < dec->nb_outputs);
2255 o = &dec->outputs[out_idx];
2257 for (unsigned i = 0; i < o->nb_dst; i++) {
2258 uint8_t *finished = &o->dst_finished[i];
2259 AVFrame *to_send = frame;
2261 // sending a frame consumes it, so make a temporary reference if needed
2262 if (i < o->nb_dst - 1) {
2263 to_send = dec->send_frame;
2265 // frame may sometimes contain props only,
2266 // e.g. to signal EOF timestamp
2267 ret = frame->buf[0] ? av_frame_ref(to_send, frame) :
2268 av_frame_copy_props(to_send, frame);
2269 if (ret < 0)
2270 return ret;
2273 ret = dec_send_to_dst(sch, o->dst[i], finished, to_send);
2274 if (ret < 0) {
2275 av_frame_unref(to_send);
2276 if (ret == AVERROR_EOF) {
2277 nb_done++;
2278 continue;
2280 return ret;
2284 return (nb_done == o->nb_dst) ? AVERROR_EOF : 0;
2287 static int dec_done(Scheduler *sch, unsigned dec_idx)
2289 SchDec *dec = &sch->dec[dec_idx];
2290 int ret = 0;
2292 tq_receive_finish(dec->queue, 0);
2294 // make sure our source does not get stuck waiting for end timestamps
2295 // that will never arrive
2296 if (dec->queue_end_ts)
2297 av_thread_message_queue_set_err_recv(dec->queue_end_ts, AVERROR_EOF);
2299 for (unsigned i = 0; i < dec->nb_outputs; i++) {
2300 SchDecOutput *o = &dec->outputs[i];
2302 for (unsigned j = 0; j < o->nb_dst; j++) {
2303 int err = dec_send_to_dst(sch, o->dst[j], &o->dst_finished[j], NULL);
2304 if (err < 0 && err != AVERROR_EOF)
2305 ret = err_merge(ret, err);
2309 return ret;
2312 int sch_enc_receive(Scheduler *sch, unsigned enc_idx, AVFrame *frame)
2314 SchEnc *enc;
2315 int ret, dummy;
2317 av_assert0(enc_idx < sch->nb_enc);
2318 enc = &sch->enc[enc_idx];
2320 ret = tq_receive(enc->queue, &dummy, frame);
2321 av_assert0(dummy <= 0);
2323 return ret;
2326 static int enc_send_to_dst(Scheduler *sch, const SchedulerNode dst,
2327 uint8_t *dst_finished, AVPacket *pkt)
2329 int ret;
2331 if (*dst_finished)
2332 return AVERROR_EOF;
2334 if (!pkt)
2335 goto finish;
2337 ret = (dst.type == SCH_NODE_TYPE_MUX) ?
2338 send_to_mux(sch, &sch->mux[dst.idx], dst.idx_stream, pkt) :
2339 tq_send(sch->dec[dst.idx].queue, 0, pkt);
2340 if (ret == AVERROR_EOF)
2341 goto finish;
2343 return ret;
2345 finish:
2346 if (dst.type == SCH_NODE_TYPE_MUX)
2347 send_to_mux(sch, &sch->mux[dst.idx], dst.idx_stream, NULL);
2348 else
2349 tq_send_finish(sch->dec[dst.idx].queue, 0);
2351 *dst_finished = 1;
2353 return AVERROR_EOF;
2356 int sch_enc_send(Scheduler *sch, unsigned enc_idx, AVPacket *pkt)
2358 SchEnc *enc;
2359 int ret;
2361 av_assert0(enc_idx < sch->nb_enc);
2362 enc = &sch->enc[enc_idx];
2364 for (unsigned i = 0; i < enc->nb_dst; i++) {
2365 uint8_t *finished = &enc->dst_finished[i];
2366 AVPacket *to_send = pkt;
2368 // sending a packet consumes it, so make a temporary reference if needed
2369 if (i < enc->nb_dst - 1) {
2370 to_send = enc->send_pkt;
2372 ret = av_packet_ref(to_send, pkt);
2373 if (ret < 0)
2374 return ret;
2377 ret = enc_send_to_dst(sch, enc->dst[i], finished, to_send);
2378 if (ret < 0) {
2379 av_packet_unref(to_send);
2380 if (ret == AVERROR_EOF)
2381 continue;
2382 return ret;
2386 return 0;
2389 static int enc_done(Scheduler *sch, unsigned enc_idx)
2391 SchEnc *enc = &sch->enc[enc_idx];
2392 int ret = 0;
2394 tq_receive_finish(enc->queue, 0);
2396 for (unsigned i = 0; i < enc->nb_dst; i++) {
2397 int err = enc_send_to_dst(sch, enc->dst[i], &enc->dst_finished[i], NULL);
2398 if (err < 0 && err != AVERROR_EOF)
2399 ret = err_merge(ret, err);
2402 return ret;
2405 int sch_filter_receive(Scheduler *sch, unsigned fg_idx,
2406 unsigned *in_idx, AVFrame *frame)
2408 SchFilterGraph *fg;
2410 av_assert0(fg_idx < sch->nb_filters);
2411 fg = &sch->filters[fg_idx];
2413 av_assert0(*in_idx <= fg->nb_inputs);
2415 // update scheduling to account for desired input stream, if it changed
2417 // this check needs no locking because only the filtering thread
2418 // updates this value
2419 if (*in_idx != fg->best_input) {
2420 pthread_mutex_lock(&sch->schedule_lock);
2422 fg->best_input = *in_idx;
2423 schedule_update_locked(sch);
2425 pthread_mutex_unlock(&sch->schedule_lock);
2428 if (*in_idx == fg->nb_inputs) {
2429 int terminate = waiter_wait(sch, &fg->waiter);
2430 return terminate ? AVERROR_EOF : AVERROR(EAGAIN);
2433 while (1) {
2434 int ret, idx;
2436 ret = tq_receive(fg->queue, &idx, frame);
2437 if (idx < 0)
2438 return AVERROR_EOF;
2439 else if (ret >= 0) {
2440 *in_idx = idx;
2441 return 0;
2444 // disregard EOFs for specific streams - they should always be
2445 // preceded by an EOF frame
2449 void sch_filter_receive_finish(Scheduler *sch, unsigned fg_idx, unsigned in_idx)
2451 SchFilterGraph *fg;
2452 SchFilterIn *fi;
2454 av_assert0(fg_idx < sch->nb_filters);
2455 fg = &sch->filters[fg_idx];
2457 av_assert0(in_idx < fg->nb_inputs);
2458 fi = &fg->inputs[in_idx];
2460 if (!fi->receive_finished) {
2461 fi->receive_finished = 1;
2462 tq_receive_finish(fg->queue, in_idx);
2464 // close the control stream when all actual inputs are done
2465 if (++fg->nb_inputs_finished_receive == fg->nb_inputs)
2466 tq_receive_finish(fg->queue, fg->nb_inputs);
2470 int sch_filter_send(Scheduler *sch, unsigned fg_idx, unsigned out_idx, AVFrame *frame)
2472 SchFilterGraph *fg;
2473 SchedulerNode dst;
2475 av_assert0(fg_idx < sch->nb_filters);
2476 fg = &sch->filters[fg_idx];
2478 av_assert0(out_idx < fg->nb_outputs);
2479 dst = fg->outputs[out_idx].dst;
2481 return (dst.type == SCH_NODE_TYPE_ENC) ?
2482 send_to_enc (sch, &sch->enc[dst.idx], frame) :
2483 send_to_filter(sch, &sch->filters[dst.idx], dst.idx_stream, frame);
2486 static int filter_done(Scheduler *sch, unsigned fg_idx)
2488 SchFilterGraph *fg = &sch->filters[fg_idx];
2489 int ret = 0;
2491 for (unsigned i = 0; i <= fg->nb_inputs; i++)
2492 tq_receive_finish(fg->queue, i);
2494 for (unsigned i = 0; i < fg->nb_outputs; i++) {
2495 SchedulerNode dst = fg->outputs[i].dst;
2496 int err = (dst.type == SCH_NODE_TYPE_ENC) ?
2497 send_to_enc (sch, &sch->enc[dst.idx], NULL) :
2498 send_to_filter(sch, &sch->filters[dst.idx], dst.idx_stream, NULL);
2500 if (err < 0 && err != AVERROR_EOF)
2501 ret = err_merge(ret, err);
2504 pthread_mutex_lock(&sch->schedule_lock);
2506 fg->task_exited = 1;
2508 schedule_update_locked(sch);
2510 pthread_mutex_unlock(&sch->schedule_lock);
2512 return ret;
2515 int sch_filter_command(Scheduler *sch, unsigned fg_idx, AVFrame *frame)
2517 SchFilterGraph *fg;
2519 av_assert0(fg_idx < sch->nb_filters);
2520 fg = &sch->filters[fg_idx];
2522 return send_to_filter(sch, fg, fg->nb_inputs, frame);
2525 static int task_cleanup(Scheduler *sch, SchedulerNode node)
2527 switch (node.type) {
2528 case SCH_NODE_TYPE_DEMUX: return demux_done (sch, node.idx);
2529 case SCH_NODE_TYPE_MUX: return mux_done (sch, node.idx);
2530 case SCH_NODE_TYPE_DEC: return dec_done (sch, node.idx);
2531 case SCH_NODE_TYPE_ENC: return enc_done (sch, node.idx);
2532 case SCH_NODE_TYPE_FILTER_IN: return filter_done(sch, node.idx);
2533 default: av_assert0(0);
2537 static void *task_wrapper(void *arg)
2539 SchTask *task = arg;
2540 Scheduler *sch = task->parent;
2541 int ret;
2542 int err = 0;
2544 ret = task->func(task->func_arg);
2545 if (ret < 0)
2546 av_log(task->func_arg, AV_LOG_ERROR,
2547 "Task finished with error code: %d (%s)\n", ret, av_err2str(ret));
2549 err = task_cleanup(sch, task->node);
2550 ret = err_merge(ret, err);
2552 // EOF is considered normal termination
2553 if (ret == AVERROR_EOF)
2554 ret = 0;
2555 if (ret < 0)
2556 atomic_store(&sch->task_failed, 1);
2558 av_log(task->func_arg, ret < 0 ? AV_LOG_ERROR : AV_LOG_VERBOSE,
2559 "Terminating thread with return code %d (%s)\n", ret,
2560 ret < 0 ? av_err2str(ret) : "success");
2562 return (void*)(intptr_t)ret;
2565 static int task_stop(Scheduler *sch, SchTask *task)
2567 int ret;
2568 void *thread_ret;
2570 if (!task->thread_running)
2571 return task_cleanup(sch, task->node);
2573 ret = pthread_join(task->thread, &thread_ret);
2574 av_assert0(ret == 0);
2576 task->thread_running = 0;
2578 return (intptr_t)thread_ret;
2581 int sch_stop(Scheduler *sch, int64_t *finish_ts)
2583 int ret = 0, err;
2585 if (sch->state != SCH_STATE_STARTED)
2586 return 0;
2588 atomic_store(&sch->terminate, 1);
2590 for (unsigned type = 0; type < 2; type++)
2591 for (unsigned i = 0; i < (type ? sch->nb_demux : sch->nb_filters); i++) {
2592 SchWaiter *w = type ? &sch->demux[i].waiter : &sch->filters[i].waiter;
2593 waiter_set(w, 1);
2596 for (unsigned i = 0; i < sch->nb_demux; i++) {
2597 SchDemux *d = &sch->demux[i];
2599 err = task_stop(sch, &d->task);
2600 ret = err_merge(ret, err);
2603 for (unsigned i = 0; i < sch->nb_dec; i++) {
2604 SchDec *dec = &sch->dec[i];
2606 err = task_stop(sch, &dec->task);
2607 ret = err_merge(ret, err);
2610 for (unsigned i = 0; i < sch->nb_filters; i++) {
2611 SchFilterGraph *fg = &sch->filters[i];
2613 err = task_stop(sch, &fg->task);
2614 ret = err_merge(ret, err);
2617 for (unsigned i = 0; i < sch->nb_enc; i++) {
2618 SchEnc *enc = &sch->enc[i];
2620 err = task_stop(sch, &enc->task);
2621 ret = err_merge(ret, err);
2624 for (unsigned i = 0; i < sch->nb_mux; i++) {
2625 SchMux *mux = &sch->mux[i];
2627 err = task_stop(sch, &mux->task);
2628 ret = err_merge(ret, err);
2631 if (finish_ts)
2632 *finish_ts = trailing_dts(sch, 1);
2634 sch->state = SCH_STATE_STOPPED;
2636 return ret;