1 #include "tao/CSD_ThreadPool/CSD_TP_Task.h"
2 #include "tao/CSD_ThreadPool/CSD_TP_Request.h"
3 #include "tao/CSD_ThreadPool/CSD_TP_Dispatchable_Visitor.h"
4 #include "tao/CSD_ThreadPool/CSD_TP_Cancel_Visitor.h"
6 #if !defined (__ACE_INLINE__)
7 # include "tao/CSD_ThreadPool/CSD_TP_Task.inl"
8 #endif /* ! __ACE_INLINE__ */
10 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
12 TAO::CSD::TP_Task::~TP_Task()
18 TAO::CSD::TP_Task::add_request(TP_Request
* request
)
20 ACE_GUARD_RETURN (TAO_SYNCH_MUTEX
, guard
, this->lock_
, false);
22 if (!this->accepting_requests_
)
24 TAOLIB_DEBUG((LM_DEBUG
,"(%P|%t) TP_Task::add_request() - "
25 "not accepting requests\n"));
29 // We have made the decision that the request is going to be placed upon
30 // the queue_. Inform the request that it is about to be placed into
31 // a request queue. Some requests may not need to do anything in
32 // preparation of being placed into a queue. Others, however, may need
33 // to perfom a "clone" operation on some underlying request data before
34 // the request can be properly placed into a queue.
35 request
->prepare_for_queue();
37 this->queue_
.put(request
);
39 this->work_available_
.signal();
46 TAO::CSD::TP_Task::open(void* args
)
48 Thread_Counter num
= 1;
49 Thread_Counter
* tmp
= static_cast<Thread_Counter
*> (args
);
53 //FUZZ: disable check_for_lack_ACE_OS
54 TAOLIB_ERROR_RETURN((LM_ERROR
,
55 ACE_TEXT ("(%P|%t) TP_Task failed to open. ")
56 ACE_TEXT ("Invalid argument type passed to open().\n")),
58 //FUZZ: enable check_for_lack_ACE_OS
63 // We can't activate 0 threads. Make sure this isn't the case.
66 TAOLIB_ERROR_RETURN((LM_ERROR
,
67 ACE_TEXT ("(%P|%t) TP_Task failed to open. ")
68 ACE_TEXT ("num_threads (%u) is less-than 1.\n"),
73 // We need the lock acquired from here on out.
74 ACE_GUARD_RETURN (TAO_SYNCH_MUTEX
, guard
, this->lock_
, -1);
76 // Multiple POA_Manager::activate() calls trigger multiple calls to open()
83 // Activate this task object with 'num' worker threads.
84 if (this->activate(THR_NEW_LWP
| THR_JOINABLE
, num
) != 0)
86 // Assumes that when activate returns non-zero return code that
87 // no threads were activated.
88 TAOLIB_ERROR_RETURN((LM_ERROR
,
89 ACE_TEXT ("(%P|%t) TP_Task failed to activate ")
90 ACE_TEXT ("(%d) worker threads.\n"),
95 // Now we have past the point where we can say we've been open()'ed before.
98 // Now we wait until all of the threads have started.
99 while (this->num_threads_
!= num
)
101 this->active_workers_
.wait();
104 // We can now accept requests (via our add_request() method).
105 this->accepting_requests_
= true;
112 TAO::CSD::TP_Task::svc()
114 // Account for this current worker thread having started the
115 // execution of this svc() method.
117 ACE_GUARD_RETURN (TAO_SYNCH_MUTEX
, guard
, this->lock_
, false);
118 // Put the thread id into a collection which is used to check whether
119 // the orb shutdown is called by one of the threads in the pool.
120 ACE_thread_t thr_id
= ACE_OS::thr_self ();
121 this->activated_threads_
.push_back(thr_id
);
122 ++this->num_threads_
;
123 this->active_workers_
.signal();
126 // This visitor object will be re-used over and over again as part of
127 // the "GetWork" logic below.
128 TP_Dispatchable_Visitor dispatchable_visitor
;
130 // Start the "GetWork-And-PerformWork" loop for the current worker thread.
133 TP_Request_Handle request
;
135 // Do the "GetWork" step.
137 // Acquire the lock until just before we decide to "PerformWork".
138 ACE_GUARD_RETURN (TAO_SYNCH_MUTEX
, guard
, this->lock_
, 0);
140 // Start the "GetWork" loop.
141 while (request
.is_nil())
143 if (this->shutdown_initiated_
)
145 // This breaks us out of all loops with one fell swoop.
149 if (this->deferred_shutdown_initiated_
)
151 this->deferred_shutdown_initiated_
= false;
155 // There is no need to visit the queue if it is empty.
156 if (!this->queue_
.is_empty())
158 // Visit the requests in the queue in hopes of
159 // locating the first "dispatchable" (ie, not busy) request.
160 // If a dispatchable request is located, it is extracted
161 // from the queue and saved in a handle data member in the
163 this->queue_
.accept_visitor(dispatchable_visitor
);
165 // If a dispatchable request is located, it is extracted
166 // from the queue and saved in a handle data member in the
167 // visitor object. Let's get a "copy" (or a NULL pointer
168 // if the visitor didn't locate/extract one).
169 request
= dispatchable_visitor
.request();
172 // Either the queue is empty or we couldn't find any dispatchable
173 // requests in the queue at this time.
174 if (request
.is_nil())
176 // Let's wait until we hear about the possibility of
177 // work before we go look again.
178 this->work_available_
.wait();
182 // We have dropped out of the "while (request.is_nil())" loop.
183 // We only get here is we located/extracted a dispatchable request
184 // from the queue. Note that the visitor will have already
185 // marked the target servant as now being busy (because of us).
186 // We can now safely release the lock.
189 // Do the "PerformWork" step. We don't need the lock_ to do this.
192 // Now that the request has been dispatched, we need to mark the target
193 // servant as no longer being busy, and we need to signal any wait()'ing
194 // worker threads that there may be some dispatchable requests in the
195 // queue now for this not-busy servant. We need the lock_ to do this.
197 ACE_GUARD_RETURN (TAO_SYNCH_MUTEX
, guard
, this->lock_
, 0);
198 request
->mark_as_ready();
199 this->work_available_
.signal();
202 // Reset the visitor since we use it over and over. This
203 // will cause the visitor to drop any reference to
204 // the dispatched request.
205 dispatchable_visitor
.reset();
207 // Note that the request will be "released" here when the request
208 // handle falls out of scope and its destructor performs the
209 // _remove_ref() call on the underlying TP_Request object.
215 TAO::CSD::TP_Task::close(u_long flag
)
217 ACE_GUARD_RETURN (TAO_SYNCH_MUTEX
, guard
, this->lock_
, 0);
221 // Worker thread is closing.
222 --this->num_threads_
;
223 this->active_workers_
.signal();
227 // Strategy object is shutting down the task.
229 // Do nothing if this task has never been open()'ed.
235 // Set the shutdown flag to true.
236 this->shutdown_initiated_
= true;
238 // Stop accepting requests.
239 this->accepting_requests_
= false;
241 // Signal all worker threads waiting on the work_available_ condition.
242 this->work_available_
.broadcast();
244 bool calling_thread_in_tp
= false;
246 ACE_thread_t my_thr_id
= ACE_OS::thr_self ();
248 // Check whether the calling thread(calling orb shutdown) is one of the
249 // threads in the pool. If it is then it should not wait itself.
250 size_t const size
= this->activated_threads_
.size ();
252 for (size_t i
= 0; i
< size
; i
++)
254 if (this->activated_threads_
[i
] == my_thr_id
)
256 calling_thread_in_tp
= true;
257 this->deferred_shutdown_initiated_
= true;
262 // Wait until all worker threads have shutdown.
263 size_t target_num_threads
= calling_thread_in_tp
? 1 : 0;
264 while (this->num_threads_
!= target_num_threads
)
266 this->active_workers_
.wait();
269 // Cancel all requests.
270 TP_Cancel_Visitor cancel_visitor
;
271 this->queue_
.accept_visitor(cancel_visitor
);
273 this->opened_
= false;
274 this->shutdown_initiated_
= false;
282 TAO::CSD::TP_Task::cancel_servant (PortableServer::Servant servant
)
284 ACE_GUARD (TAO_SYNCH_MUTEX
, guard
, this->lock_
);
286 // Cancel the requests targeted for the provided servant.
287 TP_Cancel_Visitor
cancel_visitor(servant
);
288 this->queue_
.accept_visitor(cancel_visitor
);
291 TAO_END_VERSIONED_NAMESPACE_DECL