2 ==============================================================================
4 This file is part of the JUCE library - "Jules' Utility Class Extensions"
5 Copyright 2004-11 by Raw Material Software Ltd.
7 ------------------------------------------------------------------------------
9 JUCE can be redistributed and/or modified under the terms of the GNU General
10 Public License (Version 2), as published by the Free Software Foundation.
11 A copy of the license is included in the JUCE distribution, or can be found
12 online at www.gnu.org/licenses.
14 JUCE is distributed in the hope that it will be useful, but WITHOUT ANY
15 WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
16 A PARTICULAR PURPOSE. See the GNU General Public License for more details.
18 ------------------------------------------------------------------------------
20 To release a closed-source product which uses JUCE, commercial licenses are
21 available: visit www.rawmaterialsoftware.com/juce for more information.
23 ==============================================================================
26 #include "../core/juce_StandardHeader.h"
30 #include "juce_ThreadPool.h"
31 #include "../core/juce_Time.h"
34 //==============================================================================
35 ThreadPoolJob::ThreadPoolJob (const String
& name
)
40 shouldBeDeleted (false)
44 ThreadPoolJob::~ThreadPoolJob()
46 // you mustn't delete a job while it's still in a pool! Use ThreadPool::removeJob()
47 // to remove it first!
48 jassert (pool
== nullptr || ! pool
->contains (this));
51 String
ThreadPoolJob::getJobName() const
56 void ThreadPoolJob::setJobName (const String
& newName
)
61 void ThreadPoolJob::signalJobShouldExit()
67 //==============================================================================
68 class ThreadPool::ThreadPoolThread
: public Thread
71 ThreadPoolThread (ThreadPool
& pool_
)
80 while (! threadShouldExit())
82 if (! pool
.runNextJob())
91 JUCE_DECLARE_NON_COPYABLE_WITH_LEAK_DETECTOR (ThreadPoolThread
);
94 //==============================================================================
95 ThreadPool::ThreadPool (const int numThreads
,
96 const bool startThreadsOnlyWhenNeeded
,
97 const int stopThreadsWhenNotUsedTimeoutMs
)
98 : threadStopTimeout (stopThreadsWhenNotUsedTimeoutMs
),
101 jassert (numThreads
> 0); // not much point having one of these with no threads in it.
103 for (int i
= jmax (1, numThreads
); --i
>= 0;)
104 threads
.add (new ThreadPoolThread (*this));
106 if (! startThreadsOnlyWhenNeeded
)
107 for (int i
= threads
.size(); --i
>= 0;)
108 threads
.getUnchecked(i
)->startThread (priority
);
111 ThreadPool::~ThreadPool()
113 removeAllJobs (true, 4000);
116 for (i
= threads
.size(); --i
>= 0;)
117 threads
.getUnchecked(i
)->signalThreadShouldExit();
119 for (i
= threads
.size(); --i
>= 0;)
120 threads
.getUnchecked(i
)->stopThread (500);
123 void ThreadPool::addJob (ThreadPoolJob
* const job
)
125 jassert (job
!= nullptr);
126 jassert (job
->pool
== nullptr);
128 if (job
->pool
== nullptr)
131 job
->shouldStop
= false;
132 job
->isActive
= false;
135 const ScopedLock
sl (lock
);
140 for (int i
= threads
.size(); --i
>= 0;)
141 if (threads
.getUnchecked(i
)->isThreadRunning() && ! threads
.getUnchecked(i
)->threadShouldExit())
144 if (numRunning
< threads
.size())
146 bool startedOne
= false;
149 while (--n
>= 0 && ! startedOne
)
151 for (int i
= threads
.size(); --i
>= 0;)
153 if (! threads
.getUnchecked(i
)->isThreadRunning())
155 threads
.getUnchecked(i
)->startThread (priority
);
167 for (int i
= threads
.size(); --i
>= 0;)
168 threads
.getUnchecked(i
)->notify();
172 int ThreadPool::getNumJobs() const
177 ThreadPoolJob
* ThreadPool::getJob (const int index
) const
179 const ScopedLock
sl (lock
);
183 bool ThreadPool::contains (const ThreadPoolJob
* const job
) const
185 const ScopedLock
sl (lock
);
186 return jobs
.contains (const_cast <ThreadPoolJob
*> (job
));
189 bool ThreadPool::isJobRunning (const ThreadPoolJob
* const job
) const
191 const ScopedLock
sl (lock
);
192 return jobs
.contains (const_cast <ThreadPoolJob
*> (job
)) && job
->isActive
;
195 bool ThreadPool::waitForJobToFinish (const ThreadPoolJob
* const job
,
196 const int timeOutMs
) const
200 const uint32 start
= Time::getMillisecondCounter();
202 while (contains (job
))
204 if (timeOutMs
>= 0 && Time::getMillisecondCounter() >= start
+ timeOutMs
)
207 jobFinishedSignal
.wait (2);
214 bool ThreadPool::removeJob (ThreadPoolJob
* const job
,
215 const bool interruptIfRunning
,
218 bool dontWait
= true;
222 const ScopedLock
sl (lock
);
224 if (jobs
.contains (job
))
228 if (interruptIfRunning
)
229 job
->signalJobShouldExit();
235 jobs
.removeValue (job
);
241 return dontWait
|| waitForJobToFinish (job
, timeOutMs
);
244 bool ThreadPool::removeAllJobs (const bool interruptRunningJobs
,
246 const bool deleteInactiveJobs
,
247 ThreadPool::JobSelector
* selectedJobsToRemove
)
249 Array
<ThreadPoolJob
*> jobsToWaitFor
;
252 const ScopedLock
sl (lock
);
254 for (int i
= jobs
.size(); --i
>= 0;)
256 ThreadPoolJob
* const job
= jobs
.getUnchecked(i
);
258 if (selectedJobsToRemove
== nullptr || selectedJobsToRemove
->isJobSuitable (job
))
262 jobsToWaitFor
.add (job
);
264 if (interruptRunningJobs
)
265 job
->signalJobShouldExit();
271 if (deleteInactiveJobs
)
280 const uint32 start
= Time::getMillisecondCounter();
284 for (int i
= jobsToWaitFor
.size(); --i
>= 0;)
285 if (! isJobRunning (jobsToWaitFor
.getUnchecked (i
)))
286 jobsToWaitFor
.remove (i
);
288 if (jobsToWaitFor
.size() == 0)
291 if (timeOutMs
>= 0 && Time::getMillisecondCounter() >= start
+ timeOutMs
)
294 jobFinishedSignal
.wait (20);
300 StringArray
ThreadPool::getNamesOfAllJobs (const bool onlyReturnActiveJobs
) const
303 const ScopedLock
sl (lock
);
305 for (int i
= 0; i
< jobs
.size(); ++i
)
307 const ThreadPoolJob
* const job
= jobs
.getUnchecked(i
);
308 if (job
->isActive
|| ! onlyReturnActiveJobs
)
309 s
.add (job
->getJobName());
315 bool ThreadPool::setThreadPriorities (const int newPriority
)
319 if (priority
!= newPriority
)
321 priority
= newPriority
;
323 for (int i
= threads
.size(); --i
>= 0;)
324 if (! threads
.getUnchecked(i
)->setPriority (newPriority
))
331 bool ThreadPool::runNextJob()
333 ThreadPoolJob
* job
= nullptr;
336 const ScopedLock
sl (lock
);
338 for (int i
= 0; i
< jobs
.size(); ++i
)
342 if (job
!= nullptr && ! (job
->isActive
|| job
->shouldStop
))
349 job
->isActive
= true;
357 ThreadPoolJob::JobStatus result
= job
->runJob();
359 lastJobEndTime
= Time::getApproximateMillisecondCounter();
361 const ScopedLock
sl (lock
);
363 if (jobs
.contains (job
))
365 job
->isActive
= false;
367 if (result
!= ThreadPoolJob::jobNeedsRunningAgain
|| job
->shouldStop
)
370 job
->shouldStop
= true;
371 jobs
.removeValue (job
);
373 if (result
== ThreadPoolJob::jobHasFinishedAndShouldBeDeleted
)
376 jobFinishedSignal
.signal();
380 // move the job to the end of the queue if it wants another go
381 jobs
.move (jobs
.indexOf (job
), -1);
385 #if JUCE_CATCH_UNHANDLED_EXCEPTIONS
388 const ScopedLock
sl (lock
);
389 jobs
.removeValue (job
);
395 if (threadStopTimeout
> 0
396 && Time::getApproximateMillisecondCounter() > lastJobEndTime
+ threadStopTimeout
)
398 const ScopedLock
sl (lock
);
400 if (jobs
.size() == 0)
401 for (int i
= threads
.size(); --i
>= 0;)
402 threads
.getUnchecked(i
)->signalThreadShouldExit();