Merge pull request #2218 from jwillemsen/jwi-pthreadsigmask
[ACE_TAO.git] / TAO / tao / Dynamic_TP / DTP_POA_Strategy.cpp
blobea0088f136cedaf8f45cade34ed279781130b4bb
1 #include "tao/Dynamic_TP/DTP_POA_Strategy.h"
2 #include "tao/CSD_ThreadPool/CSD_TP_Remote_Request.h"
3 #include "tao/CSD_ThreadPool/CSD_TP_Collocated_Synch_Request.h"
4 #include "tao/CSD_ThreadPool/CSD_TP_Collocated_Asynch_Request.h"
5 #include "tao/CSD_ThreadPool/CSD_TP_Custom_Synch_Request.h"
6 #include "tao/CSD_ThreadPool/CSD_TP_Custom_Asynch_Request.h"
7 #include "tao/CSD_ThreadPool/CSD_TP_Collocated_Synch_With_Server_Request.h"
8 #include "tao/ORB_Core.h"
10 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0
12 #if !defined (__ACE_INLINE__)
13 #include "tao/Dynamic_TP/DTP_POA_Strategy.inl"
14 #endif /* ! __ACE_INLINE__ */
16 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
19 TAO_DTP_POA_Strategy::~TAO_DTP_POA_Strategy()
23 TAO_DTP_POA_Strategy::CustomRequestOutcome
24 TAO_DTP_POA_Strategy::custom_synch_request(
25 TAO::CSD::TP_Custom_Request_Operation* op)
27 TAO::CSD::TP_Servant_State::HandleType servant_state =
28 this->get_servant_state(op->servant());
30 TAO::CSD::TP_Custom_Synch_Request *req_ptr;
31 ACE_NEW_RETURN (req_ptr,
32 TAO::CSD::TP_Custom_Synch_Request(op, servant_state.in ()),
33 REQUEST_REJECTED);
35 TAO::CSD::TP_Custom_Synch_Request_Handle request = req_ptr;
37 if (!this->dtp_task_.add_request (request.in ()))
39 // The request was rejected by the task.
40 return REQUEST_REJECTED;
43 // Now we wait until the request is handled (executed or cancelled).
44 return (request->wait ()) ? REQUEST_EXECUTED : REQUEST_CANCELLED;
47 TAO_DTP_POA_Strategy::CustomRequestOutcome
48 TAO_DTP_POA_Strategy::custom_asynch_request (
49 TAO::CSD::TP_Custom_Request_Operation* op)
51 TAO::CSD::TP_Servant_State::HandleType servant_state =
52 this->get_servant_state (op->servant ());
54 TAO::CSD::TP_Custom_Asynch_Request *req_ptr;
55 ACE_NEW_RETURN (req_ptr,
56 TAO::CSD::TP_Custom_Asynch_Request (op, servant_state.in ()),
57 REQUEST_REJECTED);
59 TAO::CSD::TP_Custom_Asynch_Request_Handle request = req_ptr;
61 return (this->dtp_task_.add_request (request.in ()))
62 ? REQUEST_DISPATCHED : REQUEST_REJECTED;
65 bool
66 TAO_DTP_POA_Strategy::poa_activated_event_i (TAO_ORB_Core& orb_core)
68 this->dtp_task_.thr_mgr (orb_core.thr_mgr ());
70 // Activates the worker threads, and waits until all have been started.
71 if (!this->config_initialized_)
73 TAO_DTP_Config_Registry * config_repo =
74 ACE_Dynamic_Service<TAO_DTP_Config_Registry>::instance
75 ("DTP_Config_Registry");
77 if (config_repo == 0)
79 if (TAO_debug_level > 0)
81 TAOLIB_DEBUG ((LM_DEBUG,
82 ACE_TEXT ("TAO (%P|%t) - DTP_POA_Strategy - ")
83 ACE_TEXT ("cannot retrieve configuration repo\n")));
85 return false;
87 else
89 TAO_DTP_Definition config_entry;
90 if (!config_repo->find (this->dynamic_tp_config_name_, config_entry))
92 TAOLIB_DEBUG ((LM_DEBUG,
93 ACE_TEXT ("TAO (%P|%t) - DTP_POA_Strategy - ")
94 ACE_TEXT ("warning: config not found...using ")
95 ACE_TEXT ("defaults!\n")));
97 this->set_dtp_config (config_entry);
98 //this->dtp_task_.set_init_pool_threads(config_entry.init_threads_);
99 //this->dtp_task_.set_min_pool_threads(config_entry.min_threads_);
100 //this->dtp_task_.set_max_pool_threads(config_entry.max_threads_);
101 //this->dtp_task_.set_thread_idle_time(config_entry.timeout_);
102 //this->dtp_task_.set_thread_stack_size(config_entry.stack_size_);
103 //this->dtp_task_.set_max_request_queue_depth(config_entry.queue_depth_);
108 return (this->dtp_task_.open () == 0);
111 void
112 TAO_DTP_POA_Strategy::poa_deactivated_event_i ()
114 // Passing in a value of 1 means that we want to shutdown the task, which
115 // equates to causing all worker threads to shutdown. The worker threads
116 // themselves will also invoke the close() method, but the passed-in value
117 // will be 0. So, a 1 means "shutdown", and a 0 means "a single worker
118 // thread is going away".
119 this->dtp_task_.close(1);
122 TAO::CSD::Strategy_Base::DispatchResult
123 TAO_DTP_POA_Strategy::dispatch_remote_request_i
124 (TAO_ServerRequest& server_request,
125 const PortableServer::ObjectId& object_id,
126 PortableServer::POA_ptr poa,
127 const char* operation,
128 PortableServer::Servant servant)
130 TAO::CSD::TP_Servant_State::HandleType servant_state =
131 this->get_servant_state (servant);
133 // Handle the one ways that are SYNC_WITH_SERVER and not collocated.
134 // before queuing the request and thus avoid delaying the client.
135 // This is a problem if the servant ends up throwing a Location Forward
136 // exception. If necessary, add an override config option here.
137 CORBA::Boolean early_sync = true;
138 server_request.is_queued (early_sync);
139 server_request.sync_before_dispatch ();
141 // Now we can create the TP_Remote_Request object, and then add it to our
142 // task_'s "request queue".
144 // TBD-CSD: Need to use a Cached Allocator to "create" the
145 // TP_Remote_Request objects. For now, use the heap.
146 TAO::CSD::TP_Remote_Request *req_ptr;
147 ACE_NEW_RETURN (req_ptr,
148 TAO::CSD::TP_Remote_Request (server_request,
149 object_id,
150 poa,
151 operation,
152 servant,
153 servant_state.in ()),
154 TAO::CSD::Strategy_Base::DISPATCH_REJECTED);
156 TAO::CSD::TP_Remote_Request_Handle request = req_ptr;
158 // Hand the request object to our task so that it can add the request
159 // to its "request queue".
160 if (!this->dtp_task_.add_request (request.in ()))
162 // Return the DISPATCH_REJECTED return code so that the caller (our
163 // base class' dispatch_request() method) knows that we did
164 // not handle the request, and that it should be rejected.
165 return TAO::CSD::Strategy_Base::DISPATCH_REJECTED;
168 return TAO::CSD::Strategy_Base::DISPATCH_HANDLED;
171 TAO::CSD::Strategy_Base::DispatchResult
172 TAO_DTP_POA_Strategy::dispatch_collocated_request_i
173 (TAO_ServerRequest& server_request,
174 const PortableServer::ObjectId& object_id,
175 PortableServer::POA_ptr poa,
176 const char* operation,
177 PortableServer::Servant servant)
179 TAO::CSD::TP_Servant_State::HandleType servant_state =
180 this->get_servant_state (servant);
182 bool is_sync_with_server = server_request.sync_with_server ();
183 bool is_synchronous = server_request.response_expected ();
185 TAO::CSD::TP_Collocated_Synch_Request_Handle
186 synch_request;
187 TAO::CSD::TP_Collocated_Synch_With_Server_Request_Handle
188 synch_with_server_request;
189 TAO::CSD::TP_Request_Handle
190 request;
192 // Create the request object using the appropriate concrete type.
193 if (is_sync_with_server)
195 TAO::CSD::TP_Collocated_Synch_With_Server_Request *req_ptr;
196 ACE_NEW_RETURN (req_ptr,
197 TAO::CSD::TP_Collocated_Synch_With_Server_Request
198 (server_request,
199 object_id,
200 poa,
201 operation,
202 servant,
203 servant_state.in ()),
204 DISPATCH_REJECTED);
206 synch_with_server_request = req_ptr;
208 // Give the request handle its own "copy".
209 synch_with_server_request->_add_ref ();
210 request = synch_with_server_request.in ();
212 else if (is_synchronous)
214 TAO::CSD::TP_Collocated_Synch_Request *req_ptr;
215 ACE_NEW_RETURN (req_ptr,
216 TAO::CSD::TP_Collocated_Synch_Request (
217 server_request,
218 object_id,
219 poa,
220 operation,
221 servant,
222 servant_state.in ()),
223 DISPATCH_REJECTED);
225 synch_request = req_ptr;
227 // Give the request handle its own "copy".
228 synch_request->_add_ref ();
229 request = synch_request.in ();
231 else
233 TAO::CSD::TP_Collocated_Asynch_Request *req_ptr;
234 ACE_NEW_RETURN (req_ptr,
235 TAO::CSD::TP_Collocated_Asynch_Request (server_request,
236 object_id,
237 poa,
238 operation,
239 servant,
240 servant_state.in ()),
241 DISPATCH_REJECTED);
243 // Just use the (base) request handle to hold the request object.
244 request = req_ptr;
247 // Hand the request object to our task so that it can add the request
248 // to its "request queue".
249 if (!this->dtp_task_.add_request (request.in ()))
251 // Return the DISPATCH_REJECTED return code so that the caller (our
252 // base class' dispatch_request() method) knows that we did
253 // not handle the request, and that it should be rejected.
254 return DISPATCH_REJECTED;
257 // We need to wait on the request object if the request type is a
258 // synchronous request.
259 if (!synch_request.is_nil ())
261 int srw = synch_request->wait ();
262 if (srw == false)
264 // Raise exception when request was cancelled.
265 throw ::CORBA::NO_IMPLEMENT ();
268 else if (!synch_with_server_request.is_nil())
270 bool swsr = synch_with_server_request->wait();
271 if (swsr == false)
273 // Raise exception when request was cancelled.
274 throw ::CORBA::NO_IMPLEMENT ();
278 return DISPATCH_HANDLED;
281 void
282 TAO_DTP_POA_Strategy::servant_activated_event_i
283 (PortableServer::Servant servant,
284 const PortableServer::ObjectId&)
286 if (this->serialize_servants_)
288 // Add the servant to the servant state map.
289 this->servant_state_map_.insert (servant);
293 void
294 TAO_DTP_POA_Strategy::servant_deactivated_event_i
295 (PortableServer::Servant servant,
296 const PortableServer::ObjectId&)
298 // Cancel all requests stuck in the queue for the specified servant.
299 this->dtp_task_.cancel_servant (servant);
301 if (this->serialize_servants_)
303 // Remove the servant from the servant state map.
304 this->servant_state_map_.remove (servant);
308 void
309 TAO_DTP_POA_Strategy::cancel_requests (PortableServer::Servant servant)
311 // Cancel all requests stuck in the queue for the specified servant.
312 this->dtp_task_.cancel_servant (servant);
315 TAO::CSD::TP_Servant_State::HandleType
316 TAO_DTP_POA_Strategy::get_servant_state (PortableServer::Servant servant)
318 TAO::CSD::TP_Servant_State::HandleType servant_state;
320 if (this->serialize_servants_)
322 servant_state = this->servant_state_map_.find (servant);
325 return servant_state;
328 void
329 TAO_DTP_POA_Strategy::set_dtp_config (TAO_DTP_Definition &tp_config)
331 if (tp_config.min_threads_ <= 0)
333 this->dtp_task_.set_min_pool_threads (1);
334 this->dtp_task_.set_thread_idle_time (ACE_Time_Value (0,0));
336 else
338 this->dtp_task_.set_min_pool_threads (tp_config.min_threads_);
339 this->dtp_task_.set_thread_idle_time (tp_config.timeout_);
342 // initial_pool_threads_
343 if ((tp_config.init_threads_ <= 0) ||
344 (tp_config.init_threads_ < tp_config.min_threads_))
346 this->dtp_task_.set_init_pool_threads (this->dtp_task_.get_min_pool_threads());
348 else
350 this->dtp_task_.set_init_pool_threads (tp_config.init_threads_);
353 // max_pool_threads_
354 if (tp_config.max_threads_ <= 0)
356 // Set to 0 so that max is unbounded.
357 this->dtp_task_.set_max_pool_threads(0);
359 else
361 if (tp_config.max_threads_ < tp_config.init_threads_)
363 this->dtp_task_.set_max_pool_threads(
364 this->dtp_task_.get_init_pool_threads ());
366 else
368 this->dtp_task_.set_max_pool_threads (tp_config.max_threads_);
372 // thread_stack_size_
373 if (tp_config.stack_size_ <= 0)
375 this->dtp_task_.set_thread_stack_size (ACE_DEFAULT_THREAD_STACKSIZE);
377 else
379 this->dtp_task_.set_thread_stack_size (tp_config.stack_size_);
382 // max_request_queue_depth_
383 if (tp_config.queue_depth_ < 0)
385 this->dtp_task_.set_max_request_queue_depth (0);
387 else
389 this->dtp_task_.set_max_request_queue_depth (tp_config.queue_depth_);
392 if (TAO_debug_level > 4)
394 TAOLIB_DEBUG ((LM_DEBUG,
395 ACE_TEXT ("TAO (%P|%t) - DTP_POA_Strategy: ")
396 ACE_TEXT ("Initialized with:\n")
397 ACE_TEXT ("TAO (%P|%t) - DTP_POA_Strategy initial_pool_threads_=")
398 ACE_TEXT ("[%d]\n")
399 ACE_TEXT ("TAO (%P|%t) - DTP_POA_Strategy min_pool_threads_=[%d]\n")
400 ACE_TEXT ("TAO (%P|%t) - DTP_POA_Strategy max_pool_threads_=[%d]\n")
401 ACE_TEXT ("TAO (%P|%t) - DTP_POA_Strategy max_request_queue_depth_=")
402 ACE_TEXT ("[%d]\n")
403 ACE_TEXT ("TAO (%P|%t) - DTP_POA_Strategy thread_stack_size_=[%d]\n")
404 ACE_TEXT ("TAO (%P|%t) - DTP_POA_Strategy thread_idle_time_=[%d]\n"),
405 this->dtp_task_.get_init_pool_threads(),
406 this->dtp_task_.get_min_pool_threads(),
407 this->dtp_task_.get_max_pool_threads(),
408 this->dtp_task_.get_max_request_queue_depth(),
409 this->dtp_task_.get_thread_stack_size(),
410 this->dtp_task_.get_thread_idle_time()));
414 TAO_END_VERSIONED_NAMESPACE_DECL
416 #endif /* TAO_HAS_CORBA_MESSAGING && TAO_HAS_CORBA_MESSAGING != 0 */