1 #include "tao/Dynamic_TP/DTP_Thread_Pool.h"
3 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0
5 #if ! defined (__ACE_INLINE__)
6 #include "tao/Dynamic_TP/DTP_Thread_Pool.inl"
7 #endif /* __ACE_INLINE__ */
9 #include "tao/Exception.h"
10 #include "tao/ORB_Core.h"
11 #include "tao/ORB_Core_TSS_Resources.h"
12 #include "tao/TSS_Resources.h"
14 #include "tao/Acceptor_Registry.h"
15 #include "tao/debug.h"
16 #include "tao/LF_Follower.h"
17 #include "tao/Leader_Follower.h"
20 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
22 TAO_DTP_New_Leader_Generator::TAO_DTP_New_Leader_Generator (
23 TAO_DTP_Thread_Pool
&p
)
29 TAO_DTP_New_Leader_Generator::no_leaders_available ()
31 return this->pool_
.new_dynamic_thread ();
34 TAO_DTP_Termination_Waiter::TAO_DTP_Termination_Waiter (TAO_DTP_Thread_Pool
&p
)
35 : ACE_Task_Base (p
.manager ().orb_core ().thr_mgr ()),
41 TAO_DTP_Termination_Waiter::svc ()
46 ACE_GUARD_RETURN (TAO_SYNCH_MUTEX
,
48 this->pool_
.termination_lock_
,
50 this->pool_
.termination_cond_
.wait ();
53 while (this->pool_
.active_count_
> 0);
57 TAO_DTP_Thread_Pool_Threads::TAO_DTP_Thread_Pool_Threads (TAO_DTP_Thread_Pool
&p
)
58 : ACE_Task_Base (p
.manager ().orb_core ().thr_mgr ()),
64 TAO_DTP_Thread_Pool_Threads::svc ()
66 if (TAO_debug_level
> 7)
68 TAOLIB_DEBUG ((LM_DEBUG
,
69 ACE_TEXT ("TAO (%P|%t) new DTP thread signaling waiter\n")));
72 ACE_GUARD_RETURN (TAO_SYNCH_MUTEX
,
74 this->pool_
.activation_lock_
,
76 this->pool_
.activation_cond_
.broadcast ();
78 if (TAO_debug_level
> 7)
80 TAOLIB_DEBUG ((LM_DEBUG
,
81 ACE_TEXT ("TAO (%P|%t) new DTP thread signal complete\n")));
84 TAO_ORB_Core
&orb_core
= this->pool_
.manager ().orb_core ();
85 if (orb_core
.has_shutdown ())
93 catch (const ::CORBA::Exception
& ex
)
95 // No point propagating this exception. Print it out.
96 TAOLIB_ERROR ((LM_ERROR
,
97 ACE_TEXT ("orb->run() raised exception for thread %t\n")));
99 ex
._tao_print_exception ("");
103 ACE_GUARD_RETURN (TAO_SYNCH_MUTEX
,
105 this->pool_
.termination_lock_
,
107 this->pool_
.termination_cond_
.broadcast ();
114 TAO_DTP_Thread_Pool_Threads::run (TAO_ORB_Core
&orb_core
)
116 CORBA::ORB_ptr orb
= orb_core
.orb ();
117 // A timeout is specified, run the ORB in an idle loop, if we
118 // don't handle any operations for the given timeout we just
119 // exit the loop and this thread ends itself.
120 if (TAO_debug_level
> 7)
122 TAOLIB_DEBUG ((LM_DEBUG
,
123 ACE_TEXT ("TAO (%P|%t) - DTP Pool %d - ")
124 ACE_TEXT ("Starting worker, count = %d; ")
125 ACE_TEXT ("setting timeout for %d sec, %d usec\n"),
128 this->pool_
.dynamic_thread_time().sec(),
129 this->pool_
.dynamic_thread_time().usec()));
133 this->pool_
.add_active();
134 bool must_deactivate
= true;
135 while (!orb_core
.has_shutdown ())
137 tv
= this->pool_
.dynamic_thread_time ();
138 orb
->perform_work (tv
);
139 bool timeout
= errno
== ETIME
;
140 if (TAO_debug_level
> 7)
142 TAOLIB_DEBUG ((LM_DEBUG
,
143 ACE_TEXT ("TAO (%P|%t) - DTP Pool %d ")
144 ACE_TEXT ("run: above_min = %d, timeout = %d\n"),
145 this->pool_
.id(), this->pool_
.above_minimum(), timeout
));
147 if (timeout
&& this->pool_
.remove_active (false))
149 // we've timed out, but the pool is not yet at the minimum
150 must_deactivate
= false;
156 this->pool_
.remove_active (true);
159 if (TAO_debug_level
> 7)
161 TAOLIB_DEBUG ((LM_DEBUG
,
162 ACE_TEXT ("TAO (%P|%t) - DTP Pool %d ")
163 ACE_TEXT ("Terminating worker, remaining pool thread count = %d\n"),
165 this->thr_count () - 1));
172 TAO_DTP_Thread_Pool::above_minimum ()
174 return this->definition_
.min_threads_
> 0 &&
175 (int)this->active_count_
> this->definition_
.min_threads_
;
179 TAO_DTP_Thread_Pool::current_threads () const
181 ACE_GUARD_RETURN (TAO_SYNCH_MUTEX
,
186 return static_cast<CORBA::ULong
> (this->threads_
.thr_count ());
190 TAO_DTP_Thread_Pool::add_active ()
192 ACE_GUARD (TAO_SYNCH_MUTEX
, mon
, this->lock_
);
193 ++this->active_count_
;
197 TAO_DTP_Thread_Pool::remove_active (bool force
)
199 ACE_GUARD_RETURN (TAO_SYNCH_MUTEX
, mon
, this->lock_
, false);
200 if (force
|| this->above_minimum())
202 --this->active_count_
;
209 TAO_DTP_Thread_Pool::create_initial_threads ()
211 ACE_GUARD_RETURN (TAO_SYNCH_MUTEX
,
216 // Create initial threads.
217 // first, create the minimum number of threads as static
218 // if the min threads count is -1 that means all threads are static
220 size_t count
= (size_t)this->definition_
.init_threads_
;
222 if (TAO_debug_level
> 7)
224 TAOLIB_DEBUG ((LM_DEBUG
,
225 ACE_TEXT ("(%P|%t) DTP_Thread_Pool::create_initial_threads ")
226 ACE_TEXT ("Creating %d threads\n"),
230 int result
= this->create_threads_i (count
);
233 this->waiter_
.activate (THR_NEW_LWP
| THR_JOINABLE
);
239 TAO_DTP_Thread_Pool::new_dynamic_thread ()
241 // Note that we are checking this condition below without the lock
243 if (TAO_debug_level
> 0)
245 TAOLIB_DEBUG ((LM_DEBUG
,
246 ACE_TEXT ("TAO (%P|%t) DTP Pool %d new_dynamic_thread, ")
247 ACE_TEXT ("max = %d, current = %d\n"),
248 this->id_
, this->definition_
.max_threads_
,
249 (int)this->threads_
.thr_count ()));
252 if (this->manager_
.orb_core ().has_shutdown () || this->shutdown_
||
253 (this->definition_
.max_threads_
> 0 &&
254 (int)this->active_count_
>= this->definition_
.max_threads_
))
259 ACE_GUARD_RETURN (TAO_SYNCH_MUTEX
,
264 if (this->definition_
.max_threads_
== -1 ||
265 (int)this->active_count_
< this->definition_
.max_threads_
)
267 if (TAO_debug_level
> 7)
268 TAOLIB_DEBUG ((LM_DEBUG
,
269 ACE_TEXT ("TAO (%P|%t) DTP Pool %d new_dynamic_thread, ")
270 ACE_TEXT ("count = %d, creating new thread\n"),
272 this->active_count_
));
274 if (this->create_threads_i (1))
276 if (TAO_debug_level
> 0)
278 TAOLIB_ERROR ((LM_ERROR
,
279 ACE_TEXT ("Pool %d Thread %t: ")
280 ACE_TEXT ("cannot create dynamic thread\n"),
291 TAO_DTP_Thread_Pool::create_threads_i (size_t count
)
293 // Overwritten parameters.
294 int force_active
= 1;
295 long thread_flags
= THR_NEW_LWP
| THR_DETACHED
;
297 // Default parameters.
298 int default_grp_id
= -1;
299 ACE_Task_Base
*default_task
= 0;
300 ACE_hthread_t
*default_thread_handles
= 0;
301 void **default_stack
= 0;
303 // Setting stack size.
304 size_t *stack_size_array
= 0;
305 ACE_NEW_RETURN (stack_size_array
,
309 for (index
= 0; index
!= count
; ++index
)
310 stack_size_array
[index
] =
311 this->definition_
.stack_size_
;
313 // Make sure the dynamically created stack size array is properly
315 std::unique_ptr
<size_t[]> auto_stack_size_array (stack_size_array
);
317 TAO_ORB_Core
&orb_core
= manager_
.orb_core ();
321 orb_core
.orb_params ()->thread_creation_flags ();
323 int default_priority
= 0;
325 // Activate the threads.
326 if (TAO_debug_level
> 7)
328 TAOLIB_DEBUG ((LM_DEBUG
,
329 ACE_TEXT ("TAO (%P|%t) new DTP thread requested\n")));
332 ACE_GUARD_RETURN (TAO_SYNCH_MUTEX
,
334 this->activation_lock_
,
337 this->threads_
.activate (flags
,
338 static_cast<int> (count
),
343 default_thread_handles
,
348 if (TAO_debug_level
> 7)
350 TAOLIB_DEBUG ((LM_DEBUG
,
351 ACE_TEXT ("TAO (%P|%t) new DTP thread requester waiting\n")));
353 this->activation_cond_
.wait ();
357 if (TAO_debug_level
> 0)
359 TAOLIB_ERROR ((LM_ERROR
,
360 ACE_TEXT ("TAO (%P|%t) new DTP thread failed %p\n"),
361 ACE_TEXT ("ACE_Task_Base::activate")));
365 if (TAO_debug_level
> 7)
367 TAOLIB_DEBUG ((LM_DEBUG
,
368 ACE_TEXT ("TAO (%P|%t) new DTP thread requester running\n")));
374 TAO_DTP_Thread_Pool::TAO_DTP_Thread_Pool (TAO_DTP_Thread_Pool_Manager
&manager
,
376 TAO_DTP_Definition
&def
)
377 : manager_ (manager
),
384 new_thread_generator_ (*this),
387 activation_cond_ (activation_lock_
),
388 termination_lock_ (),
389 termination_cond_ (termination_lock_
)
391 manager_
.orb_core ().leader_follower ().set_new_leader_generator (
392 &new_thread_generator_
);
396 TAO_DTP_Thread_Pool::open ()
398 // Nothing to do for now
401 TAO_DTP_Thread_Pool::~TAO_DTP_Thread_Pool ()
406 TAO_DTP_Thread_Pool::shutting_down ()
408 ACE_GUARD (TAO_SYNCH_MUTEX
,
412 // We are shutting down, this way we are not creating any more new dynamic
414 this->shutdown_
= true;
419 TAO_DTP_Thread_Pool::wait ()
421 this->waiter_
.wait ();
424 #define TAO_THREAD_POOL_MANAGER_GUARD \
425 ACE_GUARD_THROW_EX ( \
430 CORBA::SystemException::_tao_minor_code ( \
433 CORBA::COMPLETED_NO));
435 TAO_DTP_Thread_Pool_Manager::TAO_DTP_Thread_Pool_Manager (TAO_ORB_Core
&orb_core
)
436 : orb_core_ (orb_core
),
438 thread_pool_id_counter_ (1),
443 TAO_DTP_Thread_Pool_Manager::~TAO_DTP_Thread_Pool_Manager ()
445 // Delete all the pools.
446 for (THREAD_POOLS::ITERATOR iterator
= this->thread_pools_
.begin ();
447 iterator
!= this->thread_pools_
.end ();
449 delete (*iterator
).int_id_
;
454 TAO_DTP_Thread_Pool_Manager::wait ()
456 for (THREAD_POOLS::ITERATOR iterator
= this->thread_pools_
.begin ();
457 iterator
!= this->thread_pools_
.end ();
459 (*iterator
).int_id_
->wait ();
463 TAO_DTP_Thread_Pool_Manager::create_threadpool (TAO_DTP_Definition
&def
)
465 TAO_THREAD_POOL_MANAGER_GUARD
;
467 return this->create_threadpool_i (def
);
471 TAO_DTP_Thread_Pool_Manager::destroy_threadpool (CORBA::ULong threadpool
)
473 TAO_DTP_Thread_Pool
*tao_thread_pool
= 0;
475 // The guard is just for the map, don't do a wait inside the guard, because
476 // during the wait other threads can try to access the thread pool manager
477 // also, this can be one of the threads we are waiting for, which then
478 // results in a deadlock
480 TAO_THREAD_POOL_MANAGER_GUARD
;
482 // Unbind the thread pool from the map.
483 int const result
= this->thread_pools_
.unbind (threadpool
, tao_thread_pool
);
485 // If the thread pool is not found in our map.
487 return; //throw RTCORBA::RTORB::InvalidThreadpool ();
490 // Mark the thread pool that we are shutting down.
491 tao_thread_pool
->shutting_down ();
493 // Wait for the threads.
494 tao_thread_pool
->wait ();
496 // Delete the thread pool.
497 delete tao_thread_pool
;
501 TAO_DTP_Thread_Pool_Manager::create_threadpool_i (TAO_DTP_Definition
&def
)
503 // Create the thread pool.
504 TAO_DTP_Thread_Pool
*thread_pool
= 0;
506 ACE_NEW_THROW_EX (thread_pool
,
507 TAO_DTP_Thread_Pool (*this,
508 this->thread_pool_id_counter_
,
511 CORBA::NO_MEMORY ());
513 return this->create_threadpool_helper (thread_pool
);
517 TAO_DTP_Thread_Pool_Manager::create_threadpool_helper (TAO_DTP_Thread_Pool
*thread_pool
)
519 // Make sure of safe deletion in case of errors.
520 std::unique_ptr
<TAO_DTP_Thread_Pool
> safe_thread_pool (thread_pool
);
523 thread_pool
->open ();
525 // Create the static threads.
526 int result
= thread_pool
->create_initial_threads ();
528 // Throw exception in case of errors.
531 throw ::CORBA::INTERNAL
533 CORBA::SystemException::_tao_minor_code
535 0, //TAO_RTCORBA_THREAD_CREATION_LOCATION_CODE,
537 CORBA::COMPLETED_NO
);
540 // Bind thread to internal table.
541 result
= this->thread_pools_
.bind (this->thread_pool_id_counter_
, thread_pool
);
543 TAO_ORB_Core_TSS_Resources
&tss
=
544 *this->orb_core_
.get_tss_resources ();
545 // Associate the thread pool with the ORB for later retrieval
546 tss
.lane_
= thread_pool
;
549 // Throw exception in case of errors.
551 throw ::CORBA::INTERNAL ();
557 // No need to delete thread pool.
558 safe_thread_pool
.release ();
560 // Return current counter and perform post-increment.
561 return this->thread_pool_id_counter_
++;
565 TAO_DTP_Thread_Pool_Manager::orb_core () const
567 return this->orb_core_
;
570 TAO_END_VERSIONED_NAMESPACE_DECL
572 #endif /* TAO_HAS_CORBA_MESSAGING && TAO_HAS_CORBA_MESSAGING != 0 */