1 #include "MIF_Scheduler.h"
2 #include "tao/RTScheduling/Request_Interceptor.h"
6 std::atomic
<long> server_guid_counter
;
8 DT::DT (TAO_SYNCH_MUTEX
&lock
,
21 this->dt_cond_
.wait ();
28 this->dt_cond_
.signal ();
32 Segment_Sched_Param_Policy::importance ()
34 return this->importance_
;
38 Segment_Sched_Param_Policy::importance (CORBA::Short importance
)
40 this->importance_
= importance
;
44 Segment_Sched_Param_Policy::copy ()
46 Segment_Sched_Param_Policy
*copy
= 0;
47 ACE_NEW_THROW_EX (copy
,
48 Segment_Sched_Param_Policy
,
51 copy
->importance (this->importance_
);
57 Segment_Sched_Param_Policy::policy_type ()
63 Segment_Sched_Param_Policy::destroy ()
67 MIF_Scheduler::MIF_Scheduler (CORBA::ORB_ptr orb
)
73 CORBA::Object_var object
=
74 orb
->resolve_initial_references ("RTScheduler_Current");
77 RTScheduling::Current::_narrow (object
.in ());
80 orb
->resolve_initial_references ("PriorityMappingManager");
82 this->mapping_manager_
=
83 RTCORBA::PriorityMappingManager::_narrow (object
.in ());
85 catch (const CORBA::Exception
& ex
)
87 ex
._tao_print_exception ("Caught exception:");
91 MIF_Scheduler::~MIF_Scheduler ()
93 while (free_que_
.message_count () > 0)
96 ACE_Message_Block
*msg
= 0;
97 free_que_
.dequeue_head (msg
);
98 dt
= dynamic_cast<DT
*> (msg
);
104 MIF_Scheduler::incr_thr_count ()
112 MIF_Scheduler::wait ()
118 ACE_DEBUG ((LM_DEBUG
,
126 MIF_Scheduler::resume_main ()
129 wait_cond_
.signal ();
132 MIF_Scheduling::SegmentSchedulingParameterPolicy_ptr
133 MIF_Scheduler::create_segment_scheduling_parameter (CORBA::Short importance
)
135 MIF_Scheduling::SegmentSchedulingParameterPolicy_ptr segment_policy
;
136 ACE_NEW_THROW_EX (segment_policy
,
137 Segment_Sched_Param_Policy
,
139 CORBA::SystemException::_tao_minor_code (
142 CORBA::COMPLETED_NO
));
144 segment_policy
->importance (importance
);
146 return segment_policy
;
151 MIF_Scheduler::begin_new_scheduling_segment (const RTScheduling::Current::IdType
&/*guid*/,
153 CORBA::Policy_ptr sched_policy
,
157 RTScheduling::Current::IdType_var guid
= this->current_
->id ();
159 ACE_OS::memcpy (&count
,
164 MIF_Scheduling::SegmentSchedulingParameterPolicy_var sched_param
=
165 MIF_Scheduling::SegmentSchedulingParameterPolicy::_narrow (sched_policy
);
167 CORBA::Short desired_priority
= sched_param
->importance ();
169 if (TAO_debug_level
> 0)
170 ACE_DEBUG ((LM_DEBUG
,
171 "%t MIF_Scheduler::begin_scheduling_segment - Importance %d\n",
175 if (desired_priority
!= 100)
183 new_dt
->msg_priority (desired_priority
);
185 ready_que_
.enqueue_prio (new_dt
);
193 MIF_Scheduler::begin_nested_scheduling_segment (const RTScheduling::Current::IdType
&guid
,
195 CORBA::Policy_ptr sched_param
,
196 CORBA::Policy_ptr implicit_sched_param
)
198 this->begin_new_scheduling_segment (guid
,
201 implicit_sched_param
);
205 MIF_Scheduler::update_scheduling_segment (const RTScheduling::Current::IdType
&/*guid*/,
206 const char* /*name*/,
207 CORBA::Policy_ptr sched_policy
,
208 CORBA::Policy_ptr
/*implicit_sched_param*/)
211 RTScheduling::Current::IdType_var guid
= this->current_
->id ();
213 ACE_OS::memcpy (&count
,
217 MIF_Scheduling::SegmentSchedulingParameterPolicy_var sched_param
=
218 MIF_Scheduling::SegmentSchedulingParameterPolicy::_narrow (sched_policy
);
220 CORBA::Short desired_priority
= sched_param
->importance ();
222 if (TAO_debug_level
> 0)
223 ACE_DEBUG ((LM_DEBUG
,
224 "%t MIF_Scheduler::update_scheduling_segment - Importance %d\n",
232 new_dt
->msg_priority (desired_priority
);
234 if (ready_que_
.message_count () > 0)
237 ACE_Message_Block
* msg
= 0;
238 ready_que_
.dequeue_head (msg
);
239 run_dt
= dynamic_cast<DT
*> (msg
);
240 if ((desired_priority
== 100) || run_dt
->msg_priority () >= (unsigned int)desired_priority
)
242 ready_que_
.enqueue_prio (new_dt
);
247 free_que_
.enqueue_prio (run_dt
);
251 ready_que_
.enqueue_prio (run_dt
);
259 MIF_Scheduler::end_scheduling_segment (const RTScheduling::Current::IdType
&guid
,
263 ACE_OS::memcpy (&count
,
267 ACE_DEBUG ((LM_DEBUG
,
268 "MIF_Scheduler::end_scheduling_segment %d\n",
271 if (ready_que_
.message_count () > 0)
274 ACE_Message_Block
* msg
= 0;
275 ready_que_
.dequeue_head (msg
);
276 run_dt
= dynamic_cast<DT
*> (msg
);
280 free_que_
.enqueue_prio (run_dt
);
285 MIF_Scheduler::end_nested_scheduling_segment (const RTScheduling::Current::IdType
&,
292 MIF_Scheduler::send_request (PortableInterceptor::ClientRequestInfo_ptr request_info
)
294 CORBA::Policy_var sched_param
= current_
->scheduling_parameter ();
296 MIF_Scheduling::SegmentSchedulingParameterPolicy_var sched_param_var
=
297 MIF_Scheduling::SegmentSchedulingParameterPolicy::_narrow (sched_param
.in ());
299 IOP::ServiceContext srv_con
;
300 srv_con
.context_id
= Client_Interceptor::SchedulingInfo
;
302 RTScheduling::Current::IdType_var guid
= current_
->id ();
304 int guid_length
= guid
->length ();
306 CORBA::OctetSeq
seq_buf (guid_length
);
307 seq_buf
.length (seq_buf
.maximum ());
308 ACE_OS::memcpy (seq_buf
.get_buffer (),
312 int cxt_data_length
= sizeof (int) + guid_length
;
313 srv_con
.context_data
.length (cxt_data_length
);
316 for (;i
< guid_length
;i
++)
318 srv_con
.context_data
[i
] = seq_buf
[i
];
321 int importance
= sched_param_var
->importance ();
322 CORBA::OctetSeq
int_buf (sizeof (importance
));
323 int_buf
.length (int_buf
.maximum ());
324 ACE_OS::memcpy (int_buf
.get_buffer (),
326 sizeof (importance
));
329 for (;i
< cxt_data_length
;i
++)
331 srv_con
.context_data
[i
] = int_buf
[j
++];
334 request_info
->add_request_service_context (srv_con
,
338 if (ready_que_
.message_count () > 0)
341 ACE_hthread_t current
;
342 ACE_Thread::self (current
);
343 if (ACE_Thread::getprio (current
, priority
) == -1)
346 ACE_DEBUG ((LM_DEBUG
,
347 "Initial thread priority is %d %d\n",
349 ACE_DEFAULT_THREAD_PRIORITY
));
351 RTCORBA::Priority rtpriority
;
352 RTCORBA::PriorityMapping
* pm
= this->mapping_manager_
->mapping ();
353 if (pm
->to_CORBA(priority
+ 1, rtpriority
))
355 current_
->the_priority (rtpriority
);
357 ACE_Thread::self (current
);
358 if (ACE_Thread::getprio (current
, priority
) == -1)
361 ACE_DEBUG ((LM_DEBUG
,
362 "Bumped thread priority is %d\n",
367 ACE_Message_Block
* msg
= 0;
368 ready_que_
.dequeue_head (msg
);
369 run_dt
= dynamic_cast<DT
*> (msg
);
371 free_que_
.enqueue_prio (run_dt
);
377 MIF_Scheduler::receive_request (PortableInterceptor::ServerRequestInfo_ptr request_info
,
378 RTScheduling::Current::IdType_out guid_out
,
380 CORBA::Policy_out sched_param_out
,
381 CORBA::Policy_out
/*implicit_sched_param*/)
383 if (TAO_debug_level
> 0)
384 ACE_DEBUG ((LM_DEBUG
,
385 "MIF_Scheduler::receive_request\n"));
387 IOP::ServiceContext_var serv_cxt
=
388 request_info
->get_request_service_context (Server_Interceptor::SchedulingInfo
);
392 ACE_DEBUG ((LM_DEBUG
,
393 "Got scheduling info\n"));
395 RTScheduling::Current::IdType
* guid
;
397 RTScheduling::Current::IdType
);
399 guid
->length (sizeof(size_t));
400 ACE_OS::memcpy (guid
->get_buffer (),
401 serv_cxt
->context_data
.get_buffer (),
405 ACE_OS::memcpy (&gu_id
,
409 ACE_DEBUG ((LM_DEBUG
,
410 "MIF_Scheduler::receive_request %d\n",
414 CORBA::OctetSeq
int_buf (sizeof (long));
415 int_buf
.length (int_buf
.maximum ());
416 int i
= sizeof (long);
417 for (unsigned int j
= 0;j
< sizeof (int);j
++)
419 int_buf
[j
] = serv_cxt
->context_data
[i
++];
423 ACE_OS::memcpy (&importance
,
424 int_buf
.get_buffer (),
425 sizeof (importance
));
428 sched_param_out
= DT_TEST::instance ()->scheduler ()->create_segment_scheduling_parameter (importance
);
430 if (TAO_debug_level
> 0)
431 ACE_DEBUG ((LM_DEBUG
,
432 "%t The Guid is %d Importance is %d\n",
441 new_dt
->msg_priority (importance
);
443 ready_que_
.enqueue_prio (new_dt
);
450 MIF_Scheduler::send_reply (PortableInterceptor::ServerRequestInfo_ptr
)
452 RTScheduling::Current::IdType_var guid
= current_
->id ();
455 ACE_OS::memcpy (&count
,
459 ACE_DEBUG ((LM_DEBUG
,
460 "MIF_Scheduler::send_reply %d\n",
463 if (ready_que_
.message_count () > 0)
466 ACE_Message_Block
* msg
= 0;
467 ready_que_
.dequeue_head (msg
);
468 run_dt
= dynamic_cast<DT
*> (msg
);
472 free_que_
.enqueue_prio (run_dt
);
477 MIF_Scheduler::send_exception (PortableInterceptor::ServerRequestInfo_ptr
)
479 if (ready_que_
.message_count () > 0)
482 ACE_Message_Block
* msg
= 0;
483 ready_que_
.dequeue_head (msg
);
484 run_dt
= dynamic_cast<DT
*> (msg
);
488 free_que_
.enqueue_prio (run_dt
);
493 MIF_Scheduler::send_other (PortableInterceptor::ServerRequestInfo_ptr
)
495 if (TAO_debug_level
> 0)
497 RTScheduling::Current::IdType_var guid
= current_
->id ();
500 ACE_OS::memcpy (&count
,
505 ACE_DEBUG ((LM_DEBUG
,
506 "MIF_Scheduler::send_other %d\n",
510 if (ready_que_
.message_count () > 0)
513 ACE_Message_Block
* msg
;
514 ready_que_
.dequeue_head (msg
);
515 run_dt
= dynamic_cast<DT
*> (msg
);
519 free_que_
.enqueue_prio (run_dt
);
524 MIF_Scheduler::send_poll (PortableInterceptor::ClientRequestInfo_ptr
)
529 MIF_Scheduler::receive_reply (PortableInterceptor::ClientRequestInfo_ptr
)
531 CORBA::Policy_var sched_param
= current_
->scheduling_parameter ();
533 MIF_Scheduling::SegmentSchedulingParameterPolicy_var sched_param_var
=
534 MIF_Scheduling::SegmentSchedulingParameterPolicy::_narrow (sched_param
.in ());
536 int importance
= sched_param_var
->importance ();
538 RTScheduling::Current::IdType_var guid
= current_
->id ();
541 ACE_OS::memcpy (&gu_id
,
545 if (TAO_debug_level
> 0)
546 ACE_DEBUG ((LM_DEBUG
,
547 "MIF_Scheduler::receive_reply Guid = %d Imp = %d\n",
557 new_dt
->msg_priority (importance
);
560 ready_que_
.enqueue_prio (new_dt
);
562 ACE_hthread_t current
;
563 ACE_Thread::self (current
);
564 if (ACE_Thread::getprio (current
, priority
) == -1)
567 RTCORBA::Priority rtpriority
;
568 RTCORBA::PriorityMapping
* pm
= this->mapping_manager_
->mapping ();
569 if (pm
->to_CORBA(priority
- 1, rtpriority
))
571 current_
->the_priority (rtpriority
);
579 MIF_Scheduler::receive_exception (PortableInterceptor::ClientRequestInfo_ptr
)
581 CORBA::Policy_var sched_param
= current_
->scheduling_parameter ();
583 MIF_Scheduling::SegmentSchedulingParameterPolicy_var sched_param_var
=
584 MIF_Scheduling::SegmentSchedulingParameterPolicy::_narrow (sched_param
.in ());
586 int importance
= sched_param_var
->importance ();
588 RTScheduling::Current::IdType_var guid
= current_
->id ();
591 ACE_OS::memcpy (&gu_id
,
600 new_dt
->msg_priority (importance
);
603 ready_que_
.enqueue_prio (new_dt
);
606 ACE_hthread_t current
;
607 ACE_Thread::self (current
);
608 if (ACE_Thread::getprio (current
, priority
) == -1)
611 RTCORBA::Priority rtpriority
;
612 RTCORBA::PriorityMapping
* pm
= this->mapping_manager_
->mapping ();
613 if (pm
->to_CORBA(priority
- 1, rtpriority
))
615 current_
->the_priority (rtpriority
);
623 MIF_Scheduler::receive_other (PortableInterceptor::ClientRequestInfo_ptr
)
625 CORBA::Policy_var sched_param
= current_
->scheduling_parameter ();
627 MIF_Scheduling::SegmentSchedulingParameterPolicy_var sched_param_var
=
628 MIF_Scheduling::SegmentSchedulingParameterPolicy::_narrow (sched_param
.in ());
630 int importance
= sched_param_var
->importance ();
632 RTScheduling::Current::IdType_var guid
= current_
->id ();
635 ACE_OS::memcpy (&gu_id
,
644 new_dt
->msg_priority (importance
);
647 ready_que_
.enqueue_prio (new_dt
);
650 ACE_hthread_t current
;
651 ACE_Thread::self (current
);
652 if (ACE_Thread::getprio (current
, priority
) == -1)
655 RTCORBA::Priority rtpriority
;
656 RTCORBA::PriorityMapping
* pm
= this->mapping_manager_
->mapping ();
657 if (pm
->to_CORBA(priority
- 1, rtpriority
))
659 current_
->the_priority (rtpriority
);
667 MIF_Scheduler::cancel (const RTScheduling::Current::IdType
&)
672 MIF_Scheduler::scheduling_policies ()
678 MIF_Scheduler::scheduling_policies (const CORBA::PolicyList
&)
683 MIF_Scheduler::poa_policies ()
689 MIF_Scheduler::scheduling_discipline_name ()
694 RTScheduling::ResourceManager_ptr
695 MIF_Scheduler::create_resource_manager (const char *,
702 MIF_Scheduler::set_scheduling_parameter (PortableServer::Servant
&,