2 * Copyright (c) 2012 Fredrik Mellbin
4 * This file is part of VapourSynth.
6 * VapourSynth is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU Lesser General Public
8 * License as published by the Free Software Foundation; either
9 * version 2.1 of the License, or (at your option) any later version.
11 * VapourSynth is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * Lesser General Public License for more details.
16 * You should have received a copy of the GNU Lesser General Public
17 * License along with Libav; if not, write to the Free Software
18 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
23 VSThread::VSThread(VSThreadPool
*owner
) : owner(owner
), stop(false) {
27 void VSThread::stopThread() {
31 void VSThread::run() {
33 owner
->activeThreads
++;
38 for (int i
= 0; i
< owner
->tasks
.count(); i
++) {
39 PFrameContext rCtx
= owner
->tasks
[i
];
41 if (rCtx
->frameDone
&& rCtx
->returnedFrame
) {
42 owner
->tasks
.removeAt(i
--);
43 owner
->returnFrame(rCtx
, rCtx
->returnedFrame
);
48 PFrameContext pCtx
= rCtx
;
50 if (rCtx
->returnedFrame
|| rCtx
->hasError())
51 rCtx
= rCtx
->upstreamContext
;
56 // if an error has already been flagged upstream simply drop this task so a filter won't get multiple arError calls for the same frame
57 if (rCtx
->hasError()) {
58 owner
->tasks
.removeAt(i
--);
62 bool isSingleInstance
= rCtx
->clip
->filterMode
== fmSerial
|| rCtx
->clip
->filterMode
== fmParallelRequests
;
64 // this check is common for both filter modes, it makes sure that multiple calls won't be made in parallel to a single filter to produce the same frame
65 // special casing so serial unordered doesn't need yet another list
66 if (owner
->runningTasks
.contains(FrameKey(rCtx
->clip
, rCtx
->clip
->filterMode
== fmUnordered
? -1 : rCtx
->n
)))
69 if (isSingleInstance
) {
70 // this is the complicated case, a new frame may not be started until all calls are completed for the current one
71 if (owner
->framesInProgress
.contains(rCtx
->clip
) && owner
->framesInProgress
[rCtx
->clip
] != rCtx
->n
)
75 // mark task as active
76 owner
->tasks
.removeAt(i
--);
77 owner
->runningTasks
.insert(FrameKey(rCtx
->clip
, rCtx
->clip
->filterMode
== fmUnordered
? -1 : rCtx
->n
), rCtx
);
79 if ((rCtx
->clip
->filterMode
== fmParallelRequests
&& pCtx
->returnedFrame
) || (rCtx
->clip
->filterMode
== fmSerial
))
80 owner
->framesInProgress
.insert(rCtx
->clip
, rCtx
->n
);
85 ActivationReason ar
= arInitial
;
87 if (pCtx
->hasError()) {
89 rCtx
->setError(pCtx
->getErrorMessage());
90 } else if (pCtx
!= rCtx
&& pCtx
->returnedFrame
) {
91 if (rCtx
->numFrameRequests
.deref())
94 ar
= arAllFramesReady
;
96 Q_ASSERT(rCtx
->numFrameRequests
>= 0);
97 rCtx
->availableFrames
.insert(FrameKey(pCtx
->clip
, pCtx
->n
), pCtx
->returnedFrame
);
98 rCtx
->lastCompletedN
= pCtx
->n
;
99 rCtx
->lastCompletedNode
= pCtx
->node
;
102 PVideoFrame f
= rCtx
->clip
->getFrameInternal(rCtx
->n
, ar
, rCtx
);
106 if (f
&& rCtx
->numFrameRequests
> 0)
107 qFatal("Frame returned but there are still pending frame requests, filter: " + rCtx
->clip
->name
);
110 PFrameContext a
= pCtx
;
112 if (a
->returnedFrame
)
113 a
= a
->upstreamContext
;
114 if (rCtx
->clip
->name
== "FFVideoSource") {
115 sprintf(b
, "src:\t%d\n", a
->n
);
116 OutputDebugStringA(b
);
118 //sprintf(b, "%s:\t%d\t%d\t%d\n", rCtx->clip->name.constData(), ((int)a->clip) & 0xff, a->n, (int)((bool)f));
119 //OutputDebugStringA(b);
123 owner
->runningTasks
.remove(FrameKey(rCtx
->clip
, rCtx
->clip
->filterMode
== fmUnordered
? -1 : rCtx
->n
));
125 if (f
|| ar
== arError
|| rCtx
->hasError()) {
126 // free all input frames quickly since the frame processing is done
127 rCtx
->availableFrames
.clear();
129 if (isSingleInstance
) {
130 if (owner
->framesInProgress
[rCtx
->clip
] != rCtx
->n
&& !rCtx
->hasError())
131 qWarning("Releasing unobtained frame lock");
133 owner
->framesInProgress
.remove(rCtx
->clip
);
136 owner
->allContexts
.remove(FrameKey(rCtx
->clip
, rCtx
->n
));
139 if (rCtx
->hasError()) {
143 n
= rCtx
->notificationChain
;
146 rCtx
->notificationChain
.clear();
147 n
->setError(rCtx
->getErrorMessage());
150 if (rCtx
->upstreamContext
) {
151 owner
->startInternal(rCtx
);
154 if (rCtx
->frameDone
) {
155 owner
->lock
.unlock();
156 QMutexLocker
callbackLock(&owner
->callbackLock
);
157 rCtx
->frameDone(rCtx
->userData
, NULL
, rCtx
->n
, rCtx
->node
, rCtx
->getErrorMessage().constData());
158 callbackLock
.unlock();
161 } while ((rCtx
= n
));
163 Q_ASSERT(rCtx
->numFrameRequests
== 0);
167 n
= rCtx
->notificationChain
;
170 rCtx
->notificationChain
.clear();
172 if (rCtx
->upstreamContext
) {
173 rCtx
->returnedFrame
= f
;
174 owner
->startInternal(rCtx
);
178 owner
->returnFrame(rCtx
, f
);
179 } while ((rCtx
= n
));
180 } else if (rCtx
->numFrameRequests
> 0 || rCtx
->n
< 0) {
181 // already scheduled or in the case of negative n it is simply a cache notify message
183 qFatal("No frame returned at the end of processing by " + rCtx
->clip
->name
);
190 if (!ranTask
&& !stop
) {
191 owner
->activeThreads
--;
192 owner
->newWork
.wait(&owner
->lock
);
193 owner
->activeThreads
++;
195 owner
->activeThreads
--;
196 owner
->lock
.unlock();
202 VSThreadPool::VSThreadPool(VSCore
*core
, int threadCount
) : core(core
), activeThreads(0) {
203 setMaxThreadCount(threadCount
);
206 int VSThreadPool::activeThreadCount() const {
207 return activeThreads
;
210 int VSThreadPool::threadCount() const {
211 return allThreads
.count();
214 void VSThreadPool::setMaxThreadCount(int threadCount
) {
215 QMutexLocker
m(&lock
);
217 while (threadCount
> allThreads
.count()) {
218 VSThread
*vst
= new VSThread(this);
219 allThreads
.insert(vst
);
223 while (threadCount
< allThreads
.count()) {
224 VSThread
*t
= *allThreads
.begin();
230 allThreads
.remove(t
);
236 void VSThreadPool::wakeThread() {
237 if (activeThreads
< threadCount())
241 void VSThreadPool::releaseThread() {
242 setMaxThreadCount(allThreads
.count() + 1);
245 void VSThreadPool::reserveThread() {
246 setMaxThreadCount(allThreads
.count() - 1);
249 void VSThreadPool::notifyCaches(CacheActivation reason
) {
250 for (int i
= 0; i
< core
->caches
.count(); i
++)
251 tasks
.append(PFrameContext(new FrameContext(reason
, core
->caches
[i
], PFrameContext())));
254 void VSThreadPool::start(const PFrameContext
&context
) {
256 QMutexLocker
m(&lock
);
257 startInternal(context
);
260 void VSThreadPool::returnFrame(const PFrameContext
&rCtx
, const PVideoFrame
&f
) {
261 Q_ASSERT(rCtx
->frameDone
);
262 // we need to unlock here so the callback may request more frames without causing a deadlock
263 // AND so that slow callbacks will only block operations in this thread, not all the others
265 VSFrameRef
*ref
= new VSFrameRef(f
);
266 QMutexLocker
m(&callbackLock
);
267 rCtx
->frameDone(rCtx
->userData
, ref
, rCtx
->n
, rCtx
->node
, NULL
);
272 void VSThreadPool::startInternal(const PFrameContext
&context
) {
273 //technically this could be done by walking up the context chain and add a new notification to the correct one
274 //unfortunately this would probably be quite slow for deep scripts so just hope the cache catches it
277 qFatal("Negative frame request by: " + context
->clip
->name
);
279 // check to see if it's time to reevaluate cache sizes
280 if (!context
->upstreamContext
&& ticks
.fetchAndAddAcquire(1) == 99) {
282 notifyCaches(cCacheTick
);
285 // add it immediately if the task is to return a completed frame
286 if (context
->returnedFrame
) {
287 tasks
.append(context
);
291 if (context
->upstreamContext
)
292 context
->upstreamContext
->numFrameRequests
.ref();
294 ////////////////////////
295 // see if the task is a duplicate
296 foreach(const PFrameContext
& ctx
, tasks
) {
297 if (context
->clip
== ctx
->clip
&& context
->n
== ctx
->n
) {
298 if (ctx
->returnedFrame
) {
299 // special case where the requested frame is encountered "by accident"
300 context
->returnedFrame
= ctx
->returnedFrame
;
301 tasks
.append(context
);
305 PFrameContext rCtx
= ctx
;
307 if (rCtx
->returnedFrame
)
308 rCtx
= rCtx
->upstreamContext
;
310 if (context
->clip
== rCtx
->clip
&& context
->n
== rCtx
->n
) {
311 PFrameContext t
= rCtx
;
313 while (t
&& t
->notificationChain
)
314 t
= t
->notificationChain
;
316 t
->notificationChain
= context
;
323 FrameKey
p(context
->clip
, context
->n
);
325 if (runningTasks
.contains(p
)) {
326 PFrameContext ctx
= runningTasks
[p
];
329 if (ctx
->returnedFrame
) {
330 // special case where the requested frame is encountered "by accident"
331 context
->returnedFrame
= ctx
->returnedFrame
;
332 tasks
.append(context
);
336 PFrameContext rCtx
= ctx
;
338 if (rCtx
->returnedFrame
)
339 rCtx
= rCtx
->upstreamContext
;
341 if (context
->clip
== rCtx
->clip
&& context
->n
== rCtx
->n
) {
342 PFrameContext t
= rCtx
;
344 while (t
&& t
->notificationChain
)
345 t
= t
->notificationChain
;
347 t
->notificationChain
= context
;
353 if (allContexts
.contains(p
)) {
354 PFrameContext ctx
= allContexts
[p
];
356 Q_ASSERT(context
->clip
== ctx
->clip
&& context
->n
== ctx
->n
);
358 if (ctx
->returnedFrame
) {
359 // special case where the requested frame is encountered "by accident"
360 context
->returnedFrame
= ctx
->returnedFrame
;
361 tasks
.append(context
);
365 while (ctx
->notificationChain
)
366 ctx
= ctx
->notificationChain
;
368 ctx
->notificationChain
= context
;
372 allContexts
[p
] = context
;
375 tasks
.append(context
);
382 void VSThreadPool::waitForDone() {
386 VSThreadPool::~VSThreadPool() {
387 setMaxThreadCount(0);