Branch libreoffice-5-0-4
[LibreOffice.git] / cppu / source / threadpool / threadpool.cxx
blob5cfb7e63e8b14c882c7272640db94289e5182b19
1 /* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2 /*
3 * This file is part of the LibreOffice project.
5 * This Source Code Form is subject to the terms of the Mozilla Public
6 * License, v. 2.0. If a copy of the MPL was not distributed with this
7 * file, You can obtain one at http://mozilla.org/MPL/2.0/.
9 * This file incorporates work covered by the following license notice:
11 * Licensed to the Apache Software Foundation (ASF) under one or more
12 * contributor license agreements. See the NOTICE file distributed
13 * with this work for additional information regarding copyright
14 * ownership. The ASF licenses this file to you under the Apache
15 * License, Version 2.0 (the "License"); you may not use this file
16 * except in compliance with the License. You may obtain a copy of
17 * the License at http://www.apache.org/licenses/LICENSE-2.0 .
20 #include "sal/config.h"
22 #include <cassert>
23 #include <unordered_map>
25 #include <osl/diagnose.h>
26 #include <osl/mutex.hxx>
27 #include <osl/thread.h>
28 #include <rtl/instance.hxx>
29 #include <sal/log.hxx>
31 #include <uno/threadpool.h>
33 #include "threadpool.hxx"
34 #include "thread.hxx"
36 using namespace ::std;
37 using namespace ::osl;
38 using namespace ::rtl;
40 namespace cppu_threadpool
42 struct theDisposedCallerAdmin :
43 public rtl::StaticWithInit< DisposedCallerAdminHolder, theDisposedCallerAdmin >
45 DisposedCallerAdminHolder operator () () {
46 return DisposedCallerAdminHolder(new DisposedCallerAdmin());
50 DisposedCallerAdminHolder DisposedCallerAdmin::getInstance()
52 return theDisposedCallerAdmin::get();
55 DisposedCallerAdmin::~DisposedCallerAdmin()
57 SAL_WARN_IF( !m_lst.empty(), "cppu.threadpool", "DisposedCallerList : " << m_lst.size() << " left\n");
60 void DisposedCallerAdmin::dispose( sal_Int64 nDisposeId )
62 MutexGuard guard( m_mutex );
63 m_lst.push_back( nDisposeId );
66 void DisposedCallerAdmin::destroy( sal_Int64 nDisposeId )
68 MutexGuard guard( m_mutex );
69 for( DisposedCallerList::iterator ii = m_lst.begin() ;
70 ii != m_lst.end() ;
71 ++ ii )
73 if( (*ii) == nDisposeId )
75 m_lst.erase( ii );
76 break;
81 bool DisposedCallerAdmin::isDisposed( sal_Int64 nDisposeId )
83 MutexGuard guard( m_mutex );
84 for( DisposedCallerList::iterator ii = m_lst.begin() ;
85 ii != m_lst.end() ;
86 ++ ii )
88 if( (*ii) == nDisposeId )
90 return true;
93 return false;
99 ThreadPool::ThreadPool()
101 m_DisposedCallerAdmin = DisposedCallerAdmin::getInstance();
104 ThreadPool::~ThreadPool()
106 SAL_WARN_IF( m_mapQueue.size(), "cppu.threadpool", "ThreadIdHashMap: " << m_mapQueue.size() << " left\n");
109 void ThreadPool::dispose( sal_Int64 nDisposeId )
111 m_DisposedCallerAdmin->dispose( nDisposeId );
113 MutexGuard guard( m_mutex );
114 for( ThreadIdHashMap::iterator ii = m_mapQueue.begin() ;
115 ii != m_mapQueue.end();
116 ++ii)
118 if( (*ii).second.first )
120 (*ii).second.first->dispose( nDisposeId );
122 if( (*ii).second.second )
124 (*ii).second.second->dispose( nDisposeId );
129 void ThreadPool::destroy( sal_Int64 nDisposeId )
131 m_DisposedCallerAdmin->destroy( nDisposeId );
134 /******************
135 * This methods lets the thread wait a certain amount of time. If within this timespan
136 * a new request comes in, this thread is reused. This is done only to improve performance,
137 * it is not required for threadpool functionality.
138 ******************/
139 void ThreadPool::waitInPool( rtl::Reference< ORequestThread > const & pThread )
141 struct WaitingThread waitingThread;
142 waitingThread.condition = osl_createCondition();
143 waitingThread.thread = pThread;
145 MutexGuard guard( m_mutexWaitingThreadList );
146 m_lstThreads.push_front( &waitingThread );
149 // let the thread wait 2 seconds
150 TimeValue time = { 2 , 0 };
151 osl_waitCondition( waitingThread.condition , &time );
154 MutexGuard guard ( m_mutexWaitingThreadList );
155 if( waitingThread.thread.is() )
157 // thread wasn't reused, remove it from the list
158 WaitingThreadList::iterator ii = find(
159 m_lstThreads.begin(), m_lstThreads.end(), &waitingThread );
160 OSL_ASSERT( ii != m_lstThreads.end() );
161 m_lstThreads.erase( ii );
165 osl_destroyCondition( waitingThread.condition );
168 void ThreadPool::joinWorkers()
171 MutexGuard guard( m_mutexWaitingThreadList );
172 for( WaitingThreadList::iterator ii = m_lstThreads.begin() ;
173 ii != m_lstThreads.end() ;
174 ++ ii )
176 // wake the threads up
177 osl_setCondition( (*ii)->condition );
180 m_aThreadAdmin.join();
183 bool ThreadPool::createThread( JobQueue *pQueue ,
184 const ByteSequence &aThreadId,
185 bool bAsynchron )
188 // Can a thread be reused ?
189 MutexGuard guard( m_mutexWaitingThreadList );
190 if( ! m_lstThreads.empty() )
192 // inform the thread and let it go
193 struct WaitingThread *pWaitingThread = m_lstThreads.back();
194 pWaitingThread->thread->setTask( pQueue , aThreadId , bAsynchron );
195 pWaitingThread->thread = 0;
197 // remove from list
198 m_lstThreads.pop_back();
200 // let the thread go
201 osl_setCondition( pWaitingThread->condition );
202 return true;
206 rtl::Reference< ORequestThread > pThread(
207 new ORequestThread( this, pQueue , aThreadId, bAsynchron) );
208 return pThread->launch();
211 bool ThreadPool::revokeQueue( const ByteSequence &aThreadId, bool bAsynchron )
213 MutexGuard guard( m_mutex );
215 ThreadIdHashMap::iterator ii = m_mapQueue.find( aThreadId );
216 OSL_ASSERT( ii != m_mapQueue.end() );
218 if( bAsynchron )
220 if( ! (*ii).second.second->isEmpty() )
222 // another thread has put something into the queue
223 return false;
226 (*ii).second.second = 0;
227 if( (*ii).second.first )
229 // all oneway request have been processed, now
230 // synchronus requests may go on
231 (*ii).second.first->resume();
234 else
236 if( ! (*ii).second.first->isEmpty() )
238 // another thread has put something into the queue
239 return false;
241 (*ii).second.first = 0;
244 if( 0 == (*ii).second.first && 0 == (*ii).second.second )
246 m_mapQueue.erase( ii );
249 return true;
253 bool ThreadPool::addJob(
254 const ByteSequence &aThreadId ,
255 bool bAsynchron,
256 void *pThreadSpecificData,
257 RequestFun * doRequest )
259 bool bCreateThread = false;
260 JobQueue *pQueue = 0;
262 MutexGuard guard( m_mutex );
264 ThreadIdHashMap::iterator ii = m_mapQueue.find( aThreadId );
266 if( ii == m_mapQueue.end() )
268 m_mapQueue[ aThreadId ] = pair < JobQueue * , JobQueue * > ( (JobQueue *)0 , (JobQueue*)0 );
269 ii = m_mapQueue.find( aThreadId );
270 OSL_ASSERT( ii != m_mapQueue.end() );
273 if( bAsynchron )
275 if( ! (*ii).second.second )
277 (*ii).second.second = new JobQueue();
278 bCreateThread = true;
280 pQueue = (*ii).second.second;
282 else
284 if( ! (*ii).second.first )
286 (*ii).second.first = new JobQueue();
287 bCreateThread = true;
289 pQueue = (*ii).second.first;
291 if( (*ii).second.second && ( (*ii).second.second->isBusy() ) )
293 pQueue->suspend();
296 pQueue->add( pThreadSpecificData , doRequest );
299 return !bCreateThread || createThread( pQueue , aThreadId , bAsynchron);
302 void ThreadPool::prepare( const ByteSequence &aThreadId )
304 MutexGuard guard( m_mutex );
306 ThreadIdHashMap::iterator ii = m_mapQueue.find( aThreadId );
308 if( ii == m_mapQueue.end() )
310 JobQueue *p = new JobQueue();
311 m_mapQueue[ aThreadId ] = pair< JobQueue * , JobQueue * > ( p , (JobQueue*)0 );
313 else if( 0 == (*ii).second.first )
315 (*ii).second.first = new JobQueue();
319 void * ThreadPool::enter( const ByteSequence & aThreadId , sal_Int64 nDisposeId )
321 JobQueue *pQueue = 0;
323 MutexGuard guard( m_mutex );
325 ThreadIdHashMap::iterator ii = m_mapQueue.find( aThreadId );
327 OSL_ASSERT( ii != m_mapQueue.end() );
328 pQueue = (*ii).second.first;
331 OSL_ASSERT( pQueue );
332 void *pReturn = pQueue->enter( nDisposeId );
334 if( pQueue->isCallstackEmpty() )
336 if( revokeQueue( aThreadId , false) )
338 // remove queue
339 delete pQueue;
342 return pReturn;
346 // All uno_ThreadPool handles in g_pThreadpoolHashSet with overlapping life
347 // spans share one ThreadPool instance. When g_pThreadpoolHashSet becomes empty
348 // (within the last uno_threadpool_destroy) all worker threads spawned by that
349 // ThreadPool instance are joined (which implies that uno_threadpool_destroy
350 // must never be called from a worker thread); afterwards, the next call to
351 // uno_threadpool_create (if any) will lead to a new ThreadPool instance.
353 using namespace cppu_threadpool;
355 struct uno_ThreadPool_Equal
357 bool operator () ( const uno_ThreadPool &a , const uno_ThreadPool &b ) const
359 return a == b;
363 struct uno_ThreadPool_Hash
365 sal_Size operator () ( const uno_ThreadPool &a ) const
367 return reinterpret_cast<sal_Size>( a );
373 typedef std::unordered_map< uno_ThreadPool, ThreadPoolHolder, uno_ThreadPool_Hash, uno_ThreadPool_Equal > ThreadpoolHashSet;
375 static ThreadpoolHashSet *g_pThreadpoolHashSet;
377 struct _uno_ThreadPool
379 sal_Int32 dummy;
382 namespace {
384 ThreadPoolHolder getThreadPool( uno_ThreadPool hPool )
386 MutexGuard guard( Mutex::getGlobalMutex() );
387 assert( g_pThreadpoolHashSet != 0 );
388 ThreadpoolHashSet::iterator i( g_pThreadpoolHashSet->find(hPool) );
389 assert( i != g_pThreadpoolHashSet->end() );
390 return i->second;
395 extern "C" uno_ThreadPool SAL_CALL
396 uno_threadpool_create() SAL_THROW_EXTERN_C()
398 MutexGuard guard( Mutex::getGlobalMutex() );
399 ThreadPoolHolder p;
400 if( ! g_pThreadpoolHashSet )
402 g_pThreadpoolHashSet = new ThreadpoolHashSet();
403 p = new ThreadPool;
405 else
407 assert( !g_pThreadpoolHashSet->empty() );
408 p = g_pThreadpoolHashSet->begin()->second;
411 // Just ensure that the handle is unique in the process (via heap)
412 uno_ThreadPool h = new struct _uno_ThreadPool;
413 g_pThreadpoolHashSet->insert( ThreadpoolHashSet::value_type(h, p) );
414 return h;
417 extern "C" void SAL_CALL
418 uno_threadpool_attach( uno_ThreadPool hPool ) SAL_THROW_EXTERN_C()
420 sal_Sequence *pThreadId = 0;
421 uno_getIdOfCurrentThread( &pThreadId );
422 getThreadPool( hPool )->prepare( pThreadId );
423 rtl_byte_sequence_release( pThreadId );
424 uno_releaseIdFromCurrentThread();
427 extern "C" void SAL_CALL
428 uno_threadpool_enter( uno_ThreadPool hPool , void **ppJob )
429 SAL_THROW_EXTERN_C()
431 sal_Sequence *pThreadId = 0;
432 uno_getIdOfCurrentThread( &pThreadId );
433 *ppJob =
434 getThreadPool( hPool )->enter(
435 pThreadId,
436 sal::static_int_cast< sal_Int64 >(
437 reinterpret_cast< sal_IntPtr >(hPool)) );
438 rtl_byte_sequence_release( pThreadId );
439 uno_releaseIdFromCurrentThread();
442 extern "C" void SAL_CALL
443 uno_threadpool_detach(SAL_UNUSED_PARAMETER uno_ThreadPool) SAL_THROW_EXTERN_C()
445 // we might do here some tiding up in case a thread called attach but never detach
448 extern "C" void SAL_CALL
449 uno_threadpool_putJob(
450 uno_ThreadPool hPool,
451 sal_Sequence *pThreadId,
452 void *pJob,
453 void ( SAL_CALL * doRequest ) ( void *pThreadSpecificData ),
454 sal_Bool bIsOneway ) SAL_THROW_EXTERN_C()
456 if (!getThreadPool(hPool)->addJob( pThreadId, bIsOneway, pJob ,doRequest ))
458 SAL_WARN(
459 "cppu",
460 "uno_threadpool_putJob in parallel with uno_threadpool_destroy");
464 extern "C" void SAL_CALL
465 uno_threadpool_dispose( uno_ThreadPool hPool ) SAL_THROW_EXTERN_C()
467 getThreadPool(hPool)->dispose(
468 sal::static_int_cast< sal_Int64 >(
469 reinterpret_cast< sal_IntPtr >(hPool)) );
472 extern "C" void SAL_CALL
473 uno_threadpool_destroy( uno_ThreadPool hPool ) SAL_THROW_EXTERN_C()
475 ThreadPoolHolder p( getThreadPool(hPool) );
476 p->destroy(
477 sal::static_int_cast< sal_Int64 >(
478 reinterpret_cast< sal_IntPtr >(hPool)) );
480 bool empty;
482 OSL_ASSERT( g_pThreadpoolHashSet );
484 MutexGuard guard( Mutex::getGlobalMutex() );
486 ThreadpoolHashSet::iterator ii = g_pThreadpoolHashSet->find( hPool );
487 OSL_ASSERT( ii != g_pThreadpoolHashSet->end() );
488 g_pThreadpoolHashSet->erase( ii );
489 delete hPool;
491 empty = g_pThreadpoolHashSet->empty();
492 if( empty )
494 delete g_pThreadpoolHashSet;
495 g_pThreadpoolHashSet = 0;
499 if( empty )
501 p->joinWorkers();
505 /* vim:set shiftwidth=4 softtabstop=4 expandtab: */