2 * Copyright (c) 2012 Fredrik Mellbin
4 * This file is part of VapourSynth.
6 * VapourSynth is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU Lesser General Public
8 * License as published by the Free Software Foundation; either
9 * version 2.1 of the License, or (at your option) any later version.
11 * VapourSynth is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * Lesser General Public License for more details.
16 * You should have received a copy of the GNU Lesser General Public
17 * License along with Libav; if not, write to the Free Software
18 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
23 VSThread::VSThread(VSThreadPool
*owner
) : owner(owner
), stop(false) {
27 void VSThread::stopThread() {
31 void VSThread::run() {
33 owner
->activeThreads
++;
38 for (int i
= 0; i
< owner
->tasks
.count(); i
++) {
39 PFrameContext rCtx
= owner
->tasks
[i
];
41 if (rCtx
->frameDone
&& rCtx
->returnedFrame
) {
42 owner
->tasks
.removeAt(i
--);
43 owner
->returnFrame(rCtx
, rCtx
->returnedFrame
);
48 PFrameContext pCtx
= rCtx
;
50 if (rCtx
->returnedFrame
|| rCtx
->hasError())
51 rCtx
= rCtx
->upstreamContext
;
56 // if an error has already been flagged upstream simply drop this task so a filter won't get multiple arError calls for the same frame
57 if (rCtx
->hasError()) {
58 owner
->tasks
.removeAt(i
--);
62 bool isSingleInstance
= (rCtx
->clip
->filterMode
== fmParallelRequests
&& pCtx
->returnedFrame
&& rCtx
->numFrameRequests
== 1 && pCtx
!= rCtx
) || rCtx
->clip
->filterMode
== fmSerial
;
64 // this check is common for both filter modes, it makes sure that multiple calls won't be made in parallel to a single filter to produce the same frame
65 // special casing so serial unordered doesn't need yet another list
66 if (owner
->runningTasks
.contains(FrameKey(rCtx
->clip
, rCtx
->clip
->filterMode
== fmUnordered
? -1 : rCtx
->n
)))
69 if (isSingleInstance
) {
70 // this is the complicated case, a new frame may not be started until all calls are completed for the current one
71 if (owner
->framesInProgress
.contains(rCtx
->clip
) && owner
->framesInProgress
[rCtx
->clip
] != rCtx
->n
)
75 // mark task as active
76 owner
->tasks
.removeAt(i
--);
77 owner
->runningTasks
.insert(FrameKey(rCtx
->clip
, rCtx
->clip
->filterMode
== fmUnordered
? -1 : rCtx
->n
), rCtx
);
80 owner
->framesInProgress
.insert(rCtx
->clip
, rCtx
->n
);
82 ActivationReason ar
= arInitial
;
84 if (pCtx
->hasError()) {
86 rCtx
->setError(pCtx
->getErrorMessage());
87 } else if (pCtx
!= rCtx
&& pCtx
->returnedFrame
) {
88 if (--rCtx
->numFrameRequests
)
91 ar
= arAllFramesReady
;
93 Q_ASSERT(rCtx
->numFrameRequests
>= 0);
94 rCtx
->availableFrames
.insert(FrameKey(pCtx
->clip
, pCtx
->n
), pCtx
->returnedFrame
);
95 rCtx
->lastCompletedN
= pCtx
->n
;
96 rCtx
->lastCompletedNode
= pCtx
->node
;
102 PVideoFrame f
= rCtx
->clip
->getFrameInternal(rCtx
->n
, ar
, rCtx
);
106 if (f
&& rCtx
->numFrameRequests
> 0)
107 qFatal("Frame returned but there are still pending frame requests, filter: " + rCtx
->clip
->name
);
109 owner
->runningTasks
.remove(FrameKey(rCtx
->clip
, rCtx
->clip
->filterMode
== fmUnordered
? -1 : rCtx
->n
));
111 if (f
|| ar
== arError
|| rCtx
->hasError()) {
112 // free all input frames quickly since the frame processing is done
113 rCtx
->availableFrames
.clear();
115 if (isSingleInstance
) {
116 if (owner
->framesInProgress
[rCtx
->clip
] != rCtx
->n
&& !rCtx
->hasError())
117 qWarning("Releasing unobtained frame lock");
119 owner
->framesInProgress
.remove(rCtx
->clip
);
122 owner
->allContexts
.remove(FrameKey(rCtx
->clip
, rCtx
->n
));
125 if (rCtx
->hasError()) {
129 n
= rCtx
->notificationChain
;
132 rCtx
->notificationChain
.clear();
133 n
->setError(rCtx
->getErrorMessage());
136 if (rCtx
->upstreamContext
) {
137 owner
->startInternal(rCtx
);
140 if (rCtx
->frameDone
) {
141 owner
->lock
.unlock();
142 QMutexLocker
callbackLock(&owner
->callbackLock
);
143 rCtx
->frameDone(rCtx
->userData
, NULL
, rCtx
->n
, rCtx
->node
, rCtx
->getErrorMessage().constData());
144 callbackLock
.unlock();
147 } while ((rCtx
= n
));
149 Q_ASSERT(rCtx
->numFrameRequests
== 0);
153 n
= rCtx
->notificationChain
;
156 rCtx
->notificationChain
.clear();
158 if (rCtx
->upstreamContext
) {
159 rCtx
->returnedFrame
= f
;
160 owner
->startInternal(rCtx
);
164 owner
->returnFrame(rCtx
, f
);
165 } while ((rCtx
= n
));
166 } else if (rCtx
->numFrameRequests
> 0 || rCtx
->n
< 0) {
167 // already scheduled or in the case of negative n it is simply a cache notify message
169 qFatal("No frame returned at the end of processing by " + rCtx
->clip
->name
);
176 if (!ranTask
&& !stop
) {
177 owner
->activeThreads
--;
178 owner
->newWork
.wait(&owner
->lock
);
179 owner
->activeThreads
++;
181 owner
->activeThreads
--;
182 owner
->lock
.unlock();
188 VSThreadPool::VSThreadPool(VSCore
*core
, int *threads
) : core(core
), activeThreads(0) {
189 int usedThreads
= QThread::idealThreadCount();
190 if (threads
&& *threads
> 0)
191 usedThreads
= *threads
;
193 *threads
= usedThreads
;
194 setMaxThreadCount(usedThreads
);
197 int VSThreadPool::activeThreadCount() const {
198 return activeThreads
;
201 int VSThreadPool::threadCount() const {
202 return allThreads
.count();
205 void VSThreadPool::setMaxThreadCount(int threadCount
) {
206 QMutexLocker
m(&lock
);
208 while (threadCount
> allThreads
.count()) {
209 VSThread
*vst
= new VSThread(this);
210 allThreads
.insert(vst
);
214 while (threadCount
< allThreads
.count()) {
215 VSThread
*t
= *allThreads
.begin();
221 allThreads
.remove(t
);
227 void VSThreadPool::wakeThread() {
228 if (activeThreads
< threadCount())
232 void VSThreadPool::releaseThread() {
233 setMaxThreadCount(allThreads
.count() + 1);
236 void VSThreadPool::reserveThread() {
237 setMaxThreadCount(allThreads
.count() - 1);
240 void VSThreadPool::notifyCaches(CacheActivation reason
) {
241 for (int i
= 0; i
< core
->caches
.count(); i
++)
242 tasks
.append(PFrameContext(new FrameContext(reason
, core
->caches
[i
], PFrameContext())));
245 void VSThreadPool::start(const PFrameContext
&context
) {
247 QMutexLocker
m(&lock
);
248 startInternal(context
);
251 void VSThreadPool::returnFrame(const PFrameContext
&rCtx
, const PVideoFrame
&f
) {
252 Q_ASSERT(rCtx
->frameDone
);
253 // we need to unlock here so the callback may request more frames without causing a deadlock
254 // AND so that slow callbacks will only block operations in this thread, not all the others
256 VSFrameRef
*ref
= new VSFrameRef(f
);
257 QMutexLocker
m(&callbackLock
);
258 rCtx
->frameDone(rCtx
->userData
, ref
, rCtx
->n
, rCtx
->node
, NULL
);
263 void VSThreadPool::startInternal(const PFrameContext
&context
) {
264 //technically this could be done by walking up the context chain and add a new notification to the correct one
265 //unfortunately this would probably be quite slow for deep scripts so just hope the cache catches it
268 qFatal("Negative frame request by: " + context
->clip
->name
);
270 // check to see if it's time to reevaluate cache sizes
271 if (core
->memory
->memoryUse() > 1024*1024*1024) {
273 notifyCaches(cNeedMemory
);
275 if (!context
->upstreamContext
&& ticks
.fetchAndAddAcquire(1) == 99) {
277 notifyCaches(cCacheTick
);
280 // add it immediately if the task is to return a completed frame
281 if (context
->returnedFrame
) {
282 tasks
.append(context
);
286 if (context
->upstreamContext
)
287 context
->upstreamContext
->numFrameRequests
++;
289 ////////////////////////
290 // see if the task is a duplicate
291 foreach(const PFrameContext
& ctx
, tasks
) {
292 if (context
->clip
== ctx
->clip
&& context
->n
== ctx
->n
) {
293 if (ctx
->returnedFrame
) {
294 // special case where the requested frame is encountered "by accident"
295 context
->returnedFrame
= ctx
->returnedFrame
;
296 tasks
.append(context
);
300 PFrameContext rCtx
= ctx
;
302 if (rCtx
->returnedFrame
)
303 rCtx
= rCtx
->upstreamContext
;
305 if (context
->clip
== rCtx
->clip
&& context
->n
== rCtx
->n
) {
306 PFrameContext t
= rCtx
;
308 while (t
&& t
->notificationChain
)
309 t
= t
->notificationChain
;
311 t
->notificationChain
= context
;
318 FrameKey
p(context
->clip
, context
->n
);
320 if (runningTasks
.contains(p
)) {
321 PFrameContext ctx
= runningTasks
[p
];
324 if (ctx
->returnedFrame
) {
325 // special case where the requested frame is encountered "by accident"
326 context
->returnedFrame
= ctx
->returnedFrame
;
327 tasks
.append(context
);
331 PFrameContext rCtx
= ctx
;
333 if (rCtx
->returnedFrame
)
334 rCtx
= rCtx
->upstreamContext
;
336 if (context
->clip
== rCtx
->clip
&& context
->n
== rCtx
->n
) {
337 PFrameContext t
= rCtx
;
339 while (t
&& t
->notificationChain
)
340 t
= t
->notificationChain
;
342 t
->notificationChain
= context
;
348 if (allContexts
.contains(p
)) {
349 PFrameContext ctx
= allContexts
[p
];
351 Q_ASSERT(context
->clip
== ctx
->clip
&& context
->n
== ctx
->n
);
353 if (ctx
->returnedFrame
) {
354 // special case where the requested frame is encountered "by accident"
355 context
->returnedFrame
= ctx
->returnedFrame
;
356 tasks
.append(context
);
360 while (ctx
->notificationChain
)
361 ctx
= ctx
->notificationChain
;
363 ctx
->notificationChain
= context
;
367 allContexts
[p
] = context
;
370 tasks
.append(context
);
377 void VSThreadPool::waitForDone() {
381 VSThreadPool::~VSThreadPool() {
382 setMaxThreadCount(0);