1 #include "FP_Scheduler.h"
2 #include "Kokyu_qosC.h"
4 #include "tao/ORB_Constants.h"
5 #include "tao/CodecFactory/CodecFactory.h"
6 #include "tao/RTScheduling/Request_Interceptor.h"
8 FP_Segment_Sched_Param_Policy::FP_Segment_Sched_Param_Policy ()
12 FP_Segment_Sched_Param_Policy::FP_Segment_Sched_Param_Policy (
13 const FP_Segment_Sched_Param_Policy
&rhs
)
16 FP_Scheduling::SegmentSchedulingParameterPolicy (),
17 CORBA::LocalObject (),
22 FP_Scheduling::SegmentSchedulingParameter
23 FP_Segment_Sched_Param_Policy::value ()
29 FP_Segment_Sched_Param_Policy::value (
30 const FP_Scheduling::SegmentSchedulingParameter
& value
)
36 FP_Segment_Sched_Param_Policy::copy ()
38 FP_Segment_Sched_Param_Policy
* tmp
= 0;
39 ACE_NEW_THROW_EX (tmp
, FP_Segment_Sched_Param_Policy (*this),
40 CORBA::NO_MEMORY (TAO::VMCID
,
41 CORBA::COMPLETED_NO
));
47 FP_Segment_Sched_Param_Policy::policy_type ()
53 FP_Segment_Sched_Param_Policy::destroy ()
57 Fixed_Priority_Scheduler::Fixed_Priority_Scheduler (
59 Kokyu::DSRT_Dispatcher_Impl_t disp_impl_type
,
62 : orb_ (CORBA::ORB::_duplicate (orb
)),
63 disp_impl_type_ (disp_impl_type
),
64 ace_sched_policy_ (ace_sched_policy
),
65 ace_sched_scope_ (ace_sched_scope
)
67 Kokyu::DSRT_ConfigInfo config
;
69 config
.impl_type_
= this->disp_impl_type_
;
70 config
.sched_policy_
= ace_sched_policy_
;
71 config
.sched_scope_
= ace_sched_scope_
;
73 Kokyu::DSRT_Dispatcher_Factory
<FP_Scheduler_Traits
>::DSRT_Dispatcher_Auto_Ptr
74 tmp( Kokyu::DSRT_Dispatcher_Factory
<FP_Scheduler_Traits
>::
75 create_DSRT_dispatcher (config
) );
76 kokyu_dispatcher_
= std::move(tmp
);
78 CORBA::Object_var object
=
79 orb
->resolve_initial_references ("RTScheduler_Current");
82 RTScheduling::Current::_narrow (object
.in ());
84 IOP::CodecFactory_var codec_factory
;
85 CORBA::Object_var obj
=
86 orb
->resolve_initial_references ("CodecFactory");
88 if (CORBA::is_nil(obj
.in ()))
90 ACE_ERROR ((LM_ERROR
, "Nil Codec factory\n"));
94 codec_factory
= IOP::CodecFactory::_narrow (obj
.in ());
97 IOP::Encoding encoding
;
98 encoding
.format
= IOP::ENCODING_CDR_ENCAPS
;
99 encoding
.major_version
= 1;
100 encoding
.minor_version
= 2;
102 codec_
= codec_factory
->create_codec (encoding
);
105 Fixed_Priority_Scheduler::~Fixed_Priority_Scheduler ()
107 // delete kokyu_dispatcher_;
111 Fixed_Priority_Scheduler::shutdown ()
113 kokyu_dispatcher_
->shutdown ();
114 ACE_DEBUG ((LM_DEBUG
, "kokyu DSRT dispatcher shutdown\n"));
117 FP_Scheduling::SegmentSchedulingParameterPolicy_ptr
118 Fixed_Priority_Scheduler::create_segment_scheduling_parameter (
119 const FP_Scheduling::SegmentSchedulingParameter
& value
122 FP_Scheduling::SegmentSchedulingParameterPolicy_ptr
123 segment_sched_param_policy
;
124 ACE_NEW_THROW_EX (segment_sched_param_policy
,
125 FP_Segment_Sched_Param_Policy
,
127 CORBA::SystemException::_tao_minor_code (
130 CORBA::COMPLETED_NO
));
132 segment_sched_param_policy
->value (value
);
134 return segment_sched_param_policy
;
139 Fixed_Priority_Scheduler::begin_new_scheduling_segment (const RTScheduling::Current::IdType
& guid
,
141 CORBA::Policy_ptr sched_policy
,
144 #ifdef KOKYU_DSRT_LOGGING
145 ACE_DEBUG ((LM_DEBUG
,
146 "(%t|%T):FP_Scheduler::begin_new_scheduling_segment enter\n"));
149 #ifdef KOKYU_DSRT_LOGGING
151 ACE_OS::memcpy (&int_guid
,
154 ACE_DEBUG ((LM_DEBUG
, "(%t|%T): guid is %d\n", int_guid
));
157 FP_Scheduler_Traits::QoSDescriptor_t qos
;
158 FP_Scheduling::SegmentSchedulingParameterPolicy_var sched_param_policy
=
159 FP_Scheduling::SegmentSchedulingParameterPolicy::_narrow (sched_policy
);
161 FP_Scheduling::SegmentSchedulingParameter sched_param
= sched_param_policy
->value ();
162 RTCORBA::Priority desired_priority
= sched_param
.base_priority
;
163 qos
.priority_
= desired_priority
;
164 kokyu_dispatcher_
->schedule (guid
, qos
);
166 #ifdef KOKYU_DSRT_LOGGING
167 ACE_DEBUG ((LM_DEBUG
,
168 "(%t|%T):FP_Scheduler::begin_new_scheduling_segment exit\n"));
173 Fixed_Priority_Scheduler::begin_nested_scheduling_segment (const RTScheduling::Current::IdType
&guid
,
175 CORBA::Policy_ptr sched_param
,
176 CORBA::Policy_ptr implicit_sched_param
)
178 this->begin_new_scheduling_segment (guid
,
181 implicit_sched_param
);
185 Fixed_Priority_Scheduler::update_scheduling_segment (const RTScheduling::Current::IdType
&guid
,
187 CORBA::Policy_ptr sched_policy
,
188 CORBA::Policy_ptr implicit_sched_param
)
190 ACE_UNUSED_ARG ((name
));
191 ACE_UNUSED_ARG ((implicit_sched_param
));
193 #ifdef KOKYU_DSRT_LOGGING
195 ACE_OS::memcpy (&int_guid
,
198 ACE_DEBUG ((LM_DEBUG
, "(%t|%T): update_sched_seg::guid is %d\n", int_guid
));
201 FP_Scheduling::SegmentSchedulingParameterPolicy_var sched_param_policy
=
202 FP_Scheduling::SegmentSchedulingParameterPolicy::_narrow (sched_policy
);
204 FP_Scheduling::SegmentSchedulingParameter sched_param
=
205 sched_param_policy
->value ();
207 RTCORBA::Priority desired_priority
= sched_param
.base_priority
;
209 FP_Scheduler_Traits::QoSDescriptor_t qos
;
210 qos
.priority_
= desired_priority
;
212 kokyu_dispatcher_
->update_schedule (guid
, qos
);
216 Fixed_Priority_Scheduler::end_scheduling_segment (const RTScheduling::Current::IdType
&guid
,
219 #ifdef KOKYU_DSRT_LOGGING
221 ACE_OS::memcpy (&int_guid
,
224 ACE_DEBUG ((LM_DEBUG
, "(%t|%T) call to end_sched_segment for guid %d\n", int_guid
));
227 kokyu_dispatcher_
->cancel_schedule (guid
);
231 Fixed_Priority_Scheduler::end_nested_scheduling_segment (const RTScheduling::Current::IdType
&,
239 Fixed_Priority_Scheduler::send_request (PortableInterceptor::ClientRequestInfo_ptr ri
)
241 Kokyu::Svc_Ctxt_DSRT_QoS sc_qos
;
243 CORBA::String_var operation
= ri
->operation ();
245 #ifdef KOKYU_DSRT_LOGGING
246 ACE_DEBUG ((LM_DEBUG
,
247 "(%t|%T): send_request "
251 // Make the context to send the context to the target
252 IOP::ServiceContext sc
;
253 sc
.context_id
= Client_Interceptor::SchedulingInfo
;
255 CORBA::Policy_var sched_policy
=
256 this->current_
->scheduling_parameter();
258 RTScheduling::Current::IdType_var guid
= this->current_
->id ();
260 ACE_OS::memcpy (&guid,
264 RTCORBA::Priority desired_priority
;
265 if (CORBA::is_nil (sched_policy
.in ()))
267 desired_priority
= 0;
271 FP_Scheduling::SegmentSchedulingParameterPolicy_var sched_param_policy
=
272 FP_Scheduling::SegmentSchedulingParameterPolicy::_narrow (sched_policy
.in ());
274 FP_Scheduling::SegmentSchedulingParameter sched_param
=
275 sched_param_policy
->value ();
277 desired_priority
= sched_param
.base_priority
;
279 #ifdef KOKYU_DSRT_LOGGING
281 ACE_OS::memcpy (&int_guid
,
284 ACE_DEBUG ((LM_DEBUG
,
285 ACE_TEXT ("(%t): send_request desired_priority from current = %d, guid = %d\n"),
286 desired_priority
, int_guid
));
289 //Fill the guid in the SC Qos struct
290 sc_qos
.guid
.length (guid
->length ());
291 guid_copy (sc_qos
.guid
, guid
.in ());
292 sc_qos
.desired_priority
= desired_priority
;
293 CORBA::Any sc_qos_as_any
;
294 sc_qos_as_any
<<= sc_qos
;
296 CORBA::OctetSeq_var cdtmp
= codec_
->encode (sc_qos_as_any
);
297 sc
.context_data
= cdtmp
.in ();
299 #ifdef KOKYU_DSRT_LOGGING
300 ACE_DEBUG ((LM_DEBUG
,
301 ACE_TEXT ("(%t|%T): send_request : about to add sched SC\n")));
304 // Add this context to the service context list.
305 ri
->add_request_service_context (sc
, 0);
309 #ifdef KOKYU_DSRT_LOGGING
310 ACE_DEBUG ((LM_DEBUG
,
311 ACE_TEXT ("(%t|%T): send_request : ")
312 ACE_TEXT ("about to call scheduler to inform block\n")));
315 kokyu_dispatcher_
->update_schedule (guid
.in (),
318 #ifdef KOKYU_DSRT_LOGGING
319 ACE_DEBUG ((LM_DEBUG
,
320 ACE_TEXT ("(%t|%T): send_request interceptor done\n")));
325 Fixed_Priority_Scheduler::receive_request (PortableInterceptor::ServerRequestInfo_ptr ri
,
326 RTScheduling::Current::IdType_out guid_out
,
327 CORBA::String_out
/*name*/,
328 CORBA::Policy_out sched_param_out
,
329 CORBA::Policy_out
/*implicit_sched_param_out*/)
331 const Kokyu::Svc_Ctxt_DSRT_QoS
* sc_qos_ptr
= 0;
333 #ifdef KOKYU_DSRT_LOGGING
334 ACE_DEBUG ((LM_DEBUG
, "(%t|%T):entered FP_Scheduler::receive_request\n"));
337 RTScheduling::Current::IdType guid
;
339 CORBA::String_var operation
= ri
->operation ();
341 #ifdef KOKYU_DSRT_LOGGING
342 ACE_DEBUG ((LM_DEBUG
,
343 "(%t|%T): receive_request from "
348 // Ignore the "_is_a" operation since it may have been invoked
349 // locally on the server side as a side effect of another call,
350 // meaning that the client hasn't added the service context yet.
351 if (ACE_OS::strcmp ("_is_a", operation
.in ()) == 0)
354 IOP::ServiceContext_var sc
=
355 ri
->get_request_service_context (Server_Interceptor::SchedulingInfo
);
357 RTCORBA::Priority desired_priority
;
361 desired_priority
= 0;
365 CORBA::OctetSeq oc_seq
= CORBA::OctetSeq (sc
->context_data
.length (),
366 sc
->context_data
.length (),
367 sc
->context_data
.get_buffer (),
369 CORBA::Any sc_qos_as_any
;
370 CORBA::Any_var scqostmp
= codec_
->decode (oc_seq
);
371 sc_qos_as_any
= scqostmp
.in ();
372 //Don't store in a _var, since >>= returns a pointer to an
373 //internal buffer and we are not supposed to free it.
374 sc_qos_as_any
>>= sc_qos_ptr
;
376 desired_priority
= sc_qos_ptr
->desired_priority
;
377 guid
.length (sc_qos_ptr
->guid
.length ());
378 guid_copy (guid
, sc_qos_ptr
->guid
);
380 ACE_NEW (guid_out
.ptr (),
381 RTScheduling::Current::IdType
);
382 guid_out
.ptr ()->length (guid
.length ());
383 *(guid_out
.ptr ()) = guid
;
385 #ifdef KOKYU_DSRT_LOGGING
387 ACE_OS::memcpy (&int_guid
,
390 ACE_DEBUG ((LM_DEBUG
,
391 "(%t|%T): Desired_Priority = %d, guid = %d in recvd service context\n",
396 FP_Scheduling::SegmentSchedulingParameter sched_param
;
397 sched_param
.base_priority
= desired_priority
;
398 sched_param_out
= this->create_segment_scheduling_parameter (sched_param
);
401 FP_Scheduler_Traits::QoSDescriptor_t qos
;
402 qos
.priority_
= desired_priority
;
403 this->kokyu_dispatcher_
->schedule (guid
, qos
);
405 #ifdef KOKYU_DSRT_LOGGING
406 ACE_DEBUG ((LM_DEBUG
, "(%t|%T): receive_request interceptor done\n"));
412 Fixed_Priority_Scheduler::send_poll (PortableInterceptor::ClientRequestInfo_ptr
)
417 Fixed_Priority_Scheduler::send_reply (PortableInterceptor::ServerRequestInfo_ptr ri
)
419 RTCORBA::Priority desired_priority
= 0;
420 Kokyu::Svc_Ctxt_DSRT_QoS sc_qos
;
422 CORBA::String_var operation
= ri
->operation ();
424 #ifdef KOKYU_DSRT_LOGGING
425 ACE_DEBUG ((LM_DEBUG
,
426 "(%t|%T): send_reply from \"%s\"\n",
430 // Make the context to send the context to the target
431 IOP::ServiceContext sc
;
432 sc
.context_id
= Server_Interceptor::SchedulingInfo
;
434 ACE_DEBUG ((LM_DEBUG
, "in send_reply: before accessing current_->sched_param\n"));
435 CORBA::Policy_var sched_policy
=
436 this->current_
->scheduling_parameter();
438 RTScheduling::Current::IdType_var guid
= this->current_
->id ();
440 if (CORBA::is_nil (sched_policy
.in ()))
442 ACE_DEBUG ((LM_DEBUG
, "sched_policy nil. desired_priority not set in sched params\n"));
443 desired_priority
= 0;
447 ACE_DEBUG ((LM_DEBUG
, "sched_policy not nil. desired_priority set in sched params\n"));
449 FP_Scheduling::SegmentSchedulingParameterPolicy_var sched_param_policy
=
450 FP_Scheduling::SegmentSchedulingParameterPolicy::_narrow (sched_policy
.in ());
452 FP_Scheduling::SegmentSchedulingParameter sched_param
=
453 sched_param_policy
->value ();
455 desired_priority
= sched_param
.base_priority
;
457 //Fill the guid in the SC Qos struct
458 sc_qos
.guid
.length (guid
->length ());
459 guid_copy (sc_qos
.guid
, guid
.in ());
460 sc_qos
.desired_priority
= desired_priority
;
461 CORBA::Any sc_qos_as_any
;
462 sc_qos_as_any
<<= sc_qos
;
464 CORBA::OctetSeq_var cdtmp
= codec_
->encode (sc_qos_as_any
);
465 sc
.context_data
= cdtmp
.in ();
467 // Add this context to the service context list.
468 ri
->add_reply_service_context (sc
, 1);
470 #ifdef KOKYU_DSRT_LOGGING
471 ACE_DEBUG ((LM_DEBUG
, "(%t|%T):reply sc added\n"));
475 kokyu_dispatcher_
->update_schedule (guid
.in (),
478 #ifdef KOKYU_DSRT_LOGGING
479 ACE_DEBUG ((LM_DEBUG
, "(%t|%T): send_reply interceptor done\n"));
484 Fixed_Priority_Scheduler::send_exception (PortableInterceptor::ServerRequestInfo_ptr ri
)
490 Fixed_Priority_Scheduler::send_other (PortableInterceptor::ServerRequestInfo_ptr ri
)
496 Fixed_Priority_Scheduler::receive_reply (PortableInterceptor::ClientRequestInfo_ptr ri
)
498 RTScheduling::Current::IdType guid
;
499 RTCORBA::Priority desired_priority
=0;
501 CORBA::String_var operation
= ri
->operation ();
503 CORBA::Object_var target
= ri
->target ();
505 ACE_CString opname
= operation
.in ();
506 #ifdef KOKYU_DSRT_LOGGING
507 ACE_DEBUG ((LM_DEBUG
,
508 "(%t|%T):receive_reply from "
513 // Check that the reply service context was received as
515 IOP::ServiceContext_var sc
=
516 ri
->get_reply_service_context (Client_Interceptor::SchedulingInfo
);
520 desired_priority
= 0;
524 CORBA::OctetSeq oc_seq
= CORBA::OctetSeq (sc
->context_data
.length (),
525 sc
->context_data
.length (),
526 sc
->context_data
.get_buffer (),
529 //Don't store in a _var, since >>= returns a pointer to an internal buffer
530 //and we are not supposed to free it.
531 const Kokyu::Svc_Ctxt_DSRT_QoS
* sc_qos_ptr
= 0;
532 CORBA::Any sc_qos_as_any
;
533 CORBA::Any_var scqostmp
= codec_
->decode (oc_seq
);
534 sc_qos_as_any
= scqostmp
.in ();
535 sc_qos_as_any
>>= sc_qos_ptr
;
537 desired_priority
= sc_qos_ptr
->desired_priority
;
538 guid
.length (sc_qos_ptr
->guid
.length ());
539 guid_copy (guid
, sc_qos_ptr
->guid
);
541 #ifdef KOKYU_DSRT_LOGGING
542 ACE_DEBUG ((LM_DEBUG
,
543 "(%t): Desired_Priority = %d in recvd service context\n",
548 FP_Scheduler_Traits::QoSDescriptor_t qos
;
549 qos
.priority_
= desired_priority
;
550 this->kokyu_dispatcher_
->schedule (guid
, qos
);
554 Fixed_Priority_Scheduler::receive_exception (PortableInterceptor::ClientRequestInfo_ptr ri
)
560 Fixed_Priority_Scheduler::receive_other (PortableInterceptor::ClientRequestInfo_ptr ri
)
566 Fixed_Priority_Scheduler::cancel (const RTScheduling::Current::IdType
&)
568 throw CORBA::NO_IMPLEMENT ();
572 Fixed_Priority_Scheduler::scheduling_policies ()
574 throw CORBA::NO_IMPLEMENT ();
578 Fixed_Priority_Scheduler::scheduling_policies (const CORBA::PolicyList
&)
580 throw CORBA::NO_IMPLEMENT ();
584 Fixed_Priority_Scheduler::poa_policies ()
586 throw CORBA::NO_IMPLEMENT ();
590 Fixed_Priority_Scheduler::scheduling_discipline_name ()
592 throw CORBA::NO_IMPLEMENT ();
595 RTScheduling::ResourceManager_ptr
596 Fixed_Priority_Scheduler::create_resource_manager (const char *,
599 throw CORBA::NO_IMPLEMENT ();
603 Fixed_Priority_Scheduler::set_scheduling_parameter (PortableServer::Servant
&,
607 throw CORBA::NO_IMPLEMENT ();