1 /* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
3 * This file is part of the LibreOffice project.
5 * This Source Code Form is subject to the terms of the Mozilla Public
6 * License, v. 2.0. If a copy of the MPL was not distributed with this
7 * file, You can obtain one at http://mozilla.org/MPL/2.0/.
9 * This file incorporates work covered by the following license notice:
11 * Licensed to the Apache Software Foundation (ASF) under one or more
12 * contributor license agreements. See the NOTICE file distributed
13 * with this work for additional information regarding copyright
14 * ownership. The ASF licenses this file to you under the Apache
15 * License, Version 2.0 (the "License"); you may not use this file
16 * except in compliance with the License. You may obtain a copy of
17 * the License at http://www.apache.org/licenses/LICENSE-2.0 .
20 #include "sal/config.h"
23 #include <unordered_map>
25 #include <osl/diagnose.h>
26 #include <osl/mutex.hxx>
27 #include <osl/thread.h>
28 #include <rtl/instance.hxx>
29 #include <sal/log.hxx>
31 #include <uno/threadpool.h>
33 #include "threadpool.hxx"
36 using namespace ::std
;
37 using namespace ::osl
;
38 using namespace ::rtl
;
40 namespace cppu_threadpool
42 struct theDisposedCallerAdmin
:
43 public rtl::StaticWithInit
< DisposedCallerAdminHolder
, theDisposedCallerAdmin
>
45 DisposedCallerAdminHolder
operator () () {
46 return DisposedCallerAdminHolder(new DisposedCallerAdmin());
50 DisposedCallerAdminHolder
DisposedCallerAdmin::getInstance()
52 return theDisposedCallerAdmin::get();
55 DisposedCallerAdmin::~DisposedCallerAdmin()
57 SAL_WARN_IF( !m_lst
.empty(), "cppu.threadpool", "DisposedCallerList : " << m_lst
.size() << " left\n");
60 void DisposedCallerAdmin::dispose( sal_Int64 nDisposeId
)
62 MutexGuard
guard( m_mutex
);
63 m_lst
.push_back( nDisposeId
);
66 void DisposedCallerAdmin::destroy( sal_Int64 nDisposeId
)
68 MutexGuard
guard( m_mutex
);
69 for( DisposedCallerList::iterator ii
= m_lst
.begin() ;
73 if( (*ii
) == nDisposeId
)
81 bool DisposedCallerAdmin::isDisposed( sal_Int64 nDisposeId
)
83 MutexGuard
guard( m_mutex
);
84 for( DisposedCallerList::iterator ii
= m_lst
.begin() ;
88 if( (*ii
) == nDisposeId
)
99 ThreadPool::ThreadPool()
101 m_DisposedCallerAdmin
= DisposedCallerAdmin::getInstance();
104 ThreadPool::~ThreadPool()
106 SAL_WARN_IF( m_mapQueue
.size(), "cppu.threadpool", "ThreadIdHashMap: " << m_mapQueue
.size() << " left\n");
109 void ThreadPool::dispose( sal_Int64 nDisposeId
)
111 m_DisposedCallerAdmin
->dispose( nDisposeId
);
113 MutexGuard
guard( m_mutex
);
114 for( ThreadIdHashMap::iterator ii
= m_mapQueue
.begin() ;
115 ii
!= m_mapQueue
.end();
118 if( (*ii
).second
.first
)
120 (*ii
).second
.first
->dispose( nDisposeId
);
122 if( (*ii
).second
.second
)
124 (*ii
).second
.second
->dispose( nDisposeId
);
129 void ThreadPool::destroy( sal_Int64 nDisposeId
)
131 m_DisposedCallerAdmin
->destroy( nDisposeId
);
135 * This methods lets the thread wait a certain amount of time. If within this timespan
136 * a new request comes in, this thread is reused. This is done only to improve performance,
137 * it is not required for threadpool functionality.
139 void ThreadPool::waitInPool( rtl::Reference
< ORequestThread
> const & pThread
)
141 struct WaitingThread waitingThread
;
142 waitingThread
.condition
= osl_createCondition();
143 waitingThread
.thread
= pThread
;
145 MutexGuard
guard( m_mutexWaitingThreadList
);
146 m_lstThreads
.push_front( &waitingThread
);
149 // let the thread wait 2 seconds
150 TimeValue time
= { 2 , 0 };
151 osl_waitCondition( waitingThread
.condition
, &time
);
154 MutexGuard
guard ( m_mutexWaitingThreadList
);
155 if( waitingThread
.thread
.is() )
157 // thread wasn't reused, remove it from the list
158 WaitingThreadList::iterator ii
= find(
159 m_lstThreads
.begin(), m_lstThreads
.end(), &waitingThread
);
160 OSL_ASSERT( ii
!= m_lstThreads
.end() );
161 m_lstThreads
.erase( ii
);
165 osl_destroyCondition( waitingThread
.condition
);
168 void ThreadPool::joinWorkers()
171 MutexGuard
guard( m_mutexWaitingThreadList
);
172 for( WaitingThreadList::iterator ii
= m_lstThreads
.begin() ;
173 ii
!= m_lstThreads
.end() ;
176 // wake the threads up
177 osl_setCondition( (*ii
)->condition
);
180 m_aThreadAdmin
.join();
183 bool ThreadPool::createThread( JobQueue
*pQueue
,
184 const ByteSequence
&aThreadId
,
188 // Can a thread be reused ?
189 MutexGuard
guard( m_mutexWaitingThreadList
);
190 if( ! m_lstThreads
.empty() )
192 // inform the thread and let it go
193 struct WaitingThread
*pWaitingThread
= m_lstThreads
.back();
194 pWaitingThread
->thread
->setTask( pQueue
, aThreadId
, bAsynchron
);
195 pWaitingThread
->thread
= 0;
198 m_lstThreads
.pop_back();
201 osl_setCondition( pWaitingThread
->condition
);
206 rtl::Reference
< ORequestThread
> pThread(
207 new ORequestThread( this, pQueue
, aThreadId
, bAsynchron
) );
208 return pThread
->launch();
211 bool ThreadPool::revokeQueue( const ByteSequence
&aThreadId
, bool bAsynchron
)
213 MutexGuard
guard( m_mutex
);
215 ThreadIdHashMap::iterator ii
= m_mapQueue
.find( aThreadId
);
216 OSL_ASSERT( ii
!= m_mapQueue
.end() );
220 if( ! (*ii
).second
.second
->isEmpty() )
222 // another thread has put something into the queue
226 (*ii
).second
.second
= 0;
227 if( (*ii
).second
.first
)
229 // all oneway request have been processed, now
230 // synchronus requests may go on
231 (*ii
).second
.first
->resume();
236 if( ! (*ii
).second
.first
->isEmpty() )
238 // another thread has put something into the queue
241 (*ii
).second
.first
= 0;
244 if( 0 == (*ii
).second
.first
&& 0 == (*ii
).second
.second
)
246 m_mapQueue
.erase( ii
);
253 bool ThreadPool::addJob(
254 const ByteSequence
&aThreadId
,
256 void *pThreadSpecificData
,
257 RequestFun
* doRequest
)
259 bool bCreateThread
= false;
260 JobQueue
*pQueue
= 0;
262 MutexGuard
guard( m_mutex
);
264 ThreadIdHashMap::iterator ii
= m_mapQueue
.find( aThreadId
);
266 if( ii
== m_mapQueue
.end() )
268 m_mapQueue
[ aThreadId
] = pair
< JobQueue
* , JobQueue
* > ( (JobQueue
*)0 , (JobQueue
*)0 );
269 ii
= m_mapQueue
.find( aThreadId
);
270 OSL_ASSERT( ii
!= m_mapQueue
.end() );
275 if( ! (*ii
).second
.second
)
277 (*ii
).second
.second
= new JobQueue();
278 bCreateThread
= true;
280 pQueue
= (*ii
).second
.second
;
284 if( ! (*ii
).second
.first
)
286 (*ii
).second
.first
= new JobQueue();
287 bCreateThread
= true;
289 pQueue
= (*ii
).second
.first
;
291 if( (*ii
).second
.second
&& ( (*ii
).second
.second
->isBusy() ) )
296 pQueue
->add( pThreadSpecificData
, doRequest
);
299 return !bCreateThread
|| createThread( pQueue
, aThreadId
, bAsynchron
);
302 void ThreadPool::prepare( const ByteSequence
&aThreadId
)
304 MutexGuard
guard( m_mutex
);
306 ThreadIdHashMap::iterator ii
= m_mapQueue
.find( aThreadId
);
308 if( ii
== m_mapQueue
.end() )
310 JobQueue
*p
= new JobQueue();
311 m_mapQueue
[ aThreadId
] = pair
< JobQueue
* , JobQueue
* > ( p
, (JobQueue
*)0 );
313 else if( 0 == (*ii
).second
.first
)
315 (*ii
).second
.first
= new JobQueue();
319 void * ThreadPool::enter( const ByteSequence
& aThreadId
, sal_Int64 nDisposeId
)
321 JobQueue
*pQueue
= 0;
323 MutexGuard
guard( m_mutex
);
325 ThreadIdHashMap::iterator ii
= m_mapQueue
.find( aThreadId
);
327 OSL_ASSERT( ii
!= m_mapQueue
.end() );
328 pQueue
= (*ii
).second
.first
;
331 OSL_ASSERT( pQueue
);
332 void *pReturn
= pQueue
->enter( nDisposeId
);
334 if( pQueue
->isCallstackEmpty() )
336 if( revokeQueue( aThreadId
, false) )
346 // All uno_ThreadPool handles in g_pThreadpoolHashSet with overlapping life
347 // spans share one ThreadPool instance. When g_pThreadpoolHashSet becomes empty
348 // (within the last uno_threadpool_destroy) all worker threads spawned by that
349 // ThreadPool instance are joined (which implies that uno_threadpool_destroy
350 // must never be called from a worker thread); afterwards, the next call to
351 // uno_threadpool_create (if any) will lead to a new ThreadPool instance.
353 using namespace cppu_threadpool
;
355 struct uno_ThreadPool_Equal
357 bool operator () ( const uno_ThreadPool
&a
, const uno_ThreadPool
&b
) const
363 struct uno_ThreadPool_Hash
365 sal_Size
operator () ( const uno_ThreadPool
&a
) const
367 return reinterpret_cast<sal_Size
>( a
);
373 typedef std::unordered_map
< uno_ThreadPool
, ThreadPoolHolder
, uno_ThreadPool_Hash
, uno_ThreadPool_Equal
> ThreadpoolHashSet
;
375 static ThreadpoolHashSet
*g_pThreadpoolHashSet
;
377 struct _uno_ThreadPool
384 ThreadPoolHolder
getThreadPool( uno_ThreadPool hPool
)
386 MutexGuard
guard( Mutex::getGlobalMutex() );
387 assert( g_pThreadpoolHashSet
!= 0 );
388 ThreadpoolHashSet::iterator
i( g_pThreadpoolHashSet
->find(hPool
) );
389 assert( i
!= g_pThreadpoolHashSet
->end() );
395 extern "C" uno_ThreadPool SAL_CALL
396 uno_threadpool_create() SAL_THROW_EXTERN_C()
398 MutexGuard
guard( Mutex::getGlobalMutex() );
400 if( ! g_pThreadpoolHashSet
)
402 g_pThreadpoolHashSet
= new ThreadpoolHashSet();
407 assert( !g_pThreadpoolHashSet
->empty() );
408 p
= g_pThreadpoolHashSet
->begin()->second
;
411 // Just ensure that the handle is unique in the process (via heap)
412 uno_ThreadPool h
= new struct _uno_ThreadPool
;
413 g_pThreadpoolHashSet
->insert( ThreadpoolHashSet::value_type(h
, p
) );
417 extern "C" void SAL_CALL
418 uno_threadpool_attach( uno_ThreadPool hPool
) SAL_THROW_EXTERN_C()
420 sal_Sequence
*pThreadId
= 0;
421 uno_getIdOfCurrentThread( &pThreadId
);
422 getThreadPool( hPool
)->prepare( pThreadId
);
423 rtl_byte_sequence_release( pThreadId
);
424 uno_releaseIdFromCurrentThread();
427 extern "C" void SAL_CALL
428 uno_threadpool_enter( uno_ThreadPool hPool
, void **ppJob
)
431 sal_Sequence
*pThreadId
= 0;
432 uno_getIdOfCurrentThread( &pThreadId
);
434 getThreadPool( hPool
)->enter(
436 sal::static_int_cast
< sal_Int64
>(
437 reinterpret_cast< sal_IntPtr
>(hPool
)) );
438 rtl_byte_sequence_release( pThreadId
);
439 uno_releaseIdFromCurrentThread();
442 extern "C" void SAL_CALL
443 uno_threadpool_detach(SAL_UNUSED_PARAMETER uno_ThreadPool
) SAL_THROW_EXTERN_C()
445 // we might do here some tiding up in case a thread called attach but never detach
448 extern "C" void SAL_CALL
449 uno_threadpool_putJob(
450 uno_ThreadPool hPool
,
451 sal_Sequence
*pThreadId
,
453 void ( SAL_CALL
* doRequest
) ( void *pThreadSpecificData
),
454 sal_Bool bIsOneway
) SAL_THROW_EXTERN_C()
456 if (!getThreadPool(hPool
)->addJob( pThreadId
, bIsOneway
, pJob
,doRequest
))
460 "uno_threadpool_putJob in parallel with uno_threadpool_destroy");
464 extern "C" void SAL_CALL
465 uno_threadpool_dispose( uno_ThreadPool hPool
) SAL_THROW_EXTERN_C()
467 getThreadPool(hPool
)->dispose(
468 sal::static_int_cast
< sal_Int64
>(
469 reinterpret_cast< sal_IntPtr
>(hPool
)) );
472 extern "C" void SAL_CALL
473 uno_threadpool_destroy( uno_ThreadPool hPool
) SAL_THROW_EXTERN_C()
475 ThreadPoolHolder
p( getThreadPool(hPool
) );
477 sal::static_int_cast
< sal_Int64
>(
478 reinterpret_cast< sal_IntPtr
>(hPool
)) );
482 OSL_ASSERT( g_pThreadpoolHashSet
);
484 MutexGuard
guard( Mutex::getGlobalMutex() );
486 ThreadpoolHashSet::iterator ii
= g_pThreadpoolHashSet
->find( hPool
);
487 OSL_ASSERT( ii
!= g_pThreadpoolHashSet
->end() );
488 g_pThreadpoolHashSet
->erase( ii
);
491 empty
= g_pThreadpoolHashSet
->empty();
494 delete g_pThreadpoolHashSet
;
495 g_pThreadpoolHashSet
= 0;
505 /* vim:set shiftwidth=4 softtabstop=4 expandtab: */