1 #include "tao/Dynamic_TP/DTP_POA_Strategy.h"
2 #include "tao/CSD_ThreadPool/CSD_TP_Remote_Request.h"
3 #include "tao/CSD_ThreadPool/CSD_TP_Collocated_Synch_Request.h"
4 #include "tao/CSD_ThreadPool/CSD_TP_Collocated_Asynch_Request.h"
5 #include "tao/CSD_ThreadPool/CSD_TP_Custom_Synch_Request.h"
6 #include "tao/CSD_ThreadPool/CSD_TP_Custom_Asynch_Request.h"
7 #include "tao/CSD_ThreadPool/CSD_TP_Collocated_Synch_With_Server_Request.h"
8 #include "tao/ORB_Core.h"
10 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0
12 #if !defined (__ACE_INLINE__)
13 #include "tao/Dynamic_TP/DTP_POA_Strategy.inl"
14 #endif /* ! __ACE_INLINE__ */
16 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
19 TAO_DTP_POA_Strategy::~TAO_DTP_POA_Strategy()
23 TAO_DTP_POA_Strategy::CustomRequestOutcome
24 TAO_DTP_POA_Strategy::custom_synch_request(
25 TAO::CSD::TP_Custom_Request_Operation
* op
)
27 TAO::CSD::TP_Servant_State::HandleType servant_state
=
28 this->get_servant_state(op
->servant());
30 TAO::CSD::TP_Custom_Synch_Request
*req_ptr
;
31 ACE_NEW_RETURN (req_ptr
,
32 TAO::CSD::TP_Custom_Synch_Request(op
, servant_state
.in ()),
35 TAO::CSD::TP_Custom_Synch_Request_Handle request
= req_ptr
;
37 if (!this->dtp_task_
.add_request (request
.in ()))
39 // The request was rejected by the task.
40 return REQUEST_REJECTED
;
43 // Now we wait until the request is handled (executed or cancelled).
44 return (request
->wait ()) ? REQUEST_EXECUTED
: REQUEST_CANCELLED
;
47 TAO_DTP_POA_Strategy::CustomRequestOutcome
48 TAO_DTP_POA_Strategy::custom_asynch_request (
49 TAO::CSD::TP_Custom_Request_Operation
* op
)
51 TAO::CSD::TP_Servant_State::HandleType servant_state
=
52 this->get_servant_state (op
->servant ());
54 TAO::CSD::TP_Custom_Asynch_Request
*req_ptr
;
55 ACE_NEW_RETURN (req_ptr
,
56 TAO::CSD::TP_Custom_Asynch_Request (op
, servant_state
.in ()),
59 TAO::CSD::TP_Custom_Asynch_Request_Handle request
= req_ptr
;
61 return (this->dtp_task_
.add_request (request
.in ()))
62 ? REQUEST_DISPATCHED
: REQUEST_REJECTED
;
66 TAO_DTP_POA_Strategy::poa_activated_event_i (TAO_ORB_Core
& orb_core
)
68 this->dtp_task_
.thr_mgr (orb_core
.thr_mgr ());
70 // Activates the worker threads, and waits until all have been started.
71 if (!this->config_initialized_
)
73 TAO_DTP_Config_Registry
* config_repo
=
74 ACE_Dynamic_Service
<TAO_DTP_Config_Registry
>::instance
75 ("DTP_Config_Registry");
79 if (TAO_debug_level
> 0)
81 TAOLIB_DEBUG ((LM_DEBUG
,
82 ACE_TEXT ("TAO (%P|%t) - DTP_POA_Strategy - ")
83 ACE_TEXT ("cannot retrieve configuration repo\n")));
89 TAO_DTP_Definition config_entry
;
90 if (!config_repo
->find (this->dynamic_tp_config_name_
, config_entry
))
92 TAOLIB_DEBUG ((LM_DEBUG
,
93 ACE_TEXT ("TAO (%P|%t) - DTP_POA_Strategy - ")
94 ACE_TEXT ("warning: config not found...using ")
95 ACE_TEXT ("defaults!\n")));
97 this->set_dtp_config (config_entry
);
98 //this->dtp_task_.set_init_pool_threads(config_entry.init_threads_);
99 //this->dtp_task_.set_min_pool_threads(config_entry.min_threads_);
100 //this->dtp_task_.set_max_pool_threads(config_entry.max_threads_);
101 //this->dtp_task_.set_thread_idle_time(config_entry.timeout_);
102 //this->dtp_task_.set_thread_stack_size(config_entry.stack_size_);
103 //this->dtp_task_.set_max_request_queue_depth(config_entry.queue_depth_);
108 return (this->dtp_task_
.open () == 0);
112 TAO_DTP_POA_Strategy::poa_deactivated_event_i ()
114 // Passing in a value of 1 means that we want to shutdown the task, which
115 // equates to causing all worker threads to shutdown. The worker threads
116 // themselves will also invoke the close() method, but the passed-in value
117 // will be 0. So, a 1 means "shutdown", and a 0 means "a single worker
118 // thread is going away".
119 this->dtp_task_
.close(1);
122 TAO::CSD::Strategy_Base::DispatchResult
123 TAO_DTP_POA_Strategy::dispatch_remote_request_i
124 (TAO_ServerRequest
& server_request
,
125 const PortableServer::ObjectId
& object_id
,
126 PortableServer::POA_ptr poa
,
127 const char* operation
,
128 PortableServer::Servant servant
)
130 TAO::CSD::TP_Servant_State::HandleType servant_state
=
131 this->get_servant_state (servant
);
133 // Handle the one ways that are SYNC_WITH_SERVER and not collocated.
134 // before queuing the request and thus avoid delaying the client.
135 // This is a problem if the servant ends up throwing a Location Forward
136 // exception. If necessary, add an override config option here.
137 CORBA::Boolean early_sync
= true;
138 server_request
.is_queued (early_sync
);
139 server_request
.sync_before_dispatch ();
141 // Now we can create the TP_Remote_Request object, and then add it to our
142 // task_'s "request queue".
144 // TBD-CSD: Need to use a Cached Allocator to "create" the
145 // TP_Remote_Request objects. For now, use the heap.
146 TAO::CSD::TP_Remote_Request
*req_ptr
;
147 ACE_NEW_RETURN (req_ptr
,
148 TAO::CSD::TP_Remote_Request (server_request
,
153 servant_state
.in ()),
154 TAO::CSD::Strategy_Base::DISPATCH_REJECTED
);
156 TAO::CSD::TP_Remote_Request_Handle request
= req_ptr
;
158 // Hand the request object to our task so that it can add the request
159 // to its "request queue".
160 if (!this->dtp_task_
.add_request (request
.in ()))
162 // Return the DISPATCH_REJECTED return code so that the caller (our
163 // base class' dispatch_request() method) knows that we did
164 // not handle the request, and that it should be rejected.
165 return TAO::CSD::Strategy_Base::DISPATCH_REJECTED
;
168 return TAO::CSD::Strategy_Base::DISPATCH_HANDLED
;
171 TAO::CSD::Strategy_Base::DispatchResult
172 TAO_DTP_POA_Strategy::dispatch_collocated_request_i
173 (TAO_ServerRequest
& server_request
,
174 const PortableServer::ObjectId
& object_id
,
175 PortableServer::POA_ptr poa
,
176 const char* operation
,
177 PortableServer::Servant servant
)
179 TAO::CSD::TP_Servant_State::HandleType servant_state
=
180 this->get_servant_state (servant
);
182 bool is_sync_with_server
= server_request
.sync_with_server ();
183 bool is_synchronous
= server_request
.response_expected ();
185 TAO::CSD::TP_Collocated_Synch_Request_Handle
187 TAO::CSD::TP_Collocated_Synch_With_Server_Request_Handle
188 synch_with_server_request
;
189 TAO::CSD::TP_Request_Handle
192 // Create the request object using the appropriate concrete type.
193 if (is_sync_with_server
)
195 TAO::CSD::TP_Collocated_Synch_With_Server_Request
*req_ptr
;
196 ACE_NEW_RETURN (req_ptr
,
197 TAO::CSD::TP_Collocated_Synch_With_Server_Request
203 servant_state
.in ()),
206 synch_with_server_request
= req_ptr
;
208 // Give the request handle its own "copy".
209 synch_with_server_request
->_add_ref ();
210 request
= synch_with_server_request
.in ();
212 else if (is_synchronous
)
214 TAO::CSD::TP_Collocated_Synch_Request
*req_ptr
;
215 ACE_NEW_RETURN (req_ptr
,
216 TAO::CSD::TP_Collocated_Synch_Request (
222 servant_state
.in ()),
225 synch_request
= req_ptr
;
227 // Give the request handle its own "copy".
228 synch_request
->_add_ref ();
229 request
= synch_request
.in ();
233 TAO::CSD::TP_Collocated_Asynch_Request
*req_ptr
;
234 ACE_NEW_RETURN (req_ptr
,
235 TAO::CSD::TP_Collocated_Asynch_Request (server_request
,
240 servant_state
.in ()),
243 // Just use the (base) request handle to hold the request object.
247 // Hand the request object to our task so that it can add the request
248 // to its "request queue".
249 if (!this->dtp_task_
.add_request (request
.in ()))
251 // Return the DISPATCH_REJECTED return code so that the caller (our
252 // base class' dispatch_request() method) knows that we did
253 // not handle the request, and that it should be rejected.
254 return DISPATCH_REJECTED
;
257 // We need to wait on the request object if the request type is a
258 // synchronous request.
259 if (!synch_request
.is_nil ())
261 int srw
= synch_request
->wait ();
264 // Raise exception when request was cancelled.
265 throw ::CORBA::NO_IMPLEMENT ();
268 else if (!synch_with_server_request
.is_nil())
270 bool swsr
= synch_with_server_request
->wait();
273 // Raise exception when request was cancelled.
274 throw ::CORBA::NO_IMPLEMENT ();
278 return DISPATCH_HANDLED
;
282 TAO_DTP_POA_Strategy::servant_activated_event_i
283 (PortableServer::Servant servant
,
284 const PortableServer::ObjectId
&)
286 if (this->serialize_servants_
)
288 // Add the servant to the servant state map.
289 this->servant_state_map_
.insert (servant
);
294 TAO_DTP_POA_Strategy::servant_deactivated_event_i
295 (PortableServer::Servant servant
,
296 const PortableServer::ObjectId
&)
298 // Cancel all requests stuck in the queue for the specified servant.
299 this->dtp_task_
.cancel_servant (servant
);
301 if (this->serialize_servants_
)
303 // Remove the servant from the servant state map.
304 this->servant_state_map_
.remove (servant
);
309 TAO_DTP_POA_Strategy::cancel_requests (PortableServer::Servant servant
)
311 // Cancel all requests stuck in the queue for the specified servant.
312 this->dtp_task_
.cancel_servant (servant
);
315 TAO::CSD::TP_Servant_State::HandleType
316 TAO_DTP_POA_Strategy::get_servant_state (PortableServer::Servant servant
)
318 TAO::CSD::TP_Servant_State::HandleType servant_state
;
320 if (this->serialize_servants_
)
322 servant_state
= this->servant_state_map_
.find (servant
);
325 return servant_state
;
329 TAO_DTP_POA_Strategy::set_dtp_config (TAO_DTP_Definition
&tp_config
)
331 if (tp_config
.min_threads_
<= 0)
333 this->dtp_task_
.set_min_pool_threads (1);
334 this->dtp_task_
.set_thread_idle_time (ACE_Time_Value (0,0));
338 this->dtp_task_
.set_min_pool_threads (tp_config
.min_threads_
);
339 this->dtp_task_
.set_thread_idle_time (tp_config
.timeout_
);
342 // initial_pool_threads_
343 if ((tp_config
.init_threads_
<= 0) ||
344 (tp_config
.init_threads_
< tp_config
.min_threads_
))
346 this->dtp_task_
.set_init_pool_threads (this->dtp_task_
.get_min_pool_threads());
350 this->dtp_task_
.set_init_pool_threads (tp_config
.init_threads_
);
354 if (tp_config
.max_threads_
<= 0)
356 // Set to 0 so that max is unbounded.
357 this->dtp_task_
.set_max_pool_threads(0);
361 if (tp_config
.max_threads_
< tp_config
.init_threads_
)
363 this->dtp_task_
.set_max_pool_threads(
364 this->dtp_task_
.get_init_pool_threads ());
368 this->dtp_task_
.set_max_pool_threads (tp_config
.max_threads_
);
372 // thread_stack_size_
373 if (tp_config
.stack_size_
<= 0)
375 this->dtp_task_
.set_thread_stack_size (ACE_DEFAULT_THREAD_STACKSIZE
);
379 this->dtp_task_
.set_thread_stack_size (tp_config
.stack_size_
);
382 // max_request_queue_depth_
383 if (tp_config
.queue_depth_
< 0)
385 this->dtp_task_
.set_max_request_queue_depth (0);
389 this->dtp_task_
.set_max_request_queue_depth (tp_config
.queue_depth_
);
392 if (TAO_debug_level
> 4)
394 TAOLIB_DEBUG ((LM_DEBUG
,
395 ACE_TEXT ("TAO (%P|%t) - DTP_POA_Strategy: ")
396 ACE_TEXT ("Initialized with:\n")
397 ACE_TEXT ("TAO (%P|%t) - DTP_POA_Strategy initial_pool_threads_=")
399 ACE_TEXT ("TAO (%P|%t) - DTP_POA_Strategy min_pool_threads_=[%d]\n")
400 ACE_TEXT ("TAO (%P|%t) - DTP_POA_Strategy max_pool_threads_=[%d]\n")
401 ACE_TEXT ("TAO (%P|%t) - DTP_POA_Strategy max_request_queue_depth_=")
403 ACE_TEXT ("TAO (%P|%t) - DTP_POA_Strategy thread_stack_size_=[%d]\n")
404 ACE_TEXT ("TAO (%P|%t) - DTP_POA_Strategy thread_idle_time_=[%d]\n"),
405 this->dtp_task_
.get_init_pool_threads(),
406 this->dtp_task_
.get_min_pool_threads(),
407 this->dtp_task_
.get_max_pool_threads(),
408 this->dtp_task_
.get_max_request_queue_depth(),
409 this->dtp_task_
.get_thread_stack_size(),
410 this->dtp_task_
.get_thread_idle_time()));
414 TAO_END_VERSIONED_NAMESPACE_DECL
416 #endif /* TAO_HAS_CORBA_MESSAGING && TAO_HAS_CORBA_MESSAGING != 0 */