Merge pull request #2218 from jwillemsen/jwi-pthreadsigmask
[ACE_TAO.git] / TAO / tao / Dynamic_TP / DTP_Task.cpp
blob99d6ca3609ce1e5132bdb40c1c1912b9a56669b8
1 #include "tao/Dynamic_TP/DTP_Task.h"
3 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0
5 #include "tao/CSD_ThreadPool/CSD_TP_Request.h"
6 #include "tao/CSD_ThreadPool/CSD_TP_Dispatchable_Visitor.h"
7 #include "tao/CSD_ThreadPool/CSD_TP_Cancel_Visitor.h"
9 #if !defined (__ACE_INLINE__)
10 # include "tao/Dynamic_TP/DTP_Task.inl"
11 #endif /* ! __ACE_INLINE__ */
13 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
15 TAO_DTP_Task::TAO_DTP_Task ()
16 : aw_lock_ (),
17 queue_lock_ (),
18 work_lock_ (),
19 work_available_ (this->work_lock_),
20 active_workers_ (this->aw_lock_),
21 active_count_ (0),
22 accepting_requests_ (false),
23 shutdown_ (false),
24 check_queue_ (false),
25 opened_ (false),
26 num_queue_requests_ ((size_t)0),
27 init_pool_threads_ ((size_t)0),
28 min_pool_threads_ ((size_t)0),
29 max_pool_threads_ ((size_t)0),
30 max_request_queue_depth_ ((size_t)0),
31 thread_stack_size_ ((size_t)0)
35 TAO_DTP_Task::~TAO_DTP_Task()
39 bool
40 TAO_DTP_Task::add_request (TAO::CSD::TP_Request* request)
43 ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, guard, this->queue_lock_, false);
44 ++this->num_queue_requests_;
45 if ((this->num_queue_requests_ > this->max_request_queue_depth_) &&
46 (this->max_request_queue_depth_ != 0))
48 this->accepting_requests_ = false;
51 if (!this->accepting_requests_)
53 if (TAO_debug_level > 4)
55 TAOLIB_DEBUG ((LM_DEBUG,
56 ACE_TEXT ("TAO (%P|%t) - DTP_Task::add_request() ")
57 ACE_TEXT ("not accepting requests.\n")
58 ACE_TEXT ("TAO (%P|%t) - DTP_Task::add_request() ")
59 ACE_TEXT ("num_queue_requests_ : [%d]\n")
60 ACE_TEXT ("TAO (%P|%t) - DTP_Task::add_request() ")
61 ACE_TEXT ("max_request_queue_depth_ : [%d]\n"),
62 this->num_queue_requests_,
63 this->max_request_queue_depth_));
65 --this->num_queue_requests_;
66 return false;
69 // We have made the decision that the request is going to be placed upon
70 // the queue_. Inform the request that it is about to be placed into
71 // a request queue. Some requests may not need to do anything in
72 // preparation of being placed into a queue. Others, however, may need
73 // to perfom a "clone" operation on some underlying request data before
74 // the request can be properly placed into a queue.
75 request->prepare_for_queue();
76 this->queue_.put(request);
79 ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, guard, this->work_lock_, false);
80 this->check_queue_ = true;
81 this->work_available_.signal ();
82 if (TAO_debug_level > 4 )
84 TAOLIB_DEBUG((LM_DEBUG,
85 ACE_TEXT ("TAO (%P|%t) - DTP_Task::add_request() - ")
86 ACE_TEXT ("work available\n")));
90 return true;
93 size_t
94 TAO_DTP_Task::get_init_pool_threads ()
96 return this->init_pool_threads_;
99 size_t
100 TAO_DTP_Task::get_min_pool_threads ()
102 return this->min_pool_threads_;
105 size_t TAO_DTP_Task::get_max_pool_threads ()
107 return this->max_pool_threads_;
110 size_t
111 TAO_DTP_Task::get_max_request_queue_depth ()
113 return this->max_request_queue_depth_;
116 size_t
117 TAO_DTP_Task::get_thread_stack_size ()
119 return this->thread_stack_size_;
122 time_t
123 TAO_DTP_Task::get_thread_idle_time ()
125 return this->thread_idle_time_.sec();
129 TAO_DTP_Task::open (void* /* args */)
131 ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, guard, this->aw_lock_, -1);
132 // We can assume that we are in the proper state to handle this open()
133 // call as long as we haven't been opened before.
134 if (this->opened_)
136 return 0;
139 int num = static_cast<int> (this->init_pool_threads_);
141 if (TAO_debug_level > 4)
143 TAOLIB_DEBUG ((LM_DEBUG,
144 ACE_TEXT ("TAO (%P|%t) - DTP_Task::open() initialized with:\n")
145 ACE_TEXT ("TAO (%P|%t) - DTP_Task::open() init_threads_ \t\t: [%d]\n")
146 ACE_TEXT ("TAO (%P|%t) - DTP_Task::open() min_pool_threads_ \t\t: [%d]\n")
147 ACE_TEXT ("TAO (%P|%t) - DTP_Task::open() max_pool_threads_ \t\t: [%d]\n")
148 ACE_TEXT ("TAO (%P|%t) - DTP_Task::open() max_request_queue_depth_ \t: [%d]\n")
149 ACE_TEXT ("TAO (%P|%t) - DTP_Task::open() thread_stack_size_ \t\t: [%d]\n")
150 ACE_TEXT ("TAO (%P|%t) - DTP_Task::open() thread_idle_time_ \t\t: [%d]\n"),
151 this->init_pool_threads_,
152 this->min_pool_threads_,
153 this->max_pool_threads_,
154 this->max_request_queue_depth_,
155 this->thread_stack_size_,
156 this->thread_idle_time_.sec ()));
159 // We can't activate 0 threads. Make sure this isn't the case.
160 if (num < 1)
162 if (TAO_debug_level > 0)
164 TAOLIB_ERROR ((LM_ERROR,
165 ACE_TEXT ("TAO (%P|%t) DTP_Task::open() failed to open. ")
166 ACE_TEXT ("num_threads (%u) is less-than 1.\n"),
167 num));
169 return -1;
172 // Set the busy_threads_ to the number of init_threads
173 // now. When they startup they will decrement themselves
174 // as they go into a wait state.
176 this->busy_threads_ = 0;
178 // Create the stack size arrays if the stack size is set > 0.
180 // Activate this task object with 'num' worker threads.
181 if (this->thread_stack_size_ == 0)
183 if (this->activate (THR_NEW_LWP | THR_DETACHED, num, 1) != 0)
185 TAOLIB_ERROR_RETURN ((LM_ERROR,
186 ACE_TEXT ("(%P|%t) DTP_Task::open() failed to activate ")
187 ACE_TEXT ("(%d) worker threads.\n"),
188 num),
189 -1);
192 else
194 size_t * stack_sz_arr = new size_t[num];
195 for (int z = 0; z < num; z++)
197 stack_sz_arr[z] = this->thread_stack_size_;
200 if (this->activate (THR_NEW_LWP | THR_DETACHED,
201 num,
203 ACE_DEFAULT_THREAD_PRIORITY,
208 stack_sz_arr) != 0)
210 TAOLIB_ERROR_RETURN ((LM_ERROR,
211 ACE_TEXT ("(%P|%t) DTP_Task::open() failed to activate ")
212 ACE_TEXT ("(%d) worker threads.\n"),
213 num),
214 -1);
217 delete[] stack_sz_arr;
220 if (TAO_debug_level > 4)
222 TAOLIB_DEBUG ((LM_DEBUG,
223 ACE_TEXT ("(%P|%t) DTP_Task::open() activated %d initial threads\n"),
224 num));
227 this->active_count_ = static_cast<size_t> (num);
229 this->opened_ = true;
230 this->accepting_requests_ = true;
232 return 0;
235 bool
236 TAO_DTP_Task::request_ready (TAO::CSD::TP_Dispatchable_Visitor &v,
237 TAO::CSD::TP_Request_Handle &r)
239 ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, guard, this->queue_lock_, false);
240 if (!this->queue_.is_empty())
242 this->queue_.accept_visitor(v);
243 r = v.request();
244 return !r.is_nil();
246 return false;
249 void
250 TAO_DTP_Task::clear_request (TAO::CSD::TP_Request_Handle &r)
252 ACE_GUARD (TAO_SYNCH_MUTEX, guard, this->queue_lock_);
253 --this->num_queue_requests_;
254 if (this->max_request_queue_depth_ > 0)
256 this->accepting_requests_ = true;
259 if (TAO_debug_level > 4 )
261 TAOLIB_DEBUG ((LM_DEBUG,
262 ACE_TEXT ("TAO (%P|%t) - DTP_Task::clear_request() ")
263 ACE_TEXT ("Decrementing num_queue_requests.")
264 ACE_TEXT ("New queue depth:%d\n"),
265 this->num_queue_requests_));
268 r->mark_as_ready ();
271 void
272 TAO_DTP_Task::add_busy ()
274 ACE_GUARD (TAO_SYNCH_MUTEX, mon, this->aw_lock_);
275 ++this->busy_threads_;
278 void
279 TAO_DTP_Task::remove_busy ()
281 ACE_GUARD (TAO_SYNCH_MUTEX, mon, this->aw_lock_);
282 --this->busy_threads_;
285 void
286 TAO_DTP_Task::add_active ()
288 ACE_GUARD (TAO_SYNCH_MUTEX, mon, this->aw_lock_);
289 ++this->active_count_;
292 bool
293 TAO_DTP_Task::remove_active (bool force)
295 ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, mon, this->aw_lock_, false);
296 if (force || this->above_minimum())
298 --this->active_count_;
299 this->active_workers_.signal ();
300 return true;
302 return false;
305 bool
306 TAO_DTP_Task::need_active ()
308 ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, mon, this->aw_lock_, false);
309 return ((this->busy_threads_ == static_cast<unsigned long> (this->active_count_)) &&
310 ((this->max_pool_threads_ < 1) ||
311 (this->active_count_ < this->max_pool_threads_)));
314 bool
315 TAO_DTP_Task::above_minimum ()
317 return this->min_pool_threads_ > 0 &&
318 this->active_count_ > this->min_pool_threads_;
322 TAO_DTP_Task::svc ()
324 this->add_busy ();
325 if (TAO_debug_level > 4)
327 TAOLIB_DEBUG ((LM_DEBUG,
328 ACE_TEXT ("TAO (%P|%t) - DTP_Task::svc() ")
329 ACE_TEXT ("New thread created.\n")));
331 TAO::CSD::TP_Dispatchable_Visitor dispatchable_visitor;
332 while (!this->shutdown_)
334 TAO::CSD::TP_Request_Handle request;
336 while (!this->shutdown_ && request.is_nil ())
338 if (!this->request_ready (dispatchable_visitor, request))
340 this->remove_busy ();
342 if (TAO_debug_level > 4)
344 TAOLIB_DEBUG ((LM_DEBUG,
345 ACE_TEXT ("TAO (%P|%t) - DTP_Task::svc() ")
346 ACE_TEXT ("Decrementing busy_threads_. ")
347 ACE_TEXT ("Busy thread count:%d\n"),
348 this->busy_threads_.load()));
351 ACE_Time_Value tmp_sec = this->thread_idle_time_.to_absolute_time();
354 ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, guard, this->work_lock_, false);
355 int wait_state = 0;
356 while (!(this->shutdown_ || this->check_queue_) && wait_state != -1)
358 wait_state = this->thread_idle_time_.sec () == 0
359 ? this->work_available_.wait ()
360 : this->work_available_.wait (&tmp_sec);
362 // Check for timeout
363 if (this->shutdown_)
364 return 0;
365 if (wait_state == -1)
367 if (errno != ETIME || this->remove_active (false))
369 if (TAO_debug_level > 4)
371 TAOLIB_DEBUG ((LM_DEBUG,
372 ACE_TEXT ("TAO (%P|%t) - DTP_Task::svc() ")
373 ACE_TEXT ("Existing thread expiring.\n")));
375 return 0;
378 this->check_queue_ = false;
381 this->add_busy ();
382 if (TAO_debug_level > 4)
384 TAOLIB_DEBUG ((LM_DEBUG,
385 ACE_TEXT ("TAO (%P|%t) - DTP_Task::svc() ")
386 ACE_TEXT ("Incrementing busy_threads_. ")
387 ACE_TEXT ("Busy thread count:%d\n"),
388 this->busy_threads_.load ()));
393 if (this->need_active ())
395 if (this->activate (THR_NEW_LWP | THR_DETACHED,
398 ACE_DEFAULT_THREAD_PRIORITY,
403 this->thread_stack_size_ == 0 ? 0 :
404 &this->thread_stack_size_) != 0)
406 TAOLIB_ERROR ((LM_ERROR,
407 ACE_TEXT ("(%P|%t) DTP_Task::svc() failed to ")
408 ACE_TEXT ("grow thread pool.\n")));
410 else
412 this->add_active ();
413 if (TAO_debug_level > 4)
415 TAOLIB_DEBUG ((LM_DEBUG,
416 ACE_TEXT ("TAO (%P|%t) - DTP_Task::svc() ")
417 ACE_TEXT ("Growing threadcount. ")
418 ACE_TEXT ("New thread count:%d\n"),
419 this->thr_count ()));
424 request->dispatch ();
425 this->clear_request (request);
426 dispatchable_visitor.reset ();
428 this->remove_active (true);
429 return 0;
434 TAO_DTP_Task::close (u_long flag)
437 ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, guard, this->aw_lock_, 0);
438 if (flag == 0)
440 this->active_workers_.signal ();
441 return 0;
444 if (!this->opened_)
446 return 0;
448 this->opened_ = false;
449 this->shutdown_ = true;
450 this->accepting_requests_ = false;
454 ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, guard, this->work_lock_, 0);
455 this->work_available_.broadcast();
458 size_t in_task = (this->thr_mgr ()->task () == this) ? 1 : 0;
459 if (TAO_debug_level > 4)
461 TAOLIB_DEBUG ((LM_DEBUG,
462 ACE_TEXT ("TAO (%P|%t) - DTP_Task::close() ")
463 ACE_TEXT ("shutting down. in_task = %d, Count = %d \n"),
464 in_task, this->thr_count ()));
467 while (this->thr_count () > in_task)
469 ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, guard, this->aw_lock_, 0);
470 this->active_workers_.wait ();
474 ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, guard, this->queue_lock_, 0);
475 TAO::CSD::TP_Cancel_Visitor v;
476 this->queue_.accept_visitor (v);
478 return 0;
482 void
483 TAO_DTP_Task::set_init_pool_threads (size_t thr_count)
485 this->init_pool_threads_ = thr_count;
488 void
489 TAO_DTP_Task::set_min_pool_threads (size_t thr_count)
491 this->min_pool_threads_ = thr_count;
494 void
495 TAO_DTP_Task::set_max_pool_threads (size_t thr_count)
497 this->max_pool_threads_ = thr_count;
500 void
501 TAO_DTP_Task::set_thread_stack_size (size_t stack_sz)
503 this->thread_stack_size_ = stack_sz;
506 void
507 TAO_DTP_Task::set_thread_idle_time(ACE_Time_Value thr_timeout)
509 this->thread_idle_time_ = thr_timeout;
512 void
513 TAO_DTP_Task::set_max_request_queue_depth (size_t queue_depth)
515 this->max_request_queue_depth_ = queue_depth;
518 void
519 TAO_DTP_Task::cancel_servant (PortableServer::Servant servant)
521 if (servant == 0)
523 if (TAO_debug_level > 0)
525 TAOLIB_DEBUG ((LM_DEBUG,
526 ACE_TEXT ("TAO (%P|%t) - DTP_Task::cancel_servant ")
527 ACE_TEXT ("called with null servant\n")
530 return;
533 ACE_GUARD (TAO_SYNCH_MUTEX, guard, this->queue_lock_);
535 // Cancel the requests targeted for the provided servant.
536 TAO::CSD::TP_Cancel_Visitor cancel_visitor (servant);
537 this->queue_.accept_visitor (cancel_visitor);
540 TAO_END_VERSIONED_NAMESPACE_DECL
542 #endif /* (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0 */