1 #include "MIF_Scheduler.h"
2 #include "Kokyu_qosC.h"
4 #include "tao/RTScheduling/Request_Interceptor.h"
5 #include "tao/CodecFactory/CodecFactory.h"
6 #include "tao/ORB_Constants.h"
8 MIF_Sched_Param_Policy::MIF_Sched_Param_Policy ()
12 MIF_Sched_Param_Policy::MIF_Sched_Param_Policy (
13 const MIF_Sched_Param_Policy
&rhs
17 MIF_Scheduling::SchedulingParameterPolicy (),
18 CORBA::LocalObject (),
23 MIF_Scheduling::SchedulingParameter
24 MIF_Sched_Param_Policy::value ()
30 MIF_Sched_Param_Policy::value (const MIF_Scheduling::SchedulingParameter
& value
)
36 MIF_Sched_Param_Policy::copy ()
38 MIF_Sched_Param_Policy
* tmp
= 0;
39 ACE_NEW_THROW_EX (tmp
,
40 MIF_Sched_Param_Policy (*this),
41 CORBA::NO_MEMORY (TAO::VMCID
,
42 CORBA::COMPLETED_NO
));
48 MIF_Sched_Param_Policy::policy_type ()
54 MIF_Sched_Param_Policy::destroy ()
58 MIF_Scheduler::MIF_Scheduler (CORBA::ORB_ptr orb
,
59 Kokyu::DSRT_Dispatcher_Impl_t disp_impl_type
,
62 : orb_ (CORBA::ORB::_duplicate (orb
)),
63 disp_impl_type_ (disp_impl_type
),
64 ace_sched_policy_ (ace_sched_policy
),
65 ace_sched_scope_ (ace_sched_scope
)
67 Kokyu::DSRT_ConfigInfo config
;
69 config
.impl_type_
= this->disp_impl_type_
;
70 config
.sched_policy_
= ace_sched_policy_
;
71 config
.sched_scope_
= ace_sched_scope_
;
73 Kokyu::DSRT_Dispatcher_Factory
<MIF_Scheduler_Traits
>::DSRT_Dispatcher_Auto_Ptr
74 tmp( Kokyu::DSRT_Dispatcher_Factory
<MIF_Scheduler_Traits
>::
75 create_DSRT_dispatcher (config
) );
76 kokyu_dispatcher_
= std::move(tmp
);
78 CORBA::Object_var object
=
79 orb
->resolve_initial_references ("RTScheduler_Current");
82 RTScheduling::Current::_narrow (object
.in ());
84 IOP::CodecFactory_var codec_factory
;
85 CORBA::Object_var obj
= orb
->resolve_initial_references ("CodecFactory");
87 if (CORBA::is_nil(obj
.in ()))
89 ACE_ERROR ((LM_ERROR
, "Nil Codec factory\n"));
93 codec_factory
= IOP::CodecFactory::_narrow (obj
.in ());
96 IOP::Encoding encoding
;
97 encoding
.format
= IOP::ENCODING_CDR_ENCAPS
;
98 encoding
.major_version
= 1;
99 encoding
.minor_version
= 2;
101 codec_
= codec_factory
->create_codec (encoding
);
104 MIF_Scheduler::~MIF_Scheduler ()
106 // delete kokyu_dispatcher_;
110 MIF_Scheduler::shutdown ()
112 kokyu_dispatcher_
->shutdown ();
113 ACE_DEBUG ((LM_DEBUG
, "kokyu DSRT dispatcher shutdown\n"));
116 MIF_Scheduling::SchedulingParameterPolicy_ptr
117 MIF_Scheduler::create_scheduling_parameter (const MIF_Scheduling::SchedulingParameter
& value
)
119 MIF_Scheduling::SchedulingParameterPolicy_ptr sched_param_policy
;
120 ACE_NEW_THROW_EX (sched_param_policy
,
121 MIF_Sched_Param_Policy
,
123 CORBA::SystemException::_tao_minor_code (
126 CORBA::COMPLETED_NO
));
128 sched_param_policy
->value (value
);
130 return sched_param_policy
;
135 MIF_Scheduler::begin_new_scheduling_segment (const RTScheduling::Current::IdType
& guid
,
137 CORBA::Policy_ptr sched_policy
,
140 #ifdef KOKYU_DSRT_LOGGING
141 ACE_DEBUG ((LM_DEBUG
,
142 "(%t|%T):MIF_Scheduler::begin_new_scheduling_segment enter\n"));
145 #ifdef KOKYU_DSRT_LOGGING
147 ACE_OS::memcpy (&int_guid
,
150 ACE_DEBUG ((LM_DEBUG
, "(%t|%T): guid is %d\n", int_guid
));
153 MIF_Scheduler_Traits::QoSDescriptor_t qos
;
154 MIF_Scheduling::SchedulingParameterPolicy_var sched_param_policy
=
155 MIF_Scheduling::SchedulingParameterPolicy::_narrow (sched_policy
);
157 MIF_Scheduling::SchedulingParameter_var sched_param
= sched_param_policy
->value ();
158 CORBA::Short importance
= sched_param
->importance
;
159 qos
.importance_
= importance
;
160 kokyu_dispatcher_
->schedule (guid
, qos
);
162 #ifdef KOKYU_DSRT_LOGGING
163 ACE_DEBUG ((LM_DEBUG
,
164 "(%t|%T):MIF_Scheduler::begin_new_scheduling_segment exit\n"));
169 MIF_Scheduler::begin_nested_scheduling_segment (const RTScheduling::Current::IdType
&guid
,
171 CORBA::Policy_ptr sched_param
,
172 CORBA::Policy_ptr implicit_sched_param
)
174 this->begin_new_scheduling_segment (guid
,
177 implicit_sched_param
);
181 MIF_Scheduler::update_scheduling_segment (const RTScheduling::Current::IdType
& guid
,
183 CORBA::Policy_ptr sched_policy
,
184 CORBA::Policy_ptr implicit_sched_param
)
186 ACE_UNUSED_ARG ((name
));
187 ACE_UNUSED_ARG ((implicit_sched_param
));
189 #ifdef KOKYU_DSRT_LOGGING
191 ACE_OS::memcpy (&int_guid
,
194 ACE_DEBUG ((LM_DEBUG
, "(%t|%T): update_sched_seg::guid is %d\n", int_guid
));
197 MIF_Scheduling::SchedulingParameterPolicy_var sched_param_policy
=
198 MIF_Scheduling::SchedulingParameterPolicy::_narrow (sched_policy
);
200 MIF_Scheduling::SchedulingParameter_var sched_param
= sched_param_policy
->value ();
201 CORBA::Short importance
= sched_param
->importance
;
203 MIF_Scheduler_Traits::QoSDescriptor_t qos
;
204 qos
.importance_
= importance
;
206 kokyu_dispatcher_
->update_schedule (guid
, qos
);
210 MIF_Scheduler::end_scheduling_segment (
211 const RTScheduling::Current::IdType
&guid
,
214 #ifdef KOKYU_DSRT_LOGGING
216 ACE_OS::memcpy (&int_guid
,
219 ACE_DEBUG ((LM_DEBUG
, "(%t|%T) call to end_sched_segment for guid %d\n", int_guid
));
222 kokyu_dispatcher_
->cancel_schedule (guid
);
226 MIF_Scheduler::end_nested_scheduling_segment (
227 const RTScheduling::Current::IdType
&,
235 MIF_Scheduler::send_request (PortableInterceptor::ClientRequestInfo_ptr ri
)
237 Kokyu::Svc_Ctxt_DSRT_QoS sc_qos
;
239 CORBA::String_var operation
= ri
->operation ();
241 #ifdef KOKYU_DSRT_LOGGING
242 ACE_DEBUG ((LM_DEBUG
,
243 "(%t|%T): send_request "
248 // Make the context to send the context to the target
249 IOP::ServiceContext sc
;
250 sc
.context_id
= Client_Interceptor::SchedulingInfo
;
252 CORBA::Policy_var sched_policy
=
253 this->current_
->scheduling_parameter();
255 RTScheduling::Current::IdType_var guid
= this->current_
->id ();
257 ACE_OS::memcpy (&guid,
261 CORBA::Short importance
;
262 if (CORBA::is_nil (sched_policy
.in ()))
268 MIF_Scheduling::SchedulingParameterPolicy_var sched_param_policy
=
269 MIF_Scheduling::SchedulingParameterPolicy::_narrow (sched_policy
.in ());
271 MIF_Scheduling::SchedulingParameter_var sched_param
= sched_param_policy
->value ();
272 importance
= sched_param
->importance
;
274 #ifdef KOKYU_DSRT_LOGGING
276 ACE_OS::memcpy (&int_guid
,
279 ACE_DEBUG ((LM_DEBUG
,
280 "(%t|%T): send_request importance from current = %d, guid = %d\n",
281 importance
, int_guid
));
284 //Fill the guid in the SC Qos struct
285 sc_qos
.guid
.length (guid
->length ());
286 guid_copy (sc_qos
.guid
, guid
.in ());
287 sc_qos
.importance
= importance
;
288 CORBA::Any sc_qos_as_any
;
289 sc_qos_as_any
<<= sc_qos
;
291 CORBA::OctetSeq_var cdtmp
= codec_
->encode (sc_qos_as_any
);
292 sc
.context_data
= cdtmp
.in ();
294 #ifdef KOKYU_DSRT_LOGGING
295 ACE_DEBUG ((LM_DEBUG
,
296 ACE_TEXT ("(%t|%T): send_request : about to add sched SC\n")));
299 // Add this context to the service context list.
300 ri
->add_request_service_context (sc
, 0);
304 #ifdef KOKYU_DSRT_LOGGING
305 ACE_DEBUG ((LM_DEBUG
,
306 ACE_TEXT ("(%t|%T): send_request : ")
307 ACE_TEXT ("about to call scheduler to inform block\n")));
310 kokyu_dispatcher_
->update_schedule (guid
.in (),
313 #ifdef KOKYU_DSRT_LOGGING
314 ACE_DEBUG ((LM_DEBUG
,
315 ACE_TEXT ("(%t|%T): send_request interceptor done\n")));
320 MIF_Scheduler::receive_request (PortableInterceptor::ServerRequestInfo_ptr ri
,
321 RTScheduling::Current::IdType_out guid_out
,
322 CORBA::String_out
/*name*/,
323 CORBA::Policy_out sched_param_out
,
324 CORBA::Policy_out
/*implicit_sched_param_out*/)
326 const Kokyu::Svc_Ctxt_DSRT_QoS
* sc_qos_ptr
= 0;
328 #ifdef KOKYU_DSRT_LOGGING
329 ACE_DEBUG ((LM_DEBUG
, "(%t|%T):entered MIF_Scheduler::receive_request\n"));
332 RTScheduling::Current::IdType guid
;
334 CORBA::String_var operation
= ri
->operation ();
336 #ifdef KOKYU_DSRT_LOGGING
337 ACE_DEBUG ((LM_DEBUG
,
338 "(%t|%T): receive_request from "
343 // Ignore the "_is_a" operation since it may have been invoked
344 // locally on the server side as a side effect of another call,
345 // meaning that the client hasn't added the service context yet.
346 if (ACE_OS::strcmp ("_is_a", operation
.in ()) == 0)
349 IOP::ServiceContext_var sc
=
350 ri
->get_request_service_context (Server_Interceptor::SchedulingInfo
);
352 CORBA::Short importance
;
360 CORBA::OctetSeq oc_seq
= CORBA::OctetSeq (sc
->context_data
.length (),
361 sc
->context_data
.length (),
362 sc
->context_data
.get_buffer (),
364 CORBA::Any sc_qos_as_any
;
365 CORBA::Any_var scqostmp
= codec_
->decode (oc_seq
);
366 sc_qos_as_any
= scqostmp
.in ();
367 //Don't store in a _var, since >>= returns a pointer to an
368 //internal buffer and we are not supposed to free it.
369 sc_qos_as_any
>>= sc_qos_ptr
;
371 importance
= sc_qos_ptr
->importance
;
372 guid
.length (sc_qos_ptr
->guid
.length ());
373 guid_copy (guid
, sc_qos_ptr
->guid
);
375 ACE_NEW (guid_out
.ptr (),
376 RTScheduling::Current::IdType
);
377 guid_out
.ptr ()->length (guid
.length ());
378 *(guid_out
.ptr ()) = guid
;
380 #ifdef KOKYU_DSRT_LOGGING
382 ACE_OS::memcpy (&int_guid
,
385 ACE_DEBUG ((LM_DEBUG
,
386 "(%t|%T): Importance = %d, guid = %d in recvd service context\n",
387 importance
, int_guid
));
390 MIF_Scheduling::SchedulingParameter sched_param
;
391 sched_param
.importance
= importance
;
392 sched_param_out
= this->create_scheduling_parameter (sched_param
);
395 MIF_Scheduler_Traits::QoSDescriptor_t qos
;
396 qos
.importance_
= importance
;
397 this->kokyu_dispatcher_
->schedule (guid
, qos
);
399 #ifdef KOKYU_DSRT_LOGGING
400 ACE_DEBUG ((LM_DEBUG
, "(%t|%T): receive_request interceptor done\n"));
406 MIF_Scheduler::send_reply (PortableInterceptor::ServerRequestInfo_ptr ri
)
408 CORBA::Short importance
= 0;
409 Kokyu::Svc_Ctxt_DSRT_QoS sc_qos
;
411 CORBA::String_var operation
= ri
->operation ();
413 #ifdef KOKYU_DSRT_LOGGING
414 ACE_DEBUG ((LM_DEBUG
,
415 "(%t|%T): send_reply from \"%s\"\n",
419 // Make the context to send the context to the target
420 IOP::ServiceContext sc
;
421 sc
.context_id
= Server_Interceptor::SchedulingInfo
;
423 CORBA::Policy_var sched_policy
=
424 this->current_
->scheduling_parameter();
426 RTScheduling::Current::IdType_var guid
= this->current_
->id ();
428 if (CORBA::is_nil (sched_policy
.in ()))
430 #ifdef KOKYU_DSRT_LOGGING
431 ACE_DEBUG ((LM_DEBUG
,
432 "(%t|%T): sched_policy nil. ",
433 "importance not set in sched params\n"));
439 #ifdef KOKYU_DSRT_LOGGING
440 ACE_DEBUG ((LM_DEBUG
,
441 "(%t|%T):sched_policy not nil. ",
442 "importance set in sched params\n"));
444 MIF_Scheduling::SchedulingParameterPolicy_var sched_param_policy
=
445 MIF_Scheduling::SchedulingParameterPolicy::_narrow (sched_policy
.in ());
447 MIF_Scheduling::SchedulingParameter_var sched_param
= sched_param_policy
->value ();
449 importance
= sched_param
->importance
;
451 //Fill the guid in the SC Qos struct
452 sc_qos
.guid
.length (guid
->length ());
453 guid_copy (sc_qos
.guid
, guid
.in ());
454 sc_qos
.importance
= importance
;
455 CORBA::Any sc_qos_as_any
;
456 sc_qos_as_any
<<= sc_qos
;
458 CORBA::OctetSeq_var cdtmp
= codec_
->encode (sc_qos_as_any
);
459 sc
.context_data
= cdtmp
.in ();
461 // Add this context to the service context list.
462 ri
->add_reply_service_context (sc
, 1);
464 #ifdef KOKYU_DSRT_LOGGING
465 ACE_DEBUG ((LM_DEBUG
, "(%t|%T):reply sc added\n"));
469 kokyu_dispatcher_
->update_schedule (guid
.in (),
472 #ifdef KOKYU_DSRT_LOGGING
473 ACE_DEBUG ((LM_DEBUG
, "(%t|%T): send_reply interceptor done\n"));
478 MIF_Scheduler::send_exception (PortableInterceptor::ServerRequestInfo_ptr ri
)
484 MIF_Scheduler::send_poll (PortableInterceptor::ClientRequestInfo_ptr
)
489 MIF_Scheduler::send_other (PortableInterceptor::ServerRequestInfo_ptr ri
)
495 MIF_Scheduler::receive_reply (PortableInterceptor::ClientRequestInfo_ptr ri
)
497 RTScheduling::Current::IdType guid
;
498 CORBA::Short importance
=0;
500 CORBA::String_var operation
= ri
->operation ();
502 CORBA::Object_var target
= ri
->target ();
504 ACE_CString opname
= operation
.in ();
505 #ifdef KOKYU_DSRT_LOGGING
506 ACE_DEBUG ((LM_DEBUG
,
507 "(%t|%T):receive_reply from "
512 // Check that the reply service context was received as
514 IOP::ServiceContext_var sc
=
515 ri
->get_reply_service_context (Client_Interceptor::SchedulingInfo
);
523 CORBA::OctetSeq oc_seq
= CORBA::OctetSeq (sc
->context_data
.length (),
524 sc
->context_data
.length (),
525 sc
->context_data
.get_buffer (),
528 //Don't store in a _var, since >>= returns a pointer to an internal buffer
529 //and we are not supposed to free it.
530 const Kokyu::Svc_Ctxt_DSRT_QoS
* sc_qos_ptr
= 0;
531 CORBA::Any sc_qos_as_any
;
532 CORBA::Any_var scqostmp
= codec_
->decode (oc_seq
);
533 sc_qos_as_any
= scqostmp
.in ();
534 sc_qos_as_any
>>= sc_qos_ptr
;
535 importance
= sc_qos_ptr
->importance
;
536 guid
.length (sc_qos_ptr
->guid
.length ());
537 guid_copy (guid
, sc_qos_ptr
->guid
);
539 #ifdef KOKYU_DSRT_LOGGING
540 ACE_DEBUG ((LM_DEBUG
,
541 "(%t|%T): Importance = %d in recvd service context\n",
546 MIF_Scheduler_Traits::QoSDescriptor_t qos
;
547 qos
.importance_
= importance
;
548 this->kokyu_dispatcher_
->schedule (guid
, qos
);
552 MIF_Scheduler::receive_exception (PortableInterceptor::ClientRequestInfo_ptr ri
)
558 MIF_Scheduler::receive_other (PortableInterceptor::ClientRequestInfo_ptr ri
)
564 MIF_Scheduler::cancel (const RTScheduling::Current::IdType
&)
569 MIF_Scheduler::scheduling_policies ()
571 throw CORBA::NO_IMPLEMENT ();
575 MIF_Scheduler::scheduling_policies (const CORBA::PolicyList
&)
577 throw CORBA::NO_IMPLEMENT ();
581 MIF_Scheduler::poa_policies ()
583 throw CORBA::NO_IMPLEMENT ();
587 MIF_Scheduler::scheduling_discipline_name ()
589 throw CORBA::NO_IMPLEMENT ();
592 RTScheduling::ResourceManager_ptr
593 MIF_Scheduler::create_resource_manager (const char *,
596 throw CORBA::NO_IMPLEMENT ();
600 MIF_Scheduler::set_scheduling_parameter (PortableServer::Servant
&,
604 throw CORBA::NO_IMPLEMENT ();