Revert "Minor modernization of DynamicAny code"
[ACE_TAO.git] / TAO / tao / CSD_ThreadPool / CSD_TP_Task.cpp
blob378ea9e05afeccf7dbe0961c3f9e0a62f8a73de2
1 #include "tao/CSD_ThreadPool/CSD_TP_Task.h"
2 #include "tao/CSD_ThreadPool/CSD_TP_Request.h"
3 #include "tao/CSD_ThreadPool/CSD_TP_Dispatchable_Visitor.h"
4 #include "tao/CSD_ThreadPool/CSD_TP_Cancel_Visitor.h"
6 #if !defined (__ACE_INLINE__)
7 # include "tao/CSD_ThreadPool/CSD_TP_Task.inl"
8 #endif /* ! __ACE_INLINE__ */
10 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
12 TAO::CSD::TP_Task::~TP_Task()
17 bool
18 TAO::CSD::TP_Task::add_request(TP_Request* request)
20 ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, guard, this->lock_, false);
22 if (!this->accepting_requests_)
24 TAOLIB_DEBUG((LM_DEBUG,"(%P|%t) TP_Task::add_request() - "
25 "not accepting requests\n"));
26 return false;
29 // We have made the decision that the request is going to be placed upon
30 // the queue_. Inform the request that it is about to be placed into
31 // a request queue. Some requests may not need to do anything in
32 // preparation of being placed into a queue. Others, however, may need
33 // to perfom a "clone" operation on some underlying request data before
34 // the request can be properly placed into a queue.
35 request->prepare_for_queue();
37 this->queue_.put(request);
39 this->work_available_.signal();
41 return true;
45 int
46 TAO::CSD::TP_Task::open(void* args)
48 Thread_Counter num = 1;
49 Thread_Counter* tmp = static_cast<Thread_Counter*> (args);
51 if (tmp == 0)
53 //FUZZ: disable check_for_lack_ACE_OS
54 TAOLIB_ERROR_RETURN((LM_ERROR,
55 ACE_TEXT ("(%P|%t) TP_Task failed to open. ")
56 ACE_TEXT ("Invalid argument type passed to open().\n")),
57 -1);
58 //FUZZ: enable check_for_lack_ACE_OS
61 num = *tmp;
63 // We can't activate 0 threads. Make sure this isn't the case.
64 if (num < 1)
66 TAOLIB_ERROR_RETURN((LM_ERROR,
67 ACE_TEXT ("(%P|%t) TP_Task failed to open. ")
68 ACE_TEXT ("num_threads (%u) is less-than 1.\n"),
69 num),
70 -1);
73 // We need the lock acquired from here on out.
74 ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, guard, this->lock_, -1);
76 // Multiple POA_Manager::activate() calls trigger multiple calls to open()
77 // and that is OK
78 if (this->opened_)
80 return 0;
83 // Activate this task object with 'num' worker threads.
84 if (this->activate(THR_NEW_LWP | THR_JOINABLE, num) != 0)
86 // Assumes that when activate returns non-zero return code that
87 // no threads were activated.
88 TAOLIB_ERROR_RETURN((LM_ERROR,
89 ACE_TEXT ("(%P|%t) TP_Task failed to activate ")
90 ACE_TEXT ("(%d) worker threads.\n"),
91 num),
92 -1);
95 // Now we have past the point where we can say we've been open()'ed before.
96 this->opened_ = true;
98 // Now we wait until all of the threads have started.
99 while (this->num_threads_ != num)
101 this->active_workers_.wait();
104 // We can now accept requests (via our add_request() method).
105 this->accepting_requests_ = true;
107 return 0;
112 TAO::CSD::TP_Task::svc()
114 // Account for this current worker thread having started the
115 // execution of this svc() method.
117 ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, guard, this->lock_, false);
118 // Put the thread id into a collection which is used to check whether
119 // the orb shutdown is called by one of the threads in the pool.
120 ACE_thread_t thr_id = ACE_OS::thr_self ();
121 this->activated_threads_.push_back(thr_id);
122 ++this->num_threads_;
123 this->active_workers_.signal();
126 // This visitor object will be re-used over and over again as part of
127 // the "GetWork" logic below.
128 TP_Dispatchable_Visitor dispatchable_visitor;
130 // Start the "GetWork-And-PerformWork" loop for the current worker thread.
131 while (1)
133 TP_Request_Handle request;
135 // Do the "GetWork" step.
137 // Acquire the lock until just before we decide to "PerformWork".
138 ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, guard, this->lock_, 0);
140 // Start the "GetWork" loop.
141 while (request.is_nil())
143 if (this->shutdown_initiated_)
145 // This breaks us out of all loops with one fell swoop.
146 return 0;
149 if (this->deferred_shutdown_initiated_)
151 this->deferred_shutdown_initiated_ = false;
152 return 0;
155 // There is no need to visit the queue if it is empty.
156 if (!this->queue_.is_empty())
158 // Visit the requests in the queue in hopes of
159 // locating the first "dispatchable" (ie, not busy) request.
160 // If a dispatchable request is located, it is extracted
161 // from the queue and saved in a handle data member in the
162 // visitor object.
163 this->queue_.accept_visitor(dispatchable_visitor);
165 // If a dispatchable request is located, it is extracted
166 // from the queue and saved in a handle data member in the
167 // visitor object. Let's get a "copy" (or a NULL pointer
168 // if the visitor didn't locate/extract one).
169 request = dispatchable_visitor.request();
172 // Either the queue is empty or we couldn't find any dispatchable
173 // requests in the queue at this time.
174 if (request.is_nil())
176 // Let's wait until we hear about the possibility of
177 // work before we go look again.
178 this->work_available_.wait();
182 // We have dropped out of the "while (request.is_nil())" loop.
183 // We only get here is we located/extracted a dispatchable request
184 // from the queue. Note that the visitor will have already
185 // marked the target servant as now being busy (because of us).
186 // We can now safely release the lock.
189 // Do the "PerformWork" step. We don't need the lock_ to do this.
190 request->dispatch();
192 // Now that the request has been dispatched, we need to mark the target
193 // servant as no longer being busy, and we need to signal any wait()'ing
194 // worker threads that there may be some dispatchable requests in the
195 // queue now for this not-busy servant. We need the lock_ to do this.
197 ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, guard, this->lock_, 0);
198 request->mark_as_ready();
199 this->work_available_.signal();
202 // Reset the visitor since we use it over and over. This
203 // will cause the visitor to drop any reference to
204 // the dispatched request.
205 dispatchable_visitor.reset();
207 // Note that the request will be "released" here when the request
208 // handle falls out of scope and its destructor performs the
209 // _remove_ref() call on the underlying TP_Request object.
215 TAO::CSD::TP_Task::close(u_long flag)
217 ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, guard, this->lock_, 0);
219 if (flag == 0)
221 // Worker thread is closing.
222 --this->num_threads_;
223 this->active_workers_.signal();
225 else
227 // Strategy object is shutting down the task.
229 // Do nothing if this task has never been open()'ed.
230 if (!this->opened_)
232 return 0;
235 // Set the shutdown flag to true.
236 this->shutdown_initiated_ = true;
238 // Stop accepting requests.
239 this->accepting_requests_ = false;
241 // Signal all worker threads waiting on the work_available_ condition.
242 this->work_available_.broadcast();
244 bool calling_thread_in_tp = false;
246 ACE_thread_t my_thr_id = ACE_OS::thr_self ();
248 // Check whether the calling thread(calling orb shutdown) is one of the
249 // threads in the pool. If it is then it should not wait itself.
250 size_t const size = this->activated_threads_.size ();
252 for (size_t i = 0; i < size; i ++)
254 if (this->activated_threads_[i] == my_thr_id)
256 calling_thread_in_tp = true;
257 this->deferred_shutdown_initiated_ = true;
258 break;
262 // Wait until all worker threads have shutdown.
263 size_t target_num_threads = calling_thread_in_tp ? 1 : 0;
264 while (this->num_threads_ != target_num_threads)
266 this->active_workers_.wait();
269 // Cancel all requests.
270 TP_Cancel_Visitor cancel_visitor;
271 this->queue_.accept_visitor(cancel_visitor);
273 this->opened_ = false;
274 this->shutdown_initiated_ = false;
277 return 0;
281 void
282 TAO::CSD::TP_Task::cancel_servant (PortableServer::Servant servant)
284 ACE_GUARD (TAO_SYNCH_MUTEX, guard, this->lock_);
286 // Cancel the requests targeted for the provided servant.
287 TP_Cancel_Visitor cancel_visitor(servant);
288 this->queue_.accept_visitor(cancel_visitor);
291 TAO_END_VERSIONED_NAMESPACE_DECL