1 #include "tao/Dynamic_TP/DTP_Task.h"
3 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0
5 #include "tao/CSD_ThreadPool/CSD_TP_Request.h"
6 #include "tao/CSD_ThreadPool/CSD_TP_Dispatchable_Visitor.h"
7 #include "tao/CSD_ThreadPool/CSD_TP_Cancel_Visitor.h"
9 #if !defined (__ACE_INLINE__)
10 # include "tao/Dynamic_TP/DTP_Task.inl"
11 #endif /* ! __ACE_INLINE__ */
13 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
15 TAO_DTP_Task::TAO_DTP_Task ()
19 work_available_ (this->work_lock_
),
20 active_workers_ (this->aw_lock_
),
22 accepting_requests_ (false),
26 num_queue_requests_ ((size_t)0),
27 init_pool_threads_ ((size_t)0),
28 min_pool_threads_ ((size_t)0),
29 max_pool_threads_ ((size_t)0),
30 max_request_queue_depth_ ((size_t)0),
31 thread_stack_size_ ((size_t)0)
35 TAO_DTP_Task::~TAO_DTP_Task()
40 TAO_DTP_Task::add_request (TAO::CSD::TP_Request
* request
)
43 ACE_GUARD_RETURN (TAO_SYNCH_MUTEX
, guard
, this->queue_lock_
, false);
44 ++this->num_queue_requests_
;
45 if ((this->num_queue_requests_
> this->max_request_queue_depth_
) &&
46 (this->max_request_queue_depth_
!= 0))
48 this->accepting_requests_
= false;
51 if (!this->accepting_requests_
)
53 if (TAO_debug_level
> 4)
55 TAOLIB_DEBUG ((LM_DEBUG
,
56 ACE_TEXT ("TAO (%P|%t) - DTP_Task::add_request() ")
57 ACE_TEXT ("not accepting requests.\n")
58 ACE_TEXT ("TAO (%P|%t) - DTP_Task::add_request() ")
59 ACE_TEXT ("num_queue_requests_ : [%d]\n")
60 ACE_TEXT ("TAO (%P|%t) - DTP_Task::add_request() ")
61 ACE_TEXT ("max_request_queue_depth_ : [%d]\n"),
62 this->num_queue_requests_
,
63 this->max_request_queue_depth_
));
65 --this->num_queue_requests_
;
69 // We have made the decision that the request is going to be placed upon
70 // the queue_. Inform the request that it is about to be placed into
71 // a request queue. Some requests may not need to do anything in
72 // preparation of being placed into a queue. Others, however, may need
73 // to perfom a "clone" operation on some underlying request data before
74 // the request can be properly placed into a queue.
75 request
->prepare_for_queue();
76 this->queue_
.put(request
);
79 ACE_GUARD_RETURN (TAO_SYNCH_MUTEX
, guard
, this->work_lock_
, false);
80 this->check_queue_
= true;
81 this->work_available_
.signal ();
82 if (TAO_debug_level
> 4 )
84 TAOLIB_DEBUG((LM_DEBUG
,
85 ACE_TEXT ("TAO (%P|%t) - DTP_Task::add_request() - ")
86 ACE_TEXT ("work available\n")));
94 TAO_DTP_Task::get_init_pool_threads ()
96 return this->init_pool_threads_
;
100 TAO_DTP_Task::get_min_pool_threads ()
102 return this->min_pool_threads_
;
105 size_t TAO_DTP_Task::get_max_pool_threads ()
107 return this->max_pool_threads_
;
111 TAO_DTP_Task::get_max_request_queue_depth ()
113 return this->max_request_queue_depth_
;
117 TAO_DTP_Task::get_thread_stack_size ()
119 return this->thread_stack_size_
;
123 TAO_DTP_Task::get_thread_idle_time ()
125 return this->thread_idle_time_
.sec();
129 TAO_DTP_Task::open (void* /* args */)
131 ACE_GUARD_RETURN (TAO_SYNCH_MUTEX
, guard
, this->aw_lock_
, -1);
132 // We can assume that we are in the proper state to handle this open()
133 // call as long as we haven't been opened before.
139 int num
= static_cast<int> (this->init_pool_threads_
);
141 if (TAO_debug_level
> 4)
143 TAOLIB_DEBUG ((LM_DEBUG
,
144 ACE_TEXT ("TAO (%P|%t) - DTP_Task::open() initialized with:\n")
145 ACE_TEXT ("TAO (%P|%t) - DTP_Task::open() init_threads_ \t\t: [%d]\n")
146 ACE_TEXT ("TAO (%P|%t) - DTP_Task::open() min_pool_threads_ \t\t: [%d]\n")
147 ACE_TEXT ("TAO (%P|%t) - DTP_Task::open() max_pool_threads_ \t\t: [%d]\n")
148 ACE_TEXT ("TAO (%P|%t) - DTP_Task::open() max_request_queue_depth_ \t: [%d]\n")
149 ACE_TEXT ("TAO (%P|%t) - DTP_Task::open() thread_stack_size_ \t\t: [%d]\n")
150 ACE_TEXT ("TAO (%P|%t) - DTP_Task::open() thread_idle_time_ \t\t: [%d]\n"),
151 this->init_pool_threads_
,
152 this->min_pool_threads_
,
153 this->max_pool_threads_
,
154 this->max_request_queue_depth_
,
155 this->thread_stack_size_
,
156 this->thread_idle_time_
.sec ()));
159 // We can't activate 0 threads. Make sure this isn't the case.
162 if (TAO_debug_level
> 0)
164 TAOLIB_ERROR ((LM_ERROR
,
165 ACE_TEXT ("TAO (%P|%t) DTP_Task::open() failed to open. ")
166 ACE_TEXT ("num_threads (%u) is less-than 1.\n"),
172 // Set the busy_threads_ to the number of init_threads
173 // now. When they startup they will decrement themselves
174 // as they go into a wait state.
176 this->busy_threads_
= 0;
178 // Create the stack size arrays if the stack size is set > 0.
180 // Activate this task object with 'num' worker threads.
181 if (this->thread_stack_size_
== 0)
183 if (this->activate (THR_NEW_LWP
| THR_DETACHED
, num
, 1) != 0)
185 TAOLIB_ERROR_RETURN ((LM_ERROR
,
186 ACE_TEXT ("(%P|%t) DTP_Task::open() failed to activate ")
187 ACE_TEXT ("(%d) worker threads.\n"),
194 size_t * stack_sz_arr
= new size_t[num
];
195 for (int z
= 0; z
< num
; z
++)
197 stack_sz_arr
[z
] = this->thread_stack_size_
;
200 if (this->activate (THR_NEW_LWP
| THR_DETACHED
,
203 ACE_DEFAULT_THREAD_PRIORITY
,
210 TAOLIB_ERROR_RETURN ((LM_ERROR
,
211 ACE_TEXT ("(%P|%t) DTP_Task::open() failed to activate ")
212 ACE_TEXT ("(%d) worker threads.\n"),
217 delete[] stack_sz_arr
;
220 if (TAO_debug_level
> 4)
222 TAOLIB_DEBUG ((LM_DEBUG
,
223 ACE_TEXT ("(%P|%t) DTP_Task::open() activated %d initial threads\n"),
227 this->active_count_
= static_cast<size_t> (num
);
229 this->opened_
= true;
230 this->accepting_requests_
= true;
236 TAO_DTP_Task::request_ready (TAO::CSD::TP_Dispatchable_Visitor
&v
,
237 TAO::CSD::TP_Request_Handle
&r
)
239 ACE_GUARD_RETURN (TAO_SYNCH_MUTEX
, guard
, this->queue_lock_
, false);
240 if (!this->queue_
.is_empty())
242 this->queue_
.accept_visitor(v
);
250 TAO_DTP_Task::clear_request (TAO::CSD::TP_Request_Handle
&r
)
252 ACE_GUARD (TAO_SYNCH_MUTEX
, guard
, this->queue_lock_
);
253 --this->num_queue_requests_
;
254 if (this->max_request_queue_depth_
> 0)
256 this->accepting_requests_
= true;
259 if (TAO_debug_level
> 4 )
261 TAOLIB_DEBUG ((LM_DEBUG
,
262 ACE_TEXT ("TAO (%P|%t) - DTP_Task::clear_request() ")
263 ACE_TEXT ("Decrementing num_queue_requests.")
264 ACE_TEXT ("New queue depth:%d\n"),
265 this->num_queue_requests_
));
272 TAO_DTP_Task::add_busy ()
274 ACE_GUARD (TAO_SYNCH_MUTEX
, mon
, this->aw_lock_
);
275 ++this->busy_threads_
;
279 TAO_DTP_Task::remove_busy ()
281 ACE_GUARD (TAO_SYNCH_MUTEX
, mon
, this->aw_lock_
);
282 --this->busy_threads_
;
286 TAO_DTP_Task::add_active ()
288 ACE_GUARD (TAO_SYNCH_MUTEX
, mon
, this->aw_lock_
);
289 ++this->active_count_
;
293 TAO_DTP_Task::remove_active (bool force
)
295 ACE_GUARD_RETURN (TAO_SYNCH_MUTEX
, mon
, this->aw_lock_
, false);
296 if (force
|| this->above_minimum())
298 --this->active_count_
;
299 this->active_workers_
.signal ();
306 TAO_DTP_Task::need_active ()
308 ACE_GUARD_RETURN (TAO_SYNCH_MUTEX
, mon
, this->aw_lock_
, false);
309 return ((this->busy_threads_
== static_cast<unsigned long> (this->active_count_
)) &&
310 ((this->max_pool_threads_
< 1) ||
311 (this->active_count_
< this->max_pool_threads_
)));
315 TAO_DTP_Task::above_minimum ()
317 return this->min_pool_threads_
> 0 &&
318 this->active_count_
> this->min_pool_threads_
;
325 if (TAO_debug_level
> 4)
327 TAOLIB_DEBUG ((LM_DEBUG
,
328 ACE_TEXT ("TAO (%P|%t) - DTP_Task::svc() ")
329 ACE_TEXT ("New thread created.\n")));
331 TAO::CSD::TP_Dispatchable_Visitor dispatchable_visitor
;
332 while (!this->shutdown_
)
334 TAO::CSD::TP_Request_Handle request
;
336 while (!this->shutdown_
&& request
.is_nil ())
338 if (!this->request_ready (dispatchable_visitor
, request
))
340 this->remove_busy ();
342 if (TAO_debug_level
> 4)
344 TAOLIB_DEBUG ((LM_DEBUG
,
345 ACE_TEXT ("TAO (%P|%t) - DTP_Task::svc() ")
346 ACE_TEXT ("Decrementing busy_threads_. ")
347 ACE_TEXT ("Busy thread count:%d\n"),
348 this->busy_threads_
.load()));
351 ACE_Time_Value tmp_sec
= this->thread_idle_time_
.to_absolute_time();
354 ACE_GUARD_RETURN (TAO_SYNCH_MUTEX
, guard
, this->work_lock_
, false);
356 while (!(this->shutdown_
|| this->check_queue_
) && wait_state
!= -1)
358 wait_state
= this->thread_idle_time_
.sec () == 0
359 ? this->work_available_
.wait ()
360 : this->work_available_
.wait (&tmp_sec
);
365 if (wait_state
== -1)
367 if (errno
!= ETIME
|| this->remove_active (false))
369 if (TAO_debug_level
> 4)
371 TAOLIB_DEBUG ((LM_DEBUG
,
372 ACE_TEXT ("TAO (%P|%t) - DTP_Task::svc() ")
373 ACE_TEXT ("Existing thread expiring.\n")));
378 this->check_queue_
= false;
382 if (TAO_debug_level
> 4)
384 TAOLIB_DEBUG ((LM_DEBUG
,
385 ACE_TEXT ("TAO (%P|%t) - DTP_Task::svc() ")
386 ACE_TEXT ("Incrementing busy_threads_. ")
387 ACE_TEXT ("Busy thread count:%d\n"),
388 this->busy_threads_
.load ()));
393 if (this->need_active ())
395 if (this->activate (THR_NEW_LWP
| THR_DETACHED
,
398 ACE_DEFAULT_THREAD_PRIORITY
,
403 this->thread_stack_size_
== 0 ? 0 :
404 &this->thread_stack_size_
) != 0)
406 TAOLIB_ERROR ((LM_ERROR
,
407 ACE_TEXT ("(%P|%t) DTP_Task::svc() failed to ")
408 ACE_TEXT ("grow thread pool.\n")));
413 if (TAO_debug_level
> 4)
415 TAOLIB_DEBUG ((LM_DEBUG
,
416 ACE_TEXT ("TAO (%P|%t) - DTP_Task::svc() ")
417 ACE_TEXT ("Growing threadcount. ")
418 ACE_TEXT ("New thread count:%d\n"),
419 this->thr_count ()));
424 request
->dispatch ();
425 this->clear_request (request
);
426 dispatchable_visitor
.reset ();
428 this->remove_active (true);
434 TAO_DTP_Task::close (u_long flag
)
437 ACE_GUARD_RETURN (TAO_SYNCH_MUTEX
, guard
, this->aw_lock_
, 0);
440 this->active_workers_
.signal ();
448 this->opened_
= false;
449 this->shutdown_
= true;
450 this->accepting_requests_
= false;
454 ACE_GUARD_RETURN (TAO_SYNCH_MUTEX
, guard
, this->work_lock_
, 0);
455 this->work_available_
.broadcast();
458 size_t in_task
= (this->thr_mgr ()->task () == this) ? 1 : 0;
459 if (TAO_debug_level
> 4)
461 TAOLIB_DEBUG ((LM_DEBUG
,
462 ACE_TEXT ("TAO (%P|%t) - DTP_Task::close() ")
463 ACE_TEXT ("shutting down. in_task = %d, Count = %d \n"),
464 in_task
, this->thr_count ()));
467 while (this->thr_count () > in_task
)
469 ACE_GUARD_RETURN (TAO_SYNCH_MUTEX
, guard
, this->aw_lock_
, 0);
470 this->active_workers_
.wait ();
474 ACE_GUARD_RETURN (TAO_SYNCH_MUTEX
, guard
, this->queue_lock_
, 0);
475 TAO::CSD::TP_Cancel_Visitor v
;
476 this->queue_
.accept_visitor (v
);
483 TAO_DTP_Task::set_init_pool_threads (size_t thr_count
)
485 this->init_pool_threads_
= thr_count
;
489 TAO_DTP_Task::set_min_pool_threads (size_t thr_count
)
491 this->min_pool_threads_
= thr_count
;
495 TAO_DTP_Task::set_max_pool_threads (size_t thr_count
)
497 this->max_pool_threads_
= thr_count
;
501 TAO_DTP_Task::set_thread_stack_size (size_t stack_sz
)
503 this->thread_stack_size_
= stack_sz
;
507 TAO_DTP_Task::set_thread_idle_time(ACE_Time_Value thr_timeout
)
509 this->thread_idle_time_
= thr_timeout
;
513 TAO_DTP_Task::set_max_request_queue_depth (size_t queue_depth
)
515 this->max_request_queue_depth_
= queue_depth
;
519 TAO_DTP_Task::cancel_servant (PortableServer::Servant servant
)
523 if (TAO_debug_level
> 0)
525 TAOLIB_DEBUG ((LM_DEBUG
,
526 ACE_TEXT ("TAO (%P|%t) - DTP_Task::cancel_servant ")
527 ACE_TEXT ("called with null servant\n")
533 ACE_GUARD (TAO_SYNCH_MUTEX
, guard
, this->queue_lock_
);
535 // Cancel the requests targeted for the provided servant.
536 TAO::CSD::TP_Cancel_Visitor
cancel_visitor (servant
);
537 this->queue_
.accept_visitor (cancel_visitor
);
540 TAO_END_VERSIONED_NAMESPACE_DECL
542 #endif /* (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0 */