Merge pull request #1844 from jrw972/monterey
[ACE_TAO.git] / TAO / examples / Kokyu_dsrt_schedulers / MUF_Scheduler.cpp
blob5193fbbae924743a81aedc6eac6c41d3bdcc8929
1 #include "MUF_Scheduler.h"
2 #include "Kokyu_qosC.h"
3 #include "utils.h"
4 #include "tao/RTScheduling/Request_Interceptor.h"
5 #include "tao/CodecFactory/CodecFactory.h"
6 #include "tao/ORB_Constants.h"
8 MUF_Sched_Param_Policy::MUF_Sched_Param_Policy ()
12 MUF_Sched_Param_Policy::MUF_Sched_Param_Policy (
13 const MUF_Sched_Param_Policy &rhs
15 : CORBA::Object (),
16 CORBA::Policy (),
17 MUF_Scheduling::SchedulingParameterPolicy (),
18 CORBA::LocalObject (),
19 value_ (rhs.value_)
23 MUF_Scheduling::SchedulingParameter
24 MUF_Sched_Param_Policy::value (void)
26 return this->value_;
29 void
30 MUF_Sched_Param_Policy::value (const MUF_Scheduling::SchedulingParameter& value)
32 this->value_ = value;
35 CORBA::Policy_ptr
36 MUF_Sched_Param_Policy::copy (void)
38 MUF_Sched_Param_Policy* tmp = 0;
39 ACE_NEW_THROW_EX (tmp,
40 MUF_Sched_Param_Policy (*this),
41 CORBA::NO_MEMORY (TAO::VMCID,
42 CORBA::COMPLETED_NO));
44 return tmp;
47 CORBA::PolicyType
48 MUF_Sched_Param_Policy::policy_type (void)
50 return 0;
53 void
54 MUF_Sched_Param_Policy::destroy (void)
58 MUF_Scheduler::MUF_Scheduler (CORBA::ORB_ptr orb,
59 Kokyu::DSRT_Dispatcher_Impl_t disp_impl_type,
60 int ace_sched_policy,
61 int ace_sched_scope)
62 : orb_ (CORBA::ORB::_duplicate (orb)),
63 disp_impl_type_ (disp_impl_type),
64 ace_sched_policy_ (ace_sched_policy),
65 ace_sched_scope_ (ace_sched_scope)
68 Kokyu::DSRT_ConfigInfo config;
70 config.impl_type_ = this->disp_impl_type_;
71 config.sched_policy_ = ace_sched_policy_;
72 config.sched_scope_ = ace_sched_scope_;
74 Kokyu::DSRT_Dispatcher_Factory<MUF_Scheduler_Traits>::DSRT_Dispatcher_Auto_Ptr
75 tmp( Kokyu::DSRT_Dispatcher_Factory<MUF_Scheduler_Traits>::
76 create_DSRT_dispatcher (config) );
77 kokyu_dispatcher_ = tmp;
79 CORBA::Object_var object =
80 orb->resolve_initial_references ("RTScheduler_Current");
82 this->current_ =
83 RTScheduling::Current::_narrow (object.in ());
85 IOP::CodecFactory_var codec_factory;
86 CORBA::Object_var obj =
87 orb->resolve_initial_references ("CodecFactory");
89 if (CORBA::is_nil(obj.in ()))
91 ACE_ERROR ((LM_ERROR, "Nil Codec factory\n"));
93 else
95 codec_factory = IOP::CodecFactory::_narrow (obj.in ());
98 IOP::Encoding encoding;
99 encoding.format = IOP::ENCODING_CDR_ENCAPS;
100 encoding.major_version = 1;
101 encoding.minor_version = 2;
103 codec_ = codec_factory->create_codec (encoding);
106 MUF_Scheduler::~MUF_Scheduler (void)
110 void
111 MUF_Scheduler::shutdown (void)
113 kokyu_dispatcher_->shutdown ();
114 ACE_DEBUG ((LM_DEBUG, "kokyu DSRT dispatcher shutdown\n"));
117 MUF_Scheduling::SchedulingParameterPolicy_ptr
118 MUF_Scheduler::create_scheduling_parameter (const MUF_Scheduling::SchedulingParameter & value)
120 MUF_Scheduling::SchedulingParameterPolicy_ptr sched_param_policy;
121 ACE_NEW_THROW_EX (sched_param_policy,
122 MUF_Sched_Param_Policy,
123 CORBA::NO_MEMORY (
124 CORBA::SystemException::_tao_minor_code (
125 TAO::VMCID,
126 ENOMEM),
127 CORBA::COMPLETED_NO));
129 sched_param_policy->value (value);
131 return sched_param_policy;
135 void
136 MUF_Scheduler::begin_new_scheduling_segment (const RTScheduling::Current::IdType &guid,
137 const char *,
138 CORBA::Policy_ptr sched_policy,
139 CORBA::Policy_ptr)
141 #ifdef KOKYU_DSRT_LOGGING
142 ACE_DEBUG ((LM_DEBUG,
143 "(%t|%T):MUF_Scheduler::begin_new_scheduling_segment enter\n"));
144 #endif
146 #ifdef KOKYU_DSRT_LOGGING
147 int int_guid;
148 ACE_OS::memcpy (&int_guid,
149 guid.get_buffer (),
150 guid.length ());
151 ACE_DEBUG ((LM_DEBUG, "(%t|%T): guid is %d\n", int_guid));
152 #endif
154 MUF_Scheduler_Traits::QoSDescriptor_t qos;
155 MUF_Scheduling::SchedulingParameterPolicy_var sched_param_policy =
156 MUF_Scheduling::SchedulingParameterPolicy::_narrow (sched_policy);
158 MUF_Scheduling::SchedulingParameter_var sched_param = sched_param_policy->value ();
160 qos.deadline_ = sched_param->deadline;
161 qos.exec_time_ = sched_param->estimated_initial_execution_time;
162 qos.criticality_ = sched_param->criticality;
164 kokyu_dispatcher_->schedule (guid, qos);
166 #ifdef KOKYU_DSRT_LOGGING
167 ACE_DEBUG ((LM_DEBUG,
168 "(%t|%T):MUF_Scheduler::begin_new_scheduling_segment exit\n"));
169 #endif
173 void
174 MUF_Scheduler::begin_nested_scheduling_segment (const RTScheduling::Current::IdType &guid,
175 const char *name,
176 CORBA::Policy_ptr sched_param,
177 CORBA::Policy_ptr implicit_sched_param)
179 this->begin_new_scheduling_segment (guid,
180 name,
181 sched_param,
182 implicit_sched_param);
185 void
186 MUF_Scheduler::update_scheduling_segment (const RTScheduling::Current::IdType& guid,
187 const char* name,
188 CORBA::Policy_ptr sched_policy,
189 CORBA::Policy_ptr implicit_sched_param)
191 ACE_UNUSED_ARG ((name));
192 ACE_UNUSED_ARG ((implicit_sched_param));
194 #ifdef KOKYU_DSRT_LOGGING
195 int int_guid ;
196 ACE_OS::memcpy (&int_guid,
197 guid.get_buffer (),
198 guid.length ());
199 ACE_DEBUG ((LM_DEBUG, "(%t|%T): update_sched_seg::guid is %d\n", int_guid));
200 #endif
202 MUF_Scheduling::SchedulingParameterPolicy_var sched_param_policy =
203 MUF_Scheduling::SchedulingParameterPolicy::_narrow (sched_policy);
205 MUF_Scheduling::SchedulingParameter_var sched_param = sched_param_policy->value ();
206 MUF_Scheduler_Traits::QoSDescriptor_t qos;
208 qos.deadline_ = sched_param->deadline;
209 qos.exec_time_ = sched_param->estimated_initial_execution_time;
210 qos.criticality_ = sched_param->criticality;
212 kokyu_dispatcher_->update_schedule (guid, qos);
215 void
216 MUF_Scheduler::end_scheduling_segment (const RTScheduling::Current::IdType &guid,
217 const char *)
219 #ifdef KOKYU_DSRT_LOGGING
220 int int_guid;
221 ACE_OS::memcpy (&int_guid,
222 guid.get_buffer (),
223 guid.length ());
224 ACE_DEBUG ((LM_DEBUG, "(%t|%T) call to end_sched_segment for guid %d\n", int_guid));
225 #endif
227 kokyu_dispatcher_->cancel_schedule (guid);
230 void
231 MUF_Scheduler::end_nested_scheduling_segment (const RTScheduling::Current::IdType &,
232 const char *,
233 CORBA::Policy_ptr)
238 void
239 MUF_Scheduler::send_request (PortableInterceptor::ClientRequestInfo_ptr ri)
241 Kokyu::Svc_Ctxt_DSRT_QoS sc_qos;
243 CORBA::String_var operation = ri->operation ();
245 #ifdef KOKYU_DSRT_LOGGING
246 ACE_DEBUG ((LM_DEBUG,
247 "(%t|%T): send_request "
248 "from \"%s\"\n",
249 operation.in ()));
250 #endif
252 // Make the context to send the context to the target
253 IOP::ServiceContext sc;
254 sc.context_id = Client_Interceptor::SchedulingInfo;
256 CORBA::Policy_var sched_policy =
257 this->current_->scheduling_parameter();
259 RTScheduling::Current::IdType_var guid = this->current_->id ();
262 ACE_OS::memcpy (&guid,
263 guid->get_buffer (),
264 guid->length ());
267 CORBA::Long criticality;
268 TimeBase::TimeT deadline,exec_time;
270 if (CORBA::is_nil (sched_policy.in ()))
272 //24 hrs from now - infinity
273 ACE_Time_Value deadline_tv = ACE_OS::gettimeofday () + ACE_Time_Value (24*60*60,0);
274 deadline = deadline_tv.sec () * 1000000 + deadline_tv.usec () * 10; //100s of nanoseconds for TimeBase::TimeT
275 exec_time = 0;
276 criticality = 0;
278 else
280 MUF_Scheduling::SchedulingParameterPolicy_var sched_param_policy =
281 MUF_Scheduling::SchedulingParameterPolicy::_narrow (sched_policy.in ());
283 MUF_Scheduling::SchedulingParameter_var sched_param = sched_param_policy->value ();
284 deadline = sched_param->deadline;
285 exec_time = sched_param->estimated_initial_execution_time;
286 criticality = sched_param->criticality;
288 #ifdef KOKYU_DSRT_LOGGING
289 int int_guid;
290 ACE_OS::memcpy (&int_guid,
291 guid->get_buffer (),
292 guid->length ());
293 ACE_DEBUG ((LM_DEBUG,
294 "(%t|%T): send_request guid = %d\n",
295 int_guid));
296 #endif
298 //Fill the guid in the SC Qos struct
299 sc_qos.guid.length (guid->length ());
300 guid_copy (sc_qos.guid, guid.in ());
301 sc_qos.deadline = deadline;
302 sc_qos.estimated_initial_execution_time = exec_time;
303 sc_qos.criticality = criticality;
304 CORBA::Any sc_qos_as_any;
305 sc_qos_as_any <<= sc_qos;
307 CORBA::OctetSeq_var cdtmp = codec_->encode (sc_qos_as_any);
308 sc.context_data = cdtmp.in ();
310 #ifdef KOKYU_DSRT_LOGGING
311 ACE_DEBUG ((LM_DEBUG,
312 ACE_TEXT ("(%t|%T): send_request : about to add sched SC\n")));
313 #endif
315 // Add this context to the service context list.
316 ri->add_request_service_context (sc, 0);
320 #ifdef KOKYU_DSRT_LOGGING
321 ACE_DEBUG ((LM_DEBUG,
322 ACE_TEXT ("(%t|%T): send_request : ")
323 ACE_TEXT ("about to call scheduler to inform block\n")));
324 #endif
326 kokyu_dispatcher_->update_schedule (guid.in (),
327 Kokyu::BLOCK);
329 #ifdef KOKYU_DSRT_LOGGING
330 ACE_DEBUG ((LM_DEBUG,
331 ACE_TEXT ("(%t|%T): send_request interceptor done\n")));
332 #endif
335 void
336 MUF_Scheduler::receive_request (PortableInterceptor::ServerRequestInfo_ptr ri,
337 RTScheduling::Current::IdType_out guid_out,
338 CORBA::String_out /*name*/,
339 CORBA::Policy_out sched_param_out,
340 CORBA::Policy_out /*implicit_sched_param_out*/)
342 const Kokyu::Svc_Ctxt_DSRT_QoS* sc_qos_ptr = 0;
344 #ifdef KOKYU_DSRT_LOGGING
345 ACE_DEBUG ((LM_DEBUG, "(%t|%T):entered MUF_Scheduler::receive_request\n"));
346 #endif
348 RTScheduling::Current::IdType guid;
350 CORBA::String_var operation = ri->operation ();
352 #ifdef KOKYU_DSRT_LOGGING
353 ACE_DEBUG ((LM_DEBUG,
354 "(%t|%T): receive_request from "
355 "\"%s\"\n",
356 operation.in ()));
357 #endif
359 // Ignore the "_is_a" operation since it may have been invoked
360 // locally on the server side as a side effect of another call,
361 // meaning that the client hasn't added the service context yet.
362 if (ACE_OS::strcmp ("_is_a", operation.in ()) == 0)
363 return;
365 IOP::ServiceContext_var sc =
366 ri->get_request_service_context (Server_Interceptor::SchedulingInfo);
368 CORBA::Long criticality;
369 TimeBase::TimeT deadline,exec_time;
371 if (sc.ptr () == 0)
373 //24 hrs from now - infinity
374 ACE_Time_Value deadline_tv = ACE_OS::gettimeofday () + ACE_Time_Value (24*60*60,0);
375 deadline = deadline_tv.sec ()*1000000 + deadline_tv.usec ()*10; //100s of nanoseconds for TimeBase::TimeT
376 exec_time = 0;
377 criticality = 0;
379 else
381 CORBA::OctetSeq oc_seq = CORBA::OctetSeq (sc->context_data.length (),
382 sc->context_data.length (),
383 sc->context_data.get_buffer (),
385 CORBA::Any sc_qos_as_any;
386 CORBA::Any_var scqostmp = codec_->decode (oc_seq);
387 sc_qos_as_any = scqostmp.in ();
388 //Don't store in a _var, since >>= returns a pointer to an
389 //internal buffer and we are not supposed to free it.
390 sc_qos_as_any >>= sc_qos_ptr;
392 deadline = sc_qos_ptr->deadline;
393 criticality = sc_qos_ptr->criticality;
394 exec_time = sc_qos_ptr->estimated_initial_execution_time;
396 guid.length (sc_qos_ptr->guid.length ());
397 guid_copy (guid, sc_qos_ptr->guid);
399 ACE_NEW (guid_out.ptr (),
400 RTScheduling::Current::IdType);
401 guid_out.ptr ()->length (guid.length ());
402 *(guid_out.ptr ()) = guid;
404 #ifdef KOKYU_DSRT_LOGGING
405 int int_guid;
406 ACE_OS::memcpy (&int_guid,
407 guid.get_buffer (),
408 guid.length ());
409 ACE_DEBUG ((LM_DEBUG,
410 "(%t|%T): Criticality = %d, guid = %d "
411 "in recvd service context\n",
412 criticality,
413 int_guid));
414 #endif
415 MUF_Scheduling::SchedulingParameter sched_param;
416 sched_param.criticality = criticality;
417 sched_param.deadline = deadline;
418 sched_param_out = this->create_scheduling_parameter (sched_param);
421 MUF_Scheduler_Traits::QoSDescriptor_t qos;
422 qos.criticality_ = criticality;
423 qos.deadline_ = deadline;
424 qos.exec_time_ = exec_time;
426 this->kokyu_dispatcher_->schedule (guid, qos);
428 #ifdef KOKYU_DSRT_LOGGING
429 ACE_DEBUG ((LM_DEBUG, "(%t|%T): receive_request interceptor done\n"));
430 #endif
434 void
435 MUF_Scheduler::send_poll (PortableInterceptor::ClientRequestInfo_ptr)
439 void
440 MUF_Scheduler::send_reply (PortableInterceptor::ServerRequestInfo_ptr ri)
443 Kokyu::Svc_Ctxt_DSRT_QoS sc_qos;
445 CORBA::String_var operation = ri->operation ();
447 #ifdef KOKYU_DSRT_LOGGING
448 ACE_DEBUG ((LM_DEBUG,
449 "(%t|%T): send_reply from \"%s\"\n",
450 ri->operation ()));
451 #endif
453 // Make the context to send the context to the target
454 IOP::ServiceContext sc;
455 sc.context_id = Server_Interceptor::SchedulingInfo;
458 CORBA::Long criticality;
459 TimeBase::TimeT deadline,exec_time;
461 CORBA::Policy_var sched_policy =
462 this->current_->scheduling_parameter();
464 RTScheduling::Current::IdType_var guid = this->current_->id ();
466 if (CORBA::is_nil (sched_policy.in ()))
468 #ifdef KOKYU_DSRT_LOGGING
469 ACE_DEBUG ((LM_DEBUG,
470 "(%t|%T): sched_policy nil.\n"));
471 #endif
472 //24 hrs from now - infinity
473 ACE_Time_Value deadline_tv = ACE_OS::gettimeofday () + ACE_Time_Value (24*60*60,0);
474 deadline = deadline_tv.sec ()*1000000 + deadline_tv.usec ()*10; //100s of nanoseconds for TimeBase::TimeT
475 exec_time = 0;
476 criticality = 0;
478 else
480 #ifdef KOKYU_DSRT_LOGGING
481 ACE_DEBUG ((LM_DEBUG,
482 "(%t|%T):sched_policy not nil. ",
483 "sched params set\n"));
484 #endif
485 MUF_Scheduling::SchedulingParameterPolicy_var sched_param_policy =
486 MUF_Scheduling::SchedulingParameterPolicy::_narrow (sched_policy.in ());
487 MUF_Scheduling::SchedulingParameter_var sched_param = sched_param_policy->value ();
490 sc_qos.guid.length (guid->length ());
491 guid_copy (sc_qos.guid, guid.in ());
493 deadline = sched_param->deadline;
494 exec_time = sched_param->estimated_initial_execution_time;
495 criticality = sched_param->criticality;
496 sc_qos.deadline = deadline;
497 sc_qos.estimated_initial_execution_time = exec_time;
498 sc_qos.criticality = criticality;
500 CORBA::Any sc_qos_as_any;
501 sc_qos_as_any <<= sc_qos;
503 CORBA::OctetSeq_var cdtmp = codec_->encode (sc_qos_as_any);
504 sc.context_data = cdtmp.in ();
506 // Add this context to the service context list.
507 ri->add_reply_service_context (sc, 1);
509 #ifdef KOKYU_DSRT_LOGGING
510 ACE_DEBUG ((LM_DEBUG, "(%t|%T):reply sc added\n"));
511 #endif
514 kokyu_dispatcher_->update_schedule (guid.in (),
515 Kokyu::BLOCK);
517 #ifdef KOKYU_DSRT_LOGGING
518 ACE_DEBUG ((LM_DEBUG, "(%t|%T): send_reply interceptor done\n"));
519 #endif
522 void
523 MUF_Scheduler::send_exception (PortableInterceptor::ServerRequestInfo_ptr ri)
525 send_reply (ri);
528 void
529 MUF_Scheduler::send_other (PortableInterceptor::ServerRequestInfo_ptr ri)
531 send_reply (ri);
534 void
535 MUF_Scheduler::receive_reply (PortableInterceptor::ClientRequestInfo_ptr ri)
537 RTScheduling::Current::IdType guid;
539 CORBA::String_var operation = ri->operation ();
541 CORBA::Object_var target = ri->target ();
543 ACE_CString opname = operation.in ();
544 #ifdef KOKYU_DSRT_LOGGING
545 ACE_DEBUG ((LM_DEBUG,
546 "(%t|%T):receive_reply from "
547 "\"%s\"\n",
548 opname.c_str ()));
549 #endif
551 // Check that the reply service context was received as
552 // expected.
553 IOP::ServiceContext_var sc =
554 ri->get_reply_service_context (Client_Interceptor::SchedulingInfo);
556 CORBA::Long criticality;
557 TimeBase::TimeT deadline,exec_time;
559 if (sc.ptr () == 0)
561 ACE_DEBUG ((LM_DEBUG, "service context was not filled\n"));
562 //24 hrs from now - infinity
563 ACE_Time_Value deadline_tv = ACE_OS::gettimeofday () + ACE_Time_Value (24*60*60,0);
564 deadline = deadline_tv.sec ()*1000000 + deadline_tv.usec ()*10; //100s of nanoseconds for TimeBase::TimeT
565 exec_time = 0;
566 criticality = 0;
568 else
570 CORBA::OctetSeq oc_seq = CORBA::OctetSeq (sc->context_data.length (),
571 sc->context_data.length (),
572 sc->context_data.get_buffer (),
575 //Don't store in a _var, since >>= returns a pointer to an internal buffer
576 //and we are not supposed to free it.
577 const Kokyu::Svc_Ctxt_DSRT_QoS* sc_qos_ptr = 0;
578 CORBA::Any sc_qos_as_any;
579 CORBA::Any_var scqostmp = codec_->decode (oc_seq);
580 sc_qos_as_any = scqostmp.in ();
581 sc_qos_as_any >>= sc_qos_ptr;
583 deadline = sc_qos_ptr->deadline;
584 criticality = sc_qos_ptr->criticality;
585 exec_time = sc_qos_ptr->estimated_initial_execution_time;
587 guid.length (sc_qos_ptr->guid.length ());
588 guid_copy (guid, sc_qos_ptr->guid);
590 ACE_DEBUG ((LM_DEBUG,
591 "(%t|%T): Criticality = %d in recvd service context\n",
592 criticality));
595 MUF_Scheduler_Traits::QoSDescriptor_t qos;
596 qos.deadline_ = qos.criticality_ = criticality;
597 qos.deadline_ = deadline;
598 qos.exec_time_ = exec_time;
599 this->kokyu_dispatcher_->schedule (guid, qos);
602 void
603 MUF_Scheduler::receive_exception (PortableInterceptor::ClientRequestInfo_ptr ri)
605 receive_reply (ri);
608 void
609 MUF_Scheduler::receive_other (PortableInterceptor::ClientRequestInfo_ptr ri)
611 receive_reply (ri);
614 void
615 MUF_Scheduler::cancel (const RTScheduling::Current::IdType &)
617 throw CORBA::NO_IMPLEMENT ();
620 CORBA::PolicyList*
621 MUF_Scheduler::scheduling_policies (void)
623 throw CORBA::NO_IMPLEMENT ();
626 void
627 MUF_Scheduler::scheduling_policies (const CORBA::PolicyList &)
629 throw CORBA::NO_IMPLEMENT ();
632 CORBA::PolicyList*
633 MUF_Scheduler::poa_policies (void)
635 throw CORBA::NO_IMPLEMENT ();
638 char *
639 MUF_Scheduler::scheduling_discipline_name (void)
641 throw CORBA::NO_IMPLEMENT ();
644 RTScheduling::ResourceManager_ptr
645 MUF_Scheduler::create_resource_manager (const char *,
646 CORBA::Policy_ptr)
648 throw CORBA::NO_IMPLEMENT ();
651 void
652 MUF_Scheduler::set_scheduling_parameter (PortableServer::Servant &,
653 const char *,
654 CORBA::Policy_ptr)
656 throw CORBA::NO_IMPLEMENT ();