Merge pull request #2301 from sonndinh/remove-dup-reactor-functions
[ACE_TAO.git] / TAO / examples / Kokyu_dsrt_schedulers / MUF_Scheduler.cpp
blobfe0b50123ccd571c6b14b6f68a3e6a2f5a6a73b7
1 #include "MUF_Scheduler.h"
2 #include "Kokyu_qosC.h"
3 #include "utils.h"
4 #include "tao/RTScheduling/Request_Interceptor.h"
5 #include "tao/CodecFactory/CodecFactory.h"
6 #include "tao/ORB_Constants.h"
8 MUF_Sched_Param_Policy::MUF_Sched_Param_Policy ()
12 MUF_Sched_Param_Policy::MUF_Sched_Param_Policy (
13 const MUF_Sched_Param_Policy &rhs
15 : CORBA::Object (),
16 CORBA::Policy (),
17 MUF_Scheduling::SchedulingParameterPolicy (),
18 CORBA::LocalObject (),
19 value_ (rhs.value_)
23 MUF_Scheduling::SchedulingParameter
24 MUF_Sched_Param_Policy::value ()
26 return this->value_;
29 void
30 MUF_Sched_Param_Policy::value (const MUF_Scheduling::SchedulingParameter& value)
32 this->value_ = value;
35 CORBA::Policy_ptr
36 MUF_Sched_Param_Policy::copy ()
38 MUF_Sched_Param_Policy* tmp = 0;
39 ACE_NEW_THROW_EX (tmp,
40 MUF_Sched_Param_Policy (*this),
41 CORBA::NO_MEMORY (TAO::VMCID,
42 CORBA::COMPLETED_NO));
44 return tmp;
47 CORBA::PolicyType
48 MUF_Sched_Param_Policy::policy_type ()
50 return 0;
53 void
54 MUF_Sched_Param_Policy::destroy ()
58 MUF_Scheduler::MUF_Scheduler (CORBA::ORB_ptr orb,
59 Kokyu::DSRT_Dispatcher_Impl_t disp_impl_type,
60 int ace_sched_policy,
61 int ace_sched_scope)
62 : orb_ (CORBA::ORB::_duplicate (orb)),
63 disp_impl_type_ (disp_impl_type),
64 ace_sched_policy_ (ace_sched_policy),
65 ace_sched_scope_ (ace_sched_scope)
67 Kokyu::DSRT_ConfigInfo config;
69 config.impl_type_ = this->disp_impl_type_;
70 config.sched_policy_ = ace_sched_policy_;
71 config.sched_scope_ = ace_sched_scope_;
73 Kokyu::DSRT_Dispatcher_Factory<MUF_Scheduler_Traits>::DSRT_Dispatcher_Auto_Ptr
74 tmp( Kokyu::DSRT_Dispatcher_Factory<MUF_Scheduler_Traits>::
75 create_DSRT_dispatcher (config) );
76 kokyu_dispatcher_ = std::move(tmp);
78 CORBA::Object_var object =
79 orb->resolve_initial_references ("RTScheduler_Current");
81 this->current_ =
82 RTScheduling::Current::_narrow (object.in ());
84 IOP::CodecFactory_var codec_factory;
85 CORBA::Object_var obj =
86 orb->resolve_initial_references ("CodecFactory");
88 if (CORBA::is_nil(obj.in ()))
90 ACE_ERROR ((LM_ERROR, "Nil Codec factory\n"));
92 else
94 codec_factory = IOP::CodecFactory::_narrow (obj.in ());
97 IOP::Encoding encoding;
98 encoding.format = IOP::ENCODING_CDR_ENCAPS;
99 encoding.major_version = 1;
100 encoding.minor_version = 2;
102 codec_ = codec_factory->create_codec (encoding);
105 MUF_Scheduler::~MUF_Scheduler ()
109 void
110 MUF_Scheduler::shutdown ()
112 kokyu_dispatcher_->shutdown ();
113 ACE_DEBUG ((LM_DEBUG, "kokyu DSRT dispatcher shutdown\n"));
116 MUF_Scheduling::SchedulingParameterPolicy_ptr
117 MUF_Scheduler::create_scheduling_parameter (const MUF_Scheduling::SchedulingParameter & value)
119 MUF_Scheduling::SchedulingParameterPolicy_ptr sched_param_policy;
120 ACE_NEW_THROW_EX (sched_param_policy,
121 MUF_Sched_Param_Policy,
122 CORBA::NO_MEMORY (
123 CORBA::SystemException::_tao_minor_code (
124 TAO::VMCID,
125 ENOMEM),
126 CORBA::COMPLETED_NO));
128 sched_param_policy->value (value);
130 return sched_param_policy;
134 void
135 MUF_Scheduler::begin_new_scheduling_segment (const RTScheduling::Current::IdType &guid,
136 const char *,
137 CORBA::Policy_ptr sched_policy,
138 CORBA::Policy_ptr)
140 #ifdef KOKYU_DSRT_LOGGING
141 ACE_DEBUG ((LM_DEBUG,
142 "(%t|%T):MUF_Scheduler::begin_new_scheduling_segment enter\n"));
143 #endif
145 #ifdef KOKYU_DSRT_LOGGING
146 int int_guid;
147 ACE_OS::memcpy (&int_guid,
148 guid.get_buffer (),
149 guid.length ());
150 ACE_DEBUG ((LM_DEBUG, "(%t|%T): guid is %d\n", int_guid));
151 #endif
153 MUF_Scheduler_Traits::QoSDescriptor_t qos;
154 MUF_Scheduling::SchedulingParameterPolicy_var sched_param_policy =
155 MUF_Scheduling::SchedulingParameterPolicy::_narrow (sched_policy);
157 MUF_Scheduling::SchedulingParameter_var sched_param = sched_param_policy->value ();
159 qos.deadline_ = sched_param->deadline;
160 qos.exec_time_ = sched_param->estimated_initial_execution_time;
161 qos.criticality_ = sched_param->criticality;
163 kokyu_dispatcher_->schedule (guid, qos);
165 #ifdef KOKYU_DSRT_LOGGING
166 ACE_DEBUG ((LM_DEBUG,
167 "(%t|%T):MUF_Scheduler::begin_new_scheduling_segment exit\n"));
168 #endif
172 void
173 MUF_Scheduler::begin_nested_scheduling_segment (const RTScheduling::Current::IdType &guid,
174 const char *name,
175 CORBA::Policy_ptr sched_param,
176 CORBA::Policy_ptr implicit_sched_param)
178 this->begin_new_scheduling_segment (guid,
179 name,
180 sched_param,
181 implicit_sched_param);
184 void
185 MUF_Scheduler::update_scheduling_segment (const RTScheduling::Current::IdType& guid,
186 const char* name,
187 CORBA::Policy_ptr sched_policy,
188 CORBA::Policy_ptr implicit_sched_param)
190 ACE_UNUSED_ARG ((name));
191 ACE_UNUSED_ARG ((implicit_sched_param));
193 #ifdef KOKYU_DSRT_LOGGING
194 int int_guid ;
195 ACE_OS::memcpy (&int_guid,
196 guid.get_buffer (),
197 guid.length ());
198 ACE_DEBUG ((LM_DEBUG, "(%t|%T): update_sched_seg::guid is %d\n", int_guid));
199 #endif
201 MUF_Scheduling::SchedulingParameterPolicy_var sched_param_policy =
202 MUF_Scheduling::SchedulingParameterPolicy::_narrow (sched_policy);
204 MUF_Scheduling::SchedulingParameter_var sched_param = sched_param_policy->value ();
205 MUF_Scheduler_Traits::QoSDescriptor_t qos;
207 qos.deadline_ = sched_param->deadline;
208 qos.exec_time_ = sched_param->estimated_initial_execution_time;
209 qos.criticality_ = sched_param->criticality;
211 kokyu_dispatcher_->update_schedule (guid, qos);
214 void
215 MUF_Scheduler::end_scheduling_segment (const RTScheduling::Current::IdType &guid,
216 const char *)
218 #ifdef KOKYU_DSRT_LOGGING
219 int int_guid;
220 ACE_OS::memcpy (&int_guid,
221 guid.get_buffer (),
222 guid.length ());
223 ACE_DEBUG ((LM_DEBUG, "(%t|%T) call to end_sched_segment for guid %d\n", int_guid));
224 #endif
226 kokyu_dispatcher_->cancel_schedule (guid);
229 void
230 MUF_Scheduler::end_nested_scheduling_segment (const RTScheduling::Current::IdType &,
231 const char *,
232 CORBA::Policy_ptr)
237 void
238 MUF_Scheduler::send_request (PortableInterceptor::ClientRequestInfo_ptr ri)
240 Kokyu::Svc_Ctxt_DSRT_QoS sc_qos;
242 CORBA::String_var operation = ri->operation ();
244 #ifdef KOKYU_DSRT_LOGGING
245 ACE_DEBUG ((LM_DEBUG,
246 "(%t|%T): send_request "
247 "from \"%s\"\n",
248 operation.in ()));
249 #endif
251 // Make the context to send the context to the target
252 IOP::ServiceContext sc;
253 sc.context_id = Client_Interceptor::SchedulingInfo;
255 CORBA::Policy_var sched_policy =
256 this->current_->scheduling_parameter();
258 RTScheduling::Current::IdType_var guid = this->current_->id ();
261 ACE_OS::memcpy (&guid,
262 guid->get_buffer (),
263 guid->length ());
266 CORBA::Long criticality;
267 TimeBase::TimeT deadline,exec_time;
269 if (CORBA::is_nil (sched_policy.in ()))
271 //24 hrs from now - infinity
272 ACE_Time_Value deadline_tv = ACE_OS::gettimeofday () + ACE_Time_Value (24*60*60,0);
273 deadline = deadline_tv.sec () * 1000000 + deadline_tv.usec () * 10; //100s of nanoseconds for TimeBase::TimeT
274 exec_time = 0;
275 criticality = 0;
277 else
279 MUF_Scheduling::SchedulingParameterPolicy_var sched_param_policy =
280 MUF_Scheduling::SchedulingParameterPolicy::_narrow (sched_policy.in ());
282 MUF_Scheduling::SchedulingParameter_var sched_param = sched_param_policy->value ();
283 deadline = sched_param->deadline;
284 exec_time = sched_param->estimated_initial_execution_time;
285 criticality = sched_param->criticality;
287 #ifdef KOKYU_DSRT_LOGGING
288 int int_guid;
289 ACE_OS::memcpy (&int_guid,
290 guid->get_buffer (),
291 guid->length ());
292 ACE_DEBUG ((LM_DEBUG,
293 "(%t|%T): send_request guid = %d\n",
294 int_guid));
295 #endif
297 //Fill the guid in the SC Qos struct
298 sc_qos.guid.length (guid->length ());
299 guid_copy (sc_qos.guid, guid.in ());
300 sc_qos.deadline = deadline;
301 sc_qos.estimated_initial_execution_time = exec_time;
302 sc_qos.criticality = criticality;
303 CORBA::Any sc_qos_as_any;
304 sc_qos_as_any <<= sc_qos;
306 CORBA::OctetSeq_var cdtmp = codec_->encode (sc_qos_as_any);
307 sc.context_data = cdtmp.in ();
309 #ifdef KOKYU_DSRT_LOGGING
310 ACE_DEBUG ((LM_DEBUG,
311 ACE_TEXT ("(%t|%T): send_request : about to add sched SC\n")));
312 #endif
314 // Add this context to the service context list.
315 ri->add_request_service_context (sc, 0);
319 #ifdef KOKYU_DSRT_LOGGING
320 ACE_DEBUG ((LM_DEBUG,
321 ACE_TEXT ("(%t|%T): send_request : ")
322 ACE_TEXT ("about to call scheduler to inform block\n")));
323 #endif
325 kokyu_dispatcher_->update_schedule (guid.in (),
326 Kokyu::BLOCK);
328 #ifdef KOKYU_DSRT_LOGGING
329 ACE_DEBUG ((LM_DEBUG,
330 ACE_TEXT ("(%t|%T): send_request interceptor done\n")));
331 #endif
334 void
335 MUF_Scheduler::receive_request (PortableInterceptor::ServerRequestInfo_ptr ri,
336 RTScheduling::Current::IdType_out guid_out,
337 CORBA::String_out /*name*/,
338 CORBA::Policy_out sched_param_out,
339 CORBA::Policy_out /*implicit_sched_param_out*/)
341 const Kokyu::Svc_Ctxt_DSRT_QoS* sc_qos_ptr = 0;
343 #ifdef KOKYU_DSRT_LOGGING
344 ACE_DEBUG ((LM_DEBUG, "(%t|%T):entered MUF_Scheduler::receive_request\n"));
345 #endif
347 RTScheduling::Current::IdType guid;
349 CORBA::String_var operation = ri->operation ();
351 #ifdef KOKYU_DSRT_LOGGING
352 ACE_DEBUG ((LM_DEBUG,
353 "(%t|%T): receive_request from "
354 "\"%s\"\n",
355 operation.in ()));
356 #endif
358 // Ignore the "_is_a" operation since it may have been invoked
359 // locally on the server side as a side effect of another call,
360 // meaning that the client hasn't added the service context yet.
361 if (ACE_OS::strcmp ("_is_a", operation.in ()) == 0)
362 return;
364 IOP::ServiceContext_var sc =
365 ri->get_request_service_context (Server_Interceptor::SchedulingInfo);
367 CORBA::Long criticality;
368 TimeBase::TimeT deadline,exec_time;
370 if (sc.ptr () == 0)
372 //24 hrs from now - infinity
373 ACE_Time_Value deadline_tv = ACE_OS::gettimeofday () + ACE_Time_Value (24*60*60,0);
374 deadline = deadline_tv.sec ()*1000000 + deadline_tv.usec ()*10; //100s of nanoseconds for TimeBase::TimeT
375 exec_time = 0;
376 criticality = 0;
378 else
380 CORBA::OctetSeq oc_seq = CORBA::OctetSeq (sc->context_data.length (),
381 sc->context_data.length (),
382 sc->context_data.get_buffer (),
384 CORBA::Any sc_qos_as_any;
385 CORBA::Any_var scqostmp = codec_->decode (oc_seq);
386 sc_qos_as_any = scqostmp.in ();
387 //Don't store in a _var, since >>= returns a pointer to an
388 //internal buffer and we are not supposed to free it.
389 sc_qos_as_any >>= sc_qos_ptr;
391 deadline = sc_qos_ptr->deadline;
392 criticality = sc_qos_ptr->criticality;
393 exec_time = sc_qos_ptr->estimated_initial_execution_time;
395 guid.length (sc_qos_ptr->guid.length ());
396 guid_copy (guid, sc_qos_ptr->guid);
398 ACE_NEW (guid_out.ptr (),
399 RTScheduling::Current::IdType);
400 guid_out.ptr ()->length (guid.length ());
401 *(guid_out.ptr ()) = guid;
403 #ifdef KOKYU_DSRT_LOGGING
404 int int_guid;
405 ACE_OS::memcpy (&int_guid,
406 guid.get_buffer (),
407 guid.length ());
408 ACE_DEBUG ((LM_DEBUG,
409 "(%t|%T): Criticality = %d, guid = %d "
410 "in recvd service context\n",
411 criticality,
412 int_guid));
413 #endif
414 MUF_Scheduling::SchedulingParameter sched_param;
415 sched_param.criticality = criticality;
416 sched_param.deadline = deadline;
417 sched_param_out = this->create_scheduling_parameter (sched_param);
420 MUF_Scheduler_Traits::QoSDescriptor_t qos;
421 qos.criticality_ = criticality;
422 qos.deadline_ = deadline;
423 qos.exec_time_ = exec_time;
425 this->kokyu_dispatcher_->schedule (guid, qos);
427 #ifdef KOKYU_DSRT_LOGGING
428 ACE_DEBUG ((LM_DEBUG, "(%t|%T): receive_request interceptor done\n"));
429 #endif
433 void
434 MUF_Scheduler::send_poll (PortableInterceptor::ClientRequestInfo_ptr)
438 void
439 MUF_Scheduler::send_reply (PortableInterceptor::ServerRequestInfo_ptr ri)
441 Kokyu::Svc_Ctxt_DSRT_QoS sc_qos;
443 CORBA::String_var operation = ri->operation ();
445 #ifdef KOKYU_DSRT_LOGGING
446 ACE_DEBUG ((LM_DEBUG,
447 "(%t|%T): send_reply from \"%s\"\n",
448 ri->operation ()));
449 #endif
451 // Make the context to send the context to the target
452 IOP::ServiceContext sc;
453 sc.context_id = Server_Interceptor::SchedulingInfo;
456 CORBA::Long criticality;
457 TimeBase::TimeT deadline,exec_time;
459 CORBA::Policy_var sched_policy =
460 this->current_->scheduling_parameter();
462 RTScheduling::Current::IdType_var guid = this->current_->id ();
464 if (CORBA::is_nil (sched_policy.in ()))
466 #ifdef KOKYU_DSRT_LOGGING
467 ACE_DEBUG ((LM_DEBUG,
468 "(%t|%T): sched_policy nil.\n"));
469 #endif
470 //24 hrs from now - infinity
471 ACE_Time_Value deadline_tv = ACE_OS::gettimeofday () + ACE_Time_Value (24*60*60,0);
472 deadline = deadline_tv.sec ()*1000000 + deadline_tv.usec ()*10; //100s of nanoseconds for TimeBase::TimeT
473 exec_time = 0;
474 criticality = 0;
476 else
478 #ifdef KOKYU_DSRT_LOGGING
479 ACE_DEBUG ((LM_DEBUG,
480 "(%t|%T):sched_policy not nil. ",
481 "sched params set\n"));
482 #endif
483 MUF_Scheduling::SchedulingParameterPolicy_var sched_param_policy =
484 MUF_Scheduling::SchedulingParameterPolicy::_narrow (sched_policy.in ());
485 MUF_Scheduling::SchedulingParameter_var sched_param = sched_param_policy->value ();
488 sc_qos.guid.length (guid->length ());
489 guid_copy (sc_qos.guid, guid.in ());
491 deadline = sched_param->deadline;
492 exec_time = sched_param->estimated_initial_execution_time;
493 criticality = sched_param->criticality;
494 sc_qos.deadline = deadline;
495 sc_qos.estimated_initial_execution_time = exec_time;
496 sc_qos.criticality = criticality;
498 CORBA::Any sc_qos_as_any;
499 sc_qos_as_any <<= sc_qos;
501 CORBA::OctetSeq_var cdtmp = codec_->encode (sc_qos_as_any);
502 sc.context_data = cdtmp.in ();
504 // Add this context to the service context list.
505 ri->add_reply_service_context (sc, 1);
507 #ifdef KOKYU_DSRT_LOGGING
508 ACE_DEBUG ((LM_DEBUG, "(%t|%T):reply sc added\n"));
509 #endif
512 kokyu_dispatcher_->update_schedule (guid.in (),
513 Kokyu::BLOCK);
515 #ifdef KOKYU_DSRT_LOGGING
516 ACE_DEBUG ((LM_DEBUG, "(%t|%T): send_reply interceptor done\n"));
517 #endif
520 void
521 MUF_Scheduler::send_exception (PortableInterceptor::ServerRequestInfo_ptr ri)
523 send_reply (ri);
526 void
527 MUF_Scheduler::send_other (PortableInterceptor::ServerRequestInfo_ptr ri)
529 send_reply (ri);
532 void
533 MUF_Scheduler::receive_reply (PortableInterceptor::ClientRequestInfo_ptr ri)
535 RTScheduling::Current::IdType guid;
537 CORBA::String_var operation = ri->operation ();
539 CORBA::Object_var target = ri->target ();
541 ACE_CString opname = operation.in ();
542 #ifdef KOKYU_DSRT_LOGGING
543 ACE_DEBUG ((LM_DEBUG,
544 "(%t|%T):receive_reply from "
545 "\"%s\"\n",
546 opname.c_str ()));
547 #endif
549 // Check that the reply service context was received as
550 // expected.
551 IOP::ServiceContext_var sc =
552 ri->get_reply_service_context (Client_Interceptor::SchedulingInfo);
554 CORBA::Long criticality;
555 TimeBase::TimeT deadline,exec_time;
557 if (sc.ptr () == 0)
559 ACE_DEBUG ((LM_DEBUG, "service context was not filled\n"));
560 //24 hrs from now - infinity
561 ACE_Time_Value deadline_tv = ACE_OS::gettimeofday () + ACE_Time_Value (24*60*60,0);
562 deadline = deadline_tv.sec ()*1000000 + deadline_tv.usec ()*10; //100s of nanoseconds for TimeBase::TimeT
563 exec_time = 0;
564 criticality = 0;
566 else
568 CORBA::OctetSeq oc_seq = CORBA::OctetSeq (sc->context_data.length (),
569 sc->context_data.length (),
570 sc->context_data.get_buffer (),
573 //Don't store in a _var, since >>= returns a pointer to an internal buffer
574 //and we are not supposed to free it.
575 const Kokyu::Svc_Ctxt_DSRT_QoS* sc_qos_ptr = 0;
576 CORBA::Any sc_qos_as_any;
577 CORBA::Any_var scqostmp = codec_->decode (oc_seq);
578 sc_qos_as_any = scqostmp.in ();
579 sc_qos_as_any >>= sc_qos_ptr;
581 deadline = sc_qos_ptr->deadline;
582 criticality = sc_qos_ptr->criticality;
583 exec_time = sc_qos_ptr->estimated_initial_execution_time;
585 guid.length (sc_qos_ptr->guid.length ());
586 guid_copy (guid, sc_qos_ptr->guid);
588 ACE_DEBUG ((LM_DEBUG,
589 "(%t|%T): Criticality = %d in recvd service context\n",
590 criticality));
593 MUF_Scheduler_Traits::QoSDescriptor_t qos;
594 qos.deadline_ = qos.criticality_ = criticality;
595 qos.deadline_ = deadline;
596 qos.exec_time_ = exec_time;
597 this->kokyu_dispatcher_->schedule (guid, qos);
600 void
601 MUF_Scheduler::receive_exception (PortableInterceptor::ClientRequestInfo_ptr ri)
603 receive_reply (ri);
606 void
607 MUF_Scheduler::receive_other (PortableInterceptor::ClientRequestInfo_ptr ri)
609 receive_reply (ri);
612 void
613 MUF_Scheduler::cancel (const RTScheduling::Current::IdType &)
615 throw CORBA::NO_IMPLEMENT ();
618 CORBA::PolicyList*
619 MUF_Scheduler::scheduling_policies ()
621 throw CORBA::NO_IMPLEMENT ();
624 void
625 MUF_Scheduler::scheduling_policies (const CORBA::PolicyList &)
627 throw CORBA::NO_IMPLEMENT ();
630 CORBA::PolicyList*
631 MUF_Scheduler::poa_policies ()
633 throw CORBA::NO_IMPLEMENT ();
636 char *
637 MUF_Scheduler::scheduling_discipline_name ()
639 throw CORBA::NO_IMPLEMENT ();
642 RTScheduling::ResourceManager_ptr
643 MUF_Scheduler::create_resource_manager (const char *,
644 CORBA::Policy_ptr)
646 throw CORBA::NO_IMPLEMENT ();
649 void
650 MUF_Scheduler::set_scheduling_parameter (PortableServer::Servant &,
651 const char *,
652 CORBA::Policy_ptr)
654 throw CORBA::NO_IMPLEMENT ();