bump product version to 5.0.4.1
[LibreOffice.git] / comphelper / source / misc / threadpool.cxx
blob77e9962c6e9ccdeb944c235f010a97972526769e
1 /* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2 /*
3 * This file is part of the LibreOffice project.
5 * This Source Code Form is subject to the terms of the Mozilla Public
6 * License, v. 2.0. If a copy of the MPL was not distributed with this
7 * file, You can obtain one at http://mozilla.org/MPL/2.0/.
8 */
10 #include <comphelper/threadpool.hxx>
12 #include <rtl/instance.hxx>
13 #include <boost/shared_ptr.hpp>
14 #include <thread>
15 #include <algorithm>
17 namespace comphelper {
19 class ThreadPool::ThreadWorker : public salhelper::Thread
21 ThreadPool *mpPool;
22 osl::Condition maNewWork;
23 bool mbWorking;
24 public:
26 ThreadWorker( ThreadPool *pPool ) :
27 salhelper::Thread("thread-pool"),
28 mpPool( pPool ),
29 mbWorking( false )
33 virtual void execute() SAL_OVERRIDE
35 ThreadTask *pTask;
36 while ( ( pTask = waitForWork() ) )
38 pTask->doWork();
39 delete pTask;
43 ThreadTask *waitForWork()
45 ThreadTask *pRet = NULL;
47 osl::ResettableMutexGuard aGuard( mpPool->maGuard );
49 pRet = mpPool->popWork();
51 while( !pRet )
53 if (mbWorking)
54 mpPool->stopWork();
55 mbWorking = false;
56 maNewWork.reset();
58 if( mpPool->mbTerminate )
59 break;
61 aGuard.clear(); // unlock
63 maNewWork.wait();
65 aGuard.reset(); // lock
67 pRet = mpPool->popWork();
70 if (pRet)
72 if (!mbWorking)
73 mpPool->startWork();
74 mbWorking = true;
77 return pRet;
80 // Why a condition per worker thread - you may ask.
82 // Unfortunately the Windows synchronisation API that we wrap
83 // is horribly inadequate cf.
84 // http://www.cs.wustl.edu/~schmidt/win32-cv-1.html
85 // The existing osl::Condition API should only ever be used
86 // between one producer and one consumer thread to avoid the
87 // lost wakeup problem.
89 void signalNewWork()
91 maNewWork.set();
95 ThreadPool::ThreadPool( sal_Int32 nWorkers ) :
96 mnThreadsWorking( 0 ),
97 mbTerminate( false )
99 for( sal_Int32 i = 0; i < nWorkers; i++ )
100 maWorkers.push_back( new ThreadWorker( this ) );
102 maTasksComplete.set();
104 osl::MutexGuard aGuard( maGuard );
105 for( size_t i = 0; i < maWorkers.size(); i++ )
106 maWorkers[ i ]->launch();
109 ThreadPool::~ThreadPool()
111 waitAndCleanupWorkers();
114 struct ThreadPoolStatic : public rtl::StaticWithInit< boost::shared_ptr< ThreadPool >,
115 ThreadPoolStatic >
117 boost::shared_ptr< ThreadPool > operator () () {
118 sal_Int32 nThreads = std::max( std::thread::hardware_concurrency(), 1U );
119 return boost::shared_ptr< ThreadPool >( new ThreadPool( nThreads ) );
123 ThreadPool& ThreadPool::getSharedOptimalPool()
125 return *ThreadPoolStatic::get().get();
128 void ThreadPool::waitAndCleanupWorkers()
130 waitUntilEmpty();
132 osl::ResettableMutexGuard aGuard( maGuard );
133 mbTerminate = true;
135 while( !maWorkers.empty() )
137 rtl::Reference< ThreadWorker > xWorker = maWorkers.back();
138 maWorkers.pop_back();
139 assert(std::find(maWorkers.begin(), maWorkers.end(), xWorker)
140 == maWorkers.end());
141 xWorker->signalNewWork();
142 aGuard.clear();
143 { // unlocked
144 xWorker->join();
145 xWorker.clear();
147 aGuard.reset();
151 void ThreadPool::pushTask( ThreadTask *pTask )
153 osl::MutexGuard aGuard( maGuard );
154 maTasks.insert( maTasks.begin(), pTask );
156 // horrible beyond belief:
157 for( size_t i = 0; i < maWorkers.size(); i++ )
158 maWorkers[ i ]->signalNewWork();
159 maTasksComplete.reset();
162 ThreadTask *ThreadPool::popWork()
164 if( !maTasks.empty() )
166 ThreadTask *pTask = maTasks.back();
167 maTasks.pop_back();
168 return pTask;
170 else
171 return NULL;
174 void ThreadPool::startWork()
176 mnThreadsWorking++;
179 void ThreadPool::stopWork()
181 assert( mnThreadsWorking > 0 );
182 if ( --mnThreadsWorking == 0 )
183 maTasksComplete.set();
186 void ThreadPool::waitUntilEmpty()
188 osl::ResettableMutexGuard aGuard( maGuard );
190 if( maWorkers.empty() )
191 { // no threads at all -> execute the work in-line
192 ThreadTask *pTask;
193 while ( ( pTask = popWork() ) )
195 pTask->doWork();
196 delete pTask;
199 else
201 aGuard.clear();
202 maTasksComplete.wait();
203 aGuard.reset();
205 assert( maTasks.empty() );
208 } // namespace comphelper
210 /* vim:set shiftwidth=4 softtabstop=4 expandtab: */