Fix the reporting of the number of threads
[vapoursynth-svn.git] / src / core / vsthreadpool.cpp
blobabc17fc7579d583d7216cd3bbd900b6bc3f415c1
1 /*
2 * Copyright (c) 2012 Fredrik Mellbin
4 * This file is part of VapourSynth.
6 * VapourSynth is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU Lesser General Public
8 * License as published by the Free Software Foundation; either
9 * version 2.1 of the License, or (at your option) any later version.
11 * VapourSynth is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * Lesser General Public License for more details.
16 * You should have received a copy of the GNU Lesser General Public
17 * License along with Libav; if not, write to the Free Software
18 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
21 #include "vscore.h"
23 VSThread::VSThread(VSThreadPool *owner) : owner(owner), stop(false) {
27 void VSThread::stopThread() {
28 stop = true;
31 void VSThread::run() {
32 owner->lock.lock();
33 owner->activeThreads++;
35 while (true) {
36 bool ranTask = false;
38 for (int i = 0; i < owner->tasks.count(); i++) {
39 PFrameContext rCtx = owner->tasks[i];
41 if (rCtx->frameDone && rCtx->returnedFrame) {
42 owner->tasks.removeAt(i--);
43 owner->returnFrame(rCtx, rCtx->returnedFrame);
44 ranTask = true;
45 break;
46 } else {
48 PFrameContext pCtx = rCtx;
50 if (rCtx->returnedFrame || rCtx->hasError())
51 rCtx = rCtx->upstreamContext;
53 Q_ASSERT(rCtx);
54 Q_ASSERT(pCtx);
56 // if an error has already been flagged upstream simply drop this task so a filter won't get multiple arError calls for the same frame
57 if (rCtx->hasError()) {
58 owner->tasks.removeAt(i--);
59 continue;
62 bool isSingleInstance = (rCtx->clip->filterMode == fmParallelRequests && pCtx->returnedFrame && rCtx->numFrameRequests == 1 && pCtx != rCtx) || rCtx->clip->filterMode == fmSerial;
64 // this check is common for both filter modes, it makes sure that multiple calls won't be made in parallel to a single filter to produce the same frame
65 // special casing so serial unordered doesn't need yet another list
66 if (owner->runningTasks.contains(FrameKey(rCtx->clip, rCtx->clip->filterMode == fmUnordered ? -1 : rCtx->n)))
67 continue;
69 if (isSingleInstance) {
70 // this is the complicated case, a new frame may not be started until all calls are completed for the current one
71 if (owner->framesInProgress.contains(rCtx->clip) && owner->framesInProgress[rCtx->clip] != rCtx->n)
72 continue;
75 // mark task as active
76 owner->tasks.removeAt(i--);
77 owner->runningTasks.insert(FrameKey(rCtx->clip, rCtx->clip->filterMode == fmUnordered ? -1 : rCtx->n), rCtx);
79 if (isSingleInstance)
80 owner->framesInProgress.insert(rCtx->clip, rCtx->n);
82 ActivationReason ar = arInitial;
84 if (pCtx->hasError()) {
85 ar = arError;
86 rCtx->setError(pCtx->getErrorMessage());
87 } else if (pCtx != rCtx && pCtx->returnedFrame) {
88 if (--rCtx->numFrameRequests)
89 ar = arFrameReady;
90 else
91 ar = arAllFramesReady;
93 Q_ASSERT(rCtx->numFrameRequests >= 0);
94 rCtx->availableFrames.insert(FrameKey(pCtx->clip, pCtx->n), pCtx->returnedFrame);
95 rCtx->lastCompletedN = pCtx->n;
96 rCtx->lastCompletedNode = pCtx->node;
99 owner->lock.unlock();
100 // run task
102 PVideoFrame f = rCtx->clip->getFrameInternal(rCtx->n, ar, rCtx);
103 ranTask = true;
104 owner->lock.lock();
106 if (f && rCtx->numFrameRequests > 0)
107 qFatal("Frame returned but there are still pending frame requests, filter: " + rCtx->clip->name);
109 owner->runningTasks.remove(FrameKey(rCtx->clip, rCtx->clip->filterMode == fmUnordered ? -1 : rCtx->n));
111 if (f || ar == arError || rCtx->hasError()) {
112 // free all input frames quickly since the frame processing is done
113 rCtx->availableFrames.clear();
115 if (isSingleInstance) {
116 if (owner->framesInProgress[rCtx->clip] != rCtx->n && !rCtx->hasError())
117 qWarning("Releasing unobtained frame lock");
119 owner->framesInProgress.remove(rCtx->clip);
122 owner->allContexts.remove(FrameKey(rCtx->clip, rCtx->n));
125 if (rCtx->hasError()) {
126 PFrameContext n;
128 do {
129 n = rCtx->notificationChain;
131 if (n) {
132 rCtx->notificationChain.clear();
133 n->setError(rCtx->getErrorMessage());
136 if (rCtx->upstreamContext) {
137 owner->startInternal(rCtx);
140 if (rCtx->frameDone) {
141 owner->lock.unlock();
142 QMutexLocker callbackLock(&owner->callbackLock);
143 rCtx->frameDone(rCtx->userData, NULL, rCtx->n, rCtx->node, rCtx->getErrorMessage().constData());
144 callbackLock.unlock();
145 owner->lock.lock();
147 } while ((rCtx = n));
148 } else if (f) {
149 Q_ASSERT(rCtx->numFrameRequests == 0);
150 PFrameContext n;
152 do {
153 n = rCtx->notificationChain;
155 if (n)
156 rCtx->notificationChain.clear();
158 if (rCtx->upstreamContext) {
159 rCtx->returnedFrame = f;
160 owner->startInternal(rCtx);
163 if (rCtx->frameDone)
164 owner->returnFrame(rCtx, f);
165 } while ((rCtx = n));
166 } else if (rCtx->numFrameRequests > 0 || rCtx->n < 0) {
167 // already scheduled or in the case of negative n it is simply a cache notify message
168 } else {
169 qFatal("No frame returned at the end of processing by " + rCtx->clip->name);
172 break;
176 if (!ranTask && !stop) {
177 owner->activeThreads--;
178 owner->newWork.wait(&owner->lock);
179 owner->activeThreads++;
180 } else if (stop) {
181 owner->activeThreads--;
182 owner->lock.unlock();
183 return;
188 VSThreadPool::VSThreadPool(VSCore *core, int *threads) : core(core), activeThreads(0) {
189 int usedThreads = QThread::idealThreadCount();
190 if (threads && *threads > 0)
191 usedThreads = *threads;
192 else if (threads)
193 *threads = usedThreads;
194 setMaxThreadCount(usedThreads);
197 int VSThreadPool::activeThreadCount() const {
198 return activeThreads;
201 int VSThreadPool::threadCount() const {
202 return allThreads.count();
205 void VSThreadPool::setMaxThreadCount(int threadCount) {
206 QMutexLocker m(&lock);
208 while (threadCount > allThreads.count()) {
209 VSThread *vst = new VSThread(this);
210 allThreads.insert(vst);
211 vst->start();
214 while (threadCount < allThreads.count()) {
215 VSThread *t = *allThreads.begin();
216 t->stopThread();
217 newWork.wakeAll();
218 m.unlock();
219 t->wait();
220 m.relock();
221 allThreads.remove(t);
222 delete t;
223 newWork.wakeAll();
227 void VSThreadPool::wakeThread() {
228 if (activeThreads < threadCount())
229 newWork.wakeOne();
232 void VSThreadPool::releaseThread() {
233 setMaxThreadCount(allThreads.count() + 1);
236 void VSThreadPool::reserveThread() {
237 setMaxThreadCount(allThreads.count() - 1);
240 void VSThreadPool::notifyCaches(CacheActivation reason) {
241 for (int i = 0; i < core->caches.count(); i++)
242 tasks.append(PFrameContext(new FrameContext(reason, core->caches[i], PFrameContext())));
245 void VSThreadPool::start(const PFrameContext &context) {
246 Q_ASSERT(context);
247 QMutexLocker m(&lock);
248 startInternal(context);
251 void VSThreadPool::returnFrame(const PFrameContext &rCtx, const PVideoFrame &f) {
252 Q_ASSERT(rCtx->frameDone);
253 // we need to unlock here so the callback may request more frames without causing a deadlock
254 // AND so that slow callbacks will only block operations in this thread, not all the others
255 lock.unlock();
256 VSFrameRef *ref = new VSFrameRef(f);
257 QMutexLocker m(&callbackLock);
258 rCtx->frameDone(rCtx->userData, ref, rCtx->n, rCtx->node, NULL);
259 m.unlock();
260 lock.lock();
263 void VSThreadPool::startInternal(const PFrameContext &context) {
264 //technically this could be done by walking up the context chain and add a new notification to the correct one
265 //unfortunately this would probably be quite slow for deep scripts so just hope the cache catches it
267 if (context->n < 0)
268 qFatal("Negative frame request by: " + context->clip->name);
270 // check to see if it's time to reevaluate cache sizes
271 if (core->memory->memoryUse() > 1024*1024*1024) {
272 ticks = 0;
273 notifyCaches(cNeedMemory);
275 if (!context->upstreamContext && ticks.fetchAndAddAcquire(1) == 99) {
276 ticks = 0;
277 notifyCaches(cCacheTick);
280 // add it immediately if the task is to return a completed frame
281 if (context->returnedFrame) {
282 tasks.append(context);
283 wakeThread();
284 return;
285 } else {
286 if (context->upstreamContext)
287 context->upstreamContext->numFrameRequests++;
289 ////////////////////////
290 // see if the task is a duplicate
291 foreach(const PFrameContext & ctx, tasks) {
292 if (context->clip == ctx->clip && context->n == ctx->n) {
293 if (ctx->returnedFrame) {
294 // special case where the requested frame is encountered "by accident"
295 context->returnedFrame = ctx->returnedFrame;
296 tasks.append(context);
297 wakeThread();
298 return;
299 } else {
300 PFrameContext rCtx = ctx;
302 if (rCtx->returnedFrame)
303 rCtx = rCtx->upstreamContext;
305 if (context->clip == rCtx->clip && context->n == rCtx->n) {
306 PFrameContext t = rCtx;
308 while (t && t->notificationChain)
309 t = t->notificationChain;
311 t->notificationChain = context;
312 return;
318 FrameKey p(context->clip, context->n);
320 if (runningTasks.contains(p)) {
321 PFrameContext ctx = runningTasks[p];
322 Q_ASSERT(ctx);
324 if (ctx->returnedFrame) {
325 // special case where the requested frame is encountered "by accident"
326 context->returnedFrame = ctx->returnedFrame;
327 tasks.append(context);
328 wakeThread();
329 return;
330 } else {
331 PFrameContext rCtx = ctx;
333 if (rCtx->returnedFrame)
334 rCtx = rCtx->upstreamContext;
336 if (context->clip == rCtx->clip && context->n == rCtx->n) {
337 PFrameContext t = rCtx;
339 while (t && t->notificationChain)
340 t = t->notificationChain;
342 t->notificationChain = context;
343 return;
348 if (allContexts.contains(p)) {
349 PFrameContext ctx = allContexts[p];
350 Q_ASSERT(ctx);
351 Q_ASSERT(context->clip == ctx->clip && context->n == ctx->n);
353 if (ctx->returnedFrame) {
354 // special case where the requested frame is encountered "by accident"
355 context->returnedFrame = ctx->returnedFrame;
356 tasks.append(context);
357 wakeThread();
358 return;
359 } else {
360 while (ctx->notificationChain)
361 ctx = ctx->notificationChain;
363 ctx->notificationChain = context;
364 return;
366 } else {
367 allContexts[p] = context;
370 tasks.append(context);
371 wakeThread();
372 return;
377 void VSThreadPool::waitForDone() {
381 VSThreadPool::~VSThreadPool() {
382 setMaxThreadCount(0);