1 /* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
3 * This file is part of the LibreOffice project.
5 * This Source Code Form is subject to the terms of the Mozilla Public
6 * License, v. 2.0. If a copy of the MPL was not distributed with this
7 * file, You can obtain one at http://mozilla.org/MPL/2.0/.
9 * This file incorporates work covered by the following license notice:
11 * Licensed to the Apache Software Foundation (ASF) under one or more
12 * contributor license agreements. See the NOTICE file distributed
13 * with this work for additional information regarding copyright
14 * ownership. The ASF licenses this file to you under the Apache
15 * License, Version 2.0 (the "License"); you may not use this file
16 * except in compliance with the License. You may obtain a copy of
17 * the License at http://www.apache.org/licenses/LICENSE-2.0 .
20 #include <sal/config.h>
26 #include <unordered_map>
28 #include <osl/diagnose.h>
29 #include <sal/log.hxx>
31 #include <uno/threadpool.h>
33 #include "threadpool.hxx"
36 using namespace ::osl
;
37 using namespace ::rtl
;
39 namespace cppu_threadpool
41 WaitingThread::WaitingThread(
42 rtl::Reference
<ORequestThread
> theThread
): thread(std::move(theThread
))
45 DisposedCallerAdminHolder
const & DisposedCallerAdmin::getInstance()
47 static DisposedCallerAdminHolder theDisposedCallerAdmin
= std::make_shared
<DisposedCallerAdmin
>();
48 return theDisposedCallerAdmin
;
51 DisposedCallerAdmin::~DisposedCallerAdmin()
53 SAL_WARN_IF( !m_vector
.empty(), "cppu.threadpool", "DisposedCallerList : " << m_vector
.size() << " left");
56 void DisposedCallerAdmin::dispose( void const * nDisposeId
)
58 std::scoped_lock
guard( m_mutex
);
59 m_vector
.push_back( nDisposeId
);
62 void DisposedCallerAdmin::destroy( void const * nDisposeId
)
64 std::scoped_lock
guard( m_mutex
);
65 std::erase(m_vector
, nDisposeId
);
68 bool DisposedCallerAdmin::isDisposed( void const * nDisposeId
)
70 std::scoped_lock
guard( m_mutex
);
71 return (std::find(m_vector
.begin(), m_vector
.end(), nDisposeId
) != m_vector
.end());
75 ThreadPool::ThreadPool() :
76 m_DisposedCallerAdmin( DisposedCallerAdmin::getInstance() )
80 ThreadPool::~ThreadPool()
82 SAL_WARN_IF( m_mapQueue
.size(), "cppu.threadpool", "ThreadIdHashMap: " << m_mapQueue
.size() << " left");
85 void ThreadPool::dispose( void const * nDisposeId
)
87 m_DisposedCallerAdmin
->dispose( nDisposeId
);
89 std::scoped_lock
guard( m_mutex
);
90 for (auto const& item
: m_mapQueue
)
92 if( item
.second
.first
)
94 item
.second
.first
->dispose( nDisposeId
);
96 if( item
.second
.second
)
98 item
.second
.second
->dispose( nDisposeId
);
103 void ThreadPool::destroy( void const * nDisposeId
)
105 m_DisposedCallerAdmin
->destroy( nDisposeId
);
109 * This method lets the thread wait a certain amount of time. If within this timespan
110 * a new request comes in, this thread is reused. This is done only to improve performance,
111 * it is not required for threadpool functionality.
113 void ThreadPool::waitInPool( rtl::Reference
< ORequestThread
> const & pThread
)
115 WaitingThread
waitingThread(pThread
);
117 std::scoped_lock
guard( m_mutexWaitingThreadList
);
118 m_dequeThreads
.push_front( &waitingThread
);
121 // let the thread wait 2 seconds
122 waitingThread
.condition
.wait( std::chrono::seconds(2) );
125 std::scoped_lock
guard ( m_mutexWaitingThreadList
);
126 if( waitingThread
.thread
.is() )
128 // thread wasn't reused, remove it from the list
129 WaitingThreadDeque::iterator ii
= find(
130 m_dequeThreads
.begin(), m_dequeThreads
.end(), &waitingThread
);
131 OSL_ASSERT( ii
!= m_dequeThreads
.end() );
132 m_dequeThreads
.erase( ii
);
137 void ThreadPool::joinWorkers()
140 std::scoped_lock
guard( m_mutexWaitingThreadList
);
141 for (auto const& thread
: m_dequeThreads
)
143 // wake the threads up
144 thread
->condition
.set();
147 m_aThreadAdmin
.join();
150 bool ThreadPool::createThread( JobQueue
*pQueue
,
151 const ByteSequence
&aThreadId
,
155 // Can a thread be reused ?
156 std::scoped_lock
guard( m_mutexWaitingThreadList
);
157 if( ! m_dequeThreads
.empty() )
159 // inform the thread and let it go
160 struct WaitingThread
*pWaitingThread
= m_dequeThreads
.back();
161 pWaitingThread
->thread
->setTask( pQueue
, aThreadId
, bAsynchron
);
162 pWaitingThread
->thread
= nullptr;
165 m_dequeThreads
.pop_back();
168 pWaitingThread
->condition
.set();
173 rtl::Reference
pThread(
174 new ORequestThread( this, pQueue
, aThreadId
, bAsynchron
) );
175 return pThread
->launch();
178 bool ThreadPool::revokeQueue( const ByteSequence
&aThreadId
, bool bAsynchron
)
180 std::scoped_lock
guard( m_mutex
);
182 ThreadIdHashMap::iterator ii
= m_mapQueue
.find( aThreadId
);
183 OSL_ASSERT( ii
!= m_mapQueue
.end() );
187 if( ! (*ii
).second
.second
->isEmpty() )
189 // another thread has put something into the queue
193 (*ii
).second
.second
= nullptr;
194 if( (*ii
).second
.first
)
196 // all oneway request have been processed, now
197 // synchronous requests may go on
198 (*ii
).second
.first
->resume();
203 if( ! (*ii
).second
.first
->isEmpty() )
205 // another thread has put something into the queue
208 (*ii
).second
.first
= nullptr;
211 if( nullptr == (*ii
).second
.first
&& nullptr == (*ii
).second
.second
)
213 m_mapQueue
.erase( ii
);
220 bool ThreadPool::addJob(
221 const ByteSequence
&aThreadId
,
223 void *pThreadSpecificData
,
224 RequestFun
* doRequest
,
225 void const * disposeId
)
227 bool bCreateThread
= false;
228 JobQueue
*pQueue
= nullptr;
230 std::scoped_lock
guard( m_mutex
);
231 if (m_DisposedCallerAdmin
->isDisposed(disposeId
)) {
235 ThreadIdHashMap::iterator ii
= m_mapQueue
.find( aThreadId
);
237 if( ii
== m_mapQueue
.end() )
239 m_mapQueue
[ aThreadId
] = std::pair
< JobQueue
* , JobQueue
* > ( nullptr , nullptr );
240 ii
= m_mapQueue
.find( aThreadId
);
241 OSL_ASSERT( ii
!= m_mapQueue
.end() );
246 if( ! (*ii
).second
.second
)
248 (*ii
).second
.second
= new JobQueue();
249 bCreateThread
= true;
251 pQueue
= (*ii
).second
.second
;
255 if( ! (*ii
).second
.first
)
257 (*ii
).second
.first
= new JobQueue();
258 bCreateThread
= true;
260 pQueue
= (*ii
).second
.first
;
262 if( (*ii
).second
.second
&& ( (*ii
).second
.second
->isBusy() ) )
267 pQueue
->add( pThreadSpecificData
, doRequest
);
270 return !bCreateThread
|| createThread( pQueue
, aThreadId
, bAsynchron
);
273 void ThreadPool::prepare( const ByteSequence
&aThreadId
)
275 std::scoped_lock
guard( m_mutex
);
277 ThreadIdHashMap::iterator ii
= m_mapQueue
.find( aThreadId
);
279 if( ii
== m_mapQueue
.end() )
281 JobQueue
*p
= new JobQueue();
282 m_mapQueue
[ aThreadId
] = std::pair
< JobQueue
* , JobQueue
* > ( p
, nullptr );
284 else if( nullptr == (*ii
).second
.first
)
286 (*ii
).second
.first
= new JobQueue();
290 void * ThreadPool::enter( const ByteSequence
& aThreadId
, void const * nDisposeId
)
292 JobQueue
*pQueue
= nullptr;
294 std::scoped_lock
guard( m_mutex
);
296 ThreadIdHashMap::iterator ii
= m_mapQueue
.find( aThreadId
);
298 assert(ii
!= m_mapQueue
.end());
299 pQueue
= (*ii
).second
.first
;
303 void *pReturn
= pQueue
->enter( nDisposeId
);
305 if( pQueue
->isCallstackEmpty() )
307 if( revokeQueue( aThreadId
, false) )
317 // All uno_ThreadPool handles in g_pThreadpoolHashSet with overlapping life
318 // spans share one ThreadPool instance. When g_pThreadpoolHashSet becomes empty
319 // (within the last uno_threadpool_destroy) all worker threads spawned by that
320 // ThreadPool instance are joined (which implies that uno_threadpool_destroy
321 // must never be called from a worker thread); afterwards, the next call to
322 // uno_threadpool_create (if any) will lead to a new ThreadPool instance.
324 using namespace cppu_threadpool
;
328 struct uno_ThreadPool_Equal
330 bool operator () ( const uno_ThreadPool
&a
, const uno_ThreadPool
&b
) const
336 struct uno_ThreadPool_Hash
338 std::size_t operator () ( const uno_ThreadPool
&a
) const
340 return reinterpret_cast<std::size_t>( a
);
346 typedef std::unordered_map
< uno_ThreadPool
, ThreadPoolHolder
, uno_ThreadPool_Hash
, uno_ThreadPool_Equal
> ThreadpoolHashSet
;
348 static ThreadpoolHashSet
*g_pThreadpoolHashSet
;
350 struct _uno_ThreadPool
357 ThreadPoolHolder
getThreadPool( uno_ThreadPool hPool
)
359 MutexGuard
guard( Mutex::getGlobalMutex() );
360 assert( g_pThreadpoolHashSet
!= nullptr );
361 ThreadpoolHashSet::iterator
i( g_pThreadpoolHashSet
->find(hPool
) );
362 assert( i
!= g_pThreadpoolHashSet
->end() );
368 extern "C" uno_ThreadPool SAL_CALL
369 uno_threadpool_create() noexcept
371 MutexGuard
guard( Mutex::getGlobalMutex() );
373 if( ! g_pThreadpoolHashSet
)
375 g_pThreadpoolHashSet
= new ThreadpoolHashSet
;
380 assert( !g_pThreadpoolHashSet
->empty() );
381 p
= g_pThreadpoolHashSet
->begin()->second
;
384 // Just ensure that the handle is unique in the process (via heap)
385 uno_ThreadPool h
= new struct _uno_ThreadPool
;
386 g_pThreadpoolHashSet
->emplace( h
, p
);
390 extern "C" void SAL_CALL
391 uno_threadpool_attach( uno_ThreadPool hPool
) noexcept
393 sal_Sequence
*pThreadId
= nullptr;
394 uno_getIdOfCurrentThread( &pThreadId
);
395 getThreadPool( hPool
)->prepare( pThreadId
);
396 rtl_byte_sequence_release( pThreadId
);
397 uno_releaseIdFromCurrentThread();
400 extern "C" void SAL_CALL
401 uno_threadpool_enter( uno_ThreadPool hPool
, void **ppJob
) noexcept
403 sal_Sequence
*pThreadId
= nullptr;
404 uno_getIdOfCurrentThread( &pThreadId
);
406 getThreadPool( hPool
)->enter(
409 rtl_byte_sequence_release( pThreadId
);
410 uno_releaseIdFromCurrentThread();
413 extern "C" void SAL_CALL
414 uno_threadpool_detach(SAL_UNUSED_PARAMETER uno_ThreadPool
) noexcept
416 // we might do here some tidying up in case a thread called attach but never detach
419 extern "C" void SAL_CALL
420 uno_threadpool_putJob(
421 uno_ThreadPool hPool
,
422 sal_Sequence
*pThreadId
,
424 void ( SAL_CALL
* doRequest
) ( void *pThreadSpecificData
),
425 sal_Bool bIsOneway
) noexcept
427 if (!getThreadPool(hPool
)->addJob( pThreadId
, bIsOneway
, pJob
,doRequest
, hPool
))
431 "uno_threadpool_putJob in parallel with uno_threadpool_destroy");
435 extern "C" void SAL_CALL
436 uno_threadpool_dispose( uno_ThreadPool hPool
) noexcept
438 getThreadPool(hPool
)->dispose(
442 extern "C" void SAL_CALL
443 uno_threadpool_destroy( uno_ThreadPool hPool
) noexcept
445 ThreadPoolHolder
p( getThreadPool(hPool
) );
451 assert(g_pThreadpoolHashSet
);
453 MutexGuard
guard( Mutex::getGlobalMutex() );
455 ThreadpoolHashSet::iterator ii
= g_pThreadpoolHashSet
->find( hPool
);
456 OSL_ASSERT( ii
!= g_pThreadpoolHashSet
->end() );
457 g_pThreadpoolHashSet
->erase( ii
);
460 empty
= g_pThreadpoolHashSet
->empty();
463 delete g_pThreadpoolHashSet
;
464 g_pThreadpoolHashSet
= nullptr;
474 /* vim:set shiftwidth=4 softtabstop=4 expandtab: */