1 #ifndef DSRT_DIRECT_DISPATCHER_IMPL_T_CPP
2 #define DSRT_DIRECT_DISPATCHER_IMPL_T_CPP
4 #include "DSRT_Direct_Dispatcher_Impl_T.h"
6 #if !defined (__ACE_INLINE__)
7 //#include "DSRT_Direct_Dispatcher_Impl_T.i"
8 #endif /* __ACE_INLINE__ */
13 //@@VS: This is somehow not being recognized by MSVC, which results
14 //in a link error. For now, the definition has been moved to the .h
15 //file. Needs further investigation.
17 template <class DSRT_Scheduler_Traits>
18 int Comparator_Adapter_Generator<DSRT_Scheduler_Traits>::MoreEligible::
19 operator ()(const DSRT_Dispatch_Item_var<DSRT_Scheduler_Traits>& item1,
20 const DSRT_Dispatch_Item_var<DSRT_Scheduler_Traits>& item2)
22 int rc = qos_comparator_ (item1->qos (), item2->qos ());
28 //if equally eligible, then resolve tie with the creation time of
30 if (rc == 0 && item1->insertion_time () < item2->insertion_time ())
36 template <class DSRT_Scheduler_Traits
>
37 DSRT_Direct_Dispatcher_Impl
<DSRT_Scheduler_Traits
>::
38 DSRT_Direct_Dispatcher_Impl (ACE_Sched_Params::Policy sched_policy
,
40 :DSRT_Dispatcher_Impl
<DSRT_Scheduler_Traits
>(sched_policy
, sched_scope
),
41 sched_queue_modified_ (0),
42 sched_queue_modified_cond_ (sched_queue_modified_cond_lock_
)
44 //Run scheduler thread at highest priority
45 if (this->activate (this->rt_thr_flags_
, 1, 0, this->executive_prio_
) == -1)
48 "(%t|%T) cannot activate scheduler thread in RT mode."
49 "Trying in non RT mode\n"));
50 if (this->activate (this->non_rt_thr_flags_
) == -1)
52 "(%t|%T) cannot activate scheduler thread\n"));
56 template <class DSRT_Scheduler_Traits
> int
57 DSRT_Direct_Dispatcher_Impl
<DSRT_Scheduler_Traits
>::
58 init_i (const DSRT_ConfigInfo
&)
63 template <class DSRT_Scheduler_Traits
> int
64 DSRT_Direct_Dispatcher_Impl
<DSRT_Scheduler_Traits
>::svc ()
66 ACE_hthread_t scheduler_thr_handle
;
67 ACE_Thread::self (scheduler_thr_handle
);
69 #ifdef KOKYU_DSRT_LOGGING
72 ACE_TEXT ("max prio=%d\n")
73 ACE_TEXT ("min prio=%d\n")
74 ACE_TEXT ("active prio=%d\n")
75 ACE_TEXT ("inactive prio=%d\n"),
81 if (ACE_OS::thr_getprio (scheduler_thr_handle
, prio
) == -1)
86 ACE_TEXT ("getprio not supported\n")
93 ACE_TEXT ("thr_getprio failed")));
97 ACE_DEBUG ((LM_DEBUG
, "(%t): Scheduler thread prio is %d\n", prio
));
98 #endif /*DSRT_LOGGING*/
102 ACE_GUARD_RETURN (cond_lock_t
,
104 sched_queue_modified_cond_lock_
,
107 if (this->shutdown_flagged_
)
110 while (!sched_queue_modified_
)
112 #ifdef KOKYU_DSRT_LOGGING
113 ACE_DEBUG ((LM_DEBUG
,
114 "(%t|%T): sched thread about to wait on cv\n"));
116 sched_queue_modified_cond_
.wait ();
119 #ifdef KOKYU_DSRT_LOGGING
120 ACE_DEBUG ((LM_DEBUG
, "(%t|%T): sched thread done waiting on cv\n"));
123 sched_queue_modified_
= 0;
125 ACE_GUARD_RETURN (ACE_SYNCH_RECURSIVE_MUTEX
,
130 if (this->ready_queue_
.current_size () <= 0)
133 #ifdef KOKYU_DSRT_LOGGING
134 ACE_DEBUG ((LM_DEBUG
, "(%t|%T):Sched Queue contents===>\n"));
135 this->ready_queue_
.dump ();
137 DSRT_Dispatch_Item_var
<DSRT_Scheduler_Traits
> item_var
;
138 this->ready_queue_
.most_eligible (item_var
);
140 ACE_hthread_t most_eligible_thr_handle
= item_var
->thread_handle ();
142 #ifdef KOKYU_DSRT_LOGGING
143 ACE_DEBUG ((LM_DEBUG
,
144 "(%t|%T):curr scheduled thr handle = %d\n",
145 this->curr_scheduled_thr_handle_
));
146 ACE_DEBUG ((LM_DEBUG
,
147 "(%t|%T):most eligible thr handle = %d\n",
148 most_eligible_thr_handle
));
151 if (this->curr_scheduled_thr_handle_
!= most_eligible_thr_handle
)
153 if (this->curr_scheduled_thr_handle_
!= 0)
155 if (ACE_OS::thr_setprio (this->curr_scheduled_thr_handle_
,
156 this->inactive_prio_
,
157 this->sched_policy_
) == -1)
159 ACE_ERROR ((LM_ERROR
,
161 ACE_TEXT ("thr_setprio on curr_scheduled_thr_handle_ failed.")));
162 ACE_DEBUG ((LM_DEBUG
, "thr_handle = %d, prio = %d\n",
163 this->curr_scheduled_thr_handle_
,
164 this->inactive_prio_
));
168 if (ACE_OS::thr_setprio (most_eligible_thr_handle
,
169 this->active_prio_
, this->sched_policy_
) == -1)
171 ACE_ERROR ((LM_ERROR
,
173 ACE_TEXT ("thr_setprio on most_eligible_thr_handle failed")));
176 this->curr_scheduled_thr_handle_
= most_eligible_thr_handle
;
177 this->curr_scheduled_guid_
= item_var
->guid ();
179 /*change all threads in blocked_prio_ to inactive_prio_*/
180 this->ready_queue_
.change_prio(this->blocked_prio_
, this->inactive_prio_
,this->sched_policy_
);
183 #ifdef KOKYU_DSRT_LOGGING
184 ACE_DEBUG ((LM_DEBUG
, "(%t): sched thread exiting\n"));
190 template <class DSRT_Scheduler_Traits
>
191 int DSRT_Direct_Dispatcher_Impl
<DSRT_Scheduler_Traits
>::
192 schedule_i (Guid_t id
, const DSRT_QoSDescriptor
& qos
)
194 ACE_GUARD_RETURN (ACE_SYNCH_RECURSIVE_MUTEX
, guard
, this->synch_lock_
, -1);
196 #ifdef KOKYU_DSRT_LOGGING
197 ACE_DEBUG ((LM_DEBUG
,
198 "(%t|%T):schedule_i enter\n"));
201 DSRT_Dispatch_Item
<DSRT_Scheduler_Traits
>* item
;
202 ACE_hthread_t thr_handle
;
203 ACE_Thread::self (thr_handle
);
205 ACE_NEW_RETURN (item
,
206 DSRT_Dispatch_Item
<DSRT_Scheduler_Traits
> (id
, qos
),
208 item
->thread_handle (thr_handle
);
210 if (this->ready_queue_
.insert (item
) == -1)
213 #ifdef KOKYU_DSRT_LOGGING
214 ACE_DEBUG ((LM_DEBUG
,
215 "(%t|%T):schedule_i after ready_q.insert\n"));
218 if (ACE_OS::thr_setprio (thr_handle
,
220 this->sched_policy_
) == -1)
222 ACE_ERROR_RETURN ((LM_ERROR
,
224 ACE_TEXT ("thr_setprio failed")), -1);
227 #ifdef KOKYU_DSRT_LOGGING
228 ACE_DEBUG ((LM_DEBUG
,
229 "(%t|%T):schedule_i after thr_setprio\n"));
232 //ready_queue_.dump ();
234 /*first release ready_queue_ lock. Otherwise if the scheduler gets the
235 sched_queue_modified_cond_lock first, then try to get the ready_queue_ lock
236 just when one thread who gets the ready_queue_ lock first, then try to get
237 sched_queue_modified_cond_lock. Deadlock happens.
241 //@@ Perhaps the lock could be moved further down just before
242 //setting the condition variable?
243 ACE_GUARD_RETURN (cond_lock_t
,
244 mon
, this->sched_queue_modified_cond_lock_
, 0);
246 #ifdef KOKYU_DSRT_LOGGING
247 ACE_DEBUG ((LM_DEBUG
,
248 "(%t|%T):schedule_i after acquiring cond lock\n"));
251 this->sched_queue_modified_
= 1;
252 this->sched_queue_modified_cond_
.signal ();
254 #ifdef KOKYU_DSRT_LOGGING
255 ACE_DEBUG ((LM_DEBUG
,
256 "(%t|%T):schedule_i exit\n"));
262 template <class DSRT_Scheduler_Traits
>
263 int DSRT_Direct_Dispatcher_Impl
<DSRT_Scheduler_Traits
>::
264 update_schedule_i (Guid_t guid
, const DSRT_QoSDescriptor
& qos
)
266 return this->schedule (guid
, qos
);
269 template <class DSRT_Scheduler_Traits
>
270 int DSRT_Direct_Dispatcher_Impl
<DSRT_Scheduler_Traits
>::
271 update_schedule_i (Guid_t guid
, Block_Flag_t flag
)
273 ACE_GUARD_RETURN (ACE_SYNCH_RECURSIVE_MUTEX
, guard
, this->synch_lock_
, -1);
275 #ifdef KOKYU_DSRT_LOGGING
276 ACE_DEBUG ((LM_DEBUG
, "(%t): update schedule for block entered\n"));
279 DSRT_Dispatch_Item_var
<DSRT_Scheduler_Traits
> dispatch_item
;
280 ACE_hthread_t thr_handle
;
282 int found
= this->ready_queue_
.find (guid
, dispatch_item
);
283 if (found
== 0 && flag
== BLOCK
)
285 thr_handle
= dispatch_item
->thread_handle ();
286 if (ACE_OS::thr_setprio (thr_handle
,
288 this->sched_policy_
) == -1)
290 ACE_ERROR ((LM_ERROR
,
292 ACE_TEXT ("thr_setprio failed")));
295 //monitor released because cancel_schedule would acquire the
296 //lock. Using recursive mutex creates lock up.
298 //@@ Need to investigate this further. Also we can consider
299 //using the Thread-Safe interface pattern.
301 int rc
= this->cancel_schedule (guid
);
303 #ifdef KOKYU_DSRT_LOGGING
304 ACE_DEBUG ((LM_DEBUG
, "(%t): update schedule for block done\n"));
310 #ifdef KOKYU_DSRT_LOGGING
311 ACE_DEBUG ((LM_DEBUG
, "(%t): update schedule for block done\n"));
317 template <class DSRT_Scheduler_Traits
> int
318 DSRT_Direct_Dispatcher_Impl
<DSRT_Scheduler_Traits
>::
319 cancel_schedule_i (Guid_t guid
)
321 ACE_GUARD_RETURN (ACE_SYNCH_RECURSIVE_MUTEX
, guard
, this->synch_lock_
, -1);
323 #ifdef KOKYU_DSRT_LOGGING
324 ACE_DEBUG ((LM_DEBUG
, "(%t): about to remove guid\n"));
327 this->ready_queue_
.remove (guid
);
329 #ifdef KOKYU_DSRT_LOGGING
330 this->ready_queue_
.dump ();
333 if (this->curr_scheduled_guid_
== guid
)
335 this->curr_scheduled_guid_
= 0;
336 this->curr_scheduled_thr_handle_
= 0;
339 //release ready_queue_ lock first before getting another lock
342 ACE_GUARD_RETURN (cond_lock_t
,
343 mon
, this->sched_queue_modified_cond_lock_
, 0);
344 this->sched_queue_modified_
= 1;
345 this->sched_queue_modified_cond_
.signal ();
349 template <class DSRT_Scheduler_Traits
> int
350 DSRT_Direct_Dispatcher_Impl
<DSRT_Scheduler_Traits
>::
353 this->shutdown_flagged_
= 1;
355 ACE_GUARD_RETURN (cond_lock_t
, mon
, this->sched_queue_modified_cond_lock_
, 0);
356 this->sched_queue_modified_
= 1;
357 this->sched_queue_modified_cond_
.signal ();
358 // We have to wait until the scheduler executive thread shuts
359 // down. But we have acquired the lock and if we wait without
360 // releasing it, the scheduler thread will try to acquire it after
361 // it gets woken up by the above signal and it fails to acquire the
362 // lock. This will lead to a deadlock. So release the lock before we
371 #endif /* DSRT_DIRECT_DISPATCHER_IMPL_T_CPP */