1 #include "MUF_Scheduler.h"
2 #include "Kokyu_qosC.h"
4 #include "tao/RTScheduling/Request_Interceptor.h"
5 #include "tao/CodecFactory/CodecFactory.h"
6 #include "tao/ORB_Constants.h"
8 MUF_Sched_Param_Policy::MUF_Sched_Param_Policy ()
12 MUF_Sched_Param_Policy::MUF_Sched_Param_Policy (
13 const MUF_Sched_Param_Policy
&rhs
17 MUF_Scheduling::SchedulingParameterPolicy (),
18 CORBA::LocalObject (),
23 MUF_Scheduling::SchedulingParameter
24 MUF_Sched_Param_Policy::value ()
30 MUF_Sched_Param_Policy::value (const MUF_Scheduling::SchedulingParameter
& value
)
36 MUF_Sched_Param_Policy::copy ()
38 MUF_Sched_Param_Policy
* tmp
= 0;
39 ACE_NEW_THROW_EX (tmp
,
40 MUF_Sched_Param_Policy (*this),
41 CORBA::NO_MEMORY (TAO::VMCID
,
42 CORBA::COMPLETED_NO
));
48 MUF_Sched_Param_Policy::policy_type ()
54 MUF_Sched_Param_Policy::destroy ()
58 MUF_Scheduler::MUF_Scheduler (CORBA::ORB_ptr orb
,
59 Kokyu::DSRT_Dispatcher_Impl_t disp_impl_type
,
62 : orb_ (CORBA::ORB::_duplicate (orb
)),
63 disp_impl_type_ (disp_impl_type
),
64 ace_sched_policy_ (ace_sched_policy
),
65 ace_sched_scope_ (ace_sched_scope
)
67 Kokyu::DSRT_ConfigInfo config
;
69 config
.impl_type_
= this->disp_impl_type_
;
70 config
.sched_policy_
= ace_sched_policy_
;
71 config
.sched_scope_
= ace_sched_scope_
;
73 Kokyu::DSRT_Dispatcher_Factory
<MUF_Scheduler_Traits
>::DSRT_Dispatcher_Auto_Ptr
74 tmp( Kokyu::DSRT_Dispatcher_Factory
<MUF_Scheduler_Traits
>::
75 create_DSRT_dispatcher (config
) );
76 kokyu_dispatcher_
= std::move(tmp
);
78 CORBA::Object_var object
=
79 orb
->resolve_initial_references ("RTScheduler_Current");
82 RTScheduling::Current::_narrow (object
.in ());
84 IOP::CodecFactory_var codec_factory
;
85 CORBA::Object_var obj
=
86 orb
->resolve_initial_references ("CodecFactory");
88 if (CORBA::is_nil(obj
.in ()))
90 ACE_ERROR ((LM_ERROR
, "Nil Codec factory\n"));
94 codec_factory
= IOP::CodecFactory::_narrow (obj
.in ());
97 IOP::Encoding encoding
;
98 encoding
.format
= IOP::ENCODING_CDR_ENCAPS
;
99 encoding
.major_version
= 1;
100 encoding
.minor_version
= 2;
102 codec_
= codec_factory
->create_codec (encoding
);
105 MUF_Scheduler::~MUF_Scheduler ()
110 MUF_Scheduler::shutdown ()
112 kokyu_dispatcher_
->shutdown ();
113 ACE_DEBUG ((LM_DEBUG
, "kokyu DSRT dispatcher shutdown\n"));
116 MUF_Scheduling::SchedulingParameterPolicy_ptr
117 MUF_Scheduler::create_scheduling_parameter (const MUF_Scheduling::SchedulingParameter
& value
)
119 MUF_Scheduling::SchedulingParameterPolicy_ptr sched_param_policy
;
120 ACE_NEW_THROW_EX (sched_param_policy
,
121 MUF_Sched_Param_Policy
,
123 CORBA::SystemException::_tao_minor_code (
126 CORBA::COMPLETED_NO
));
128 sched_param_policy
->value (value
);
130 return sched_param_policy
;
135 MUF_Scheduler::begin_new_scheduling_segment (const RTScheduling::Current::IdType
&guid
,
137 CORBA::Policy_ptr sched_policy
,
140 #ifdef KOKYU_DSRT_LOGGING
141 ACE_DEBUG ((LM_DEBUG
,
142 "(%t|%T):MUF_Scheduler::begin_new_scheduling_segment enter\n"));
145 #ifdef KOKYU_DSRT_LOGGING
147 ACE_OS::memcpy (&int_guid
,
150 ACE_DEBUG ((LM_DEBUG
, "(%t|%T): guid is %d\n", int_guid
));
153 MUF_Scheduler_Traits::QoSDescriptor_t qos
;
154 MUF_Scheduling::SchedulingParameterPolicy_var sched_param_policy
=
155 MUF_Scheduling::SchedulingParameterPolicy::_narrow (sched_policy
);
157 MUF_Scheduling::SchedulingParameter_var sched_param
= sched_param_policy
->value ();
159 qos
.deadline_
= sched_param
->deadline
;
160 qos
.exec_time_
= sched_param
->estimated_initial_execution_time
;
161 qos
.criticality_
= sched_param
->criticality
;
163 kokyu_dispatcher_
->schedule (guid
, qos
);
165 #ifdef KOKYU_DSRT_LOGGING
166 ACE_DEBUG ((LM_DEBUG
,
167 "(%t|%T):MUF_Scheduler::begin_new_scheduling_segment exit\n"));
173 MUF_Scheduler::begin_nested_scheduling_segment (const RTScheduling::Current::IdType
&guid
,
175 CORBA::Policy_ptr sched_param
,
176 CORBA::Policy_ptr implicit_sched_param
)
178 this->begin_new_scheduling_segment (guid
,
181 implicit_sched_param
);
185 MUF_Scheduler::update_scheduling_segment (const RTScheduling::Current::IdType
& guid
,
187 CORBA::Policy_ptr sched_policy
,
188 CORBA::Policy_ptr implicit_sched_param
)
190 ACE_UNUSED_ARG ((name
));
191 ACE_UNUSED_ARG ((implicit_sched_param
));
193 #ifdef KOKYU_DSRT_LOGGING
195 ACE_OS::memcpy (&int_guid
,
198 ACE_DEBUG ((LM_DEBUG
, "(%t|%T): update_sched_seg::guid is %d\n", int_guid
));
201 MUF_Scheduling::SchedulingParameterPolicy_var sched_param_policy
=
202 MUF_Scheduling::SchedulingParameterPolicy::_narrow (sched_policy
);
204 MUF_Scheduling::SchedulingParameter_var sched_param
= sched_param_policy
->value ();
205 MUF_Scheduler_Traits::QoSDescriptor_t qos
;
207 qos
.deadline_
= sched_param
->deadline
;
208 qos
.exec_time_
= sched_param
->estimated_initial_execution_time
;
209 qos
.criticality_
= sched_param
->criticality
;
211 kokyu_dispatcher_
->update_schedule (guid
, qos
);
215 MUF_Scheduler::end_scheduling_segment (const RTScheduling::Current::IdType
&guid
,
218 #ifdef KOKYU_DSRT_LOGGING
220 ACE_OS::memcpy (&int_guid
,
223 ACE_DEBUG ((LM_DEBUG
, "(%t|%T) call to end_sched_segment for guid %d\n", int_guid
));
226 kokyu_dispatcher_
->cancel_schedule (guid
);
230 MUF_Scheduler::end_nested_scheduling_segment (const RTScheduling::Current::IdType
&,
238 MUF_Scheduler::send_request (PortableInterceptor::ClientRequestInfo_ptr ri
)
240 Kokyu::Svc_Ctxt_DSRT_QoS sc_qos
;
242 CORBA::String_var operation
= ri
->operation ();
244 #ifdef KOKYU_DSRT_LOGGING
245 ACE_DEBUG ((LM_DEBUG
,
246 "(%t|%T): send_request "
251 // Make the context to send the context to the target
252 IOP::ServiceContext sc
;
253 sc
.context_id
= Client_Interceptor::SchedulingInfo
;
255 CORBA::Policy_var sched_policy
=
256 this->current_
->scheduling_parameter();
258 RTScheduling::Current::IdType_var guid
= this->current_
->id ();
261 ACE_OS::memcpy (&guid,
266 CORBA::Long criticality
;
267 TimeBase::TimeT deadline
,exec_time
;
269 if (CORBA::is_nil (sched_policy
.in ()))
271 //24 hrs from now - infinity
272 ACE_Time_Value deadline_tv
= ACE_OS::gettimeofday () + ACE_Time_Value (24*60*60,0);
273 deadline
= deadline_tv
.sec () * 1000000 + deadline_tv
.usec () * 10; //100s of nanoseconds for TimeBase::TimeT
279 MUF_Scheduling::SchedulingParameterPolicy_var sched_param_policy
=
280 MUF_Scheduling::SchedulingParameterPolicy::_narrow (sched_policy
.in ());
282 MUF_Scheduling::SchedulingParameter_var sched_param
= sched_param_policy
->value ();
283 deadline
= sched_param
->deadline
;
284 exec_time
= sched_param
->estimated_initial_execution_time
;
285 criticality
= sched_param
->criticality
;
287 #ifdef KOKYU_DSRT_LOGGING
289 ACE_OS::memcpy (&int_guid
,
292 ACE_DEBUG ((LM_DEBUG
,
293 "(%t|%T): send_request guid = %d\n",
297 //Fill the guid in the SC Qos struct
298 sc_qos
.guid
.length (guid
->length ());
299 guid_copy (sc_qos
.guid
, guid
.in ());
300 sc_qos
.deadline
= deadline
;
301 sc_qos
.estimated_initial_execution_time
= exec_time
;
302 sc_qos
.criticality
= criticality
;
303 CORBA::Any sc_qos_as_any
;
304 sc_qos_as_any
<<= sc_qos
;
306 CORBA::OctetSeq_var cdtmp
= codec_
->encode (sc_qos_as_any
);
307 sc
.context_data
= cdtmp
.in ();
309 #ifdef KOKYU_DSRT_LOGGING
310 ACE_DEBUG ((LM_DEBUG
,
311 ACE_TEXT ("(%t|%T): send_request : about to add sched SC\n")));
314 // Add this context to the service context list.
315 ri
->add_request_service_context (sc
, 0);
319 #ifdef KOKYU_DSRT_LOGGING
320 ACE_DEBUG ((LM_DEBUG
,
321 ACE_TEXT ("(%t|%T): send_request : ")
322 ACE_TEXT ("about to call scheduler to inform block\n")));
325 kokyu_dispatcher_
->update_schedule (guid
.in (),
328 #ifdef KOKYU_DSRT_LOGGING
329 ACE_DEBUG ((LM_DEBUG
,
330 ACE_TEXT ("(%t|%T): send_request interceptor done\n")));
335 MUF_Scheduler::receive_request (PortableInterceptor::ServerRequestInfo_ptr ri
,
336 RTScheduling::Current::IdType_out guid_out
,
337 CORBA::String_out
/*name*/,
338 CORBA::Policy_out sched_param_out
,
339 CORBA::Policy_out
/*implicit_sched_param_out*/)
341 const Kokyu::Svc_Ctxt_DSRT_QoS
* sc_qos_ptr
= 0;
343 #ifdef KOKYU_DSRT_LOGGING
344 ACE_DEBUG ((LM_DEBUG
, "(%t|%T):entered MUF_Scheduler::receive_request\n"));
347 RTScheduling::Current::IdType guid
;
349 CORBA::String_var operation
= ri
->operation ();
351 #ifdef KOKYU_DSRT_LOGGING
352 ACE_DEBUG ((LM_DEBUG
,
353 "(%t|%T): receive_request from "
358 // Ignore the "_is_a" operation since it may have been invoked
359 // locally on the server side as a side effect of another call,
360 // meaning that the client hasn't added the service context yet.
361 if (ACE_OS::strcmp ("_is_a", operation
.in ()) == 0)
364 IOP::ServiceContext_var sc
=
365 ri
->get_request_service_context (Server_Interceptor::SchedulingInfo
);
367 CORBA::Long criticality
;
368 TimeBase::TimeT deadline
,exec_time
;
372 //24 hrs from now - infinity
373 ACE_Time_Value deadline_tv
= ACE_OS::gettimeofday () + ACE_Time_Value (24*60*60,0);
374 deadline
= deadline_tv
.sec ()*1000000 + deadline_tv
.usec ()*10; //100s of nanoseconds for TimeBase::TimeT
380 CORBA::OctetSeq oc_seq
= CORBA::OctetSeq (sc
->context_data
.length (),
381 sc
->context_data
.length (),
382 sc
->context_data
.get_buffer (),
384 CORBA::Any sc_qos_as_any
;
385 CORBA::Any_var scqostmp
= codec_
->decode (oc_seq
);
386 sc_qos_as_any
= scqostmp
.in ();
387 //Don't store in a _var, since >>= returns a pointer to an
388 //internal buffer and we are not supposed to free it.
389 sc_qos_as_any
>>= sc_qos_ptr
;
391 deadline
= sc_qos_ptr
->deadline
;
392 criticality
= sc_qos_ptr
->criticality
;
393 exec_time
= sc_qos_ptr
->estimated_initial_execution_time
;
395 guid
.length (sc_qos_ptr
->guid
.length ());
396 guid_copy (guid
, sc_qos_ptr
->guid
);
398 ACE_NEW (guid_out
.ptr (),
399 RTScheduling::Current::IdType
);
400 guid_out
.ptr ()->length (guid
.length ());
401 *(guid_out
.ptr ()) = guid
;
403 #ifdef KOKYU_DSRT_LOGGING
405 ACE_OS::memcpy (&int_guid
,
408 ACE_DEBUG ((LM_DEBUG
,
409 "(%t|%T): Criticality = %d, guid = %d "
410 "in recvd service context\n",
414 MUF_Scheduling::SchedulingParameter sched_param
;
415 sched_param
.criticality
= criticality
;
416 sched_param
.deadline
= deadline
;
417 sched_param_out
= this->create_scheduling_parameter (sched_param
);
420 MUF_Scheduler_Traits::QoSDescriptor_t qos
;
421 qos
.criticality_
= criticality
;
422 qos
.deadline_
= deadline
;
423 qos
.exec_time_
= exec_time
;
425 this->kokyu_dispatcher_
->schedule (guid
, qos
);
427 #ifdef KOKYU_DSRT_LOGGING
428 ACE_DEBUG ((LM_DEBUG
, "(%t|%T): receive_request interceptor done\n"));
434 MUF_Scheduler::send_poll (PortableInterceptor::ClientRequestInfo_ptr
)
439 MUF_Scheduler::send_reply (PortableInterceptor::ServerRequestInfo_ptr ri
)
441 Kokyu::Svc_Ctxt_DSRT_QoS sc_qos
;
443 CORBA::String_var operation
= ri
->operation ();
445 #ifdef KOKYU_DSRT_LOGGING
446 ACE_DEBUG ((LM_DEBUG
,
447 "(%t|%T): send_reply from \"%s\"\n",
451 // Make the context to send the context to the target
452 IOP::ServiceContext sc
;
453 sc
.context_id
= Server_Interceptor::SchedulingInfo
;
456 CORBA::Long criticality
;
457 TimeBase::TimeT deadline
,exec_time
;
459 CORBA::Policy_var sched_policy
=
460 this->current_
->scheduling_parameter();
462 RTScheduling::Current::IdType_var guid
= this->current_
->id ();
464 if (CORBA::is_nil (sched_policy
.in ()))
466 #ifdef KOKYU_DSRT_LOGGING
467 ACE_DEBUG ((LM_DEBUG
,
468 "(%t|%T): sched_policy nil.\n"));
470 //24 hrs from now - infinity
471 ACE_Time_Value deadline_tv
= ACE_OS::gettimeofday () + ACE_Time_Value (24*60*60,0);
472 deadline
= deadline_tv
.sec ()*1000000 + deadline_tv
.usec ()*10; //100s of nanoseconds for TimeBase::TimeT
478 #ifdef KOKYU_DSRT_LOGGING
479 ACE_DEBUG ((LM_DEBUG
,
480 "(%t|%T):sched_policy not nil. ",
481 "sched params set\n"));
483 MUF_Scheduling::SchedulingParameterPolicy_var sched_param_policy
=
484 MUF_Scheduling::SchedulingParameterPolicy::_narrow (sched_policy
.in ());
485 MUF_Scheduling::SchedulingParameter_var sched_param
= sched_param_policy
->value ();
488 sc_qos
.guid
.length (guid
->length ());
489 guid_copy (sc_qos
.guid
, guid
.in ());
491 deadline
= sched_param
->deadline
;
492 exec_time
= sched_param
->estimated_initial_execution_time
;
493 criticality
= sched_param
->criticality
;
494 sc_qos
.deadline
= deadline
;
495 sc_qos
.estimated_initial_execution_time
= exec_time
;
496 sc_qos
.criticality
= criticality
;
498 CORBA::Any sc_qos_as_any
;
499 sc_qos_as_any
<<= sc_qos
;
501 CORBA::OctetSeq_var cdtmp
= codec_
->encode (sc_qos_as_any
);
502 sc
.context_data
= cdtmp
.in ();
504 // Add this context to the service context list.
505 ri
->add_reply_service_context (sc
, 1);
507 #ifdef KOKYU_DSRT_LOGGING
508 ACE_DEBUG ((LM_DEBUG
, "(%t|%T):reply sc added\n"));
512 kokyu_dispatcher_
->update_schedule (guid
.in (),
515 #ifdef KOKYU_DSRT_LOGGING
516 ACE_DEBUG ((LM_DEBUG
, "(%t|%T): send_reply interceptor done\n"));
521 MUF_Scheduler::send_exception (PortableInterceptor::ServerRequestInfo_ptr ri
)
527 MUF_Scheduler::send_other (PortableInterceptor::ServerRequestInfo_ptr ri
)
533 MUF_Scheduler::receive_reply (PortableInterceptor::ClientRequestInfo_ptr ri
)
535 RTScheduling::Current::IdType guid
;
537 CORBA::String_var operation
= ri
->operation ();
539 CORBA::Object_var target
= ri
->target ();
541 ACE_CString opname
= operation
.in ();
542 #ifdef KOKYU_DSRT_LOGGING
543 ACE_DEBUG ((LM_DEBUG
,
544 "(%t|%T):receive_reply from "
549 // Check that the reply service context was received as
551 IOP::ServiceContext_var sc
=
552 ri
->get_reply_service_context (Client_Interceptor::SchedulingInfo
);
554 CORBA::Long criticality
;
555 TimeBase::TimeT deadline
,exec_time
;
559 ACE_DEBUG ((LM_DEBUG
, "service context was not filled\n"));
560 //24 hrs from now - infinity
561 ACE_Time_Value deadline_tv
= ACE_OS::gettimeofday () + ACE_Time_Value (24*60*60,0);
562 deadline
= deadline_tv
.sec ()*1000000 + deadline_tv
.usec ()*10; //100s of nanoseconds for TimeBase::TimeT
568 CORBA::OctetSeq oc_seq
= CORBA::OctetSeq (sc
->context_data
.length (),
569 sc
->context_data
.length (),
570 sc
->context_data
.get_buffer (),
573 //Don't store in a _var, since >>= returns a pointer to an internal buffer
574 //and we are not supposed to free it.
575 const Kokyu::Svc_Ctxt_DSRT_QoS
* sc_qos_ptr
= 0;
576 CORBA::Any sc_qos_as_any
;
577 CORBA::Any_var scqostmp
= codec_
->decode (oc_seq
);
578 sc_qos_as_any
= scqostmp
.in ();
579 sc_qos_as_any
>>= sc_qos_ptr
;
581 deadline
= sc_qos_ptr
->deadline
;
582 criticality
= sc_qos_ptr
->criticality
;
583 exec_time
= sc_qos_ptr
->estimated_initial_execution_time
;
585 guid
.length (sc_qos_ptr
->guid
.length ());
586 guid_copy (guid
, sc_qos_ptr
->guid
);
588 ACE_DEBUG ((LM_DEBUG
,
589 "(%t|%T): Criticality = %d in recvd service context\n",
593 MUF_Scheduler_Traits::QoSDescriptor_t qos
;
594 qos
.deadline_
= qos
.criticality_
= criticality
;
595 qos
.deadline_
= deadline
;
596 qos
.exec_time_
= exec_time
;
597 this->kokyu_dispatcher_
->schedule (guid
, qos
);
601 MUF_Scheduler::receive_exception (PortableInterceptor::ClientRequestInfo_ptr ri
)
607 MUF_Scheduler::receive_other (PortableInterceptor::ClientRequestInfo_ptr ri
)
613 MUF_Scheduler::cancel (const RTScheduling::Current::IdType
&)
615 throw CORBA::NO_IMPLEMENT ();
619 MUF_Scheduler::scheduling_policies ()
621 throw CORBA::NO_IMPLEMENT ();
625 MUF_Scheduler::scheduling_policies (const CORBA::PolicyList
&)
627 throw CORBA::NO_IMPLEMENT ();
631 MUF_Scheduler::poa_policies ()
633 throw CORBA::NO_IMPLEMENT ();
637 MUF_Scheduler::scheduling_discipline_name ()
639 throw CORBA::NO_IMPLEMENT ();
642 RTScheduling::ResourceManager_ptr
643 MUF_Scheduler::create_resource_manager (const char *,
646 throw CORBA::NO_IMPLEMENT ();
650 MUF_Scheduler::set_scheduling_parameter (PortableServer::Servant
&,
654 throw CORBA::NO_IMPLEMENT ();