Merge pull request #2303 from jwillemsen/jwi-803
[ACE_TAO.git] / TAO / examples / Kokyu_dsrt_schedulers / FP_Scheduler.cpp
blobe2b8374f60121eabeee8e61f915184f630bd27a3
1 #include "FP_Scheduler.h"
2 #include "Kokyu_qosC.h"
3 #include "utils.h"
4 #include "tao/ORB_Constants.h"
5 #include "tao/CodecFactory/CodecFactory.h"
6 #include "tao/RTScheduling/Request_Interceptor.h"
8 FP_Segment_Sched_Param_Policy::FP_Segment_Sched_Param_Policy ()
12 FP_Segment_Sched_Param_Policy::FP_Segment_Sched_Param_Policy (
13 const FP_Segment_Sched_Param_Policy &rhs)
14 : CORBA::Object (),
15 CORBA::Policy (),
16 FP_Scheduling::SegmentSchedulingParameterPolicy (),
17 CORBA::LocalObject (),
18 value_ (rhs.value_)
22 FP_Scheduling::SegmentSchedulingParameter
23 FP_Segment_Sched_Param_Policy::value ()
25 return value_;
28 void
29 FP_Segment_Sched_Param_Policy::value (
30 const FP_Scheduling::SegmentSchedulingParameter & value)
32 this->value_ = value;
35 CORBA::Policy_ptr
36 FP_Segment_Sched_Param_Policy::copy ()
38 FP_Segment_Sched_Param_Policy* tmp = 0;
39 ACE_NEW_THROW_EX (tmp, FP_Segment_Sched_Param_Policy (*this),
40 CORBA::NO_MEMORY (TAO::VMCID,
41 CORBA::COMPLETED_NO));
43 return tmp;
46 CORBA::PolicyType
47 FP_Segment_Sched_Param_Policy::policy_type ()
49 return 0;
52 void
53 FP_Segment_Sched_Param_Policy::destroy ()
57 Fixed_Priority_Scheduler::Fixed_Priority_Scheduler (
58 CORBA::ORB_ptr orb,
59 Kokyu::DSRT_Dispatcher_Impl_t disp_impl_type,
60 int ace_sched_policy,
61 int ace_sched_scope)
62 : orb_ (CORBA::ORB::_duplicate (orb)),
63 disp_impl_type_ (disp_impl_type),
64 ace_sched_policy_ (ace_sched_policy),
65 ace_sched_scope_ (ace_sched_scope)
67 Kokyu::DSRT_ConfigInfo config;
69 config.impl_type_ = this->disp_impl_type_;
70 config.sched_policy_ = ace_sched_policy_;
71 config.sched_scope_ = ace_sched_scope_;
73 Kokyu::DSRT_Dispatcher_Factory<FP_Scheduler_Traits>::DSRT_Dispatcher_Auto_Ptr
74 tmp( Kokyu::DSRT_Dispatcher_Factory<FP_Scheduler_Traits>::
75 create_DSRT_dispatcher (config) );
76 kokyu_dispatcher_ = std::move(tmp);
78 CORBA::Object_var object =
79 orb->resolve_initial_references ("RTScheduler_Current");
81 this->current_ =
82 RTScheduling::Current::_narrow (object.in ());
84 IOP::CodecFactory_var codec_factory;
85 CORBA::Object_var obj =
86 orb->resolve_initial_references ("CodecFactory");
88 if (CORBA::is_nil(obj.in ()))
90 ACE_ERROR ((LM_ERROR, "Nil Codec factory\n"));
92 else
94 codec_factory = IOP::CodecFactory::_narrow (obj.in ());
97 IOP::Encoding encoding;
98 encoding.format = IOP::ENCODING_CDR_ENCAPS;
99 encoding.major_version = 1;
100 encoding.minor_version = 2;
102 codec_ = codec_factory->create_codec (encoding);
105 Fixed_Priority_Scheduler::~Fixed_Priority_Scheduler ()
107 // delete kokyu_dispatcher_;
110 void
111 Fixed_Priority_Scheduler::shutdown ()
113 kokyu_dispatcher_->shutdown ();
114 ACE_DEBUG ((LM_DEBUG, "kokyu DSRT dispatcher shutdown\n"));
117 FP_Scheduling::SegmentSchedulingParameterPolicy_ptr
118 Fixed_Priority_Scheduler::create_segment_scheduling_parameter (
119 const FP_Scheduling::SegmentSchedulingParameter & value
122 FP_Scheduling::SegmentSchedulingParameterPolicy_ptr
123 segment_sched_param_policy;
124 ACE_NEW_THROW_EX (segment_sched_param_policy,
125 FP_Segment_Sched_Param_Policy,
126 CORBA::NO_MEMORY (
127 CORBA::SystemException::_tao_minor_code (
128 TAO::VMCID,
129 ENOMEM),
130 CORBA::COMPLETED_NO));
132 segment_sched_param_policy->value (value);
134 return segment_sched_param_policy;
138 void
139 Fixed_Priority_Scheduler::begin_new_scheduling_segment (const RTScheduling::Current::IdType& guid,
140 const char *,
141 CORBA::Policy_ptr sched_policy,
142 CORBA::Policy_ptr)
144 #ifdef KOKYU_DSRT_LOGGING
145 ACE_DEBUG ((LM_DEBUG,
146 "(%t|%T):FP_Scheduler::begin_new_scheduling_segment enter\n"));
147 #endif
149 #ifdef KOKYU_DSRT_LOGGING
150 int int_guid;
151 ACE_OS::memcpy (&int_guid,
152 guid.get_buffer (),
153 guid.length ());
154 ACE_DEBUG ((LM_DEBUG, "(%t|%T): guid is %d\n", int_guid));
155 #endif
157 FP_Scheduler_Traits::QoSDescriptor_t qos;
158 FP_Scheduling::SegmentSchedulingParameterPolicy_var sched_param_policy =
159 FP_Scheduling::SegmentSchedulingParameterPolicy::_narrow (sched_policy);
161 FP_Scheduling::SegmentSchedulingParameter sched_param = sched_param_policy->value ();
162 RTCORBA::Priority desired_priority = sched_param.base_priority;
163 qos.priority_ = desired_priority;
164 kokyu_dispatcher_->schedule (guid, qos);
166 #ifdef KOKYU_DSRT_LOGGING
167 ACE_DEBUG ((LM_DEBUG,
168 "(%t|%T):FP_Scheduler::begin_new_scheduling_segment exit\n"));
169 #endif
172 void
173 Fixed_Priority_Scheduler::begin_nested_scheduling_segment (const RTScheduling::Current::IdType &guid,
174 const char *name,
175 CORBA::Policy_ptr sched_param,
176 CORBA::Policy_ptr implicit_sched_param)
178 this->begin_new_scheduling_segment (guid,
179 name,
180 sched_param,
181 implicit_sched_param);
184 void
185 Fixed_Priority_Scheduler::update_scheduling_segment (const RTScheduling::Current::IdType &guid,
186 const char* name,
187 CORBA::Policy_ptr sched_policy,
188 CORBA::Policy_ptr implicit_sched_param)
190 ACE_UNUSED_ARG ((name));
191 ACE_UNUSED_ARG ((implicit_sched_param));
193 #ifdef KOKYU_DSRT_LOGGING
194 int int_guid ;
195 ACE_OS::memcpy (&int_guid,
196 guid.get_buffer (),
197 guid.length ());
198 ACE_DEBUG ((LM_DEBUG, "(%t|%T): update_sched_seg::guid is %d\n", int_guid));
199 #endif
201 FP_Scheduling::SegmentSchedulingParameterPolicy_var sched_param_policy =
202 FP_Scheduling::SegmentSchedulingParameterPolicy::_narrow (sched_policy);
204 FP_Scheduling::SegmentSchedulingParameter sched_param =
205 sched_param_policy->value ();
207 RTCORBA::Priority desired_priority = sched_param.base_priority;
209 FP_Scheduler_Traits::QoSDescriptor_t qos;
210 qos.priority_ = desired_priority;
212 kokyu_dispatcher_->update_schedule (guid, qos);
215 void
216 Fixed_Priority_Scheduler::end_scheduling_segment (const RTScheduling::Current::IdType &guid,
217 const char *)
219 #ifdef KOKYU_DSRT_LOGGING
220 int int_guid;
221 ACE_OS::memcpy (&int_guid,
222 guid.get_buffer (),
223 guid.length ());
224 ACE_DEBUG ((LM_DEBUG, "(%t|%T) call to end_sched_segment for guid %d\n", int_guid));
225 #endif
227 kokyu_dispatcher_->cancel_schedule (guid);
230 void
231 Fixed_Priority_Scheduler::end_nested_scheduling_segment (const RTScheduling::Current::IdType &,
232 const char *,
233 CORBA::Policy_ptr)
238 void
239 Fixed_Priority_Scheduler::send_request (PortableInterceptor::ClientRequestInfo_ptr ri)
241 Kokyu::Svc_Ctxt_DSRT_QoS sc_qos;
243 CORBA::String_var operation = ri->operation ();
245 #ifdef KOKYU_DSRT_LOGGING
246 ACE_DEBUG ((LM_DEBUG,
247 "(%t|%T): send_request "
248 "from \"%s\"\n",
249 operation.in ()));
250 #endif
251 // Make the context to send the context to the target
252 IOP::ServiceContext sc;
253 sc.context_id = Client_Interceptor::SchedulingInfo;
255 CORBA::Policy_var sched_policy =
256 this->current_->scheduling_parameter();
258 RTScheduling::Current::IdType_var guid = this->current_->id ();
260 ACE_OS::memcpy (&guid,
261 guid->get_buffer (),
262 guid->length ());
264 RTCORBA::Priority desired_priority;
265 if (CORBA::is_nil (sched_policy.in ()))
267 desired_priority = 0;
269 else
271 FP_Scheduling::SegmentSchedulingParameterPolicy_var sched_param_policy =
272 FP_Scheduling::SegmentSchedulingParameterPolicy::_narrow (sched_policy.in ());
274 FP_Scheduling::SegmentSchedulingParameter sched_param =
275 sched_param_policy->value ();
277 desired_priority = sched_param.base_priority;
279 #ifdef KOKYU_DSRT_LOGGING
280 int int_guid;
281 ACE_OS::memcpy (&int_guid,
282 guid->get_buffer (),
283 guid->length ());
284 ACE_DEBUG ((LM_DEBUG,
285 ACE_TEXT ("(%t): send_request desired_priority from current = %d, guid = %d\n"),
286 desired_priority, int_guid));
287 #endif
289 //Fill the guid in the SC Qos struct
290 sc_qos.guid.length (guid->length ());
291 guid_copy (sc_qos.guid, guid.in ());
292 sc_qos.desired_priority = desired_priority;
293 CORBA::Any sc_qos_as_any;
294 sc_qos_as_any <<= sc_qos;
296 CORBA::OctetSeq_var cdtmp = codec_->encode (sc_qos_as_any);
297 sc.context_data = cdtmp.in ();
299 #ifdef KOKYU_DSRT_LOGGING
300 ACE_DEBUG ((LM_DEBUG,
301 ACE_TEXT ("(%t|%T): send_request : about to add sched SC\n")));
302 #endif
304 // Add this context to the service context list.
305 ri->add_request_service_context (sc, 0);
309 #ifdef KOKYU_DSRT_LOGGING
310 ACE_DEBUG ((LM_DEBUG,
311 ACE_TEXT ("(%t|%T): send_request : ")
312 ACE_TEXT ("about to call scheduler to inform block\n")));
313 #endif
315 kokyu_dispatcher_->update_schedule (guid.in (),
316 Kokyu::BLOCK);
318 #ifdef KOKYU_DSRT_LOGGING
319 ACE_DEBUG ((LM_DEBUG,
320 ACE_TEXT ("(%t|%T): send_request interceptor done\n")));
321 #endif
324 void
325 Fixed_Priority_Scheduler::receive_request (PortableInterceptor::ServerRequestInfo_ptr ri,
326 RTScheduling::Current::IdType_out guid_out,
327 CORBA::String_out /*name*/,
328 CORBA::Policy_out sched_param_out,
329 CORBA::Policy_out /*implicit_sched_param_out*/)
331 const Kokyu::Svc_Ctxt_DSRT_QoS* sc_qos_ptr = 0;
333 #ifdef KOKYU_DSRT_LOGGING
334 ACE_DEBUG ((LM_DEBUG, "(%t|%T):entered FP_Scheduler::receive_request\n"));
335 #endif
337 RTScheduling::Current::IdType guid;
339 CORBA::String_var operation = ri->operation ();
341 #ifdef KOKYU_DSRT_LOGGING
342 ACE_DEBUG ((LM_DEBUG,
343 "(%t|%T): receive_request from "
344 "\"%s\"\n",
345 operation.in ()));
346 #endif
348 // Ignore the "_is_a" operation since it may have been invoked
349 // locally on the server side as a side effect of another call,
350 // meaning that the client hasn't added the service context yet.
351 if (ACE_OS::strcmp ("_is_a", operation.in ()) == 0)
352 return;
354 IOP::ServiceContext_var sc =
355 ri->get_request_service_context (Server_Interceptor::SchedulingInfo);
357 RTCORBA::Priority desired_priority;
359 if (sc.ptr () == 0)
361 desired_priority = 0;
363 else
365 CORBA::OctetSeq oc_seq = CORBA::OctetSeq (sc->context_data.length (),
366 sc->context_data.length (),
367 sc->context_data.get_buffer (),
369 CORBA::Any sc_qos_as_any;
370 CORBA::Any_var scqostmp = codec_->decode (oc_seq);
371 sc_qos_as_any = scqostmp.in ();
372 //Don't store in a _var, since >>= returns a pointer to an
373 //internal buffer and we are not supposed to free it.
374 sc_qos_as_any >>= sc_qos_ptr;
376 desired_priority = sc_qos_ptr->desired_priority;
377 guid.length (sc_qos_ptr->guid.length ());
378 guid_copy (guid, sc_qos_ptr->guid);
380 ACE_NEW (guid_out.ptr (),
381 RTScheduling::Current::IdType);
382 guid_out.ptr ()->length (guid.length ());
383 *(guid_out.ptr ()) = guid;
385 #ifdef KOKYU_DSRT_LOGGING
386 int int_guid;
387 ACE_OS::memcpy (&int_guid,
388 guid.get_buffer (),
389 guid.length ());
390 ACE_DEBUG ((LM_DEBUG,
391 "(%t|%T): Desired_Priority = %d, guid = %d in recvd service context\n",
392 desired_priority,
393 int_guid));
394 #endif
396 FP_Scheduling::SegmentSchedulingParameter sched_param;
397 sched_param.base_priority = desired_priority;
398 sched_param_out = this->create_segment_scheduling_parameter (sched_param);
401 FP_Scheduler_Traits::QoSDescriptor_t qos;
402 qos.priority_ = desired_priority;
403 this->kokyu_dispatcher_->schedule (guid, qos);
405 #ifdef KOKYU_DSRT_LOGGING
406 ACE_DEBUG ((LM_DEBUG, "(%t|%T): receive_request interceptor done\n"));
407 #endif
411 void
412 Fixed_Priority_Scheduler::send_poll (PortableInterceptor::ClientRequestInfo_ptr)
416 void
417 Fixed_Priority_Scheduler::send_reply (PortableInterceptor::ServerRequestInfo_ptr ri)
419 RTCORBA::Priority desired_priority = 0;
420 Kokyu::Svc_Ctxt_DSRT_QoS sc_qos;
422 CORBA::String_var operation = ri->operation ();
424 #ifdef KOKYU_DSRT_LOGGING
425 ACE_DEBUG ((LM_DEBUG,
426 "(%t|%T): send_reply from \"%s\"\n",
427 ri->operation ()));
428 #endif
430 // Make the context to send the context to the target
431 IOP::ServiceContext sc;
432 sc.context_id = Server_Interceptor::SchedulingInfo;
434 ACE_DEBUG ((LM_DEBUG, "in send_reply: before accessing current_->sched_param\n"));
435 CORBA::Policy_var sched_policy =
436 this->current_->scheduling_parameter();
438 RTScheduling::Current::IdType_var guid = this->current_->id ();
440 if (CORBA::is_nil (sched_policy.in ()))
442 ACE_DEBUG ((LM_DEBUG, "sched_policy nil. desired_priority not set in sched params\n"));
443 desired_priority = 0;
445 else
447 ACE_DEBUG ((LM_DEBUG, "sched_policy not nil. desired_priority set in sched params\n"));
449 FP_Scheduling::SegmentSchedulingParameterPolicy_var sched_param_policy =
450 FP_Scheduling::SegmentSchedulingParameterPolicy::_narrow (sched_policy.in ());
452 FP_Scheduling::SegmentSchedulingParameter sched_param =
453 sched_param_policy->value ();
455 desired_priority = sched_param.base_priority;
457 //Fill the guid in the SC Qos struct
458 sc_qos.guid.length (guid->length ());
459 guid_copy (sc_qos.guid, guid.in ());
460 sc_qos.desired_priority = desired_priority;
461 CORBA::Any sc_qos_as_any;
462 sc_qos_as_any <<= sc_qos;
464 CORBA::OctetSeq_var cdtmp = codec_->encode (sc_qos_as_any);
465 sc.context_data = cdtmp.in ();
467 // Add this context to the service context list.
468 ri->add_reply_service_context (sc, 1);
470 #ifdef KOKYU_DSRT_LOGGING
471 ACE_DEBUG ((LM_DEBUG, "(%t|%T):reply sc added\n"));
472 #endif
475 kokyu_dispatcher_->update_schedule (guid.in (),
476 Kokyu::BLOCK);
478 #ifdef KOKYU_DSRT_LOGGING
479 ACE_DEBUG ((LM_DEBUG, "(%t|%T): send_reply interceptor done\n"));
480 #endif
483 void
484 Fixed_Priority_Scheduler::send_exception (PortableInterceptor::ServerRequestInfo_ptr ri)
486 send_reply (ri);
489 void
490 Fixed_Priority_Scheduler::send_other (PortableInterceptor::ServerRequestInfo_ptr ri)
492 send_reply (ri);
495 void
496 Fixed_Priority_Scheduler::receive_reply (PortableInterceptor::ClientRequestInfo_ptr ri)
498 RTScheduling::Current::IdType guid;
499 RTCORBA::Priority desired_priority=0;
501 CORBA::String_var operation = ri->operation ();
503 CORBA::Object_var target = ri->target ();
505 ACE_CString opname = operation.in ();
506 #ifdef KOKYU_DSRT_LOGGING
507 ACE_DEBUG ((LM_DEBUG,
508 "(%t|%T):receive_reply from "
509 "\"%s\"\n",
510 opname.c_str ()));
511 #endif
513 // Check that the reply service context was received as
514 // expected.
515 IOP::ServiceContext_var sc =
516 ri->get_reply_service_context (Client_Interceptor::SchedulingInfo);
518 if (sc.ptr () == 0)
520 desired_priority = 0;
522 else
524 CORBA::OctetSeq oc_seq = CORBA::OctetSeq (sc->context_data.length (),
525 sc->context_data.length (),
526 sc->context_data.get_buffer (),
529 //Don't store in a _var, since >>= returns a pointer to an internal buffer
530 //and we are not supposed to free it.
531 const Kokyu::Svc_Ctxt_DSRT_QoS* sc_qos_ptr = 0;
532 CORBA::Any sc_qos_as_any;
533 CORBA::Any_var scqostmp = codec_->decode (oc_seq);
534 sc_qos_as_any = scqostmp.in ();
535 sc_qos_as_any >>= sc_qos_ptr;
537 desired_priority = sc_qos_ptr->desired_priority;
538 guid.length (sc_qos_ptr->guid.length ());
539 guid_copy (guid, sc_qos_ptr->guid);
541 #ifdef KOKYU_DSRT_LOGGING
542 ACE_DEBUG ((LM_DEBUG,
543 "(%t): Desired_Priority = %d in recvd service context\n",
544 desired_priority));
545 #endif
548 FP_Scheduler_Traits::QoSDescriptor_t qos;
549 qos.priority_ = desired_priority;
550 this->kokyu_dispatcher_->schedule (guid, qos);
553 void
554 Fixed_Priority_Scheduler::receive_exception (PortableInterceptor::ClientRequestInfo_ptr ri)
556 receive_reply (ri);
559 void
560 Fixed_Priority_Scheduler::receive_other (PortableInterceptor::ClientRequestInfo_ptr ri)
562 receive_reply (ri);
565 void
566 Fixed_Priority_Scheduler::cancel (const RTScheduling::Current::IdType &)
568 throw CORBA::NO_IMPLEMENT ();
571 CORBA::PolicyList*
572 Fixed_Priority_Scheduler::scheduling_policies ()
574 throw CORBA::NO_IMPLEMENT ();
577 void
578 Fixed_Priority_Scheduler::scheduling_policies (const CORBA::PolicyList &)
580 throw CORBA::NO_IMPLEMENT ();
583 CORBA::PolicyList*
584 Fixed_Priority_Scheduler::poa_policies ()
586 throw CORBA::NO_IMPLEMENT ();
589 char *
590 Fixed_Priority_Scheduler::scheduling_discipline_name ()
592 throw CORBA::NO_IMPLEMENT ();
595 RTScheduling::ResourceManager_ptr
596 Fixed_Priority_Scheduler::create_resource_manager (const char *,
597 CORBA::Policy_ptr)
599 throw CORBA::NO_IMPLEMENT ();
602 void
603 Fixed_Priority_Scheduler::set_scheduling_parameter (PortableServer::Servant &,
604 const char *,
605 CORBA::Policy_ptr)
607 throw CORBA::NO_IMPLEMENT ();