1 #include "MUF_Scheduler.h"
2 #include "Kokyu_qosC.h"
4 #include "tao/RTScheduling/Request_Interceptor.h"
5 #include "tao/CodecFactory/CodecFactory.h"
6 #include "tao/ORB_Constants.h"
8 MUF_Sched_Param_Policy::MUF_Sched_Param_Policy ()
12 MUF_Sched_Param_Policy::MUF_Sched_Param_Policy (
13 const MUF_Sched_Param_Policy
&rhs
17 MUF_Scheduling::SchedulingParameterPolicy (),
18 CORBA::LocalObject (),
23 MUF_Scheduling::SchedulingParameter
24 MUF_Sched_Param_Policy::value (void)
30 MUF_Sched_Param_Policy::value (const MUF_Scheduling::SchedulingParameter
& value
)
36 MUF_Sched_Param_Policy::copy (void)
38 MUF_Sched_Param_Policy
* tmp
= 0;
39 ACE_NEW_THROW_EX (tmp
,
40 MUF_Sched_Param_Policy (*this),
41 CORBA::NO_MEMORY (TAO::VMCID
,
42 CORBA::COMPLETED_NO
));
48 MUF_Sched_Param_Policy::policy_type (void)
54 MUF_Sched_Param_Policy::destroy (void)
58 MUF_Scheduler::MUF_Scheduler (CORBA::ORB_ptr orb
,
59 Kokyu::DSRT_Dispatcher_Impl_t disp_impl_type
,
62 : orb_ (CORBA::ORB::_duplicate (orb
)),
63 disp_impl_type_ (disp_impl_type
),
64 ace_sched_policy_ (ace_sched_policy
),
65 ace_sched_scope_ (ace_sched_scope
)
68 Kokyu::DSRT_ConfigInfo config
;
70 config
.impl_type_
= this->disp_impl_type_
;
71 config
.sched_policy_
= ace_sched_policy_
;
72 config
.sched_scope_
= ace_sched_scope_
;
74 Kokyu::DSRT_Dispatcher_Factory
<MUF_Scheduler_Traits
>::DSRT_Dispatcher_Auto_Ptr
75 tmp( Kokyu::DSRT_Dispatcher_Factory
<MUF_Scheduler_Traits
>::
76 create_DSRT_dispatcher (config
) );
77 kokyu_dispatcher_
= tmp
;
79 CORBA::Object_var object
=
80 orb
->resolve_initial_references ("RTScheduler_Current");
83 RTScheduling::Current::_narrow (object
.in ());
85 IOP::CodecFactory_var codec_factory
;
86 CORBA::Object_var obj
=
87 orb
->resolve_initial_references ("CodecFactory");
89 if (CORBA::is_nil(obj
.in ()))
91 ACE_ERROR ((LM_ERROR
, "Nil Codec factory\n"));
95 codec_factory
= IOP::CodecFactory::_narrow (obj
.in ());
98 IOP::Encoding encoding
;
99 encoding
.format
= IOP::ENCODING_CDR_ENCAPS
;
100 encoding
.major_version
= 1;
101 encoding
.minor_version
= 2;
103 codec_
= codec_factory
->create_codec (encoding
);
106 MUF_Scheduler::~MUF_Scheduler (void)
111 MUF_Scheduler::shutdown (void)
113 kokyu_dispatcher_
->shutdown ();
114 ACE_DEBUG ((LM_DEBUG
, "kokyu DSRT dispatcher shutdown\n"));
117 MUF_Scheduling::SchedulingParameterPolicy_ptr
118 MUF_Scheduler::create_scheduling_parameter (const MUF_Scheduling::SchedulingParameter
& value
)
120 MUF_Scheduling::SchedulingParameterPolicy_ptr sched_param_policy
;
121 ACE_NEW_THROW_EX (sched_param_policy
,
122 MUF_Sched_Param_Policy
,
124 CORBA::SystemException::_tao_minor_code (
127 CORBA::COMPLETED_NO
));
129 sched_param_policy
->value (value
);
131 return sched_param_policy
;
136 MUF_Scheduler::begin_new_scheduling_segment (const RTScheduling::Current::IdType
&guid
,
138 CORBA::Policy_ptr sched_policy
,
141 #ifdef KOKYU_DSRT_LOGGING
142 ACE_DEBUG ((LM_DEBUG
,
143 "(%t|%T):MUF_Scheduler::begin_new_scheduling_segment enter\n"));
146 #ifdef KOKYU_DSRT_LOGGING
148 ACE_OS::memcpy (&int_guid
,
151 ACE_DEBUG ((LM_DEBUG
, "(%t|%T): guid is %d\n", int_guid
));
154 MUF_Scheduler_Traits::QoSDescriptor_t qos
;
155 MUF_Scheduling::SchedulingParameterPolicy_var sched_param_policy
=
156 MUF_Scheduling::SchedulingParameterPolicy::_narrow (sched_policy
);
158 MUF_Scheduling::SchedulingParameter_var sched_param
= sched_param_policy
->value ();
160 qos
.deadline_
= sched_param
->deadline
;
161 qos
.exec_time_
= sched_param
->estimated_initial_execution_time
;
162 qos
.criticality_
= sched_param
->criticality
;
164 kokyu_dispatcher_
->schedule (guid
, qos
);
166 #ifdef KOKYU_DSRT_LOGGING
167 ACE_DEBUG ((LM_DEBUG
,
168 "(%t|%T):MUF_Scheduler::begin_new_scheduling_segment exit\n"));
174 MUF_Scheduler::begin_nested_scheduling_segment (const RTScheduling::Current::IdType
&guid
,
176 CORBA::Policy_ptr sched_param
,
177 CORBA::Policy_ptr implicit_sched_param
)
179 this->begin_new_scheduling_segment (guid
,
182 implicit_sched_param
);
186 MUF_Scheduler::update_scheduling_segment (const RTScheduling::Current::IdType
& guid
,
188 CORBA::Policy_ptr sched_policy
,
189 CORBA::Policy_ptr implicit_sched_param
)
191 ACE_UNUSED_ARG ((name
));
192 ACE_UNUSED_ARG ((implicit_sched_param
));
194 #ifdef KOKYU_DSRT_LOGGING
196 ACE_OS::memcpy (&int_guid
,
199 ACE_DEBUG ((LM_DEBUG
, "(%t|%T): update_sched_seg::guid is %d\n", int_guid
));
202 MUF_Scheduling::SchedulingParameterPolicy_var sched_param_policy
=
203 MUF_Scheduling::SchedulingParameterPolicy::_narrow (sched_policy
);
205 MUF_Scheduling::SchedulingParameter_var sched_param
= sched_param_policy
->value ();
206 MUF_Scheduler_Traits::QoSDescriptor_t qos
;
208 qos
.deadline_
= sched_param
->deadline
;
209 qos
.exec_time_
= sched_param
->estimated_initial_execution_time
;
210 qos
.criticality_
= sched_param
->criticality
;
212 kokyu_dispatcher_
->update_schedule (guid
, qos
);
216 MUF_Scheduler::end_scheduling_segment (const RTScheduling::Current::IdType
&guid
,
219 #ifdef KOKYU_DSRT_LOGGING
221 ACE_OS::memcpy (&int_guid
,
224 ACE_DEBUG ((LM_DEBUG
, "(%t|%T) call to end_sched_segment for guid %d\n", int_guid
));
227 kokyu_dispatcher_
->cancel_schedule (guid
);
231 MUF_Scheduler::end_nested_scheduling_segment (const RTScheduling::Current::IdType
&,
239 MUF_Scheduler::send_request (PortableInterceptor::ClientRequestInfo_ptr ri
)
241 Kokyu::Svc_Ctxt_DSRT_QoS sc_qos
;
243 CORBA::String_var operation
= ri
->operation ();
245 #ifdef KOKYU_DSRT_LOGGING
246 ACE_DEBUG ((LM_DEBUG
,
247 "(%t|%T): send_request "
252 // Make the context to send the context to the target
253 IOP::ServiceContext sc
;
254 sc
.context_id
= Client_Interceptor::SchedulingInfo
;
256 CORBA::Policy_var sched_policy
=
257 this->current_
->scheduling_parameter();
259 RTScheduling::Current::IdType_var guid
= this->current_
->id ();
262 ACE_OS::memcpy (&guid,
267 CORBA::Long criticality
;
268 TimeBase::TimeT deadline
,exec_time
;
270 if (CORBA::is_nil (sched_policy
.in ()))
272 //24 hrs from now - infinity
273 ACE_Time_Value deadline_tv
= ACE_OS::gettimeofday () + ACE_Time_Value (24*60*60,0);
274 deadline
= deadline_tv
.sec () * 1000000 + deadline_tv
.usec () * 10; //100s of nanoseconds for TimeBase::TimeT
280 MUF_Scheduling::SchedulingParameterPolicy_var sched_param_policy
=
281 MUF_Scheduling::SchedulingParameterPolicy::_narrow (sched_policy
.in ());
283 MUF_Scheduling::SchedulingParameter_var sched_param
= sched_param_policy
->value ();
284 deadline
= sched_param
->deadline
;
285 exec_time
= sched_param
->estimated_initial_execution_time
;
286 criticality
= sched_param
->criticality
;
288 #ifdef KOKYU_DSRT_LOGGING
290 ACE_OS::memcpy (&int_guid
,
293 ACE_DEBUG ((LM_DEBUG
,
294 "(%t|%T): send_request guid = %d\n",
298 //Fill the guid in the SC Qos struct
299 sc_qos
.guid
.length (guid
->length ());
300 guid_copy (sc_qos
.guid
, guid
.in ());
301 sc_qos
.deadline
= deadline
;
302 sc_qos
.estimated_initial_execution_time
= exec_time
;
303 sc_qos
.criticality
= criticality
;
304 CORBA::Any sc_qos_as_any
;
305 sc_qos_as_any
<<= sc_qos
;
307 CORBA::OctetSeq_var cdtmp
= codec_
->encode (sc_qos_as_any
);
308 sc
.context_data
= cdtmp
.in ();
310 #ifdef KOKYU_DSRT_LOGGING
311 ACE_DEBUG ((LM_DEBUG
,
312 ACE_TEXT ("(%t|%T): send_request : about to add sched SC\n")));
315 // Add this context to the service context list.
316 ri
->add_request_service_context (sc
, 0);
320 #ifdef KOKYU_DSRT_LOGGING
321 ACE_DEBUG ((LM_DEBUG
,
322 ACE_TEXT ("(%t|%T): send_request : ")
323 ACE_TEXT ("about to call scheduler to inform block\n")));
326 kokyu_dispatcher_
->update_schedule (guid
.in (),
329 #ifdef KOKYU_DSRT_LOGGING
330 ACE_DEBUG ((LM_DEBUG
,
331 ACE_TEXT ("(%t|%T): send_request interceptor done\n")));
336 MUF_Scheduler::receive_request (PortableInterceptor::ServerRequestInfo_ptr ri
,
337 RTScheduling::Current::IdType_out guid_out
,
338 CORBA::String_out
/*name*/,
339 CORBA::Policy_out sched_param_out
,
340 CORBA::Policy_out
/*implicit_sched_param_out*/)
342 const Kokyu::Svc_Ctxt_DSRT_QoS
* sc_qos_ptr
= 0;
344 #ifdef KOKYU_DSRT_LOGGING
345 ACE_DEBUG ((LM_DEBUG
, "(%t|%T):entered MUF_Scheduler::receive_request\n"));
348 RTScheduling::Current::IdType guid
;
350 CORBA::String_var operation
= ri
->operation ();
352 #ifdef KOKYU_DSRT_LOGGING
353 ACE_DEBUG ((LM_DEBUG
,
354 "(%t|%T): receive_request from "
359 // Ignore the "_is_a" operation since it may have been invoked
360 // locally on the server side as a side effect of another call,
361 // meaning that the client hasn't added the service context yet.
362 if (ACE_OS::strcmp ("_is_a", operation
.in ()) == 0)
365 IOP::ServiceContext_var sc
=
366 ri
->get_request_service_context (Server_Interceptor::SchedulingInfo
);
368 CORBA::Long criticality
;
369 TimeBase::TimeT deadline
,exec_time
;
373 //24 hrs from now - infinity
374 ACE_Time_Value deadline_tv
= ACE_OS::gettimeofday () + ACE_Time_Value (24*60*60,0);
375 deadline
= deadline_tv
.sec ()*1000000 + deadline_tv
.usec ()*10; //100s of nanoseconds for TimeBase::TimeT
381 CORBA::OctetSeq oc_seq
= CORBA::OctetSeq (sc
->context_data
.length (),
382 sc
->context_data
.length (),
383 sc
->context_data
.get_buffer (),
385 CORBA::Any sc_qos_as_any
;
386 CORBA::Any_var scqostmp
= codec_
->decode (oc_seq
);
387 sc_qos_as_any
= scqostmp
.in ();
388 //Don't store in a _var, since >>= returns a pointer to an
389 //internal buffer and we are not supposed to free it.
390 sc_qos_as_any
>>= sc_qos_ptr
;
392 deadline
= sc_qos_ptr
->deadline
;
393 criticality
= sc_qos_ptr
->criticality
;
394 exec_time
= sc_qos_ptr
->estimated_initial_execution_time
;
396 guid
.length (sc_qos_ptr
->guid
.length ());
397 guid_copy (guid
, sc_qos_ptr
->guid
);
399 ACE_NEW (guid_out
.ptr (),
400 RTScheduling::Current::IdType
);
401 guid_out
.ptr ()->length (guid
.length ());
402 *(guid_out
.ptr ()) = guid
;
404 #ifdef KOKYU_DSRT_LOGGING
406 ACE_OS::memcpy (&int_guid
,
409 ACE_DEBUG ((LM_DEBUG
,
410 "(%t|%T): Criticality = %d, guid = %d "
411 "in recvd service context\n",
415 MUF_Scheduling::SchedulingParameter sched_param
;
416 sched_param
.criticality
= criticality
;
417 sched_param
.deadline
= deadline
;
418 sched_param_out
= this->create_scheduling_parameter (sched_param
);
421 MUF_Scheduler_Traits::QoSDescriptor_t qos
;
422 qos
.criticality_
= criticality
;
423 qos
.deadline_
= deadline
;
424 qos
.exec_time_
= exec_time
;
426 this->kokyu_dispatcher_
->schedule (guid
, qos
);
428 #ifdef KOKYU_DSRT_LOGGING
429 ACE_DEBUG ((LM_DEBUG
, "(%t|%T): receive_request interceptor done\n"));
435 MUF_Scheduler::send_poll (PortableInterceptor::ClientRequestInfo_ptr
)
440 MUF_Scheduler::send_reply (PortableInterceptor::ServerRequestInfo_ptr ri
)
443 Kokyu::Svc_Ctxt_DSRT_QoS sc_qos
;
445 CORBA::String_var operation
= ri
->operation ();
447 #ifdef KOKYU_DSRT_LOGGING
448 ACE_DEBUG ((LM_DEBUG
,
449 "(%t|%T): send_reply from \"%s\"\n",
453 // Make the context to send the context to the target
454 IOP::ServiceContext sc
;
455 sc
.context_id
= Server_Interceptor::SchedulingInfo
;
458 CORBA::Long criticality
;
459 TimeBase::TimeT deadline
,exec_time
;
461 CORBA::Policy_var sched_policy
=
462 this->current_
->scheduling_parameter();
464 RTScheduling::Current::IdType_var guid
= this->current_
->id ();
466 if (CORBA::is_nil (sched_policy
.in ()))
468 #ifdef KOKYU_DSRT_LOGGING
469 ACE_DEBUG ((LM_DEBUG
,
470 "(%t|%T): sched_policy nil.\n"));
472 //24 hrs from now - infinity
473 ACE_Time_Value deadline_tv
= ACE_OS::gettimeofday () + ACE_Time_Value (24*60*60,0);
474 deadline
= deadline_tv
.sec ()*1000000 + deadline_tv
.usec ()*10; //100s of nanoseconds for TimeBase::TimeT
480 #ifdef KOKYU_DSRT_LOGGING
481 ACE_DEBUG ((LM_DEBUG
,
482 "(%t|%T):sched_policy not nil. ",
483 "sched params set\n"));
485 MUF_Scheduling::SchedulingParameterPolicy_var sched_param_policy
=
486 MUF_Scheduling::SchedulingParameterPolicy::_narrow (sched_policy
.in ());
487 MUF_Scheduling::SchedulingParameter_var sched_param
= sched_param_policy
->value ();
490 sc_qos
.guid
.length (guid
->length ());
491 guid_copy (sc_qos
.guid
, guid
.in ());
493 deadline
= sched_param
->deadline
;
494 exec_time
= sched_param
->estimated_initial_execution_time
;
495 criticality
= sched_param
->criticality
;
496 sc_qos
.deadline
= deadline
;
497 sc_qos
.estimated_initial_execution_time
= exec_time
;
498 sc_qos
.criticality
= criticality
;
500 CORBA::Any sc_qos_as_any
;
501 sc_qos_as_any
<<= sc_qos
;
503 CORBA::OctetSeq_var cdtmp
= codec_
->encode (sc_qos_as_any
);
504 sc
.context_data
= cdtmp
.in ();
506 // Add this context to the service context list.
507 ri
->add_reply_service_context (sc
, 1);
509 #ifdef KOKYU_DSRT_LOGGING
510 ACE_DEBUG ((LM_DEBUG
, "(%t|%T):reply sc added\n"));
514 kokyu_dispatcher_
->update_schedule (guid
.in (),
517 #ifdef KOKYU_DSRT_LOGGING
518 ACE_DEBUG ((LM_DEBUG
, "(%t|%T): send_reply interceptor done\n"));
523 MUF_Scheduler::send_exception (PortableInterceptor::ServerRequestInfo_ptr ri
)
529 MUF_Scheduler::send_other (PortableInterceptor::ServerRequestInfo_ptr ri
)
535 MUF_Scheduler::receive_reply (PortableInterceptor::ClientRequestInfo_ptr ri
)
537 RTScheduling::Current::IdType guid
;
539 CORBA::String_var operation
= ri
->operation ();
541 CORBA::Object_var target
= ri
->target ();
543 ACE_CString opname
= operation
.in ();
544 #ifdef KOKYU_DSRT_LOGGING
545 ACE_DEBUG ((LM_DEBUG
,
546 "(%t|%T):receive_reply from "
551 // Check that the reply service context was received as
553 IOP::ServiceContext_var sc
=
554 ri
->get_reply_service_context (Client_Interceptor::SchedulingInfo
);
556 CORBA::Long criticality
;
557 TimeBase::TimeT deadline
,exec_time
;
561 ACE_DEBUG ((LM_DEBUG
, "service context was not filled\n"));
562 //24 hrs from now - infinity
563 ACE_Time_Value deadline_tv
= ACE_OS::gettimeofday () + ACE_Time_Value (24*60*60,0);
564 deadline
= deadline_tv
.sec ()*1000000 + deadline_tv
.usec ()*10; //100s of nanoseconds for TimeBase::TimeT
570 CORBA::OctetSeq oc_seq
= CORBA::OctetSeq (sc
->context_data
.length (),
571 sc
->context_data
.length (),
572 sc
->context_data
.get_buffer (),
575 //Don't store in a _var, since >>= returns a pointer to an internal buffer
576 //and we are not supposed to free it.
577 const Kokyu::Svc_Ctxt_DSRT_QoS
* sc_qos_ptr
= 0;
578 CORBA::Any sc_qos_as_any
;
579 CORBA::Any_var scqostmp
= codec_
->decode (oc_seq
);
580 sc_qos_as_any
= scqostmp
.in ();
581 sc_qos_as_any
>>= sc_qos_ptr
;
583 deadline
= sc_qos_ptr
->deadline
;
584 criticality
= sc_qos_ptr
->criticality
;
585 exec_time
= sc_qos_ptr
->estimated_initial_execution_time
;
587 guid
.length (sc_qos_ptr
->guid
.length ());
588 guid_copy (guid
, sc_qos_ptr
->guid
);
590 ACE_DEBUG ((LM_DEBUG
,
591 "(%t|%T): Criticality = %d in recvd service context\n",
595 MUF_Scheduler_Traits::QoSDescriptor_t qos
;
596 qos
.deadline_
= qos
.criticality_
= criticality
;
597 qos
.deadline_
= deadline
;
598 qos
.exec_time_
= exec_time
;
599 this->kokyu_dispatcher_
->schedule (guid
, qos
);
603 MUF_Scheduler::receive_exception (PortableInterceptor::ClientRequestInfo_ptr ri
)
609 MUF_Scheduler::receive_other (PortableInterceptor::ClientRequestInfo_ptr ri
)
615 MUF_Scheduler::cancel (const RTScheduling::Current::IdType
&)
617 throw CORBA::NO_IMPLEMENT ();
621 MUF_Scheduler::scheduling_policies (void)
623 throw CORBA::NO_IMPLEMENT ();
627 MUF_Scheduler::scheduling_policies (const CORBA::PolicyList
&)
629 throw CORBA::NO_IMPLEMENT ();
633 MUF_Scheduler::poa_policies (void)
635 throw CORBA::NO_IMPLEMENT ();
639 MUF_Scheduler::scheduling_discipline_name (void)
641 throw CORBA::NO_IMPLEMENT ();
644 RTScheduling::ResourceManager_ptr
645 MUF_Scheduler::create_resource_manager (const char *,
648 throw CORBA::NO_IMPLEMENT ();
652 MUF_Scheduler::set_scheduling_parameter (PortableServer::Servant
&,
656 throw CORBA::NO_IMPLEMENT ();