1 #include "MIF_Scheduler.h"
2 #include "ace/Atomic_Op.h"
3 #include "tao/RTScheduling/Request_Interceptor.h"
6 ACE_Atomic_Op
<TAO_SYNCH_MUTEX
, long> server_guid_counter
;
8 DT::DT (TAO_SYNCH_MUTEX
&lock
,
21 this->dt_cond_
.wait ();
28 this->dt_cond_
.signal ();
32 Segment_Sched_Param_Policy::importance (void)
34 return this->importance_
;
38 Segment_Sched_Param_Policy::importance (CORBA::Short importance
)
40 this->importance_
= importance
;
44 Segment_Sched_Param_Policy::copy (void)
46 Segment_Sched_Param_Policy
*copy
= 0;
47 ACE_NEW_THROW_EX (copy
,
48 Segment_Sched_Param_Policy
,
51 copy
->importance (this->importance_
);
57 Segment_Sched_Param_Policy::policy_type (void)
63 Segment_Sched_Param_Policy::destroy (void)
67 MIF_Scheduler::MIF_Scheduler (CORBA::ORB_ptr orb
)
73 CORBA::Object_var object
=
74 orb
->resolve_initial_references ("RTScheduler_Current");
77 RTScheduling::Current::_narrow (object
.in ());
80 orb
->resolve_initial_references ("PriorityMappingManager");
82 this->mapping_manager_
=
83 RTCORBA::PriorityMappingManager::_narrow (object
.in ());
85 catch (const CORBA::Exception
& ex
)
87 ex
._tao_print_exception ("Caught exception:");
91 MIF_Scheduler::~MIF_Scheduler (void)
93 while (free_que_
.message_count () > 0)
96 ACE_Message_Block
*msg
= 0;
97 free_que_
.dequeue_head (msg
);
98 dt
= dynamic_cast<DT
*> (msg
);
104 MIF_Scheduler::incr_thr_count (void)
112 MIF_Scheduler::wait (void)
118 ACE_DEBUG ((LM_DEBUG
,
126 MIF_Scheduler::resume_main (void)
129 wait_cond_
.signal ();
132 MIF_Scheduling::SegmentSchedulingParameterPolicy_ptr
133 MIF_Scheduler::create_segment_scheduling_parameter (CORBA::Short importance
)
135 MIF_Scheduling::SegmentSchedulingParameterPolicy_ptr segment_policy
;
136 ACE_NEW_THROW_EX (segment_policy
,
137 Segment_Sched_Param_Policy
,
139 CORBA::SystemException::_tao_minor_code (
142 CORBA::COMPLETED_NO
));
144 segment_policy
->importance (importance
);
146 return segment_policy
;
151 MIF_Scheduler::begin_new_scheduling_segment (const RTScheduling::Current::IdType
&/*guid*/,
153 CORBA::Policy_ptr sched_policy
,
157 RTScheduling::Current::IdType_var guid
= this->current_
->id ();
159 ACE_OS::memcpy (&count
,
164 MIF_Scheduling::SegmentSchedulingParameterPolicy_var sched_param
=
165 MIF_Scheduling::SegmentSchedulingParameterPolicy::_narrow (sched_policy
);
167 CORBA::Short desired_priority
= sched_param
->importance ();
169 if (TAO_debug_level
> 0)
170 ACE_DEBUG ((LM_DEBUG
,
171 "%t MIF_Scheduler::begin_scheduling_segment - Importance %d\n",
175 if (desired_priority
!= 100)
183 new_dt
->msg_priority (desired_priority
);
185 ready_que_
.enqueue_prio (new_dt
);
193 MIF_Scheduler::begin_nested_scheduling_segment (const RTScheduling::Current::IdType
&guid
,
195 CORBA::Policy_ptr sched_param
,
196 CORBA::Policy_ptr implicit_sched_param
)
198 this->begin_new_scheduling_segment (guid
,
201 implicit_sched_param
);
205 MIF_Scheduler::update_scheduling_segment (const RTScheduling::Current::IdType
&/*guid*/,
206 const char* /*name*/,
207 CORBA::Policy_ptr sched_policy
,
208 CORBA::Policy_ptr
/*implicit_sched_param*/)
211 RTScheduling::Current::IdType_var guid
= this->current_
->id ();
213 ACE_OS::memcpy (&count
,
217 MIF_Scheduling::SegmentSchedulingParameterPolicy_var sched_param
=
218 MIF_Scheduling::SegmentSchedulingParameterPolicy::_narrow (sched_policy
);
220 CORBA::Short desired_priority
= sched_param
->importance ();
222 if (TAO_debug_level
> 0)
223 ACE_DEBUG ((LM_DEBUG
,
224 "%t MIF_Scheduler::update_scheduling_segment - Importance %d\n",
232 new_dt
->msg_priority (desired_priority
);
234 if (ready_que_
.message_count () > 0)
237 ACE_Message_Block
* msg
= 0;
238 ready_que_
.dequeue_head (msg
);
239 run_dt
= dynamic_cast<DT
*> (msg
);
240 if ((desired_priority
== 100) || run_dt
->msg_priority () >= (unsigned int)desired_priority
)
242 ready_que_
.enqueue_prio (new_dt
);
247 free_que_
.enqueue_prio (run_dt
);
251 ready_que_
.enqueue_prio (run_dt
);
259 MIF_Scheduler::end_scheduling_segment (const RTScheduling::Current::IdType
&guid
,
263 ACE_OS::memcpy (&count
,
267 ACE_DEBUG ((LM_DEBUG
,
268 "MIF_Scheduler::end_scheduling_segment %d\n",
271 if (ready_que_
.message_count () > 0)
274 ACE_Message_Block
* msg
= 0;
275 ready_que_
.dequeue_head (msg
);
276 run_dt
= dynamic_cast<DT
*> (msg
);
280 free_que_
.enqueue_prio (run_dt
);
285 MIF_Scheduler::end_nested_scheduling_segment (const RTScheduling::Current::IdType
&,
292 MIF_Scheduler::send_request (PortableInterceptor::ClientRequestInfo_ptr request_info
)
294 CORBA::Policy_var sched_param
= current_
->scheduling_parameter ();
296 MIF_Scheduling::SegmentSchedulingParameterPolicy_var sched_param_var
=
297 MIF_Scheduling::SegmentSchedulingParameterPolicy::_narrow (sched_param
.in ());
299 IOP::ServiceContext srv_con
;
300 srv_con
.context_id
= Client_Interceptor::SchedulingInfo
;
302 RTScheduling::Current::IdType_var guid
= current_
->id ();
304 int guid_length
= guid
->length ();
306 CORBA::OctetSeq
seq_buf (guid_length
);
307 seq_buf
.length (seq_buf
.maximum ());
308 ACE_OS::memcpy (seq_buf
.get_buffer (),
312 int cxt_data_length
= sizeof (int) + guid_length
;
313 srv_con
.context_data
.length (cxt_data_length
);
316 for (;i
< guid_length
;i
++)
318 srv_con
.context_data
[i
] = seq_buf
[i
];
321 int importance
= sched_param_var
->importance ();
322 CORBA::OctetSeq
int_buf (sizeof (importance
));
323 int_buf
.length (int_buf
.maximum ());
324 ACE_OS::memcpy (int_buf
.get_buffer (),
326 sizeof (importance
));
329 for (;i
< cxt_data_length
;i
++)
331 srv_con
.context_data
[i
] = int_buf
[j
++];
334 request_info
->add_request_service_context (srv_con
,
338 if (ready_que_
.message_count () > 0)
341 ACE_hthread_t current
;
342 ACE_Thread::self (current
);
343 if (ACE_Thread::getprio (current
, priority
) == -1)
346 ACE_DEBUG ((LM_DEBUG
,
347 "Initial thread priority is %d %d\n",
349 ACE_DEFAULT_THREAD_PRIORITY
));
351 RTCORBA::Priority rtpriority
;
352 RTCORBA::PriorityMapping
* pm
= this->mapping_manager_
->mapping ();
353 if (pm
->to_CORBA(priority
+ 1, rtpriority
))
355 current_
->the_priority (rtpriority
);
357 ACE_Thread::self (current
);
358 if (ACE_Thread::getprio (current
, priority
) == -1)
361 ACE_DEBUG ((LM_DEBUG
,
362 "Bumped thread priority is %d\n",
367 ACE_Message_Block
* msg
= 0;
368 ready_que_
.dequeue_head (msg
);
369 run_dt
= dynamic_cast<DT
*> (msg
);
371 free_que_
.enqueue_prio (run_dt
);
378 MIF_Scheduler::receive_request (PortableInterceptor::ServerRequestInfo_ptr request_info
,
379 RTScheduling::Current::IdType_out guid_out
,
381 CORBA::Policy_out sched_param_out
,
382 CORBA::Policy_out
/*implicit_sched_param*/)
385 if (TAO_debug_level
> 0)
386 ACE_DEBUG ((LM_DEBUG
,
387 "MIF_Scheduler::receive_request\n"));
389 IOP::ServiceContext_var serv_cxt
=
390 request_info
->get_request_service_context (Server_Interceptor::SchedulingInfo
);
394 ACE_DEBUG ((LM_DEBUG
,
395 "Got scheduling info\n"));
397 RTScheduling::Current::IdType
* guid
;
399 RTScheduling::Current::IdType
);
401 guid
->length (sizeof(size_t));
402 ACE_OS::memcpy (guid
->get_buffer (),
403 serv_cxt
->context_data
.get_buffer (),
407 ACE_OS::memcpy (&gu_id
,
411 ACE_DEBUG ((LM_DEBUG
,
412 "MIF_Scheduler::receive_request %d\n",
416 CORBA::OctetSeq
int_buf (sizeof (long));
417 int_buf
.length (int_buf
.maximum ());
418 int i
= sizeof (long);
419 for (unsigned int j
= 0;j
< sizeof (int);j
++)
421 int_buf
[j
] = serv_cxt
->context_data
[i
++];
425 ACE_OS::memcpy (&importance
,
426 int_buf
.get_buffer (),
427 sizeof (importance
));
430 sched_param_out
= DT_TEST::instance ()->scheduler ()->create_segment_scheduling_parameter (importance
);
432 if (TAO_debug_level
> 0)
433 ACE_DEBUG ((LM_DEBUG
,
434 "%t The Guid is %d Importance is %d\n",
443 new_dt
->msg_priority (importance
);
445 ready_que_
.enqueue_prio (new_dt
);
452 MIF_Scheduler::send_reply (PortableInterceptor::ServerRequestInfo_ptr
)
455 RTScheduling::Current::IdType_var guid
= current_
->id ();
458 ACE_OS::memcpy (&count
,
462 ACE_DEBUG ((LM_DEBUG
,
463 "MIF_Scheduler::send_reply %d\n",
466 if (ready_que_
.message_count () > 0)
469 ACE_Message_Block
* msg
= 0;
470 ready_que_
.dequeue_head (msg
);
471 run_dt
= dynamic_cast<DT
*> (msg
);
475 free_que_
.enqueue_prio (run_dt
);
480 MIF_Scheduler::send_exception (PortableInterceptor::ServerRequestInfo_ptr
)
482 if (ready_que_
.message_count () > 0)
485 ACE_Message_Block
* msg
= 0;
486 ready_que_
.dequeue_head (msg
);
487 run_dt
= dynamic_cast<DT
*> (msg
);
491 free_que_
.enqueue_prio (run_dt
);
496 MIF_Scheduler::send_other (PortableInterceptor::ServerRequestInfo_ptr
)
498 if (TAO_debug_level
> 0)
500 RTScheduling::Current::IdType_var guid
= current_
->id ();
503 ACE_OS::memcpy (&count
,
508 ACE_DEBUG ((LM_DEBUG
,
509 "MIF_Scheduler::send_other %d\n",
513 if (ready_que_
.message_count () > 0)
516 ACE_Message_Block
* msg
;
517 ready_que_
.dequeue_head (msg
);
518 run_dt
= dynamic_cast<DT
*> (msg
);
522 free_que_
.enqueue_prio (run_dt
);
527 MIF_Scheduler::send_poll (PortableInterceptor::ClientRequestInfo_ptr
)
532 MIF_Scheduler::receive_reply (PortableInterceptor::ClientRequestInfo_ptr
)
534 CORBA::Policy_var sched_param
= current_
->scheduling_parameter ();
536 MIF_Scheduling::SegmentSchedulingParameterPolicy_var sched_param_var
=
537 MIF_Scheduling::SegmentSchedulingParameterPolicy::_narrow (sched_param
.in ());
539 int importance
= sched_param_var
->importance ();
541 RTScheduling::Current::IdType_var guid
= current_
->id ();
544 ACE_OS::memcpy (&gu_id
,
548 if (TAO_debug_level
> 0)
549 ACE_DEBUG ((LM_DEBUG
,
550 "MIF_Scheduler::receive_reply Guid = %d Imp = %d\n",
560 new_dt
->msg_priority (importance
);
563 ready_que_
.enqueue_prio (new_dt
);
565 ACE_hthread_t current
;
566 ACE_Thread::self (current
);
567 if (ACE_Thread::getprio (current
, priority
) == -1)
570 RTCORBA::Priority rtpriority
;
571 RTCORBA::PriorityMapping
* pm
= this->mapping_manager_
->mapping ();
572 if (pm
->to_CORBA(priority
- 1, rtpriority
))
574 current_
->the_priority (rtpriority
);
582 MIF_Scheduler::receive_exception (PortableInterceptor::ClientRequestInfo_ptr
)
584 CORBA::Policy_var sched_param
= current_
->scheduling_parameter ();
586 MIF_Scheduling::SegmentSchedulingParameterPolicy_var sched_param_var
=
587 MIF_Scheduling::SegmentSchedulingParameterPolicy::_narrow (sched_param
.in ());
589 int importance
= sched_param_var
->importance ();
591 RTScheduling::Current::IdType_var guid
= current_
->id ();
594 ACE_OS::memcpy (&gu_id
,
603 new_dt
->msg_priority (importance
);
606 ready_que_
.enqueue_prio (new_dt
);
609 ACE_hthread_t current
;
610 ACE_Thread::self (current
);
611 if (ACE_Thread::getprio (current
, priority
) == -1)
614 RTCORBA::Priority rtpriority
;
615 RTCORBA::PriorityMapping
* pm
= this->mapping_manager_
->mapping ();
616 if (pm
->to_CORBA(priority
- 1, rtpriority
))
618 current_
->the_priority (rtpriority
);
626 MIF_Scheduler::receive_other (PortableInterceptor::ClientRequestInfo_ptr
)
628 CORBA::Policy_var sched_param
= current_
->scheduling_parameter ();
630 MIF_Scheduling::SegmentSchedulingParameterPolicy_var sched_param_var
=
631 MIF_Scheduling::SegmentSchedulingParameterPolicy::_narrow (sched_param
.in ());
633 int importance
= sched_param_var
->importance ();
635 RTScheduling::Current::IdType_var guid
= current_
->id ();
638 ACE_OS::memcpy (&gu_id
,
647 new_dt
->msg_priority (importance
);
650 ready_que_
.enqueue_prio (new_dt
);
653 ACE_hthread_t current
;
654 ACE_Thread::self (current
);
655 if (ACE_Thread::getprio (current
, priority
) == -1)
658 RTCORBA::Priority rtpriority
;
659 RTCORBA::PriorityMapping
* pm
= this->mapping_manager_
->mapping ();
660 if (pm
->to_CORBA(priority
- 1, rtpriority
))
662 current_
->the_priority (rtpriority
);
670 MIF_Scheduler::cancel (const RTScheduling::Current::IdType
&)
675 MIF_Scheduler::scheduling_policies (void)
681 MIF_Scheduler::scheduling_policies (const CORBA::PolicyList
&)
686 MIF_Scheduler::poa_policies (void)
692 MIF_Scheduler::scheduling_discipline_name (void)
697 RTScheduling::ResourceManager_ptr
698 MIF_Scheduler::create_resource_manager (const char *,
705 MIF_Scheduler::set_scheduling_parameter (PortableServer::Servant
&,