2 * This file is part of Libav.
4 * Libav is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Lesser General Public
6 * License as published by the Free Software Foundation; either
7 * version 2.1 of the License, or (at your option) any later version.
9 * Libav is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 * Lesser General Public License for more details.
14 * You should have received a copy of the GNU Lesser General Public
15 * License along with Libav; if not, write to the Free Software
16 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
21 * Libavfilter multithreading support
26 #include "libavutil/common.h"
27 #include "libavutil/cpu.h"
28 #include "libavutil/mem.h"
37 #include "compat/w32pthreads.h"
40 typedef struct ThreadContext
{
45 avfilter_action_func
*func
;
47 /* per-execute parameters */
54 pthread_cond_t last_job_cond
;
55 pthread_cond_t current_job_cond
;
56 pthread_mutex_t current_job_lock
;
58 unsigned int current_execute
;
62 static void* attribute_align_arg
worker(void *v
)
65 int our_job
= c
->nb_jobs
;
66 int nb_threads
= c
->nb_threads
;
67 unsigned int last_execute
= 0;
70 pthread_mutex_lock(&c
->current_job_lock
);
71 self_id
= c
->current_job
++;
73 while (our_job
>= c
->nb_jobs
) {
74 if (c
->current_job
== nb_threads
+ c
->nb_jobs
)
75 pthread_cond_signal(&c
->last_job_cond
);
77 while (last_execute
== c
->current_execute
&& !c
->done
)
78 pthread_cond_wait(&c
->current_job_cond
, &c
->current_job_lock
);
79 last_execute
= c
->current_execute
;
83 pthread_mutex_unlock(&c
->current_job_lock
);
87 pthread_mutex_unlock(&c
->current_job_lock
);
89 c
->rets
[our_job
% c
->nb_rets
] = c
->func(c
->ctx
, c
->arg
, our_job
, c
->nb_jobs
);
91 pthread_mutex_lock(&c
->current_job_lock
);
92 our_job
= c
->current_job
++;
96 static void slice_thread_uninit(ThreadContext
*c
)
100 pthread_mutex_lock(&c
->current_job_lock
);
102 pthread_cond_broadcast(&c
->current_job_cond
);
103 pthread_mutex_unlock(&c
->current_job_lock
);
105 for (i
= 0; i
< c
->nb_threads
; i
++)
106 pthread_join(c
->workers
[i
], NULL
);
108 pthread_mutex_destroy(&c
->current_job_lock
);
109 pthread_cond_destroy(&c
->current_job_cond
);
110 pthread_cond_destroy(&c
->last_job_cond
);
111 av_freep(&c
->workers
);
114 static void slice_thread_park_workers(ThreadContext
*c
)
116 while (c
->current_job
!= c
->nb_threads
+ c
->nb_jobs
)
117 pthread_cond_wait(&c
->last_job_cond
, &c
->current_job_lock
);
118 pthread_mutex_unlock(&c
->current_job_lock
);
121 static int thread_execute(AVFilterContext
*ctx
, avfilter_action_func
*func
,
122 void *arg
, int *ret
, int nb_jobs
)
124 ThreadContext
*c
= ctx
->graph
->internal
->thread
;
130 pthread_mutex_lock(&c
->current_job_lock
);
132 c
->current_job
= c
->nb_threads
;
133 c
->nb_jobs
= nb_jobs
;
139 c
->nb_rets
= nb_jobs
;
141 c
->rets
= &dummy_ret
;
144 c
->current_execute
++;
146 pthread_cond_broadcast(&c
->current_job_cond
);
148 slice_thread_park_workers(c
);
153 static int thread_init_internal(ThreadContext
*c
, int nb_threads
)
158 int nb_cpus
= av_cpu_count();
159 av_log(c
->graph
, AV_LOG_DEBUG
, "Detected %d logical cores.\n", nb_cpus
);
160 // use number of cores + 1 as thread count if there is more than one
162 nb_threads
= nb_cpus
+ 1;
170 c
->nb_threads
= nb_threads
;
171 c
->workers
= av_mallocz(sizeof(*c
->workers
) * nb_threads
);
173 return AVERROR(ENOMEM
);
179 pthread_cond_init(&c
->current_job_cond
, NULL
);
180 pthread_cond_init(&c
->last_job_cond
, NULL
);
182 pthread_mutex_init(&c
->current_job_lock
, NULL
);
183 pthread_mutex_lock(&c
->current_job_lock
);
184 for (i
= 0; i
< nb_threads
; i
++) {
185 ret
= pthread_create(&c
->workers
[i
], NULL
, worker
, c
);
187 pthread_mutex_unlock(&c
->current_job_lock
);
189 slice_thread_uninit(c
);
194 slice_thread_park_workers(c
);
196 return c
->nb_threads
;
199 int ff_graph_thread_init(AVFilterGraph
*graph
)
203 if (graph
->nb_threads
== 1) {
204 graph
->thread_type
= 0;
208 graph
->internal
->thread
= av_mallocz(sizeof(ThreadContext
));
209 if (!graph
->internal
->thread
)
210 return AVERROR(ENOMEM
);
212 ret
= thread_init_internal(graph
->internal
->thread
, graph
->nb_threads
);
214 av_freep(&graph
->internal
->thread
);
215 graph
->thread_type
= 0;
216 graph
->nb_threads
= 1;
217 return (ret
< 0) ? ret
: 0;
219 graph
->nb_threads
= ret
;
221 graph
->internal
->thread_execute
= thread_execute
;
226 void ff_graph_thread_free(AVFilterGraph
*graph
)
228 if (graph
->internal
->thread
)
229 slice_thread_uninit(graph
->internal
->thread
);
230 av_freep(&graph
->internal
->thread
);