1 /* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
3 * This file is part of the LibreOffice project.
5 * This Source Code Form is subject to the terms of the Mozilla Public
6 * License, v. 2.0. If a copy of the MPL was not distributed with this
7 * file, You can obtain one at http://mozilla.org/MPL/2.0/.
10 #include <comphelper/threadpool.hxx>
12 #include <com/sun/star/uno/Exception.hpp>
13 #include <config_options.h>
14 #include <o3tl/safeint.hxx>
15 #include <sal/config.h>
16 #include <sal/log.hxx>
17 #include <salhelper/thread.hxx>
23 #include <comphelper/debuggerinfo.hxx>
26 #if defined HAVE_VALGRIND_HEADERS
27 #include <valgrind/memcheck.h>
31 #define WIN32_LEAN_AND_MEAN
35 namespace comphelper
{
37 /** prevent waiting for a task from inside a task */
38 #if defined DBG_UTIL && (defined LINUX || defined _WIN32)
39 static thread_local
bool gbIsWorkerThread
;
42 // used to group thread-tasks for waiting in waitTillDone()
46 sal_Int32 mnTasksWorking
;
47 std::condition_variable maTasksComplete
;
53 void onTaskWorkerDone();
58 class ThreadPool::ThreadWorker
: public salhelper::Thread
63 explicit ThreadWorker( ThreadPool
*pPool
) :
64 salhelper::Thread("thread-pool"),
69 virtual void execute() override
71 #if defined DBG_UTIL && (defined LINUX || defined _WIN32)
72 gbIsWorkerThread
= true;
74 std::unique_lock
< std::mutex
> aGuard( mpPool
->maMutex
);
76 while( !mpPool
->mbTerminate
)
78 std::unique_ptr
<ThreadTask
> pTask
= mpPool
->popWorkLocked( aGuard
, true );
81 std::shared_ptr
<ThreadTaskTag
> pTag(pTask
->mpTag
);
82 mpPool
->incBusyWorker();
89 mpPool
->decBusyWorker();
90 pTag
->onTaskWorkerDone();
96 ThreadPool::ThreadPool(std::size_t nWorkers
)
98 , mnMaxWorkers(nWorkers
)
103 ThreadPool::~ThreadPool()
105 // note: calling shutdown from global variable dtor blocks forever on Win7
106 // note2: there isn't enough MSVCRT left on exit to call assert() properly
107 // so these asserts just print something to stderr but exit status is
108 // still 0, but hopefully they will be more helpful on non-WNT platforms
110 assert(maTasks
.empty());
111 assert(mnBusyWorkers
== 0);
116 std::shared_ptr
< ThreadPool
>& GetStaticThreadPool()
118 static std::shared_ptr
< ThreadPool
> POOL
=
121 const std::size_t nThreads
= ThreadPool::getPreferredConcurrency();
122 return std::make_shared
< ThreadPool
>( nThreads
);
129 ThreadPool
& ThreadPool::getSharedOptimalPool()
131 return *GetStaticThreadPool();
134 std::size_t ThreadPool::getPreferredConcurrency()
136 static std::size_t ThreadCount
= []()
138 const std::size_t nHardThreads
= o3tl::clamp_to_unsigned
<std::size_t>(
139 std::max(std::thread::hardware_concurrency(), 1U));
140 std::size_t nThreads
= nHardThreads
;
141 const char *pEnv
= getenv("MAX_CONCURRENCY");
144 // Override with user/admin preference.
145 nThreads
= o3tl::clamp_to_unsigned
<std::size_t>(rtl_str_toInt32(pEnv
, 10));
148 nThreads
= std::min(nHardThreads
, nThreads
);
149 return std::max
<std::size_t>(nThreads
, 1);
155 // Used to order shutdown, and to ensure there are no lingering
156 // threads after LibreOfficeKit pre-init.
157 void ThreadPool::shutdown()
162 std::unique_lock
< std::mutex
> aGuard( maMutex
);
163 shutdownLocked(aGuard
);
166 void ThreadPool::shutdownLocked(std::unique_lock
<std::mutex
>& aGuard
)
168 if( maWorkers
.empty() )
169 { // no threads at all -> execute the work in-line
170 std::unique_ptr
<ThreadTask
> pTask
;
171 while ( ( pTask
= popWorkLocked(aGuard
, false) ) )
173 std::shared_ptr
<ThreadTaskTag
> pTag(pTask
->mpTag
);
175 pTag
->onTaskWorkerDone();
180 while( !maTasks
.empty() )
182 maTasksChanged
.wait( aGuard
);
183 // In the (unlikely but possible?) case pushTask() gets called meanwhile,
184 // its notify_one() call is meant to wake a up a thread and process the task.
185 // But if this code gets woken up instead, it could lead to a deadlock.
186 // Pass on the notification.
187 maTasksChanged
.notify_one();
190 assert( maTasks
.empty() );
192 // coverity[missing_lock] - on purpose
195 maTasksChanged
.notify_all();
197 decltype(maWorkers
) aWorkers
;
198 std::swap(maWorkers
, aWorkers
);
201 while (!aWorkers
.empty())
203 rtl::Reference
<ThreadWorker
> xWorker
= aWorkers
.back();
205 assert(std::find(aWorkers
.begin(), aWorkers
.end(), xWorker
)
214 void ThreadPool::pushTask( std::unique_ptr
<ThreadTask
> pTask
)
216 std::scoped_lock
< std::mutex
> aGuard( maMutex
);
220 // Worked on tasks are already removed from maTasks, so include the count of busy workers.
221 if (maWorkers
.size() < mnMaxWorkers
&& maWorkers
.size() <= maTasks
.size() + mnBusyWorkers
)
223 maWorkers
.push_back( new ThreadWorker( this ) );
224 maWorkers
.back()->launch();
227 pTask
->mpTag
->onTaskPushed();
228 maTasks
.insert( maTasks
.begin(), std::move(pTask
) );
230 maTasksChanged
.notify_one();
233 std::unique_ptr
<ThreadTask
> ThreadPool::popWorkLocked( std::unique_lock
< std::mutex
> & rGuard
, bool bWait
)
237 if( !maTasks
.empty() )
239 std::unique_ptr
<ThreadTask
> pTask
= std::move(maTasks
.back());
243 else if (!bWait
|| mbTerminate
)
246 maTasksChanged
.wait( rGuard
);
248 } while (!mbTerminate
);
253 void ThreadPool::incBusyWorker()
258 void ThreadPool::decBusyWorker()
260 assert(mnBusyWorkers
>= 1);
264 void ThreadPool::waitUntilDone(const std::shared_ptr
<ThreadTaskTag
>& rTag
, bool bJoin
)
266 #if defined DBG_UTIL && (defined LINUX || defined _WIN32)
267 assert(!gbIsWorkerThread
&& "cannot wait for tasks from inside a task");
270 std::unique_lock
< std::mutex
> aGuard( maMutex
);
272 if( maWorkers
.empty() )
273 { // no threads at all -> execute the work in-line
274 while (!rTag
->isDone())
276 std::unique_ptr
<ThreadTask
> pTask
= popWorkLocked(aGuard
, false);
279 std::shared_ptr
<ThreadTaskTag
> pTag(pTask
->mpTag
);
281 pTag
->onTaskWorkerDone();
286 rTag
->waitUntilDone();
292 void ThreadPool::joinThreadsIfIdle()
294 std::unique_lock
< std::mutex
> aGuard( maMutex
);
295 if (isIdle()) // check if there are still tasks from another tag
297 shutdownLocked(aGuard
);
301 std::shared_ptr
<ThreadTaskTag
> ThreadPool::createThreadTaskTag()
303 return std::make_shared
<ThreadTaskTag
>();
306 bool ThreadPool::isTaskTagDone(const std::shared_ptr
<ThreadTaskTag
>& pTag
)
308 return pTag
->isDone();
311 ThreadTask::ThreadTask(std::shared_ptr
<ThreadTaskTag
> xTag
)
312 : mpTag(std::move(xTag
))
316 void ThreadTask::exec()
321 catch (const std::exception
&e
)
323 SAL_WARN("comphelper", "exception in thread worker while calling doWork(): " << e
.what());
325 catch (const css::uno::Exception
&e
)
327 SAL_WARN("comphelper", "exception in thread worker while calling doWork(): " << e
);
331 SAL_WARN("comphelper", "unknown exception in thread worker while calling doWork()");
335 ThreadTaskTag::ThreadTaskTag() : mnTasksWorking(0)
339 void ThreadTaskTag::onTaskPushed()
341 std::scoped_lock
< std::mutex
> aGuard( maMutex
);
343 assert( mnTasksWorking
< 65536 ); // sanity checking
346 void ThreadTaskTag::onTaskWorkerDone()
348 std::scoped_lock
< std::mutex
> aGuard( maMutex
);
350 assert(mnTasksWorking
>= 0);
351 if (mnTasksWorking
== 0)
352 maTasksComplete
.notify_all();
355 bool ThreadTaskTag::isDone()
357 std::scoped_lock
< std::mutex
> aGuard( maMutex
);
358 return mnTasksWorking
== 0;
361 void ThreadTaskTag::waitUntilDone()
363 std::unique_lock
< std::mutex
> aGuard( maMutex
);
364 while( mnTasksWorking
> 0 )
366 #if defined DBG_UTIL && !defined NDEBUG
367 // 10 minute timeout in debug mode, unless the code is built with
368 // sanitizers or debugged in valgrind or gdb, in which case the threads
369 // should not time out in the middle of a debugging session
370 int maxTimeout
= 10 * 60;
371 #if !ENABLE_RUNTIME_OPTIMIZATIONS
372 maxTimeout
= 30 * 60;
374 #if defined HAVE_VALGRIND_HEADERS
375 if( RUNNING_ON_VALGRIND
)
376 maxTimeout
= 30 * 60;
378 if( isDebuggerAttached())
379 maxTimeout
= 300 * 60;
380 std::cv_status result
= maTasksComplete
.wait_for(
381 aGuard
, std::chrono::seconds( maxTimeout
));
382 assert(result
!= std::cv_status::timeout
);
384 // 10 minute timeout in production so the app eventually throws some kind of error
385 if (maTasksComplete
.wait_for(
386 aGuard
, std::chrono::seconds( 10 * 60 )) == std::cv_status::timeout
)
387 throw std::runtime_error("timeout waiting for threadpool tasks");
392 } // namespace comphelper
394 /* vim:set shiftwidth=4 softtabstop=4 expandtab: */