1 /*************************************************************************
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5 * Copyright 2000, 2010 Oracle and/or its affiliates.
7 * OpenOffice.org - a multi-platform office productivity suite
9 * This file is part of OpenOffice.org.
11 * OpenOffice.org is free software: you can redistribute it and/or modify
12 * it under the terms of the GNU Lesser General Public License version 3
13 * only, as published by the Free Software Foundation.
15 * OpenOffice.org is distributed in the hope that it will be useful,
16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 * GNU Lesser General Public License version 3 for more details
19 * (a copy is included in the LICENSE file that accompanied this code).
21 * You should have received a copy of the GNU Lesser General Public License
22 * version 3 along with OpenOffice.org. If not, see
23 * <http://www.openoffice.org/license.html>
24 * for a copy of the LGPLv3 License.
26 ************************************************************************/
28 // MARKER(update_precomp.py): autogen include statement, do not remove
29 #include "precompiled_cppu.hxx"
33 #include <osl/diagnose.h>
34 #include <osl/mutex.hxx>
35 #include <osl/thread.h>
36 #include <rtl/instance.hxx>
38 #include <uno/threadpool.h>
40 #include "threadpool.hxx"
43 using namespace ::std
;
44 using namespace ::osl
;
46 namespace cppu_threadpool
48 struct theDisposedCallerAdmin
:
49 public rtl::StaticWithInit
< DisposedCallerAdminHolder
, theDisposedCallerAdmin
>
51 DisposedCallerAdminHolder
operator () () {
52 return DisposedCallerAdminHolder(new DisposedCallerAdmin());
56 DisposedCallerAdminHolder
DisposedCallerAdmin::getInstance()
58 return theDisposedCallerAdmin::get();
61 DisposedCallerAdmin::~DisposedCallerAdmin()
63 #if OSL_DEBUG_LEVEL > 1
66 printf( "DisposedCallerList : %lu left\n" , static_cast<unsigned long>(m_lst
.size( )));
71 void DisposedCallerAdmin::dispose( sal_Int64 nDisposeId
)
73 MutexGuard
guard( m_mutex
);
74 m_lst
.push_back( nDisposeId
);
77 void DisposedCallerAdmin::stopDisposing( sal_Int64 nDisposeId
)
79 MutexGuard
guard( m_mutex
);
80 for( DisposedCallerList::iterator ii
= m_lst
.begin() ;
84 if( (*ii
) == nDisposeId
)
92 sal_Bool
DisposedCallerAdmin::isDisposed( sal_Int64 nDisposeId
)
94 MutexGuard
guard( m_mutex
);
95 for( DisposedCallerList::iterator ii
= m_lst
.begin() ;
99 if( (*ii
) == nDisposeId
)
108 //-------------------------------------------------------------------------------
110 struct theThreadPool
:
111 public rtl::StaticWithInit
< ThreadPoolHolder
, theThreadPool
>
113 ThreadPoolHolder
operator () () {
114 ThreadPoolHolder
aRet(new ThreadPool());
119 ThreadPool::ThreadPool()
121 m_DisposedCallerAdmin
= DisposedCallerAdmin::getInstance();
124 ThreadPool::~ThreadPool()
126 #if OSL_DEBUG_LEVEL > 1
127 if( m_mapQueue
.size() )
129 printf( "ThreadIdHashMap : %lu left\n" , static_cast<unsigned long>(m_mapQueue
.size()) );
133 ThreadPoolHolder
ThreadPool::getInstance()
135 return theThreadPool::get();
139 void ThreadPool::dispose( sal_Int64 nDisposeId
)
143 m_DisposedCallerAdmin
->dispose( nDisposeId
);
145 MutexGuard
guard( m_mutex
);
146 for( ThreadIdHashMap::iterator ii
= m_mapQueue
.begin() ;
147 ii
!= m_mapQueue
.end();
150 if( (*ii
).second
.first
)
152 (*ii
).second
.first
->dispose( nDisposeId
);
154 if( (*ii
).second
.second
)
156 (*ii
).second
.second
->dispose( nDisposeId
);
163 MutexGuard
guard( m_mutexWaitingThreadList
);
164 for( WaitingThreadList::iterator ii
= m_lstThreads
.begin() ;
165 ii
!= m_lstThreads
.end() ;
168 // wake the threads up
169 osl_setCondition( (*ii
)->condition
);
172 ThreadAdmin::getInstance()->join();
176 void ThreadPool::stopDisposing( sal_Int64 nDisposeId
)
178 m_DisposedCallerAdmin
->stopDisposing( nDisposeId
);
182 * This methods lets the thread wait a certain amount of time. If within this timespan
183 * a new request comes in, this thread is reused. This is done only to improve performance,
184 * it is not required for threadpool functionality.
186 void ThreadPool::waitInPool( ORequestThread
* pThread
)
188 struct WaitingThread waitingThread
;
189 waitingThread
.condition
= osl_createCondition();
190 waitingThread
.thread
= pThread
;
192 MutexGuard
guard( m_mutexWaitingThreadList
);
193 m_lstThreads
.push_front( &waitingThread
);
196 // let the thread wait 2 seconds
197 TimeValue time
= { 2 , 0 };
198 osl_waitCondition( waitingThread
.condition
, &time
);
201 MutexGuard
guard ( m_mutexWaitingThreadList
);
202 if( waitingThread
.thread
)
204 // thread wasn't reused, remove it from the list
205 WaitingThreadList::iterator ii
= find(
206 m_lstThreads
.begin(), m_lstThreads
.end(), &waitingThread
);
207 OSL_ASSERT( ii
!= m_lstThreads
.end() );
208 m_lstThreads
.erase( ii
);
212 osl_destroyCondition( waitingThread
.condition
);
215 void ThreadPool::createThread( JobQueue
*pQueue
,
216 const ByteSequence
&aThreadId
,
217 sal_Bool bAsynchron
)
219 sal_Bool bCreate
= sal_True
;
221 // Can a thread be reused ?
222 MutexGuard
guard( m_mutexWaitingThreadList
);
223 if( ! m_lstThreads
.empty() )
225 // inform the thread and let it go
226 struct WaitingThread
*pWaitingThread
= m_lstThreads
.back();
227 pWaitingThread
->thread
->setTask( pQueue
, aThreadId
, bAsynchron
);
228 pWaitingThread
->thread
= 0;
231 m_lstThreads
.pop_back();
234 osl_setCondition( pWaitingThread
->condition
);
241 ORequestThread
*pThread
=
242 new ORequestThread( pQueue
, aThreadId
, bAsynchron
);
248 sal_Bool
ThreadPool::revokeQueue( const ByteSequence
&aThreadId
, sal_Bool bAsynchron
)
250 MutexGuard
guard( m_mutex
);
252 ThreadIdHashMap::iterator ii
= m_mapQueue
.find( aThreadId
);
253 OSL_ASSERT( ii
!= m_mapQueue
.end() );
257 if( ! (*ii
).second
.second
->isEmpty() )
259 // another thread has put something into the queue
263 (*ii
).second
.second
= 0;
264 if( (*ii
).second
.first
)
266 // all oneway request have been processed, now
267 // synchronus requests may go on
268 (*ii
).second
.first
->resume();
273 if( ! (*ii
).second
.first
->isEmpty() )
275 // another thread has put something into the queue
278 (*ii
).second
.first
= 0;
281 if( 0 == (*ii
).second
.first
&& 0 == (*ii
).second
.second
)
283 m_mapQueue
.erase( ii
);
290 void ThreadPool::addJob(
291 const ByteSequence
&aThreadId
,
293 void *pThreadSpecificData
,
294 RequestFun
* doRequest
)
296 sal_Bool bCreateThread
= sal_False
;
297 JobQueue
*pQueue
= 0;
299 MutexGuard
guard( m_mutex
);
301 ThreadIdHashMap::iterator ii
= m_mapQueue
.find( aThreadId
);
303 if( ii
== m_mapQueue
.end() )
305 m_mapQueue
[ aThreadId
] = pair
< JobQueue
* , JobQueue
* > ( 0 , 0 );
306 ii
= m_mapQueue
.find( aThreadId
);
307 OSL_ASSERT( ii
!= m_mapQueue
.end() );
312 if( ! (*ii
).second
.second
)
314 (*ii
).second
.second
= new JobQueue();
315 bCreateThread
= sal_True
;
317 pQueue
= (*ii
).second
.second
;
321 if( ! (*ii
).second
.first
)
323 (*ii
).second
.first
= new JobQueue();
324 bCreateThread
= sal_True
;
326 pQueue
= (*ii
).second
.first
;
328 if( (*ii
).second
.second
&& ( (*ii
).second
.second
->isBusy() ) )
333 pQueue
->add( pThreadSpecificData
, doRequest
);
338 createThread( pQueue
, aThreadId
, bAsynchron
);
342 void ThreadPool::prepare( const ByteSequence
&aThreadId
)
344 MutexGuard
guard( m_mutex
);
346 ThreadIdHashMap::iterator ii
= m_mapQueue
.find( aThreadId
);
348 if( ii
== m_mapQueue
.end() )
350 JobQueue
*p
= new JobQueue();
351 m_mapQueue
[ aThreadId
] = pair
< JobQueue
* , JobQueue
* > ( p
, 0 );
353 else if( 0 == (*ii
).second
.first
)
355 (*ii
).second
.first
= new JobQueue();
359 void * ThreadPool::enter( const ByteSequence
& aThreadId
, sal_Int64 nDisposeId
)
361 JobQueue
*pQueue
= 0;
363 MutexGuard
guard( m_mutex
);
365 ThreadIdHashMap::iterator ii
= m_mapQueue
.find( aThreadId
);
367 OSL_ASSERT( ii
!= m_mapQueue
.end() );
368 pQueue
= (*ii
).second
.first
;
371 OSL_ASSERT( pQueue
);
372 void *pReturn
= pQueue
->enter( nDisposeId
);
374 if( pQueue
->isCallstackEmpty() )
376 if( revokeQueue( aThreadId
, sal_False
) )
387 using namespace cppu_threadpool
;
389 struct uno_ThreadPool_Equal
391 sal_Bool
operator () ( const uno_ThreadPool
&a
, const uno_ThreadPool
&b
) const
397 struct uno_ThreadPool_Hash
399 sal_Size
operator () ( const uno_ThreadPool
&a
) const
407 typedef ::std::hash_map
< uno_ThreadPool
, ThreadPoolHolder
, uno_ThreadPool_Hash
, uno_ThreadPool_Equal
> ThreadpoolHashSet
;
409 static ThreadpoolHashSet
*g_pThreadpoolHashSet
;
411 struct _uno_ThreadPool
416 extern "C" uno_ThreadPool SAL_CALL
417 uno_threadpool_create() SAL_THROW_EXTERN_C()
419 MutexGuard
guard( Mutex::getGlobalMutex() );
420 if( ! g_pThreadpoolHashSet
)
422 g_pThreadpoolHashSet
= new ThreadpoolHashSet();
425 // Just ensure that the handle is unique in the process (via heap)
426 uno_ThreadPool h
= new struct _uno_ThreadPool
;
427 g_pThreadpoolHashSet
->insert( ThreadpoolHashSet::value_type(h
, ThreadPool::getInstance()) );
431 extern "C" void SAL_CALL
432 uno_threadpool_attach( uno_ThreadPool
) SAL_THROW_EXTERN_C()
434 sal_Sequence
*pThreadId
= 0;
435 uno_getIdOfCurrentThread( &pThreadId
);
436 ThreadPool::getInstance()->prepare( pThreadId
);
437 rtl_byte_sequence_release( pThreadId
);
438 uno_releaseIdFromCurrentThread();
441 extern "C" void SAL_CALL
442 uno_threadpool_enter( uno_ThreadPool hPool
, void **ppJob
)
445 sal_Sequence
*pThreadId
= 0;
446 uno_getIdOfCurrentThread( &pThreadId
);
448 ThreadPool::getInstance()->enter(
450 sal::static_int_cast
< sal_Int64
>(
451 reinterpret_cast< sal_IntPtr
>(hPool
)) );
452 rtl_byte_sequence_release( pThreadId
);
453 uno_releaseIdFromCurrentThread();
456 extern "C" void SAL_CALL
457 uno_threadpool_detach( uno_ThreadPool
) SAL_THROW_EXTERN_C()
459 // we might do here some tiding up in case a thread called attach but never detach
462 extern "C" void SAL_CALL
463 uno_threadpool_putJob(
465 sal_Sequence
*pThreadId
,
467 void ( SAL_CALL
* doRequest
) ( void *pThreadSpecificData
),
468 sal_Bool bIsOneway
) SAL_THROW_EXTERN_C()
470 ThreadPool::getInstance()->addJob( pThreadId
, bIsOneway
, pJob
,doRequest
);
473 extern "C" void SAL_CALL
474 uno_threadpool_dispose( uno_ThreadPool hPool
) SAL_THROW_EXTERN_C()
476 ThreadPool::getInstance()->dispose(
477 sal::static_int_cast
< sal_Int64
>(
478 reinterpret_cast< sal_IntPtr
>(hPool
)) );
481 extern "C" void SAL_CALL
482 uno_threadpool_destroy( uno_ThreadPool hPool
) SAL_THROW_EXTERN_C()
484 ThreadPool::getInstance()->stopDisposing(
485 sal::static_int_cast
< sal_Int64
>(
486 reinterpret_cast< sal_IntPtr
>(hPool
)) );
490 // special treatment for 0 !
491 OSL_ASSERT( g_pThreadpoolHashSet
);
493 MutexGuard
guard( Mutex::getGlobalMutex() );
495 ThreadpoolHashSet::iterator ii
= g_pThreadpoolHashSet
->find( hPool
);
496 OSL_ASSERT( ii
!= g_pThreadpoolHashSet
->end() );
497 g_pThreadpoolHashSet
->erase( ii
);
500 if( g_pThreadpoolHashSet
->empty() )
502 delete g_pThreadpoolHashSet
;
503 g_pThreadpoolHashSet
= 0;