1 /* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
3 * This file is part of the LibreOffice project.
5 * This Source Code Form is subject to the terms of the Mozilla Public
6 * License, v. 2.0. If a copy of the MPL was not distributed with this
7 * file, You can obtain one at http://mozilla.org/MPL/2.0/.
9 * This file incorporates work covered by the following license notice:
11 * Licensed to the Apache Software Foundation (ASF) under one or more
12 * contributor license agreements. See the NOTICE file distributed
13 * with this work for additional information regarding copyright
14 * ownership. The ASF licenses this file to you under the Apache
15 * License, Version 2.0 (the "License"); you may not use this file
16 * except in compliance with the License. You may obtain a copy of
17 * the License at http://www.apache.org/licenses/LICENSE-2.0 .
20 #include <sal/config.h>
25 #include <unordered_map>
27 #include <osl/diagnose.h>
28 #include <osl/mutex.hxx>
29 #include <rtl/instance.hxx>
30 #include <sal/log.hxx>
32 #include <uno/threadpool.h>
34 #include "threadpool.hxx"
37 using namespace ::std
;
38 using namespace ::osl
;
39 using namespace ::rtl
;
41 namespace cppu_threadpool
43 WaitingThread::WaitingThread(
44 rtl::Reference
<ORequestThread
> const & theThread
): thread(theThread
)
49 struct theDisposedCallerAdmin
:
50 public rtl::StaticWithInit
< DisposedCallerAdminHolder
, theDisposedCallerAdmin
>
52 DisposedCallerAdminHolder
operator () () {
53 return std::make_shared
<DisposedCallerAdmin
>();
59 DisposedCallerAdminHolder
const & DisposedCallerAdmin::getInstance()
61 return theDisposedCallerAdmin::get();
64 DisposedCallerAdmin::~DisposedCallerAdmin()
66 SAL_WARN_IF( !m_vector
.empty(), "cppu.threadpool", "DisposedCallerList : " << m_vector
.size() << " left");
69 void DisposedCallerAdmin::dispose( void const * nDisposeId
)
71 MutexGuard
guard( m_mutex
);
72 m_vector
.push_back( nDisposeId
);
75 void DisposedCallerAdmin::destroy( void const * nDisposeId
)
77 MutexGuard
guard( m_mutex
);
78 m_vector
.erase(std::remove(m_vector
.begin(), m_vector
.end(), nDisposeId
), m_vector
.end());
81 bool DisposedCallerAdmin::isDisposed( void const * nDisposeId
)
83 MutexGuard
guard( m_mutex
);
84 return (std::find(m_vector
.begin(), m_vector
.end(), nDisposeId
) != m_vector
.end());
88 ThreadPool::ThreadPool() :
89 m_DisposedCallerAdmin( DisposedCallerAdmin::getInstance() )
93 ThreadPool::~ThreadPool()
95 SAL_WARN_IF( m_mapQueue
.size(), "cppu.threadpool", "ThreadIdHashMap: " << m_mapQueue
.size() << " left");
98 void ThreadPool::dispose( void const * nDisposeId
)
100 m_DisposedCallerAdmin
->dispose( nDisposeId
);
102 MutexGuard
guard( m_mutex
);
103 for (auto const& item
: m_mapQueue
)
105 if( item
.second
.first
)
107 item
.second
.first
->dispose( nDisposeId
);
109 if( item
.second
.second
)
111 item
.second
.second
->dispose( nDisposeId
);
116 void ThreadPool::destroy( void const * nDisposeId
)
118 m_DisposedCallerAdmin
->destroy( nDisposeId
);
122 * This methods lets the thread wait a certain amount of time. If within this timespan
123 * a new request comes in, this thread is reused. This is done only to improve performance,
124 * it is not required for threadpool functionality.
126 void ThreadPool::waitInPool( rtl::Reference
< ORequestThread
> const & pThread
)
128 WaitingThread
waitingThread(pThread
);
130 MutexGuard
guard( m_mutexWaitingThreadList
);
131 m_dequeThreads
.push_front( &waitingThread
);
134 // let the thread wait 2 seconds
135 waitingThread
.condition
.wait( std::chrono::seconds(2) );
138 MutexGuard
guard ( m_mutexWaitingThreadList
);
139 if( waitingThread
.thread
.is() )
141 // thread wasn't reused, remove it from the list
142 WaitingThreadDeque::iterator ii
= find(
143 m_dequeThreads
.begin(), m_dequeThreads
.end(), &waitingThread
);
144 OSL_ASSERT( ii
!= m_dequeThreads
.end() );
145 m_dequeThreads
.erase( ii
);
150 void ThreadPool::joinWorkers()
153 MutexGuard
guard( m_mutexWaitingThreadList
);
154 for (auto const& thread
: m_dequeThreads
)
156 // wake the threads up
157 thread
->condition
.set();
160 m_aThreadAdmin
.join();
163 bool ThreadPool::createThread( JobQueue
*pQueue
,
164 const ByteSequence
&aThreadId
,
168 // Can a thread be reused ?
169 MutexGuard
guard( m_mutexWaitingThreadList
);
170 if( ! m_dequeThreads
.empty() )
172 // inform the thread and let it go
173 struct WaitingThread
*pWaitingThread
= m_dequeThreads
.back();
174 pWaitingThread
->thread
->setTask( pQueue
, aThreadId
, bAsynchron
);
175 pWaitingThread
->thread
= nullptr;
178 m_dequeThreads
.pop_back();
181 pWaitingThread
->condition
.set();
186 rtl::Reference
pThread(
187 new ORequestThread( this, pQueue
, aThreadId
, bAsynchron
) );
188 return pThread
->launch();
191 bool ThreadPool::revokeQueue( const ByteSequence
&aThreadId
, bool bAsynchron
)
193 MutexGuard
guard( m_mutex
);
195 ThreadIdHashMap::iterator ii
= m_mapQueue
.find( aThreadId
);
196 OSL_ASSERT( ii
!= m_mapQueue
.end() );
200 if( ! (*ii
).second
.second
->isEmpty() )
202 // another thread has put something into the queue
206 (*ii
).second
.second
= nullptr;
207 if( (*ii
).second
.first
)
209 // all oneway request have been processed, now
210 // synchronous requests may go on
211 (*ii
).second
.first
->resume();
216 if( ! (*ii
).second
.first
->isEmpty() )
218 // another thread has put something into the queue
221 (*ii
).second
.first
= nullptr;
224 if( nullptr == (*ii
).second
.first
&& nullptr == (*ii
).second
.second
)
226 m_mapQueue
.erase( ii
);
233 bool ThreadPool::addJob(
234 const ByteSequence
&aThreadId
,
236 void *pThreadSpecificData
,
237 RequestFun
* doRequest
,
238 void const * disposeId
)
240 bool bCreateThread
= false;
241 JobQueue
*pQueue
= nullptr;
243 MutexGuard
guard( m_mutex
);
244 if (m_DisposedCallerAdmin
->isDisposed(disposeId
)) {
248 ThreadIdHashMap::iterator ii
= m_mapQueue
.find( aThreadId
);
250 if( ii
== m_mapQueue
.end() )
252 m_mapQueue
[ aThreadId
] = pair
< JobQueue
* , JobQueue
* > ( nullptr , nullptr );
253 ii
= m_mapQueue
.find( aThreadId
);
254 OSL_ASSERT( ii
!= m_mapQueue
.end() );
259 if( ! (*ii
).second
.second
)
261 (*ii
).second
.second
= new JobQueue();
262 bCreateThread
= true;
264 pQueue
= (*ii
).second
.second
;
268 if( ! (*ii
).second
.first
)
270 (*ii
).second
.first
= new JobQueue();
271 bCreateThread
= true;
273 pQueue
= (*ii
).second
.first
;
275 if( (*ii
).second
.second
&& ( (*ii
).second
.second
->isBusy() ) )
280 pQueue
->add( pThreadSpecificData
, doRequest
);
283 return !bCreateThread
|| createThread( pQueue
, aThreadId
, bAsynchron
);
286 void ThreadPool::prepare( const ByteSequence
&aThreadId
)
288 MutexGuard
guard( m_mutex
);
290 ThreadIdHashMap::iterator ii
= m_mapQueue
.find( aThreadId
);
292 if( ii
== m_mapQueue
.end() )
294 JobQueue
*p
= new JobQueue();
295 m_mapQueue
[ aThreadId
] = pair
< JobQueue
* , JobQueue
* > ( p
, nullptr );
297 else if( nullptr == (*ii
).second
.first
)
299 (*ii
).second
.first
= new JobQueue();
303 void * ThreadPool::enter( const ByteSequence
& aThreadId
, void const * nDisposeId
)
305 JobQueue
*pQueue
= nullptr;
307 MutexGuard
guard( m_mutex
);
309 ThreadIdHashMap::iterator ii
= m_mapQueue
.find( aThreadId
);
311 OSL_ASSERT( ii
!= m_mapQueue
.end() );
312 pQueue
= (*ii
).second
.first
;
315 OSL_ASSERT( pQueue
);
316 void *pReturn
= pQueue
->enter( nDisposeId
);
318 if( pQueue
->isCallstackEmpty() )
320 if( revokeQueue( aThreadId
, false) )
330 // All uno_ThreadPool handles in g_pThreadpoolHashSet with overlapping life
331 // spans share one ThreadPool instance. When g_pThreadpoolHashSet becomes empty
332 // (within the last uno_threadpool_destroy) all worker threads spawned by that
333 // ThreadPool instance are joined (which implies that uno_threadpool_destroy
334 // must never be called from a worker thread); afterwards, the next call to
335 // uno_threadpool_create (if any) will lead to a new ThreadPool instance.
337 using namespace cppu_threadpool
;
341 struct uno_ThreadPool_Equal
343 bool operator () ( const uno_ThreadPool
&a
, const uno_ThreadPool
&b
) const
349 struct uno_ThreadPool_Hash
351 std::size_t operator () ( const uno_ThreadPool
&a
) const
353 return reinterpret_cast<std::size_t>( a
);
359 typedef std::unordered_map
< uno_ThreadPool
, ThreadPoolHolder
, uno_ThreadPool_Hash
, uno_ThreadPool_Equal
> ThreadpoolHashSet
;
361 static ThreadpoolHashSet
*g_pThreadpoolHashSet
;
363 struct _uno_ThreadPool
370 ThreadPoolHolder
getThreadPool( uno_ThreadPool hPool
)
372 MutexGuard
guard( Mutex::getGlobalMutex() );
373 assert( g_pThreadpoolHashSet
!= nullptr );
374 ThreadpoolHashSet::iterator
i( g_pThreadpoolHashSet
->find(hPool
) );
375 assert( i
!= g_pThreadpoolHashSet
->end() );
381 extern "C" uno_ThreadPool SAL_CALL
382 uno_threadpool_create() SAL_THROW_EXTERN_C()
384 MutexGuard
guard( Mutex::getGlobalMutex() );
386 if( ! g_pThreadpoolHashSet
)
388 g_pThreadpoolHashSet
= new ThreadpoolHashSet
;
393 assert( !g_pThreadpoolHashSet
->empty() );
394 p
= g_pThreadpoolHashSet
->begin()->second
;
397 // Just ensure that the handle is unique in the process (via heap)
398 uno_ThreadPool h
= new struct _uno_ThreadPool
;
399 g_pThreadpoolHashSet
->emplace( h
, p
);
403 extern "C" void SAL_CALL
404 uno_threadpool_attach( uno_ThreadPool hPool
) SAL_THROW_EXTERN_C()
406 sal_Sequence
*pThreadId
= nullptr;
407 uno_getIdOfCurrentThread( &pThreadId
);
408 getThreadPool( hPool
)->prepare( pThreadId
);
409 rtl_byte_sequence_release( pThreadId
);
410 uno_releaseIdFromCurrentThread();
413 extern "C" void SAL_CALL
414 uno_threadpool_enter( uno_ThreadPool hPool
, void **ppJob
)
417 sal_Sequence
*pThreadId
= nullptr;
418 uno_getIdOfCurrentThread( &pThreadId
);
420 getThreadPool( hPool
)->enter(
423 rtl_byte_sequence_release( pThreadId
);
424 uno_releaseIdFromCurrentThread();
427 extern "C" void SAL_CALL
428 uno_threadpool_detach(SAL_UNUSED_PARAMETER uno_ThreadPool
) SAL_THROW_EXTERN_C()
430 // we might do here some tidying up in case a thread called attach but never detach
433 extern "C" void SAL_CALL
434 uno_threadpool_putJob(
435 uno_ThreadPool hPool
,
436 sal_Sequence
*pThreadId
,
438 void ( SAL_CALL
* doRequest
) ( void *pThreadSpecificData
),
439 sal_Bool bIsOneway
) SAL_THROW_EXTERN_C()
441 if (!getThreadPool(hPool
)->addJob( pThreadId
, bIsOneway
, pJob
,doRequest
, hPool
))
445 "uno_threadpool_putJob in parallel with uno_threadpool_destroy");
449 extern "C" void SAL_CALL
450 uno_threadpool_dispose( uno_ThreadPool hPool
) SAL_THROW_EXTERN_C()
452 getThreadPool(hPool
)->dispose(
456 extern "C" void SAL_CALL
457 uno_threadpool_destroy( uno_ThreadPool hPool
) SAL_THROW_EXTERN_C()
459 ThreadPoolHolder
p( getThreadPool(hPool
) );
465 OSL_ASSERT( g_pThreadpoolHashSet
);
467 MutexGuard
guard( Mutex::getGlobalMutex() );
469 ThreadpoolHashSet::iterator ii
= g_pThreadpoolHashSet
->find( hPool
);
470 OSL_ASSERT( ii
!= g_pThreadpoolHashSet
->end() );
471 g_pThreadpoolHashSet
->erase( ii
);
474 empty
= g_pThreadpoolHashSet
->empty();
477 delete g_pThreadpoolHashSet
;
478 g_pThreadpoolHashSet
= nullptr;
488 /* vim:set shiftwidth=4 softtabstop=4 expandtab: */