2 * This file is part of FFmpeg.
4 * FFmpeg is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Lesser General Public
6 * License as published by the Free Software Foundation; either
7 * version 2.1 of the License, or (at your option) any later version.
9 * FFmpeg is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 * Lesser General Public License for more details.
14 * You should have received a copy of the GNU Lesser General Public
15 * License along with FFmpeg; if not, write to the Free Software
16 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
22 #include "libavutil/avassert.h"
23 #include "libavutil/error.h"
24 #include "libavutil/fifo.h"
25 #include "libavutil/intreadwrite.h"
26 #include "libavutil/mem.h"
27 #include "libavutil/thread.h"
30 #include "thread_queue.h"
33 FINISHED_SEND
= (1 << 0),
34 FINISHED_RECV
= (1 << 1),
37 typedef struct FifoElem
{
39 unsigned int stream_idx
;
44 unsigned int nb_streams
;
49 void (*obj_move
)(void *dst
, void *src
);
55 void tq_free(ThreadQueue
**ptq
)
57 ThreadQueue
*tq
= *ptq
;
64 while (av_fifo_read(tq
->fifo
, &elem
, 1) >= 0)
65 objpool_release(tq
->obj_pool
, &elem
.obj
);
67 av_fifo_freep2(&tq
->fifo
);
69 objpool_free(&tq
->obj_pool
);
71 av_freep(&tq
->finished
);
73 pthread_cond_destroy(&tq
->cond
);
74 pthread_mutex_destroy(&tq
->lock
);
79 ThreadQueue
*tq_alloc(unsigned int nb_streams
, size_t queue_size
,
80 ObjPool
*obj_pool
, void (*obj_move
)(void *dst
, void *src
))
85 tq
= av_mallocz(sizeof(*tq
));
89 ret
= pthread_cond_init(&tq
->cond
, NULL
);
95 ret
= pthread_mutex_init(&tq
->lock
, NULL
);
97 pthread_cond_destroy(&tq
->cond
);
102 tq
->finished
= av_calloc(nb_streams
, sizeof(*tq
->finished
));
105 tq
->nb_streams
= nb_streams
;
107 tq
->fifo
= av_fifo_alloc2(queue_size
, sizeof(FifoElem
), 0);
111 tq
->obj_pool
= obj_pool
;
112 tq
->obj_move
= obj_move
;
120 int tq_send(ThreadQueue
*tq
, unsigned int stream_idx
, void *data
)
125 av_assert0(stream_idx
< tq
->nb_streams
);
126 finished
= &tq
->finished
[stream_idx
];
128 pthread_mutex_lock(&tq
->lock
);
130 if (*finished
& FINISHED_SEND
) {
131 ret
= AVERROR(EINVAL
);
135 while (!(*finished
& FINISHED_RECV
) && !av_fifo_can_write(tq
->fifo
))
136 pthread_cond_wait(&tq
->cond
, &tq
->lock
);
138 if (*finished
& FINISHED_RECV
) {
140 *finished
|= FINISHED_SEND
;
142 FifoElem elem
= { .stream_idx
= stream_idx
};
144 ret
= objpool_get(tq
->obj_pool
, &elem
.obj
);
148 tq
->obj_move(elem
.obj
, data
);
150 ret
= av_fifo_write(tq
->fifo
, &elem
, 1);
151 av_assert0(ret
>= 0);
152 pthread_cond_broadcast(&tq
->cond
);
156 pthread_mutex_unlock(&tq
->lock
);
161 static int receive_locked(ThreadQueue
*tq
, int *stream_idx
,
165 unsigned int nb_finished
= 0;
167 while (av_fifo_read(tq
->fifo
, &elem
, 1) >= 0) {
168 if (tq
->finished
[elem
.stream_idx
] & FINISHED_RECV
) {
169 objpool_release(tq
->obj_pool
, &elem
.obj
);
173 tq
->obj_move(data
, elem
.obj
);
174 objpool_release(tq
->obj_pool
, &elem
.obj
);
175 *stream_idx
= elem
.stream_idx
;
179 for (unsigned int i
= 0; i
< tq
->nb_streams
; i
++) {
180 if (!tq
->finished
[i
])
183 /* return EOF to the consumer at most once for each stream */
184 if (!(tq
->finished
[i
] & FINISHED_RECV
)) {
185 tq
->finished
[i
] |= FINISHED_RECV
;
193 return nb_finished
== tq
->nb_streams
? AVERROR_EOF
: AVERROR(EAGAIN
);
196 int tq_receive(ThreadQueue
*tq
, int *stream_idx
, void *data
)
202 pthread_mutex_lock(&tq
->lock
);
205 size_t can_read
= av_fifo_can_read(tq
->fifo
);
207 ret
= receive_locked(tq
, stream_idx
, data
);
209 // signal other threads if the fifo state changed
210 if (can_read
!= av_fifo_can_read(tq
->fifo
))
211 pthread_cond_broadcast(&tq
->cond
);
213 if (ret
== AVERROR(EAGAIN
)) {
214 pthread_cond_wait(&tq
->cond
, &tq
->lock
);
221 pthread_mutex_unlock(&tq
->lock
);
226 void tq_send_finish(ThreadQueue
*tq
, unsigned int stream_idx
)
228 av_assert0(stream_idx
< tq
->nb_streams
);
230 pthread_mutex_lock(&tq
->lock
);
232 /* mark the stream as send-finished;
233 * next time the consumer thread tries to read this stream it will get
234 * an EOF and recv-finished flag will be set */
235 tq
->finished
[stream_idx
] |= FINISHED_SEND
;
236 pthread_cond_broadcast(&tq
->cond
);
238 pthread_mutex_unlock(&tq
->lock
);
241 void tq_receive_finish(ThreadQueue
*tq
, unsigned int stream_idx
)
243 av_assert0(stream_idx
< tq
->nb_streams
);
245 pthread_mutex_lock(&tq
->lock
);
247 /* mark the stream as recv-finished;
248 * next time the producer thread tries to send for this stream, it will
249 * get an EOF and send-finished flag will be set */
250 tq
->finished
[stream_idx
] |= FINISHED_RECV
;
251 pthread_cond_broadcast(&tq
->cond
);
253 pthread_mutex_unlock(&tq
->lock
);