Cleanup ACE_HAS_PTHREAD_SIGMASK_PROTOTYPE, all platforms support it so far as I can...
[ACE_TAO.git] / TAO / examples / Kokyu_dsrt_schedulers / MIF_Scheduler.cpp
blob2e1a8e98a6dcd6872f7c8b91187315485e77e3f7
1 #include "MIF_Scheduler.h"
2 #include "Kokyu_qosC.h"
3 #include "utils.h"
4 #include "tao/RTScheduling/Request_Interceptor.h"
5 #include "tao/CodecFactory/CodecFactory.h"
6 #include "tao/ORB_Constants.h"
8 MIF_Sched_Param_Policy::MIF_Sched_Param_Policy ()
12 MIF_Sched_Param_Policy::MIF_Sched_Param_Policy (
13 const MIF_Sched_Param_Policy &rhs
15 : CORBA::Object (),
16 CORBA::Policy (),
17 MIF_Scheduling::SchedulingParameterPolicy (),
18 CORBA::LocalObject (),
19 value_ (rhs.value_)
23 MIF_Scheduling::SchedulingParameter
24 MIF_Sched_Param_Policy::value ()
26 return this->value_;
29 void
30 MIF_Sched_Param_Policy::value (const MIF_Scheduling::SchedulingParameter& value)
32 this->value_ = value;
35 CORBA::Policy_ptr
36 MIF_Sched_Param_Policy::copy ()
38 MIF_Sched_Param_Policy* tmp = 0;
39 ACE_NEW_THROW_EX (tmp,
40 MIF_Sched_Param_Policy (*this),
41 CORBA::NO_MEMORY (TAO::VMCID,
42 CORBA::COMPLETED_NO));
44 return tmp;
47 CORBA::PolicyType
48 MIF_Sched_Param_Policy::policy_type ()
50 return 0;
53 void
54 MIF_Sched_Param_Policy::destroy ()
58 MIF_Scheduler::MIF_Scheduler (CORBA::ORB_ptr orb,
59 Kokyu::DSRT_Dispatcher_Impl_t disp_impl_type,
60 int ace_sched_policy,
61 int ace_sched_scope)
62 : orb_ (CORBA::ORB::_duplicate (orb)),
63 disp_impl_type_ (disp_impl_type),
64 ace_sched_policy_ (ace_sched_policy),
65 ace_sched_scope_ (ace_sched_scope)
67 Kokyu::DSRT_ConfigInfo config;
69 config.impl_type_ = this->disp_impl_type_;
70 config.sched_policy_ = ace_sched_policy_;
71 config.sched_scope_ = ace_sched_scope_;
73 Kokyu::DSRT_Dispatcher_Factory<MIF_Scheduler_Traits>::DSRT_Dispatcher_Auto_Ptr
74 tmp( Kokyu::DSRT_Dispatcher_Factory<MIF_Scheduler_Traits>::
75 create_DSRT_dispatcher (config) );
76 kokyu_dispatcher_ = std::move(tmp);
78 CORBA::Object_var object =
79 orb->resolve_initial_references ("RTScheduler_Current");
81 this->current_ =
82 RTScheduling::Current::_narrow (object.in ());
84 IOP::CodecFactory_var codec_factory;
85 CORBA::Object_var obj = orb->resolve_initial_references ("CodecFactory");
87 if (CORBA::is_nil(obj.in ()))
89 ACE_ERROR ((LM_ERROR, "Nil Codec factory\n"));
91 else
93 codec_factory = IOP::CodecFactory::_narrow (obj.in ());
96 IOP::Encoding encoding;
97 encoding.format = IOP::ENCODING_CDR_ENCAPS;
98 encoding.major_version = 1;
99 encoding.minor_version = 2;
101 codec_ = codec_factory->create_codec (encoding);
104 MIF_Scheduler::~MIF_Scheduler ()
106 // delete kokyu_dispatcher_;
109 void
110 MIF_Scheduler::shutdown ()
112 kokyu_dispatcher_->shutdown ();
113 ACE_DEBUG ((LM_DEBUG, "kokyu DSRT dispatcher shutdown\n"));
116 MIF_Scheduling::SchedulingParameterPolicy_ptr
117 MIF_Scheduler::create_scheduling_parameter (const MIF_Scheduling::SchedulingParameter & value)
119 MIF_Scheduling::SchedulingParameterPolicy_ptr sched_param_policy;
120 ACE_NEW_THROW_EX (sched_param_policy,
121 MIF_Sched_Param_Policy,
122 CORBA::NO_MEMORY (
123 CORBA::SystemException::_tao_minor_code (
124 TAO::VMCID,
125 ENOMEM),
126 CORBA::COMPLETED_NO));
128 sched_param_policy->value (value);
130 return sched_param_policy;
134 void
135 MIF_Scheduler::begin_new_scheduling_segment (const RTScheduling::Current::IdType& guid,
136 const char *,
137 CORBA::Policy_ptr sched_policy,
138 CORBA::Policy_ptr)
140 #ifdef KOKYU_DSRT_LOGGING
141 ACE_DEBUG ((LM_DEBUG,
142 "(%t|%T):MIF_Scheduler::begin_new_scheduling_segment enter\n"));
143 #endif
145 #ifdef KOKYU_DSRT_LOGGING
146 int int_guid;
147 ACE_OS::memcpy (&int_guid,
148 guid.get_buffer (),
149 guid.length ());
150 ACE_DEBUG ((LM_DEBUG, "(%t|%T): guid is %d\n", int_guid));
151 #endif
153 MIF_Scheduler_Traits::QoSDescriptor_t qos;
154 MIF_Scheduling::SchedulingParameterPolicy_var sched_param_policy =
155 MIF_Scheduling::SchedulingParameterPolicy::_narrow (sched_policy);
157 MIF_Scheduling::SchedulingParameter_var sched_param = sched_param_policy->value ();
158 CORBA::Short importance = sched_param->importance;
159 qos.importance_ = importance;
160 kokyu_dispatcher_->schedule (guid, qos);
162 #ifdef KOKYU_DSRT_LOGGING
163 ACE_DEBUG ((LM_DEBUG,
164 "(%t|%T):MIF_Scheduler::begin_new_scheduling_segment exit\n"));
165 #endif
168 void
169 MIF_Scheduler::begin_nested_scheduling_segment (const RTScheduling::Current::IdType &guid,
170 const char *name,
171 CORBA::Policy_ptr sched_param,
172 CORBA::Policy_ptr implicit_sched_param)
174 this->begin_new_scheduling_segment (guid,
175 name,
176 sched_param,
177 implicit_sched_param);
180 void
181 MIF_Scheduler::update_scheduling_segment (const RTScheduling::Current::IdType& guid,
182 const char* name,
183 CORBA::Policy_ptr sched_policy,
184 CORBA::Policy_ptr implicit_sched_param)
186 ACE_UNUSED_ARG ((name));
187 ACE_UNUSED_ARG ((implicit_sched_param));
189 #ifdef KOKYU_DSRT_LOGGING
190 int int_guid ;
191 ACE_OS::memcpy (&int_guid,
192 guid.get_buffer (),
193 guid.length ());
194 ACE_DEBUG ((LM_DEBUG, "(%t|%T): update_sched_seg::guid is %d\n", int_guid));
195 #endif
197 MIF_Scheduling::SchedulingParameterPolicy_var sched_param_policy =
198 MIF_Scheduling::SchedulingParameterPolicy::_narrow (sched_policy);
200 MIF_Scheduling::SchedulingParameter_var sched_param = sched_param_policy->value ();
201 CORBA::Short importance = sched_param->importance;
203 MIF_Scheduler_Traits::QoSDescriptor_t qos;
204 qos.importance_ = importance;
206 kokyu_dispatcher_->update_schedule (guid, qos);
209 void
210 MIF_Scheduler::end_scheduling_segment (
211 const RTScheduling::Current::IdType &guid,
212 const char *)
214 #ifdef KOKYU_DSRT_LOGGING
215 int int_guid;
216 ACE_OS::memcpy (&int_guid,
217 guid.get_buffer (),
218 guid.length ());
219 ACE_DEBUG ((LM_DEBUG, "(%t|%T) call to end_sched_segment for guid %d\n", int_guid));
220 #endif
222 kokyu_dispatcher_->cancel_schedule (guid);
225 void
226 MIF_Scheduler::end_nested_scheduling_segment (
227 const RTScheduling::Current::IdType &,
228 const char *,
229 CORBA::Policy_ptr)
234 void
235 MIF_Scheduler::send_request (PortableInterceptor::ClientRequestInfo_ptr ri)
237 Kokyu::Svc_Ctxt_DSRT_QoS sc_qos;
239 CORBA::String_var operation = ri->operation ();
241 #ifdef KOKYU_DSRT_LOGGING
242 ACE_DEBUG ((LM_DEBUG,
243 "(%t|%T): send_request "
244 "from \"%s\"\n",
245 operation.in ()));
246 #endif
248 // Make the context to send the context to the target
249 IOP::ServiceContext sc;
250 sc.context_id = Client_Interceptor::SchedulingInfo;
252 CORBA::Policy_var sched_policy =
253 this->current_->scheduling_parameter();
255 RTScheduling::Current::IdType_var guid = this->current_->id ();
257 ACE_OS::memcpy (&guid,
258 guid->get_buffer (),
259 guid->length ());
261 CORBA::Short importance;
262 if (CORBA::is_nil (sched_policy.in ()))
264 importance = 0;
266 else
268 MIF_Scheduling::SchedulingParameterPolicy_var sched_param_policy =
269 MIF_Scheduling::SchedulingParameterPolicy::_narrow (sched_policy.in ());
271 MIF_Scheduling::SchedulingParameter_var sched_param = sched_param_policy->value ();
272 importance = sched_param->importance;
274 #ifdef KOKYU_DSRT_LOGGING
275 int int_guid;
276 ACE_OS::memcpy (&int_guid,
277 guid->get_buffer (),
278 guid->length ());
279 ACE_DEBUG ((LM_DEBUG,
280 "(%t|%T): send_request importance from current = %d, guid = %d\n",
281 importance, int_guid));
282 #endif
284 //Fill the guid in the SC Qos struct
285 sc_qos.guid.length (guid->length ());
286 guid_copy (sc_qos.guid, guid.in ());
287 sc_qos.importance = importance;
288 CORBA::Any sc_qos_as_any;
289 sc_qos_as_any <<= sc_qos;
291 CORBA::OctetSeq_var cdtmp = codec_->encode (sc_qos_as_any);
292 sc.context_data = cdtmp.in ();
294 #ifdef KOKYU_DSRT_LOGGING
295 ACE_DEBUG ((LM_DEBUG,
296 ACE_TEXT ("(%t|%T): send_request : about to add sched SC\n")));
297 #endif
299 // Add this context to the service context list.
300 ri->add_request_service_context (sc, 0);
304 #ifdef KOKYU_DSRT_LOGGING
305 ACE_DEBUG ((LM_DEBUG,
306 ACE_TEXT ("(%t|%T): send_request : ")
307 ACE_TEXT ("about to call scheduler to inform block\n")));
308 #endif
310 kokyu_dispatcher_->update_schedule (guid.in (),
311 Kokyu::BLOCK);
313 #ifdef KOKYU_DSRT_LOGGING
314 ACE_DEBUG ((LM_DEBUG,
315 ACE_TEXT ("(%t|%T): send_request interceptor done\n")));
316 #endif
319 void
320 MIF_Scheduler::receive_request (PortableInterceptor::ServerRequestInfo_ptr ri,
321 RTScheduling::Current::IdType_out guid_out,
322 CORBA::String_out /*name*/,
323 CORBA::Policy_out sched_param_out,
324 CORBA::Policy_out /*implicit_sched_param_out*/)
326 const Kokyu::Svc_Ctxt_DSRT_QoS* sc_qos_ptr = 0;
328 #ifdef KOKYU_DSRT_LOGGING
329 ACE_DEBUG ((LM_DEBUG, "(%t|%T):entered MIF_Scheduler::receive_request\n"));
330 #endif
332 RTScheduling::Current::IdType guid;
334 CORBA::String_var operation = ri->operation ();
336 #ifdef KOKYU_DSRT_LOGGING
337 ACE_DEBUG ((LM_DEBUG,
338 "(%t|%T): receive_request from "
339 "\"%s\"\n",
340 operation.in ()));
341 #endif
343 // Ignore the "_is_a" operation since it may have been invoked
344 // locally on the server side as a side effect of another call,
345 // meaning that the client hasn't added the service context yet.
346 if (ACE_OS::strcmp ("_is_a", operation.in ()) == 0)
347 return;
349 IOP::ServiceContext_var sc =
350 ri->get_request_service_context (Server_Interceptor::SchedulingInfo);
352 CORBA::Short importance;
354 if (sc.ptr () == 0)
356 importance = 0;
358 else
360 CORBA::OctetSeq oc_seq = CORBA::OctetSeq (sc->context_data.length (),
361 sc->context_data.length (),
362 sc->context_data.get_buffer (),
364 CORBA::Any sc_qos_as_any;
365 CORBA::Any_var scqostmp = codec_->decode (oc_seq);
366 sc_qos_as_any = scqostmp.in ();
367 //Don't store in a _var, since >>= returns a pointer to an
368 //internal buffer and we are not supposed to free it.
369 sc_qos_as_any >>= sc_qos_ptr;
371 importance = sc_qos_ptr->importance;
372 guid.length (sc_qos_ptr->guid.length ());
373 guid_copy (guid, sc_qos_ptr->guid);
375 ACE_NEW (guid_out.ptr (),
376 RTScheduling::Current::IdType);
377 guid_out.ptr ()->length (guid.length ());
378 *(guid_out.ptr ()) = guid;
380 #ifdef KOKYU_DSRT_LOGGING
381 int int_guid;
382 ACE_OS::memcpy (&int_guid,
383 guid.get_buffer (),
384 guid.length ());
385 ACE_DEBUG ((LM_DEBUG,
386 "(%t|%T): Importance = %d, guid = %d in recvd service context\n",
387 importance, int_guid));
388 #endif
390 MIF_Scheduling::SchedulingParameter sched_param;
391 sched_param.importance = importance;
392 sched_param_out = this->create_scheduling_parameter (sched_param);
395 MIF_Scheduler_Traits::QoSDescriptor_t qos;
396 qos.importance_ = importance;
397 this->kokyu_dispatcher_->schedule (guid, qos);
399 #ifdef KOKYU_DSRT_LOGGING
400 ACE_DEBUG ((LM_DEBUG, "(%t|%T): receive_request interceptor done\n"));
401 #endif
405 void
406 MIF_Scheduler::send_reply (PortableInterceptor::ServerRequestInfo_ptr ri)
408 CORBA::Short importance = 0;
409 Kokyu::Svc_Ctxt_DSRT_QoS sc_qos;
411 CORBA::String_var operation = ri->operation ();
413 #ifdef KOKYU_DSRT_LOGGING
414 ACE_DEBUG ((LM_DEBUG,
415 "(%t|%T): send_reply from \"%s\"\n",
416 ri->operation ()));
417 #endif
419 // Make the context to send the context to the target
420 IOP::ServiceContext sc;
421 sc.context_id = Server_Interceptor::SchedulingInfo;
423 CORBA::Policy_var sched_policy =
424 this->current_->scheduling_parameter();
426 RTScheduling::Current::IdType_var guid = this->current_->id ();
428 if (CORBA::is_nil (sched_policy.in ()))
430 #ifdef KOKYU_DSRT_LOGGING
431 ACE_DEBUG ((LM_DEBUG,
432 "(%t|%T): sched_policy nil. ",
433 "importance not set in sched params\n"));
434 #endif
435 importance = 0;
437 else
439 #ifdef KOKYU_DSRT_LOGGING
440 ACE_DEBUG ((LM_DEBUG,
441 "(%t|%T):sched_policy not nil. ",
442 "importance set in sched params\n"));
443 #endif
444 MIF_Scheduling::SchedulingParameterPolicy_var sched_param_policy =
445 MIF_Scheduling::SchedulingParameterPolicy::_narrow (sched_policy.in ());
447 MIF_Scheduling::SchedulingParameter_var sched_param = sched_param_policy->value ();
449 importance = sched_param->importance;
451 //Fill the guid in the SC Qos struct
452 sc_qos.guid.length (guid->length ());
453 guid_copy (sc_qos.guid, guid.in ());
454 sc_qos.importance = importance;
455 CORBA::Any sc_qos_as_any;
456 sc_qos_as_any <<= sc_qos;
458 CORBA::OctetSeq_var cdtmp = codec_->encode (sc_qos_as_any);
459 sc.context_data = cdtmp.in ();
461 // Add this context to the service context list.
462 ri->add_reply_service_context (sc, 1);
464 #ifdef KOKYU_DSRT_LOGGING
465 ACE_DEBUG ((LM_DEBUG, "(%t|%T):reply sc added\n"));
466 #endif
469 kokyu_dispatcher_->update_schedule (guid.in (),
470 Kokyu::BLOCK);
472 #ifdef KOKYU_DSRT_LOGGING
473 ACE_DEBUG ((LM_DEBUG, "(%t|%T): send_reply interceptor done\n"));
474 #endif
477 void
478 MIF_Scheduler::send_exception (PortableInterceptor::ServerRequestInfo_ptr ri)
480 send_reply (ri);
483 void
484 MIF_Scheduler::send_poll (PortableInterceptor::ClientRequestInfo_ptr)
488 void
489 MIF_Scheduler::send_other (PortableInterceptor::ServerRequestInfo_ptr ri)
491 send_reply (ri);
494 void
495 MIF_Scheduler::receive_reply (PortableInterceptor::ClientRequestInfo_ptr ri)
497 RTScheduling::Current::IdType guid;
498 CORBA::Short importance=0;
500 CORBA::String_var operation = ri->operation ();
502 CORBA::Object_var target = ri->target ();
504 ACE_CString opname = operation.in ();
505 #ifdef KOKYU_DSRT_LOGGING
506 ACE_DEBUG ((LM_DEBUG,
507 "(%t|%T):receive_reply from "
508 "\"%s\"\n",
509 opname.c_str ()));
510 #endif
512 // Check that the reply service context was received as
513 // expected.
514 IOP::ServiceContext_var sc =
515 ri->get_reply_service_context (Client_Interceptor::SchedulingInfo);
517 if (sc.ptr () == 0)
519 importance = 0;
521 else
523 CORBA::OctetSeq oc_seq = CORBA::OctetSeq (sc->context_data.length (),
524 sc->context_data.length (),
525 sc->context_data.get_buffer (),
528 //Don't store in a _var, since >>= returns a pointer to an internal buffer
529 //and we are not supposed to free it.
530 const Kokyu::Svc_Ctxt_DSRT_QoS* sc_qos_ptr = 0;
531 CORBA::Any sc_qos_as_any;
532 CORBA::Any_var scqostmp = codec_->decode (oc_seq);
533 sc_qos_as_any = scqostmp.in ();
534 sc_qos_as_any >>= sc_qos_ptr;
535 importance = sc_qos_ptr->importance;
536 guid.length (sc_qos_ptr->guid.length ());
537 guid_copy (guid, sc_qos_ptr->guid);
539 #ifdef KOKYU_DSRT_LOGGING
540 ACE_DEBUG ((LM_DEBUG,
541 "(%t|%T): Importance = %d in recvd service context\n",
542 importance ));
543 #endif
546 MIF_Scheduler_Traits::QoSDescriptor_t qos;
547 qos.importance_ = importance;
548 this->kokyu_dispatcher_->schedule (guid, qos);
551 void
552 MIF_Scheduler::receive_exception (PortableInterceptor::ClientRequestInfo_ptr ri)
554 receive_reply (ri);
557 void
558 MIF_Scheduler::receive_other (PortableInterceptor::ClientRequestInfo_ptr ri)
560 receive_reply (ri);
563 void
564 MIF_Scheduler::cancel (const RTScheduling::Current::IdType &)
568 CORBA::PolicyList*
569 MIF_Scheduler::scheduling_policies ()
571 throw CORBA::NO_IMPLEMENT ();
574 void
575 MIF_Scheduler::scheduling_policies (const CORBA::PolicyList &)
577 throw CORBA::NO_IMPLEMENT ();
580 CORBA::PolicyList*
581 MIF_Scheduler::poa_policies ()
583 throw CORBA::NO_IMPLEMENT ();
586 char *
587 MIF_Scheduler::scheduling_discipline_name ()
589 throw CORBA::NO_IMPLEMENT ();
592 RTScheduling::ResourceManager_ptr
593 MIF_Scheduler::create_resource_manager (const char *,
594 CORBA::Policy_ptr)
596 throw CORBA::NO_IMPLEMENT ();
599 void
600 MIF_Scheduler::set_scheduling_parameter (PortableServer::Servant &,
601 const char *,
602 CORBA::Policy_ptr)
604 throw CORBA::NO_IMPLEMENT ();