Version 7.6.3.2-android, tag libreoffice-7.6.3.2-android
[LibreOffice.git] / comphelper / source / misc / threadpool.cxx
blobf0a71eb05168ff9bd638cc219141423de58c720f
1 /* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2 /*
3 * This file is part of the LibreOffice project.
5 * This Source Code Form is subject to the terms of the Mozilla Public
6 * License, v. 2.0. If a copy of the MPL was not distributed with this
7 * file, You can obtain one at http://mozilla.org/MPL/2.0/.
8 */
10 #include <comphelper/threadpool.hxx>
12 #include <com/sun/star/uno/Exception.hpp>
13 #include <config_options.h>
14 #include <o3tl/safeint.hxx>
15 #include <sal/config.h>
16 #include <sal/log.hxx>
17 #include <salhelper/thread.hxx>
18 #include <algorithm>
19 #include <memory>
20 #include <thread>
21 #include <chrono>
22 #include <cstddef>
23 #include <comphelper/debuggerinfo.hxx>
24 #include <utility>
26 #if defined HAVE_VALGRIND_HEADERS
27 #include <valgrind/memcheck.h>
28 #endif
30 #if defined(_WIN32)
31 #define WIN32_LEAN_AND_MEAN
32 #include <windows.h>
33 #endif
35 namespace comphelper {
37 /** prevent waiting for a task from inside a task */
38 #if defined DBG_UTIL && (defined LINUX || defined _WIN32)
39 static thread_local bool gbIsWorkerThread;
40 #endif
42 // used to group thread-tasks for waiting in waitTillDone()
43 class ThreadTaskTag
45 std::mutex maMutex;
46 sal_Int32 mnTasksWorking;
47 std::condition_variable maTasksComplete;
49 public:
50 ThreadTaskTag();
51 bool isDone();
52 void waitUntilDone();
53 void onTaskWorkerDone();
54 void onTaskPushed();
58 class ThreadPool::ThreadWorker : public salhelper::Thread
60 ThreadPool *mpPool;
61 public:
63 explicit ThreadWorker( ThreadPool *pPool ) :
64 salhelper::Thread("thread-pool"),
65 mpPool( pPool )
69 virtual void execute() override
71 #if defined DBG_UTIL && (defined LINUX || defined _WIN32)
72 gbIsWorkerThread = true;
73 #endif
74 std::unique_lock< std::mutex > aGuard( mpPool->maMutex );
76 while( !mpPool->mbTerminate )
78 std::unique_ptr<ThreadTask> pTask = mpPool->popWorkLocked( aGuard, true );
79 if( pTask )
81 std::shared_ptr<ThreadTaskTag> pTag(pTask->mpTag);
82 mpPool->incBusyWorker();
83 aGuard.unlock();
85 pTask->exec();
86 pTask.reset();
88 aGuard.lock();
89 mpPool->decBusyWorker();
90 pTag->onTaskWorkerDone();
96 ThreadPool::ThreadPool(std::size_t nWorkers)
97 : mbTerminate(true)
98 , mnMaxWorkers(nWorkers)
99 , mnBusyWorkers(0)
103 ThreadPool::~ThreadPool()
105 // note: calling shutdown from global variable dtor blocks forever on Win7
106 // note2: there isn't enough MSVCRT left on exit to call assert() properly
107 // so these asserts just print something to stderr but exit status is
108 // still 0, but hopefully they will be more helpful on non-WNT platforms
109 assert(mbTerminate);
110 assert(maTasks.empty());
111 assert(mnBusyWorkers == 0);
114 namespace {
116 std::shared_ptr< ThreadPool >& GetStaticThreadPool()
118 static std::shared_ptr< ThreadPool > POOL =
119 []()
121 const std::size_t nThreads = ThreadPool::getPreferredConcurrency();
122 return std::make_shared< ThreadPool >( nThreads );
123 }();
124 return POOL;
129 ThreadPool& ThreadPool::getSharedOptimalPool()
131 return *GetStaticThreadPool();
134 std::size_t ThreadPool::getPreferredConcurrency()
136 static std::size_t ThreadCount = []()
138 const std::size_t nHardThreads = o3tl::clamp_to_unsigned<std::size_t>(
139 std::max(std::thread::hardware_concurrency(), 1U));
140 std::size_t nThreads = nHardThreads;
141 const char *pEnv = getenv("MAX_CONCURRENCY");
142 if (pEnv != nullptr)
144 // Override with user/admin preference.
145 nThreads = o3tl::clamp_to_unsigned<std::size_t>(rtl_str_toInt32(pEnv, 10));
148 nThreads = std::min(nHardThreads, nThreads);
149 return std::max<std::size_t>(nThreads, 1);
150 }();
152 return ThreadCount;
155 // Used to order shutdown, and to ensure there are no lingering
156 // threads after LibreOfficeKit pre-init.
157 void ThreadPool::shutdown()
159 // if (mbTerminate)
160 // return;
162 std::unique_lock< std::mutex > aGuard( maMutex );
163 shutdownLocked(aGuard);
166 void ThreadPool::shutdownLocked(std::unique_lock<std::mutex>& aGuard)
168 if( maWorkers.empty() )
169 { // no threads at all -> execute the work in-line
170 std::unique_ptr<ThreadTask> pTask;
171 while ( ( pTask = popWorkLocked(aGuard, false) ) )
173 std::shared_ptr<ThreadTaskTag> pTag(pTask->mpTag);
174 pTask->exec();
175 pTag->onTaskWorkerDone();
178 else
180 while( !maTasks.empty() )
182 maTasksChanged.wait( aGuard );
183 // In the (unlikely but possible?) case pushTask() gets called meanwhile,
184 // its notify_one() call is meant to wake a up a thread and process the task.
185 // But if this code gets woken up instead, it could lead to a deadlock.
186 // Pass on the notification.
187 maTasksChanged.notify_one();
190 assert( maTasks.empty() );
192 // coverity[missing_lock] - on purpose
193 mbTerminate = true;
195 maTasksChanged.notify_all();
197 decltype(maWorkers) aWorkers;
198 std::swap(maWorkers, aWorkers);
199 aGuard.unlock();
201 while (!aWorkers.empty())
203 rtl::Reference<ThreadWorker> xWorker = aWorkers.back();
204 aWorkers.pop_back();
205 assert(std::find(aWorkers.begin(), aWorkers.end(), xWorker)
206 == aWorkers.end());
208 xWorker->join();
209 xWorker.clear();
214 void ThreadPool::pushTask( std::unique_ptr<ThreadTask> pTask )
216 std::scoped_lock< std::mutex > aGuard( maMutex );
218 mbTerminate = false;
220 // Worked on tasks are already removed from maTasks, so include the count of busy workers.
221 if (maWorkers.size() < mnMaxWorkers && maWorkers.size() <= maTasks.size() + mnBusyWorkers)
223 maWorkers.push_back( new ThreadWorker( this ) );
224 maWorkers.back()->launch();
227 pTask->mpTag->onTaskPushed();
228 maTasks.insert( maTasks.begin(), std::move(pTask) );
230 maTasksChanged.notify_one();
233 std::unique_ptr<ThreadTask> ThreadPool::popWorkLocked( std::unique_lock< std::mutex > & rGuard, bool bWait )
237 if( !maTasks.empty() )
239 std::unique_ptr<ThreadTask> pTask = std::move(maTasks.back());
240 maTasks.pop_back();
241 return pTask;
243 else if (!bWait || mbTerminate)
244 return nullptr;
246 maTasksChanged.wait( rGuard );
248 } while (!mbTerminate);
250 return nullptr;
253 void ThreadPool::incBusyWorker()
255 ++mnBusyWorkers;
258 void ThreadPool::decBusyWorker()
260 assert(mnBusyWorkers >= 1);
261 --mnBusyWorkers;
264 void ThreadPool::waitUntilDone(const std::shared_ptr<ThreadTaskTag>& rTag, bool bJoin)
266 #if defined DBG_UTIL && (defined LINUX || defined _WIN32)
267 assert(!gbIsWorkerThread && "cannot wait for tasks from inside a task");
268 #endif
270 std::unique_lock< std::mutex > aGuard( maMutex );
272 if( maWorkers.empty() )
273 { // no threads at all -> execute the work in-line
274 while (!rTag->isDone())
276 std::unique_ptr<ThreadTask> pTask = popWorkLocked(aGuard, false);
277 if (!pTask)
278 break;
279 std::shared_ptr<ThreadTaskTag> pTag(pTask->mpTag);
280 pTask->exec();
281 pTag->onTaskWorkerDone();
286 rTag->waitUntilDone();
288 if (bJoin)
289 joinThreadsIfIdle();
292 void ThreadPool::joinThreadsIfIdle()
294 std::unique_lock< std::mutex > aGuard( maMutex );
295 if (isIdle()) // check if there are still tasks from another tag
297 shutdownLocked(aGuard);
301 std::shared_ptr<ThreadTaskTag> ThreadPool::createThreadTaskTag()
303 return std::make_shared<ThreadTaskTag>();
306 bool ThreadPool::isTaskTagDone(const std::shared_ptr<ThreadTaskTag>& pTag)
308 return pTag->isDone();
311 ThreadTask::ThreadTask(std::shared_ptr<ThreadTaskTag> xTag)
312 : mpTag(std::move(xTag))
316 void ThreadTask::exec()
318 try {
319 doWork();
321 catch (const std::exception &e)
323 SAL_WARN("comphelper", "exception in thread worker while calling doWork(): " << e.what());
325 catch (const css::uno::Exception &e)
327 SAL_WARN("comphelper", "exception in thread worker while calling doWork(): " << e);
329 catch (...)
331 SAL_WARN("comphelper", "unknown exception in thread worker while calling doWork()");
335 ThreadTaskTag::ThreadTaskTag() : mnTasksWorking(0)
339 void ThreadTaskTag::onTaskPushed()
341 std::scoped_lock< std::mutex > aGuard( maMutex );
342 mnTasksWorking++;
343 assert( mnTasksWorking < 65536 ); // sanity checking
346 void ThreadTaskTag::onTaskWorkerDone()
348 std::scoped_lock< std::mutex > aGuard( maMutex );
349 mnTasksWorking--;
350 assert(mnTasksWorking >= 0);
351 if (mnTasksWorking == 0)
352 maTasksComplete.notify_all();
355 bool ThreadTaskTag::isDone()
357 std::scoped_lock< std::mutex > aGuard( maMutex );
358 return mnTasksWorking == 0;
361 void ThreadTaskTag::waitUntilDone()
363 std::unique_lock< std::mutex > aGuard( maMutex );
364 while( mnTasksWorking > 0 )
366 #if defined DBG_UTIL && !defined NDEBUG
367 // 10 minute timeout in debug mode, unless the code is built with
368 // sanitizers or debugged in valgrind or gdb, in which case the threads
369 // should not time out in the middle of a debugging session
370 int maxTimeout = 10 * 60;
371 #if !ENABLE_RUNTIME_OPTIMIZATIONS
372 maxTimeout = 30 * 60;
373 #endif
374 #if defined HAVE_VALGRIND_HEADERS
375 if( RUNNING_ON_VALGRIND )
376 maxTimeout = 30 * 60;
377 #endif
378 if( isDebuggerAttached())
379 maxTimeout = 300 * 60;
380 std::cv_status result = maTasksComplete.wait_for(
381 aGuard, std::chrono::seconds( maxTimeout ));
382 assert(result != std::cv_status::timeout);
383 #else
384 // 10 minute timeout in production so the app eventually throws some kind of error
385 if (maTasksComplete.wait_for(
386 aGuard, std::chrono::seconds( 10 * 60 )) == std::cv_status::timeout)
387 throw std::runtime_error("timeout waiting for threadpool tasks");
388 #endif
392 } // namespace comphelper
394 /* vim:set shiftwidth=4 softtabstop=4 expandtab: */