add missing break
[vapoursynth-svn.git] / src / core / vsthreadpool.cpp
blobdf9cc67830664401284ca32e622cf7cfe4982d21
1 /*
2 * Copyright (c) 2012 Fredrik Mellbin
4 * This file is part of VapourSynth.
6 * VapourSynth is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU Lesser General Public
8 * License as published by the Free Software Foundation; either
9 * version 2.1 of the License, or (at your option) any later version.
11 * VapourSynth is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * Lesser General Public License for more details.
16 * You should have received a copy of the GNU Lesser General Public
17 * License along with Libav; if not, write to the Free Software
18 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
21 #include "vscore.h"
23 VSThread::VSThread(VSThreadPool *owner) : owner(owner), stop(false) {
27 void VSThread::stopThread() {
28 stop = true;
31 void VSThread::run() {
32 owner->lock.lock();
33 owner->activeThreads++;
35 while (true) {
36 bool ranTask = false;
38 for (int i = 0; i < owner->tasks.count(); i++) {
39 PFrameContext rCtx = owner->tasks[i];
41 if (rCtx->frameDone && rCtx->returnedFrame) {
42 owner->tasks.removeAt(i--);
43 owner->returnFrame(rCtx, rCtx->returnedFrame);
44 ranTask = true;
45 break;
46 } else {
48 PFrameContext pCtx = rCtx;
50 if (rCtx->returnedFrame || rCtx->hasError())
51 rCtx = rCtx->upstreamContext;
53 Q_ASSERT(rCtx);
54 Q_ASSERT(pCtx);
56 // if an error has already been flagged upstream simply drop this task so a filter won't get multiple arError calls for the same frame
57 if (rCtx->hasError()) {
58 owner->tasks.removeAt(i--);
59 continue;
62 bool isSingleInstance = rCtx->clip->filterMode == fmSerial || rCtx->clip->filterMode == fmParallelRequests;
64 // this check is common for both filter modes, it makes sure that multiple calls won't be made in parallel to a single filter to produce the same frame
65 // special casing so serial unordered doesn't need yet another list
66 if (owner->runningTasks.contains(FrameKey(rCtx->clip, rCtx->clip->filterMode == fmUnordered ? -1 : rCtx->n)))
67 continue;
69 if (isSingleInstance) {
70 // this is the complicated case, a new frame may not be started until all calls are completed for the current one
71 if (owner->framesInProgress.contains(rCtx->clip) && owner->framesInProgress[rCtx->clip] != rCtx->n)
72 continue;
75 // mark task as active
76 owner->tasks.removeAt(i--);
77 owner->runningTasks.insert(FrameKey(rCtx->clip, rCtx->clip->filterMode == fmUnordered ? -1 : rCtx->n), rCtx);
79 if ((rCtx->clip->filterMode == fmParallelRequests && pCtx->returnedFrame) || (rCtx->clip->filterMode == fmSerial))
80 owner->framesInProgress.insert(rCtx->clip, rCtx->n);
82 owner->lock.unlock();
83 // run task
85 ActivationReason ar = arInitial;
87 if (pCtx->hasError()) {
88 ar = arError;
89 rCtx->setError(pCtx->getErrorMessage());
90 } else if (pCtx != rCtx && pCtx->returnedFrame) {
91 if (rCtx->numFrameRequests.deref())
92 ar = arFrameReady;
93 else
94 ar = arAllFramesReady;
96 Q_ASSERT(rCtx->numFrameRequests >= 0);
97 rCtx->availableFrames.insert(FrameKey(pCtx->clip, pCtx->n), pCtx->returnedFrame);
98 rCtx->lastCompletedN = pCtx->n;
99 rCtx->lastCompletedNode = pCtx->node;
102 PVideoFrame f = rCtx->clip->getFrameInternal(rCtx->n, ar, rCtx);
103 ranTask = true;
104 owner->lock.lock();
106 if (f && rCtx->numFrameRequests > 0)
107 qFatal("Frame returned but there are still pending frame requests, filter: " + rCtx->clip->name);
110 PFrameContext a = pCtx;
111 char b[100];
112 if (a->returnedFrame)
113 a = a->upstreamContext;
114 if (rCtx->clip->name == "FFVideoSource") {
115 sprintf(b, "src:\t%d\n", a->n);
116 OutputDebugStringA(b);
117 } else if (f) {
118 //sprintf(b, "%s:\t%d\t%d\t%d\n", rCtx->clip->name.constData(), ((int)a->clip) & 0xff, a->n, (int)((bool)f));
119 //OutputDebugStringA(b);
123 owner->runningTasks.remove(FrameKey(rCtx->clip, rCtx->clip->filterMode == fmUnordered ? -1 : rCtx->n));
125 if (f || ar == arError || rCtx->hasError()) {
126 // free all input frames quickly since the frame processing is done
127 rCtx->availableFrames.clear();
129 if (isSingleInstance) {
130 if (owner->framesInProgress[rCtx->clip] != rCtx->n && !rCtx->hasError())
131 qWarning("Releasing unobtained frame lock");
133 owner->framesInProgress.remove(rCtx->clip);
136 owner->allContexts.remove(FrameKey(rCtx->clip, rCtx->n));
139 if (rCtx->hasError()) {
140 PFrameContext n;
142 do {
143 n = rCtx->notificationChain;
145 if (n) {
146 rCtx->notificationChain.clear();
147 n->setError(rCtx->getErrorMessage());
150 if (rCtx->upstreamContext) {
151 owner->startInternal(rCtx);
154 if (rCtx->frameDone) {
155 owner->lock.unlock();
156 QMutexLocker callbackLock(&owner->callbackLock);
157 rCtx->frameDone(rCtx->userData, NULL, rCtx->n, rCtx->node, rCtx->getErrorMessage().constData());
158 callbackLock.unlock();
159 owner->lock.lock();
161 } while ((rCtx = n));
162 } else if (f) {
163 Q_ASSERT(rCtx->numFrameRequests == 0);
164 PFrameContext n;
166 do {
167 n = rCtx->notificationChain;
169 if (n)
170 rCtx->notificationChain.clear();
172 if (rCtx->upstreamContext) {
173 rCtx->returnedFrame = f;
174 owner->startInternal(rCtx);
177 if (rCtx->frameDone)
178 owner->returnFrame(rCtx, f);
179 } while ((rCtx = n));
180 } else if (rCtx->numFrameRequests > 0 || rCtx->n < 0) {
181 // already scheduled or in the case of negative n it is simply a cache notify message
182 } else {
183 qFatal("No frame returned at the end of processing by " + rCtx->clip->name);
186 break;
190 if (!ranTask && !stop) {
191 owner->activeThreads--;
192 owner->newWork.wait(&owner->lock);
193 owner->activeThreads++;
194 } else if (stop) {
195 owner->activeThreads--;
196 owner->lock.unlock();
197 return;
202 VSThreadPool::VSThreadPool(VSCore *core, int threadCount) : core(core), activeThreads(0) {
203 setMaxThreadCount(threadCount);
206 int VSThreadPool::activeThreadCount() const {
207 return activeThreads;
210 int VSThreadPool::threadCount() const {
211 return allThreads.count();
214 void VSThreadPool::setMaxThreadCount(int threadCount) {
215 QMutexLocker m(&lock);
217 while (threadCount > allThreads.count()) {
218 VSThread *vst = new VSThread(this);
219 allThreads.insert(vst);
220 vst->start();
223 while (threadCount < allThreads.count()) {
224 VSThread *t = *allThreads.begin();
225 t->stopThread();
226 newWork.wakeAll();
227 m.unlock();
228 t->wait();
229 m.relock();
230 allThreads.remove(t);
231 delete t;
232 newWork.wakeAll();
236 void VSThreadPool::wakeThread() {
237 if (activeThreads < threadCount())
238 newWork.wakeOne();
241 void VSThreadPool::releaseThread() {
242 setMaxThreadCount(allThreads.count() + 1);
245 void VSThreadPool::reserveThread() {
246 setMaxThreadCount(allThreads.count() - 1);
249 void VSThreadPool::notifyCaches(CacheActivation reason) {
250 for (int i = 0; i < core->caches.count(); i++)
251 tasks.append(PFrameContext(new FrameContext(reason, core->caches[i], PFrameContext())));
254 void VSThreadPool::start(const PFrameContext &context) {
255 Q_ASSERT(context);
256 QMutexLocker m(&lock);
257 startInternal(context);
260 void VSThreadPool::returnFrame(const PFrameContext &rCtx, const PVideoFrame &f) {
261 Q_ASSERT(rCtx->frameDone);
262 // we need to unlock here so the callback may request more frames without causing a deadlock
263 // AND so that slow callbacks will only block operations in this thread, not all the others
264 lock.unlock();
265 VSFrameRef *ref = new VSFrameRef(f);
266 QMutexLocker m(&callbackLock);
267 rCtx->frameDone(rCtx->userData, ref, rCtx->n, rCtx->node, NULL);
268 m.unlock();
269 lock.lock();
272 void VSThreadPool::startInternal(const PFrameContext &context) {
273 //technically this could be done by walking up the context chain and add a new notification to the correct one
274 //unfortunately this would probably be quite slow for deep scripts so just hope the cache catches it
276 if (context->n < 0)
277 qFatal("Negative frame request by: " + context->clip->name);
279 // check to see if it's time to reevaluate cache sizes
280 if (!context->upstreamContext && ticks.fetchAndAddAcquire(1) == 99) {
281 ticks = 0;
282 notifyCaches(cCacheTick);
285 // add it immediately if the task is to return a completed frame
286 if (context->returnedFrame) {
287 tasks.append(context);
288 wakeThread();
289 return;
290 } else {
291 if (context->upstreamContext)
292 context->upstreamContext->numFrameRequests.ref();
294 ////////////////////////
295 // see if the task is a duplicate
296 foreach(const PFrameContext & ctx, tasks) {
297 if (context->clip == ctx->clip && context->n == ctx->n) {
298 if (ctx->returnedFrame) {
299 // special case where the requested frame is encountered "by accident"
300 context->returnedFrame = ctx->returnedFrame;
301 tasks.append(context);
302 wakeThread();
303 return;
304 } else {
305 PFrameContext rCtx = ctx;
307 if (rCtx->returnedFrame)
308 rCtx = rCtx->upstreamContext;
310 if (context->clip == rCtx->clip && context->n == rCtx->n) {
311 PFrameContext t = rCtx;
313 while (t && t->notificationChain)
314 t = t->notificationChain;
316 t->notificationChain = context;
317 return;
323 FrameKey p(context->clip, context->n);
325 if (runningTasks.contains(p)) {
326 PFrameContext ctx = runningTasks[p];
327 Q_ASSERT(ctx);
329 if (ctx->returnedFrame) {
330 // special case where the requested frame is encountered "by accident"
331 context->returnedFrame = ctx->returnedFrame;
332 tasks.append(context);
333 wakeThread();
334 return;
335 } else {
336 PFrameContext rCtx = ctx;
338 if (rCtx->returnedFrame)
339 rCtx = rCtx->upstreamContext;
341 if (context->clip == rCtx->clip && context->n == rCtx->n) {
342 PFrameContext t = rCtx;
344 while (t && t->notificationChain)
345 t = t->notificationChain;
347 t->notificationChain = context;
348 return;
353 if (allContexts.contains(p)) {
354 PFrameContext ctx = allContexts[p];
355 Q_ASSERT(ctx);
356 Q_ASSERT(context->clip == ctx->clip && context->n == ctx->n);
358 if (ctx->returnedFrame) {
359 // special case where the requested frame is encountered "by accident"
360 context->returnedFrame = ctx->returnedFrame;
361 tasks.append(context);
362 wakeThread();
363 return;
364 } else {
365 while (ctx->notificationChain)
366 ctx = ctx->notificationChain;
368 ctx->notificationChain = context;
369 return;
371 } else {
372 allContexts[p] = context;
375 tasks.append(context);
376 wakeThread();
377 return;
382 void VSThreadPool::waitForDone() {
386 VSThreadPool::~VSThreadPool() {
387 setMaxThreadCount(0);