3 //=============================================================================
7 //=============================================================================
9 #ifndef TAO_DYNAMIC_TP_TASK_H
10 #define TAO_DYNAMIC_TP_TASK_H
12 #include /**/ "ace/pre.h"
14 #include "tao/orbconf.h"
16 #if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0
18 #include "tao/Dynamic_TP/dynamic_tp_export.h"
19 #include "tao/Dynamic_TP/DTP_Config.h"
20 #include "tao/CSD_ThreadPool/CSD_TP_Queue.h"
21 #include "tao/CSD_ThreadPool/CSD_TP_Request.h"
22 #include "tao/CSD_ThreadPool/CSD_TP_Dispatchable_Visitor.h"
23 #include "tao/PortableServer/PortableServer.h"
24 #include "tao/Condition.h"
26 #if !defined (ACE_LACKS_PRAGMA_ONCE)
28 #endif /* ACE_LACKS_PRAGMA_ONCE */
31 #include "ace/Synch.h"
32 #include "ace/Containers_T.h"
33 #include "ace/Vector_T.h"
36 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
41 * @brief Active Object managing a queue of request objects.
43 * There are two types of "users" of a TP_Task object:
45 * 1) The TP_Strategy object that "owns" this task object.
46 * 2) The worker threads that "run" this task object as an
49 * The TP_Strategy object that "owns" this task object dictates
50 * when the worker threads are activated and when they are shutdown. It
51 * also injects requests into this task's queue via calls to the
52 * add_request() method. It is also the TP_Strategy object that
53 * dictates the number of worker threads to be activated via a call to
54 * the set_num_threads() method.
56 * The active object pattern is implemented via the use of the
57 * the ACE_Task_Base base class, and each worker thread will
58 * invoke this task's svc() method, and when the svc() returns, the
59 * worker thread will invoke this task's close() method (with the
60 * flag argument equal to 0).
62 class TAO_Dynamic_TP_Export TAO_DTP_Task
: public ACE_Task_Base
65 /// Default Constructor.
68 /// Virtual Destructor.
69 virtual ~TAO_DTP_Task();
72 TAO_DTP_Definition task_thread_config
;
76 /// Put a request object on to the request queue.
77 /// Returns true if successful, false otherwise (it has been "rejected").
78 bool add_request(TAO::CSD::TP_Request
* request
);
80 /// Activate the worker threads
81 virtual int open(void* args
= 0);
83 /// The "mainline" executed by each worker thread.
86 virtual int close (u_long flag
= 0);
88 /// Set the thread and queue config.
90 void set_init_pool_threads(size_t thr_count
);
92 void set_min_pool_threads(size_t thr_count
);
94 void set_max_pool_threads(size_t thr_count
);
96 void set_thread_stack_size(size_t stack_sz
);
98 void set_thread_idle_time(ACE_Time_Value thr_timeout
);
100 void set_max_request_queue_depth(size_t queue_depth
);
102 /// Get the thread and queue config.
104 size_t get_init_pool_threads();
106 size_t get_min_pool_threads();
108 size_t get_max_pool_threads();
110 size_t get_max_request_queue_depth();
112 size_t get_thread_stack_size();
114 time_t get_thread_idle_time();
116 /// Cancel all requests that are targeted for the provided servant.
117 void cancel_servant (PortableServer::Servant servant
);
120 /// get the next available request. Return true if one available, nonblocking
121 bool request_ready (TAO::CSD::TP_Dispatchable_Visitor
&v
,
122 TAO::CSD::TP_Request_Handle
&r
);
124 /// release the request, reset the accepting flag if necessary
125 void clear_request (TAO::CSD::TP_Request_Handle
&r
);
130 bool remove_active (bool);
132 bool above_minimum ();
134 typedef TAO_SYNCH_MUTEX LockType
;
135 typedef TAO_Condition
<LockType
> ConditionType
;
137 /// Lock used to synchronize the "active_workers_" condition
139 /// Lock used to synchronize manipulation of the queue
140 LockType queue_lock_
;
141 /// Lock used to synchronize the "work_available_" condition
144 /// Condition used to signal worker threads that they may be able to
145 /// find a request in the queue_ that needs to be dispatched to a
146 /// servant that is currently "not busy".
147 /// This condition will be signal()'ed each time a new request is
148 /// added to the queue_, and also when a servant has become "not busy".
149 ConditionType work_available_
;
151 /// This condition will be signal()'ed each time the num_threads_
152 /// data member has its value changed. This is used to keep the
153 /// close(1) invocation (ie, a shutdown request) blocked until all
154 /// of the worker threads have stopped running.
155 ConditionType active_workers_
;
157 /// The number of threads that are currently active. This may be
158 /// different than the total number of threads since the latter
159 /// may include threads that are shutting down but not reaped.
160 size_t active_count_
;
162 /// Flag used to indicate when this task will (or will not) accept
163 /// requests via the the add_request() method.
164 bool accepting_requests_
;
166 /// Flag used to initiate a shutdown request to all worker threads.
169 /// Flag to indicate something is on the queue. works in conjunction with
170 /// the work_available condition
173 /// Flag used to avoid multiple open() calls.
176 /// The number of requests in the local queue.
177 size_t num_queue_requests_
;
179 /// The number of currently active worker threads.
180 std::atomic
<unsigned long> busy_threads_
;
182 /// The queue of pending servant requests (a.k.a. the "request queue").
183 TAO::CSD::TP_Queue queue_
;
185 /// The low water mark for dynamic threads to settle to.
186 size_t init_pool_threads_
;
188 /// The low water mark for dynamic threads to settle to.
189 size_t min_pool_threads_
;
191 /// The high water mark for dynamic threads to be limited to.
192 size_t max_pool_threads_
;
194 /// If the max_pool_threads_ value has been met, then ORB requests coming in can be queued.
195 /// This is the maximum number that will be allowed.
196 size_t max_request_queue_depth_
;
198 /// This is the memory stack size allowable for each thread.
199 size_t thread_stack_size_
;
201 /// This is the maximum amount of time in seconds that an idle thread can
202 /// stay alive before being taken out of the pool.
203 ACE_Time_Value thread_idle_time_
;
206 TAO_END_VERSIONED_NAMESPACE_DECL
208 #if defined (__ACE_INLINE__)
209 # include "tao/Dynamic_TP/DTP_Task.inl"
210 #endif /* __ACE_INLINE__ */
212 #endif /* (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0 */
214 #include /**/ "ace/post.h"
216 #endif /* TAO_DYNAMIC_TP_TASK_H */