Merge pull request #2218 from jwillemsen/jwi-pthreadsigmask
[ACE_TAO.git] / TAO / tao / Dynamic_TP / DTP_Thread_Pool.cpp
blob3e1e90e2eee2bf794597f9f96dee803e58137220
1 #include "tao/Dynamic_TP/DTP_Thread_Pool.h"
3 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0
5 #if ! defined (__ACE_INLINE__)
6 #include "tao/Dynamic_TP/DTP_Thread_Pool.inl"
7 #endif /* __ACE_INLINE__ */
9 #include "tao/Exception.h"
10 #include "tao/ORB_Core.h"
11 #include "tao/ORB_Core_TSS_Resources.h"
12 #include "tao/TSS_Resources.h"
13 #include "tao/ORB.h"
14 #include "tao/Acceptor_Registry.h"
15 #include "tao/debug.h"
16 #include "tao/LF_Follower.h"
17 #include "tao/Leader_Follower.h"
18 #include <memory>
20 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
22 TAO_DTP_New_Leader_Generator::TAO_DTP_New_Leader_Generator (
23 TAO_DTP_Thread_Pool &p)
24 : pool_ (p)
28 bool
29 TAO_DTP_New_Leader_Generator::no_leaders_available ()
31 return this->pool_.new_dynamic_thread ();
34 TAO_DTP_Termination_Waiter::TAO_DTP_Termination_Waiter (TAO_DTP_Thread_Pool &p)
35 : ACE_Task_Base (p.manager ().orb_core ().thr_mgr ()),
36 pool_ (p)
40 int
41 TAO_DTP_Termination_Waiter::svc ()
46 ACE_GUARD_RETURN (TAO_SYNCH_MUTEX,
47 guard,
48 this->pool_.termination_lock_,
49 -1);
50 this->pool_.termination_cond_.wait ();
53 while (this->pool_.active_count_ > 0);
54 return 0;
57 TAO_DTP_Thread_Pool_Threads::TAO_DTP_Thread_Pool_Threads (TAO_DTP_Thread_Pool &p)
58 : ACE_Task_Base (p.manager ().orb_core ().thr_mgr ()),
59 pool_ (p)
63 int
64 TAO_DTP_Thread_Pool_Threads::svc ()
66 if (TAO_debug_level > 7)
68 TAOLIB_DEBUG ((LM_DEBUG,
69 ACE_TEXT ("TAO (%P|%t) new DTP thread signaling waiter\n")));
72 ACE_GUARD_RETURN (TAO_SYNCH_MUTEX,
73 guard,
74 this->pool_.activation_lock_,
75 -1);
76 this->pool_.activation_cond_.broadcast ();
78 if (TAO_debug_level > 7)
80 TAOLIB_DEBUG ((LM_DEBUG,
81 ACE_TEXT ("TAO (%P|%t) new DTP thread signal complete\n")));
84 TAO_ORB_Core &orb_core = this->pool_.manager ().orb_core ();
85 if (orb_core.has_shutdown ())
86 return 0;
88 try
90 // Do the work
91 this->run (orb_core);
93 catch (const ::CORBA::Exception& ex)
95 // No point propagating this exception. Print it out.
96 TAOLIB_ERROR ((LM_ERROR,
97 ACE_TEXT ("orb->run() raised exception for thread %t\n")));
99 ex._tao_print_exception ("");
103 ACE_GUARD_RETURN (TAO_SYNCH_MUTEX,
104 guard,
105 this->pool_.termination_lock_,
106 -1);
107 this->pool_.termination_cond_.broadcast ();
110 return 0;
114 TAO_DTP_Thread_Pool_Threads::run (TAO_ORB_Core &orb_core)
116 CORBA::ORB_ptr orb = orb_core.orb ();
117 // A timeout is specified, run the ORB in an idle loop, if we
118 // don't handle any operations for the given timeout we just
119 // exit the loop and this thread ends itself.
120 if (TAO_debug_level > 7)
122 TAOLIB_DEBUG ((LM_DEBUG,
123 ACE_TEXT ("TAO (%P|%t) - DTP Pool %d - ")
124 ACE_TEXT ("Starting worker, count = %d; ")
125 ACE_TEXT ("setting timeout for %d sec, %d usec\n"),
126 this->pool_.id (),
127 this->thr_count (),
128 this->pool_.dynamic_thread_time().sec(),
129 this->pool_.dynamic_thread_time().usec()));
132 ACE_Time_Value tv;
133 this->pool_.add_active();
134 bool must_deactivate = true;
135 while (!orb_core.has_shutdown ())
137 tv = this->pool_.dynamic_thread_time ();
138 orb->perform_work (tv);
139 bool timeout = errno == ETIME;
140 if (TAO_debug_level > 7)
142 TAOLIB_DEBUG ((LM_DEBUG,
143 ACE_TEXT ("TAO (%P|%t) - DTP Pool %d ")
144 ACE_TEXT ("run: above_min = %d, timeout = %d\n"),
145 this->pool_.id(), this->pool_.above_minimum(), timeout));
147 if (timeout && this->pool_.remove_active (false))
149 // we've timed out, but the pool is not yet at the minimum
150 must_deactivate = false;
151 break;
154 if (must_deactivate)
156 this->pool_.remove_active (true);
159 if (TAO_debug_level > 7)
161 TAOLIB_DEBUG ((LM_DEBUG,
162 ACE_TEXT ("TAO (%P|%t) - DTP Pool %d ")
163 ACE_TEXT ("Terminating worker, remaining pool thread count = %d\n"),
164 this->pool_.id (),
165 this->thr_count () - 1));
168 return 0;
171 bool
172 TAO_DTP_Thread_Pool::above_minimum ()
174 return this->definition_.min_threads_ > 0 &&
175 (int)this->active_count_ > this->definition_.min_threads_;
178 CORBA::ULong
179 TAO_DTP_Thread_Pool::current_threads () const
181 ACE_GUARD_RETURN (TAO_SYNCH_MUTEX,
182 mon,
183 this->lock_,
186 return static_cast<CORBA::ULong> (this->threads_.thr_count ());
189 void
190 TAO_DTP_Thread_Pool::add_active ()
192 ACE_GUARD (TAO_SYNCH_MUTEX, mon, this->lock_);
193 ++this->active_count_;
196 bool
197 TAO_DTP_Thread_Pool::remove_active (bool force)
199 ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, mon, this->lock_, false);
200 if (force || this->above_minimum())
202 --this->active_count_;
203 return true;
205 return false;
209 TAO_DTP_Thread_Pool::create_initial_threads ()
211 ACE_GUARD_RETURN (TAO_SYNCH_MUTEX,
212 mon,
213 this->lock_,
216 // Create initial threads.
217 // first, create the minimum number of threads as static
218 // if the min threads count is -1 that means all threads are static
220 size_t count = (size_t)this->definition_.init_threads_;
222 if (TAO_debug_level > 7)
224 TAOLIB_DEBUG ((LM_DEBUG,
225 ACE_TEXT ("(%P|%t) DTP_Thread_Pool::create_initial_threads ")
226 ACE_TEXT ("Creating %d threads\n"),
227 count));
230 int result = this->create_threads_i (count);
231 if (result == 0)
233 this->waiter_.activate (THR_NEW_LWP | THR_JOINABLE);
235 return result;
238 bool
239 TAO_DTP_Thread_Pool::new_dynamic_thread ()
241 // Note that we are checking this condition below without the lock
242 // held.
243 if (TAO_debug_level > 0)
245 TAOLIB_DEBUG ((LM_DEBUG,
246 ACE_TEXT ("TAO (%P|%t) DTP Pool %d new_dynamic_thread, ")
247 ACE_TEXT ("max = %d, current = %d\n"),
248 this->id_, this->definition_.max_threads_,
249 (int)this->threads_.thr_count ()));
252 if (this->manager_.orb_core ().has_shutdown () || this->shutdown_ ||
253 (this->definition_.max_threads_ > 0 &&
254 (int)this->active_count_ >= this->definition_.max_threads_))
256 return false;
259 ACE_GUARD_RETURN (TAO_SYNCH_MUTEX,
260 mon,
261 this->lock_,
262 false);
264 if (this->definition_.max_threads_ == -1 ||
265 (int)this->active_count_ < this->definition_.max_threads_)
267 if (TAO_debug_level > 7)
268 TAOLIB_DEBUG ((LM_DEBUG,
269 ACE_TEXT ("TAO (%P|%t) DTP Pool %d new_dynamic_thread, ")
270 ACE_TEXT ("count = %d, creating new thread\n"),
271 this->id_,
272 this->active_count_));
274 if (this->create_threads_i (1))
276 if (TAO_debug_level > 0)
278 TAOLIB_ERROR ((LM_ERROR,
279 ACE_TEXT ("Pool %d Thread %t: ")
280 ACE_TEXT ("cannot create dynamic thread\n"),
281 this->id_));
283 return false;
287 return true;
291 TAO_DTP_Thread_Pool::create_threads_i (size_t count)
293 // Overwritten parameters.
294 int force_active = 1;
295 long thread_flags = THR_NEW_LWP | THR_DETACHED;
297 // Default parameters.
298 int default_grp_id = -1;
299 ACE_Task_Base *default_task = 0;
300 ACE_hthread_t *default_thread_handles = 0;
301 void **default_stack = 0;
303 // Setting stack size.
304 size_t *stack_size_array = 0;
305 ACE_NEW_RETURN (stack_size_array,
306 size_t[count],
307 -1);
308 size_t index;
309 for (index = 0; index != count; ++index)
310 stack_size_array[index] =
311 this->definition_.stack_size_;
313 // Make sure the dynamically created stack size array is properly
314 // deleted.
315 std::unique_ptr<size_t[]> auto_stack_size_array (stack_size_array);
317 TAO_ORB_Core &orb_core = manager_.orb_core ();
319 long flags =
320 thread_flags |
321 orb_core.orb_params ()->thread_creation_flags ();
323 int default_priority = 0;
324 int result = 0;
325 // Activate the threads.
326 if (TAO_debug_level > 7)
328 TAOLIB_DEBUG ((LM_DEBUG,
329 ACE_TEXT ("TAO (%P|%t) new DTP thread requested\n")));
332 ACE_GUARD_RETURN (TAO_SYNCH_MUTEX,
333 guard,
334 this->activation_lock_,
335 -1);
336 result =
337 this->threads_.activate (flags,
338 static_cast<int> (count),
339 force_active,
340 default_grp_id,
341 default_priority,
342 default_task,
343 default_thread_handles,
344 default_stack,
345 stack_size_array);
346 if (result == 0)
348 if (TAO_debug_level > 7)
350 TAOLIB_DEBUG ((LM_DEBUG,
351 ACE_TEXT ("TAO (%P|%t) new DTP thread requester waiting\n")));
353 this->activation_cond_.wait ();
355 else
357 if (TAO_debug_level > 0)
359 TAOLIB_ERROR ((LM_ERROR,
360 ACE_TEXT ("TAO (%P|%t) new DTP thread failed %p\n"),
361 ACE_TEXT ("ACE_Task_Base::activate")));
365 if (TAO_debug_level > 7)
367 TAOLIB_DEBUG ((LM_DEBUG,
368 ACE_TEXT ("TAO (%P|%t) new DTP thread requester running\n")));
371 return result;
374 TAO_DTP_Thread_Pool::TAO_DTP_Thread_Pool (TAO_DTP_Thread_Pool_Manager &manager,
375 CORBA::ULong id,
376 TAO_DTP_Definition &def)
377 : manager_ (manager),
378 id_ (id),
379 shutdown_ (false),
380 definition_ (def),
381 threads_ (*this),
382 waiter_ (*this),
383 active_count_ (0),
384 new_thread_generator_ (*this),
385 lock_ (),
386 activation_lock_ (),
387 activation_cond_ (activation_lock_),
388 termination_lock_ (),
389 termination_cond_ (termination_lock_)
391 manager_.orb_core ().leader_follower ().set_new_leader_generator (
392 &new_thread_generator_);
395 void
396 TAO_DTP_Thread_Pool::open ()
398 // Nothing to do for now
401 TAO_DTP_Thread_Pool::~TAO_DTP_Thread_Pool ()
405 void
406 TAO_DTP_Thread_Pool::shutting_down ()
408 ACE_GUARD (TAO_SYNCH_MUTEX,
409 mon,
410 this->lock_);
412 // We are shutting down, this way we are not creating any more new dynamic
413 // threads
414 this->shutdown_ = true;
418 void
419 TAO_DTP_Thread_Pool::wait ()
421 this->waiter_.wait ();
424 #define TAO_THREAD_POOL_MANAGER_GUARD \
425 ACE_GUARD_THROW_EX ( \
426 TAO_SYNCH_MUTEX, \
427 mon, \
428 this->lock_, \
429 CORBA::INTERNAL ( \
430 CORBA::SystemException::_tao_minor_code ( \
431 TAO_GUARD_FAILURE, \
432 0), \
433 CORBA::COMPLETED_NO));
435 TAO_DTP_Thread_Pool_Manager::TAO_DTP_Thread_Pool_Manager (TAO_ORB_Core &orb_core)
436 : orb_core_ (orb_core),
437 thread_pools_ (),
438 thread_pool_id_counter_ (1),
439 lock_ ()
443 TAO_DTP_Thread_Pool_Manager::~TAO_DTP_Thread_Pool_Manager ()
445 // Delete all the pools.
446 for (THREAD_POOLS::ITERATOR iterator = this->thread_pools_.begin ();
447 iterator != this->thread_pools_.end ();
448 ++iterator)
449 delete (*iterator).int_id_;
453 void
454 TAO_DTP_Thread_Pool_Manager::wait ()
456 for (THREAD_POOLS::ITERATOR iterator = this->thread_pools_.begin ();
457 iterator != this->thread_pools_.end ();
458 ++iterator)
459 (*iterator).int_id_->wait ();
462 CORBA::ULong
463 TAO_DTP_Thread_Pool_Manager::create_threadpool (TAO_DTP_Definition &def)
465 TAO_THREAD_POOL_MANAGER_GUARD;
467 return this->create_threadpool_i (def);
470 void
471 TAO_DTP_Thread_Pool_Manager::destroy_threadpool (CORBA::ULong threadpool)
473 TAO_DTP_Thread_Pool *tao_thread_pool = 0;
475 // The guard is just for the map, don't do a wait inside the guard, because
476 // during the wait other threads can try to access the thread pool manager
477 // also, this can be one of the threads we are waiting for, which then
478 // results in a deadlock
480 TAO_THREAD_POOL_MANAGER_GUARD;
482 // Unbind the thread pool from the map.
483 int const result = this->thread_pools_.unbind (threadpool, tao_thread_pool);
485 // If the thread pool is not found in our map.
486 if (result != 0)
487 return; //throw RTCORBA::RTORB::InvalidThreadpool ();
490 // Mark the thread pool that we are shutting down.
491 tao_thread_pool->shutting_down ();
493 // Wait for the threads.
494 tao_thread_pool->wait ();
496 // Delete the thread pool.
497 delete tao_thread_pool;
500 CORBA::ULong
501 TAO_DTP_Thread_Pool_Manager::create_threadpool_i (TAO_DTP_Definition &def)
503 // Create the thread pool.
504 TAO_DTP_Thread_Pool *thread_pool = 0;
506 ACE_NEW_THROW_EX (thread_pool,
507 TAO_DTP_Thread_Pool (*this,
508 this->thread_pool_id_counter_,
511 CORBA::NO_MEMORY ());
513 return this->create_threadpool_helper (thread_pool);
516 CORBA::ULong
517 TAO_DTP_Thread_Pool_Manager::create_threadpool_helper (TAO_DTP_Thread_Pool *thread_pool)
519 // Make sure of safe deletion in case of errors.
520 std::unique_ptr<TAO_DTP_Thread_Pool> safe_thread_pool (thread_pool);
522 // Open the pool.
523 thread_pool->open ();
525 // Create the static threads.
526 int result = thread_pool->create_initial_threads ();
528 // Throw exception in case of errors.
529 if (result != 0)
531 throw ::CORBA::INTERNAL
533 CORBA::SystemException::_tao_minor_code
535 0, //TAO_RTCORBA_THREAD_CREATION_LOCATION_CODE,
536 errno),
537 CORBA::COMPLETED_NO);
540 // Bind thread to internal table.
541 result = this->thread_pools_.bind (this->thread_pool_id_counter_, thread_pool);
543 TAO_ORB_Core_TSS_Resources &tss =
544 *this->orb_core_.get_tss_resources ();
545 // Associate the thread pool with the ORB for later retrieval
546 tss.lane_ = thread_pool;
549 // Throw exception in case of errors.
550 if (result != 0)
551 throw ::CORBA::INTERNAL ();
554 // Success.
557 // No need to delete thread pool.
558 safe_thread_pool.release ();
560 // Return current counter and perform post-increment.
561 return this->thread_pool_id_counter_++;
564 TAO_ORB_Core &
565 TAO_DTP_Thread_Pool_Manager::orb_core () const
567 return this->orb_core_;
570 TAO_END_VERSIONED_NAMESPACE_DECL
572 #endif /* TAO_HAS_CORBA_MESSAGING && TAO_HAS_CORBA_MESSAGING != 0 */