Version 7.1.7.1, tag libreoffice-7.1.7.1
[LibreOffice.git] / cppu / source / threadpool / threadpool.cxx
blobc5783dc19989f22804f6684b1782b86a358e5b16
1 /* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2 /*
3 * This file is part of the LibreOffice project.
5 * This Source Code Form is subject to the terms of the Mozilla Public
6 * License, v. 2.0. If a copy of the MPL was not distributed with this
7 * file, You can obtain one at http://mozilla.org/MPL/2.0/.
9 * This file incorporates work covered by the following license notice:
11 * Licensed to the Apache Software Foundation (ASF) under one or more
12 * contributor license agreements. See the NOTICE file distributed
13 * with this work for additional information regarding copyright
14 * ownership. The ASF licenses this file to you under the Apache
15 * License, Version 2.0 (the "License"); you may not use this file
16 * except in compliance with the License. You may obtain a copy of
17 * the License at http://www.apache.org/licenses/LICENSE-2.0 .
20 #include <sal/config.h>
22 #include <cassert>
23 #include <chrono>
24 #include <algorithm>
25 #include <unordered_map>
27 #include <osl/diagnose.h>
28 #include <osl/mutex.hxx>
29 #include <rtl/instance.hxx>
30 #include <sal/log.hxx>
32 #include <uno/threadpool.h>
34 #include "threadpool.hxx"
35 #include "thread.hxx"
37 using namespace ::std;
38 using namespace ::osl;
39 using namespace ::rtl;
41 namespace cppu_threadpool
43 WaitingThread::WaitingThread(
44 rtl::Reference<ORequestThread> const & theThread): thread(theThread)
47 namespace {
49 struct theDisposedCallerAdmin :
50 public rtl::StaticWithInit< DisposedCallerAdminHolder, theDisposedCallerAdmin >
52 DisposedCallerAdminHolder operator () () {
53 return std::make_shared<DisposedCallerAdmin>();
59 DisposedCallerAdminHolder const & DisposedCallerAdmin::getInstance()
61 return theDisposedCallerAdmin::get();
64 DisposedCallerAdmin::~DisposedCallerAdmin()
66 SAL_WARN_IF( !m_vector.empty(), "cppu.threadpool", "DisposedCallerList : " << m_vector.size() << " left");
69 void DisposedCallerAdmin::dispose( void const * nDisposeId )
71 MutexGuard guard( m_mutex );
72 m_vector.push_back( nDisposeId );
75 void DisposedCallerAdmin::destroy( void const * nDisposeId )
77 MutexGuard guard( m_mutex );
78 m_vector.erase(std::remove(m_vector.begin(), m_vector.end(), nDisposeId), m_vector.end());
81 bool DisposedCallerAdmin::isDisposed( void const * nDisposeId )
83 MutexGuard guard( m_mutex );
84 return (std::find(m_vector.begin(), m_vector.end(), nDisposeId) != m_vector.end());
88 ThreadPool::ThreadPool() :
89 m_DisposedCallerAdmin( DisposedCallerAdmin::getInstance() )
93 ThreadPool::~ThreadPool()
95 SAL_WARN_IF( m_mapQueue.size(), "cppu.threadpool", "ThreadIdHashMap: " << m_mapQueue.size() << " left");
98 void ThreadPool::dispose( void const * nDisposeId )
100 m_DisposedCallerAdmin->dispose( nDisposeId );
102 MutexGuard guard( m_mutex );
103 for (auto const& item : m_mapQueue)
105 if( item.second.first )
107 item.second.first->dispose( nDisposeId );
109 if( item.second.second )
111 item.second.second->dispose( nDisposeId );
116 void ThreadPool::destroy( void const * nDisposeId )
118 m_DisposedCallerAdmin->destroy( nDisposeId );
121 /******************
122 * This methods lets the thread wait a certain amount of time. If within this timespan
123 * a new request comes in, this thread is reused. This is done only to improve performance,
124 * it is not required for threadpool functionality.
125 ******************/
126 void ThreadPool::waitInPool( rtl::Reference< ORequestThread > const & pThread )
128 WaitingThread waitingThread(pThread);
130 MutexGuard guard( m_mutexWaitingThreadList );
131 m_dequeThreads.push_front( &waitingThread );
134 // let the thread wait 2 seconds
135 waitingThread.condition.wait( std::chrono::seconds(2) );
138 MutexGuard guard ( m_mutexWaitingThreadList );
139 if( waitingThread.thread.is() )
141 // thread wasn't reused, remove it from the list
142 WaitingThreadDeque::iterator ii = find(
143 m_dequeThreads.begin(), m_dequeThreads.end(), &waitingThread );
144 OSL_ASSERT( ii != m_dequeThreads.end() );
145 m_dequeThreads.erase( ii );
150 void ThreadPool::joinWorkers()
153 MutexGuard guard( m_mutexWaitingThreadList );
154 for (auto const& thread : m_dequeThreads)
156 // wake the threads up
157 thread->condition.set();
160 m_aThreadAdmin.join();
163 bool ThreadPool::createThread( JobQueue *pQueue ,
164 const ByteSequence &aThreadId,
165 bool bAsynchron )
168 // Can a thread be reused ?
169 MutexGuard guard( m_mutexWaitingThreadList );
170 if( ! m_dequeThreads.empty() )
172 // inform the thread and let it go
173 struct WaitingThread *pWaitingThread = m_dequeThreads.back();
174 pWaitingThread->thread->setTask( pQueue , aThreadId , bAsynchron );
175 pWaitingThread->thread = nullptr;
177 // remove from list
178 m_dequeThreads.pop_back();
180 // let the thread go
181 pWaitingThread->condition.set();
182 return true;
186 rtl::Reference pThread(
187 new ORequestThread( this, pQueue , aThreadId, bAsynchron) );
188 return pThread->launch();
191 bool ThreadPool::revokeQueue( const ByteSequence &aThreadId, bool bAsynchron )
193 MutexGuard guard( m_mutex );
195 ThreadIdHashMap::iterator ii = m_mapQueue.find( aThreadId );
196 OSL_ASSERT( ii != m_mapQueue.end() );
198 if( bAsynchron )
200 if( ! (*ii).second.second->isEmpty() )
202 // another thread has put something into the queue
203 return false;
206 (*ii).second.second = nullptr;
207 if( (*ii).second.first )
209 // all oneway request have been processed, now
210 // synchronous requests may go on
211 (*ii).second.first->resume();
214 else
216 if( ! (*ii).second.first->isEmpty() )
218 // another thread has put something into the queue
219 return false;
221 (*ii).second.first = nullptr;
224 if( nullptr == (*ii).second.first && nullptr == (*ii).second.second )
226 m_mapQueue.erase( ii );
229 return true;
233 bool ThreadPool::addJob(
234 const ByteSequence &aThreadId ,
235 bool bAsynchron,
236 void *pThreadSpecificData,
237 RequestFun * doRequest,
238 void const * disposeId )
240 bool bCreateThread = false;
241 JobQueue *pQueue = nullptr;
243 MutexGuard guard( m_mutex );
244 if (m_DisposedCallerAdmin->isDisposed(disposeId)) {
245 return true;
248 ThreadIdHashMap::iterator ii = m_mapQueue.find( aThreadId );
250 if( ii == m_mapQueue.end() )
252 m_mapQueue[ aThreadId ] = pair < JobQueue * , JobQueue * > ( nullptr , nullptr );
253 ii = m_mapQueue.find( aThreadId );
254 OSL_ASSERT( ii != m_mapQueue.end() );
257 if( bAsynchron )
259 if( ! (*ii).second.second )
261 (*ii).second.second = new JobQueue();
262 bCreateThread = true;
264 pQueue = (*ii).second.second;
266 else
268 if( ! (*ii).second.first )
270 (*ii).second.first = new JobQueue();
271 bCreateThread = true;
273 pQueue = (*ii).second.first;
275 if( (*ii).second.second && ( (*ii).second.second->isBusy() ) )
277 pQueue->suspend();
280 pQueue->add( pThreadSpecificData , doRequest );
283 return !bCreateThread || createThread( pQueue , aThreadId , bAsynchron);
286 void ThreadPool::prepare( const ByteSequence &aThreadId )
288 MutexGuard guard( m_mutex );
290 ThreadIdHashMap::iterator ii = m_mapQueue.find( aThreadId );
292 if( ii == m_mapQueue.end() )
294 JobQueue *p = new JobQueue();
295 m_mapQueue[ aThreadId ] = pair< JobQueue * , JobQueue * > ( p , nullptr );
297 else if( nullptr == (*ii).second.first )
299 (*ii).second.first = new JobQueue();
303 void * ThreadPool::enter( const ByteSequence & aThreadId , void const * nDisposeId )
305 JobQueue *pQueue = nullptr;
307 MutexGuard guard( m_mutex );
309 ThreadIdHashMap::iterator ii = m_mapQueue.find( aThreadId );
311 OSL_ASSERT( ii != m_mapQueue.end() );
312 pQueue = (*ii).second.first;
315 OSL_ASSERT( pQueue );
316 void *pReturn = pQueue->enter( nDisposeId );
318 if( pQueue->isCallstackEmpty() )
320 if( revokeQueue( aThreadId , false) )
322 // remove queue
323 delete pQueue;
326 return pReturn;
330 // All uno_ThreadPool handles in g_pThreadpoolHashSet with overlapping life
331 // spans share one ThreadPool instance. When g_pThreadpoolHashSet becomes empty
332 // (within the last uno_threadpool_destroy) all worker threads spawned by that
333 // ThreadPool instance are joined (which implies that uno_threadpool_destroy
334 // must never be called from a worker thread); afterwards, the next call to
335 // uno_threadpool_create (if any) will lead to a new ThreadPool instance.
337 using namespace cppu_threadpool;
339 namespace {
341 struct uno_ThreadPool_Equal
343 bool operator () ( const uno_ThreadPool &a , const uno_ThreadPool &b ) const
345 return a == b;
349 struct uno_ThreadPool_Hash
351 std::size_t operator () ( const uno_ThreadPool &a ) const
353 return reinterpret_cast<std::size_t>( a );
359 typedef std::unordered_map< uno_ThreadPool, ThreadPoolHolder, uno_ThreadPool_Hash, uno_ThreadPool_Equal > ThreadpoolHashSet;
361 static ThreadpoolHashSet *g_pThreadpoolHashSet;
363 struct _uno_ThreadPool
365 sal_Int32 dummy;
368 namespace {
370 ThreadPoolHolder getThreadPool( uno_ThreadPool hPool )
372 MutexGuard guard( Mutex::getGlobalMutex() );
373 assert( g_pThreadpoolHashSet != nullptr );
374 ThreadpoolHashSet::iterator i( g_pThreadpoolHashSet->find(hPool) );
375 assert( i != g_pThreadpoolHashSet->end() );
376 return i->second;
381 extern "C" uno_ThreadPool SAL_CALL
382 uno_threadpool_create() SAL_THROW_EXTERN_C()
384 MutexGuard guard( Mutex::getGlobalMutex() );
385 ThreadPoolHolder p;
386 if( ! g_pThreadpoolHashSet )
388 g_pThreadpoolHashSet = new ThreadpoolHashSet;
389 p = new ThreadPool;
391 else
393 assert( !g_pThreadpoolHashSet->empty() );
394 p = g_pThreadpoolHashSet->begin()->second;
397 // Just ensure that the handle is unique in the process (via heap)
398 uno_ThreadPool h = new struct _uno_ThreadPool;
399 g_pThreadpoolHashSet->emplace( h, p );
400 return h;
403 extern "C" void SAL_CALL
404 uno_threadpool_attach( uno_ThreadPool hPool ) SAL_THROW_EXTERN_C()
406 sal_Sequence *pThreadId = nullptr;
407 uno_getIdOfCurrentThread( &pThreadId );
408 getThreadPool( hPool )->prepare( pThreadId );
409 rtl_byte_sequence_release( pThreadId );
410 uno_releaseIdFromCurrentThread();
413 extern "C" void SAL_CALL
414 uno_threadpool_enter( uno_ThreadPool hPool , void **ppJob )
415 SAL_THROW_EXTERN_C()
417 sal_Sequence *pThreadId = nullptr;
418 uno_getIdOfCurrentThread( &pThreadId );
419 *ppJob =
420 getThreadPool( hPool )->enter(
421 pThreadId,
422 hPool );
423 rtl_byte_sequence_release( pThreadId );
424 uno_releaseIdFromCurrentThread();
427 extern "C" void SAL_CALL
428 uno_threadpool_detach(SAL_UNUSED_PARAMETER uno_ThreadPool) SAL_THROW_EXTERN_C()
430 // we might do here some tidying up in case a thread called attach but never detach
433 extern "C" void SAL_CALL
434 uno_threadpool_putJob(
435 uno_ThreadPool hPool,
436 sal_Sequence *pThreadId,
437 void *pJob,
438 void ( SAL_CALL * doRequest ) ( void *pThreadSpecificData ),
439 sal_Bool bIsOneway ) SAL_THROW_EXTERN_C()
441 if (!getThreadPool(hPool)->addJob( pThreadId, bIsOneway, pJob ,doRequest, hPool ))
443 SAL_WARN(
444 "cppu.threadpool",
445 "uno_threadpool_putJob in parallel with uno_threadpool_destroy");
449 extern "C" void SAL_CALL
450 uno_threadpool_dispose( uno_ThreadPool hPool ) SAL_THROW_EXTERN_C()
452 getThreadPool(hPool)->dispose(
453 hPool );
456 extern "C" void SAL_CALL
457 uno_threadpool_destroy( uno_ThreadPool hPool ) SAL_THROW_EXTERN_C()
459 ThreadPoolHolder p( getThreadPool(hPool) );
460 p->destroy(
461 hPool );
463 bool empty;
465 OSL_ASSERT( g_pThreadpoolHashSet );
467 MutexGuard guard( Mutex::getGlobalMutex() );
469 ThreadpoolHashSet::iterator ii = g_pThreadpoolHashSet->find( hPool );
470 OSL_ASSERT( ii != g_pThreadpoolHashSet->end() );
471 g_pThreadpoolHashSet->erase( ii );
472 delete hPool;
474 empty = g_pThreadpoolHashSet->empty();
475 if( empty )
477 delete g_pThreadpoolHashSet;
478 g_pThreadpoolHashSet = nullptr;
482 if( empty )
484 p->joinWorkers();
488 /* vim:set shiftwidth=4 softtabstop=4 expandtab: */