1 /*************************************************************************
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5 * Copyright 2008 by Sun Microsystems, Inc.
7 * OpenOffice.org - a multi-platform office productivity suite
9 * $RCSfile: threadpool.cxx,v $
12 * This file is part of OpenOffice.org.
14 * OpenOffice.org is free software: you can redistribute it and/or modify
15 * it under the terms of the GNU Lesser General Public License version 3
16 * only, as published by the Free Software Foundation.
18 * OpenOffice.org is distributed in the hope that it will be useful,
19 * but WITHOUT ANY WARRANTY; without even the implied warranty of
20 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
21 * GNU Lesser General Public License version 3 for more details
22 * (a copy is included in the LICENSE file that accompanied this code).
24 * You should have received a copy of the GNU Lesser General Public License
25 * version 3 along with OpenOffice.org. If not, see
26 * <http://www.openoffice.org/license.html>
27 * for a copy of the LGPLv3 License.
29 ************************************************************************/
31 // MARKER(update_precomp.py): autogen include statement, do not remove
32 #include "precompiled_cppu.hxx"
36 #include <osl/diagnose.h>
37 #include <osl/mutex.hxx>
38 #include <osl/thread.h>
40 #include <uno/threadpool.h>
42 #include "threadpool.hxx"
45 using namespace ::std
;
46 using namespace ::osl
;
48 namespace cppu_threadpool
50 DisposedCallerAdmin
*DisposedCallerAdmin::getInstance()
52 static DisposedCallerAdmin
*pDisposedCallerAdmin
= 0;
53 if( ! pDisposedCallerAdmin
)
55 MutexGuard
guard( Mutex::getGlobalMutex() );
56 if( ! pDisposedCallerAdmin
)
58 static DisposedCallerAdmin admin
;
59 pDisposedCallerAdmin
= &admin
;
62 return pDisposedCallerAdmin
;
65 DisposedCallerAdmin::~DisposedCallerAdmin()
67 #if OSL_DEBUG_LEVEL > 1
70 printf( "DisposedCallerList : %lu left\n" , static_cast<unsigned long>(m_lst
.size( )));
75 void DisposedCallerAdmin::dispose( sal_Int64 nDisposeId
)
77 MutexGuard
guard( m_mutex
);
78 m_lst
.push_back( nDisposeId
);
81 void DisposedCallerAdmin::stopDisposing( sal_Int64 nDisposeId
)
83 MutexGuard
guard( m_mutex
);
84 for( DisposedCallerList::iterator ii
= m_lst
.begin() ;
88 if( (*ii
) == nDisposeId
)
96 sal_Bool
DisposedCallerAdmin::isDisposed( sal_Int64 nDisposeId
)
98 MutexGuard
guard( m_mutex
);
99 for( DisposedCallerList::iterator ii
= m_lst
.begin() ;
103 if( (*ii
) == nDisposeId
)
112 //-------------------------------------------------------------------------------
113 ThreadPool::~ThreadPool()
115 #if OSL_DEBUG_LEVEL > 1
116 if( m_mapQueue
.size() )
118 printf( "ThreadIdHashMap : %lu left\n" , static_cast<unsigned long>(m_mapQueue
.size()) );
122 ThreadPool
*ThreadPool::getInstance()
124 static ThreadPool
*pThreadPool
= 0;
127 MutexGuard
guard( Mutex::getGlobalMutex() );
130 static ThreadPool pool
;
138 void ThreadPool::dispose( sal_Int64 nDisposeId
)
142 DisposedCallerAdmin::getInstance()->dispose( nDisposeId
);
144 MutexGuard
guard( m_mutex
);
145 for( ThreadIdHashMap::iterator ii
= m_mapQueue
.begin() ;
146 ii
!= m_mapQueue
.end();
149 if( (*ii
).second
.first
)
151 (*ii
).second
.first
->dispose( nDisposeId
);
153 if( (*ii
).second
.second
)
155 (*ii
).second
.second
->dispose( nDisposeId
);
162 MutexGuard
guard( m_mutexWaitingThreadList
);
163 for( WaitingThreadList::iterator ii
= m_lstThreads
.begin() ;
164 ii
!= m_lstThreads
.end() ;
167 // wake the threads up
168 osl_setCondition( (*ii
)->condition
);
171 ThreadAdmin::getInstance()->join();
175 void ThreadPool::stopDisposing( sal_Int64 nDisposeId
)
177 DisposedCallerAdmin::getInstance()->stopDisposing( nDisposeId
);
181 * This methods lets the thread wait a certain amount of time. If within this timespan
182 * a new request comes in, this thread is reused. This is done only to improve performance,
183 * it is not required for threadpool functionality.
185 void ThreadPool::waitInPool( ORequestThread
* pThread
)
187 struct WaitingThread waitingThread
;
188 waitingThread
.condition
= osl_createCondition();
189 waitingThread
.thread
= pThread
;
191 MutexGuard
guard( m_mutexWaitingThreadList
);
192 m_lstThreads
.push_front( &waitingThread
);
195 // let the thread wait 2 seconds
196 TimeValue time
= { 2 , 0 };
197 osl_waitCondition( waitingThread
.condition
, &time
);
200 MutexGuard
guard ( m_mutexWaitingThreadList
);
201 if( waitingThread
.thread
)
203 // thread wasn't reused, remove it from the list
204 WaitingThreadList::iterator ii
= find(
205 m_lstThreads
.begin(), m_lstThreads
.end(), &waitingThread
);
206 OSL_ASSERT( ii
!= m_lstThreads
.end() );
207 m_lstThreads
.erase( ii
);
211 osl_destroyCondition( waitingThread
.condition
);
214 void ThreadPool::createThread( JobQueue
*pQueue
,
215 const ByteSequence
&aThreadId
,
216 sal_Bool bAsynchron
)
218 sal_Bool bCreate
= sal_True
;
220 // Can a thread be reused ?
221 MutexGuard
guard( m_mutexWaitingThreadList
);
222 if( ! m_lstThreads
.empty() )
224 // inform the thread and let it go
225 struct WaitingThread
*pWaitingThread
= m_lstThreads
.back();
226 pWaitingThread
->thread
->setTask( pQueue
, aThreadId
, bAsynchron
);
227 pWaitingThread
->thread
= 0;
230 m_lstThreads
.pop_back();
233 osl_setCondition( pWaitingThread
->condition
);
240 ORequestThread
*pThread
=
241 new ORequestThread( pQueue
, aThreadId
, bAsynchron
);
247 sal_Bool
ThreadPool::revokeQueue( const ByteSequence
&aThreadId
, sal_Bool bAsynchron
)
249 MutexGuard
guard( m_mutex
);
251 ThreadIdHashMap::iterator ii
= m_mapQueue
.find( aThreadId
);
252 OSL_ASSERT( ii
!= m_mapQueue
.end() );
256 if( ! (*ii
).second
.second
->isEmpty() )
258 // another thread has put something into the queue
262 (*ii
).second
.second
= 0;
263 if( (*ii
).second
.first
)
265 // all oneway request have been processed, now
266 // synchronus requests may go on
267 (*ii
).second
.first
->resume();
272 if( ! (*ii
).second
.first
->isEmpty() )
274 // another thread has put something into the queue
277 (*ii
).second
.first
= 0;
280 if( 0 == (*ii
).second
.first
&& 0 == (*ii
).second
.second
)
282 m_mapQueue
.erase( ii
);
289 void ThreadPool::addJob(
290 const ByteSequence
&aThreadId
,
292 void *pThreadSpecificData
,
293 RequestFun
* doRequest
)
295 sal_Bool bCreateThread
= sal_False
;
296 JobQueue
*pQueue
= 0;
298 MutexGuard
guard( m_mutex
);
300 ThreadIdHashMap::iterator ii
= m_mapQueue
.find( aThreadId
);
302 if( ii
== m_mapQueue
.end() )
304 m_mapQueue
[ aThreadId
] = pair
< JobQueue
* , JobQueue
* > ( 0 , 0 );
305 ii
= m_mapQueue
.find( aThreadId
);
306 OSL_ASSERT( ii
!= m_mapQueue
.end() );
311 if( ! (*ii
).second
.second
)
313 (*ii
).second
.second
= new JobQueue();
314 bCreateThread
= sal_True
;
316 pQueue
= (*ii
).second
.second
;
320 if( ! (*ii
).second
.first
)
322 (*ii
).second
.first
= new JobQueue();
323 bCreateThread
= sal_True
;
325 pQueue
= (*ii
).second
.first
;
327 if( (*ii
).second
.second
&& ( (*ii
).second
.second
->isBusy() ) )
332 pQueue
->add( pThreadSpecificData
, doRequest
);
337 createThread( pQueue
, aThreadId
, bAsynchron
);
341 void ThreadPool::prepare( const ByteSequence
&aThreadId
)
343 MutexGuard
guard( m_mutex
);
345 ThreadIdHashMap::iterator ii
= m_mapQueue
.find( aThreadId
);
347 if( ii
== m_mapQueue
.end() )
349 JobQueue
*p
= new JobQueue();
350 m_mapQueue
[ aThreadId
] = pair
< JobQueue
* , JobQueue
* > ( p
, 0 );
352 else if( 0 == (*ii
).second
.first
)
354 (*ii
).second
.first
= new JobQueue();
358 void * ThreadPool::enter( const ByteSequence
& aThreadId
, sal_Int64 nDisposeId
)
360 JobQueue
*pQueue
= 0;
362 MutexGuard
guard( m_mutex
);
364 ThreadIdHashMap::iterator ii
= m_mapQueue
.find( aThreadId
);
366 OSL_ASSERT( ii
!= m_mapQueue
.end() );
367 pQueue
= (*ii
).second
.first
;
370 OSL_ASSERT( pQueue
);
371 void *pReturn
= pQueue
->enter( nDisposeId
);
373 if( pQueue
->isCallstackEmpty() )
375 if( revokeQueue( aThreadId
, sal_False
) )
386 using namespace cppu_threadpool
;
388 struct uno_ThreadPool_Equal
390 sal_Bool
operator () ( const uno_ThreadPool
&a
, const uno_ThreadPool
&b
) const
396 struct uno_ThreadPool_Hash
398 sal_Size
operator () ( const uno_ThreadPool
&a
) const
406 typedef ::std::hash_set
< uno_ThreadPool
, uno_ThreadPool_Hash
, uno_ThreadPool_Equal
> ThreadpoolHashSet
;
408 static ThreadpoolHashSet
*g_pThreadpoolHashSet
;
410 struct _uno_ThreadPool
415 extern "C" uno_ThreadPool SAL_CALL
416 uno_threadpool_create() SAL_THROW_EXTERN_C()
418 MutexGuard
guard( Mutex::getGlobalMutex() );
419 if( ! g_pThreadpoolHashSet
)
421 g_pThreadpoolHashSet
= new ThreadpoolHashSet();
424 // Just ensure that the handle is unique in the process (via heap)
425 uno_ThreadPool h
= new struct _uno_ThreadPool
;
426 g_pThreadpoolHashSet
->insert( h
);
430 extern "C" void SAL_CALL
431 uno_threadpool_attach( uno_ThreadPool
) SAL_THROW_EXTERN_C()
433 sal_Sequence
*pThreadId
= 0;
434 uno_getIdOfCurrentThread( &pThreadId
);
435 ThreadPool::getInstance()->prepare( pThreadId
);
436 rtl_byte_sequence_release( pThreadId
);
437 uno_releaseIdFromCurrentThread();
440 extern "C" void SAL_CALL
441 uno_threadpool_enter( uno_ThreadPool hPool
, void **ppJob
)
444 sal_Sequence
*pThreadId
= 0;
445 uno_getIdOfCurrentThread( &pThreadId
);
447 ThreadPool::getInstance()->enter(
449 sal::static_int_cast
< sal_Int64
>(
450 reinterpret_cast< sal_IntPtr
>(hPool
)) );
451 rtl_byte_sequence_release( pThreadId
);
452 uno_releaseIdFromCurrentThread();
455 extern "C" void SAL_CALL
456 uno_threadpool_detach( uno_ThreadPool
) SAL_THROW_EXTERN_C()
458 // we might do here some tiding up in case a thread called attach but never detach
461 extern "C" void SAL_CALL
462 uno_threadpool_putJob(
464 sal_Sequence
*pThreadId
,
466 void ( SAL_CALL
* doRequest
) ( void *pThreadSpecificData
),
467 sal_Bool bIsOneway
) SAL_THROW_EXTERN_C()
469 ThreadPool::getInstance()->addJob( pThreadId
, bIsOneway
, pJob
,doRequest
);
472 extern "C" void SAL_CALL
473 uno_threadpool_dispose( uno_ThreadPool hPool
) SAL_THROW_EXTERN_C()
475 ThreadPool::getInstance()->dispose(
476 sal::static_int_cast
< sal_Int64
>(
477 reinterpret_cast< sal_IntPtr
>(hPool
)) );
480 extern "C" void SAL_CALL
481 uno_threadpool_destroy( uno_ThreadPool hPool
) SAL_THROW_EXTERN_C()
483 ThreadPool::getInstance()->stopDisposing(
484 sal::static_int_cast
< sal_Int64
>(
485 reinterpret_cast< sal_IntPtr
>(hPool
)) );
489 // special treatment for 0 !
490 OSL_ASSERT( g_pThreadpoolHashSet
);
492 MutexGuard
guard( Mutex::getGlobalMutex() );
494 ThreadpoolHashSet::iterator ii
= g_pThreadpoolHashSet
->find( hPool
);
495 OSL_ASSERT( ii
!= g_pThreadpoolHashSet
->end() );
496 g_pThreadpoolHashSet
->erase( ii
);
499 if( g_pThreadpoolHashSet
->empty() )
501 delete g_pThreadpoolHashSet
;
502 g_pThreadpoolHashSet
= 0;