Merge pull request #1551 from DOCGroup/plm_jira_333
[ACE_TAO.git] / TAO / examples / RTScheduling / MIF_Scheduler / MIF_Scheduler.cpp
blobd1a9f4b6ba28e767f199235fb4d00b3f2ba0f557
1 #include "MIF_Scheduler.h"
2 #include "ace/Atomic_Op.h"
3 #include "tao/RTScheduling/Request_Interceptor.h"
4 #include "test.h"
6 ACE_Atomic_Op<TAO_SYNCH_MUTEX, long> server_guid_counter;
8 DT::DT (TAO_SYNCH_MUTEX &lock,
9 int guid)
10 : dt_cond_ (lock),
11 guid_ (guid),
12 eligible_ (0)
16 void
17 DT::suspend (void)
19 eligible_ = 0;
20 while (!eligible_)
21 this->dt_cond_.wait ();
24 void
25 DT::resume (void)
27 eligible_ = 1;
28 this->dt_cond_.signal ();
31 CORBA::Short
32 Segment_Sched_Param_Policy::importance (void)
34 return this->importance_;
37 void
38 Segment_Sched_Param_Policy::importance (CORBA::Short importance)
40 this->importance_ = importance;
43 CORBA::Policy_ptr
44 Segment_Sched_Param_Policy::copy (void)
46 Segment_Sched_Param_Policy *copy = 0;
47 ACE_NEW_THROW_EX (copy,
48 Segment_Sched_Param_Policy,
49 CORBA::NO_MEMORY ());
51 copy->importance (this->importance_);
53 return copy;
56 CORBA::PolicyType
57 Segment_Sched_Param_Policy::policy_type (void)
59 return 0;
62 void
63 Segment_Sched_Param_Policy::destroy (void)
67 MIF_Scheduler::MIF_Scheduler (CORBA::ORB_ptr orb)
68 : wait_cond_ (lock_),
69 wait_ (0)
71 try
73 CORBA::Object_var object =
74 orb->resolve_initial_references ("RTScheduler_Current");
76 this->current_ =
77 RTScheduling::Current::_narrow (object.in ());
79 object =
80 orb->resolve_initial_references ("PriorityMappingManager");
82 this->mapping_manager_ =
83 RTCORBA::PriorityMappingManager::_narrow (object.in ());
85 catch (const CORBA::Exception& ex)
87 ex._tao_print_exception ("Caught exception:");
91 MIF_Scheduler::~MIF_Scheduler (void)
93 while (free_que_.message_count () > 0)
95 DT *dt = 0;
96 ACE_Message_Block *msg = 0;
97 free_que_.dequeue_head (msg);
98 dt = dynamic_cast<DT*> (msg);
99 delete dt;
103 void
104 MIF_Scheduler::incr_thr_count (void)
106 lock_.acquire ();
107 wait_++;
108 lock_.release ();
111 void
112 MIF_Scheduler::wait (void)
114 lock_.acquire ();
115 while (wait_ > 0)
116 wait_cond_.wait ();
118 ACE_DEBUG ((LM_DEBUG,
119 "After Wait %d\n",
120 wait_));
122 lock_.release ();
125 void
126 MIF_Scheduler::resume_main (void)
128 wait_--;
129 wait_cond_.signal ();
132 MIF_Scheduling::SegmentSchedulingParameterPolicy_ptr
133 MIF_Scheduler::create_segment_scheduling_parameter (CORBA::Short importance)
135 MIF_Scheduling::SegmentSchedulingParameterPolicy_ptr segment_policy;
136 ACE_NEW_THROW_EX (segment_policy,
137 Segment_Sched_Param_Policy,
138 CORBA::NO_MEMORY (
139 CORBA::SystemException::_tao_minor_code (
140 TAO::VMCID,
141 ENOMEM),
142 CORBA::COMPLETED_NO));
144 segment_policy->importance (importance);
146 return segment_policy;
150 void
151 MIF_Scheduler::begin_new_scheduling_segment (const RTScheduling::Current::IdType &/*guid*/,
152 const char *,
153 CORBA::Policy_ptr sched_policy,
154 CORBA::Policy_ptr)
156 size_t count = 0;
157 RTScheduling::Current::IdType_var guid = this->current_->id ();
159 ACE_OS::memcpy (&count,
160 guid->get_buffer (),
161 guid->length ());
164 MIF_Scheduling::SegmentSchedulingParameterPolicy_var sched_param =
165 MIF_Scheduling::SegmentSchedulingParameterPolicy::_narrow (sched_policy);
167 CORBA::Short desired_priority = sched_param->importance ();
169 if (TAO_debug_level > 0)
170 ACE_DEBUG ((LM_DEBUG,
171 "%t MIF_Scheduler::begin_scheduling_segment - Importance %d\n",
172 desired_priority));
175 if (desired_priority != 100)
177 //NOT Main Thread
178 DT* new_dt = 0;
179 ACE_NEW (new_dt,
180 DT (this->lock_,
181 count));
183 new_dt->msg_priority (desired_priority);
184 lock_.acquire ();
185 ready_que_.enqueue_prio (new_dt);
186 resume_main ();
187 new_dt->suspend ();
188 lock_.release ();
192 void
193 MIF_Scheduler::begin_nested_scheduling_segment (const RTScheduling::Current::IdType &guid,
194 const char *name,
195 CORBA::Policy_ptr sched_param,
196 CORBA::Policy_ptr implicit_sched_param)
198 this->begin_new_scheduling_segment (guid,
199 name,
200 sched_param,
201 implicit_sched_param);
204 void
205 MIF_Scheduler::update_scheduling_segment (const RTScheduling::Current::IdType &/*guid*/,
206 const char* /*name*/,
207 CORBA::Policy_ptr sched_policy,
208 CORBA::Policy_ptr /*implicit_sched_param*/)
210 size_t count = 0;
211 RTScheduling::Current::IdType_var guid = this->current_->id ();
213 ACE_OS::memcpy (&count,
214 guid->get_buffer (),
215 guid->length ());
217 MIF_Scheduling::SegmentSchedulingParameterPolicy_var sched_param =
218 MIF_Scheduling::SegmentSchedulingParameterPolicy::_narrow (sched_policy);
220 CORBA::Short desired_priority = sched_param->importance ();
222 if (TAO_debug_level > 0)
223 ACE_DEBUG ((LM_DEBUG,
224 "%t MIF_Scheduler::update_scheduling_segment - Importance %d\n",
225 desired_priority));
227 DT* new_dt = 0;
228 ACE_NEW (new_dt,
229 DT (this->lock_,
230 count));
232 new_dt->msg_priority (desired_priority);
234 if (ready_que_.message_count () > 0)
236 DT* run_dt;
237 ACE_Message_Block* msg = 0;
238 ready_que_.dequeue_head (msg);
239 run_dt = dynamic_cast<DT*> (msg);
240 if ((desired_priority == 100) || run_dt->msg_priority () >= (unsigned int)desired_priority)
242 ready_que_.enqueue_prio (new_dt);
243 lock_.acquire ();
244 run_dt->resume ();
245 new_dt->suspend ();
246 lock_.release ();
247 free_que_.enqueue_prio (run_dt);
249 else
251 ready_que_.enqueue_prio (run_dt);
252 delete new_dt;
255 else delete new_dt;
258 void
259 MIF_Scheduler::end_scheduling_segment (const RTScheduling::Current::IdType &guid,
260 const char *)
262 size_t count = 0;
263 ACE_OS::memcpy (&count,
264 guid.get_buffer (),
265 guid.length ());
267 ACE_DEBUG ((LM_DEBUG,
268 "MIF_Scheduler::end_scheduling_segment %d\n",
269 count));
271 if (ready_que_.message_count () > 0)
273 DT* run_dt = 0;
274 ACE_Message_Block* msg = 0;
275 ready_que_.dequeue_head (msg);
276 run_dt = dynamic_cast<DT*> (msg);
277 lock_.acquire ();
278 run_dt->resume ();
279 lock_.release ();
280 free_que_.enqueue_prio (run_dt);
284 void
285 MIF_Scheduler::end_nested_scheduling_segment (const RTScheduling::Current::IdType &,
286 const char *,
287 CORBA::Policy_ptr)
291 void
292 MIF_Scheduler::send_request (PortableInterceptor::ClientRequestInfo_ptr request_info)
294 CORBA::Policy_var sched_param = current_->scheduling_parameter ();
296 MIF_Scheduling::SegmentSchedulingParameterPolicy_var sched_param_var =
297 MIF_Scheduling::SegmentSchedulingParameterPolicy::_narrow (sched_param.in ());
299 IOP::ServiceContext srv_con;
300 srv_con.context_id = Client_Interceptor::SchedulingInfo;
302 RTScheduling::Current::IdType_var guid = current_->id ();
304 int guid_length = guid->length ();
306 CORBA::OctetSeq seq_buf (guid_length);
307 seq_buf.length (seq_buf.maximum ());
308 ACE_OS::memcpy (seq_buf.get_buffer (),
309 guid->get_buffer (),
310 guid_length);
312 int cxt_data_length = sizeof (int) + guid_length;
313 srv_con.context_data.length (cxt_data_length);
315 int i = 0;
316 for (;i < guid_length;i++)
318 srv_con.context_data [i] = seq_buf [i];
321 int importance = sched_param_var->importance ();
322 CORBA::OctetSeq int_buf (sizeof (importance));
323 int_buf.length (int_buf.maximum ());
324 ACE_OS::memcpy (int_buf.get_buffer (),
325 &importance,
326 sizeof (importance));
328 int j = 0;
329 for (;i < cxt_data_length;i++)
331 srv_con.context_data [i] = int_buf [j++];
334 request_info->add_request_service_context (srv_con,
337 lock_.acquire ();
338 if (ready_que_.message_count () > 0)
340 int priority;
341 ACE_hthread_t current;
342 ACE_Thread::self (current);
343 if (ACE_Thread::getprio (current, priority) == -1)
344 return;
346 ACE_DEBUG ((LM_DEBUG,
347 "Initial thread priority is %d %d\n",
348 priority,
349 ACE_DEFAULT_THREAD_PRIORITY));
351 RTCORBA::Priority rtpriority;
352 RTCORBA::PriorityMapping* pm = this->mapping_manager_->mapping ();
353 if (pm->to_CORBA(priority + 1, rtpriority))
355 current_->the_priority (rtpriority);
357 ACE_Thread::self (current);
358 if (ACE_Thread::getprio (current, priority) == -1)
359 return;
361 ACE_DEBUG ((LM_DEBUG,
362 "Bumped thread priority is %d\n",
363 priority));
366 DT* run_dt = 0;
367 ACE_Message_Block* msg = 0;
368 ready_que_.dequeue_head (msg);
369 run_dt = dynamic_cast<DT*> (msg);
370 run_dt->resume ();
371 free_que_.enqueue_prio (run_dt);
373 lock_.release ();
377 void
378 MIF_Scheduler::receive_request (PortableInterceptor::ServerRequestInfo_ptr request_info,
379 RTScheduling::Current::IdType_out guid_out,
380 CORBA::String_out,
381 CORBA::Policy_out sched_param_out,
382 CORBA::Policy_out /*implicit_sched_param*/)
385 if (TAO_debug_level > 0)
386 ACE_DEBUG ((LM_DEBUG,
387 "MIF_Scheduler::receive_request\n"));
389 IOP::ServiceContext_var serv_cxt =
390 request_info->get_request_service_context (Server_Interceptor::SchedulingInfo);
392 if (serv_cxt != 0)
394 ACE_DEBUG ((LM_DEBUG,
395 "Got scheduling info\n"));
397 RTScheduling::Current::IdType* guid;
398 ACE_NEW (guid,
399 RTScheduling::Current::IdType);
401 guid->length (sizeof(size_t));
402 ACE_OS::memcpy (guid->get_buffer (),
403 serv_cxt->context_data.get_buffer (),
404 sizeof (size_t));
406 size_t gu_id;
407 ACE_OS::memcpy (&gu_id,
408 guid->get_buffer (),
409 guid->length ());
411 ACE_DEBUG ((LM_DEBUG,
412 "MIF_Scheduler::receive_request %d\n",
413 gu_id));
416 CORBA::OctetSeq int_buf (sizeof (long));
417 int_buf.length (int_buf.maximum ());
418 int i = sizeof (long);
419 for (unsigned int j = 0;j < sizeof (int);j++)
421 int_buf [j] = serv_cxt->context_data [i++];
424 int importance = 0;
425 ACE_OS::memcpy (&importance,
426 int_buf.get_buffer (),
427 sizeof (importance));
429 guid_out = guid;
430 sched_param_out = DT_TEST::instance ()->scheduler ()->create_segment_scheduling_parameter (importance);
432 if (TAO_debug_level > 0)
433 ACE_DEBUG ((LM_DEBUG,
434 "%t The Guid is %d Importance is %d\n",
435 gu_id,
436 importance));
438 DT* new_dt;
439 ACE_NEW (new_dt,
440 DT (this->lock_,
441 gu_id));
443 new_dt->msg_priority (importance);
444 lock_.acquire ();
445 ready_que_.enqueue_prio (new_dt);
446 new_dt->suspend ();
447 lock_.release ();
451 void
452 MIF_Scheduler::send_reply (PortableInterceptor::ServerRequestInfo_ptr)
455 RTScheduling::Current::IdType_var guid = current_->id ();
457 size_t count;
458 ACE_OS::memcpy (&count,
459 guid->get_buffer (),
460 guid->length ());
462 ACE_DEBUG ((LM_DEBUG,
463 "MIF_Scheduler::send_reply %d\n",
464 count));
466 if (ready_que_.message_count () > 0)
468 DT* run_dt = 0;
469 ACE_Message_Block* msg = 0;
470 ready_que_.dequeue_head (msg);
471 run_dt = dynamic_cast<DT*> (msg);
472 lock_.acquire ();
473 run_dt->resume ();
474 lock_.release ();
475 free_que_.enqueue_prio (run_dt);
479 void
480 MIF_Scheduler::send_exception (PortableInterceptor::ServerRequestInfo_ptr)
482 if (ready_que_.message_count () > 0)
484 DT* run_dt = 0;
485 ACE_Message_Block* msg = 0;
486 ready_que_.dequeue_head (msg);
487 run_dt = dynamic_cast<DT*> (msg);
488 lock_.acquire ();
489 run_dt->resume ();
490 lock_.release ();
491 free_que_.enqueue_prio (run_dt);
495 void
496 MIF_Scheduler::send_other (PortableInterceptor::ServerRequestInfo_ptr)
498 if (TAO_debug_level > 0)
500 RTScheduling::Current::IdType_var guid = current_->id ();
502 size_t count;
503 ACE_OS::memcpy (&count,
504 guid->get_buffer (),
505 guid->length ());
508 ACE_DEBUG ((LM_DEBUG,
509 "MIF_Scheduler::send_other %d\n",
510 count));
513 if (ready_que_.message_count () > 0)
515 DT* run_dt;
516 ACE_Message_Block* msg;
517 ready_que_.dequeue_head (msg);
518 run_dt = dynamic_cast<DT*> (msg);
519 lock_.acquire ();
520 run_dt->resume ();
521 lock_.release ();
522 free_que_.enqueue_prio (run_dt);
526 void
527 MIF_Scheduler::send_poll (PortableInterceptor::ClientRequestInfo_ptr)
531 void
532 MIF_Scheduler::receive_reply (PortableInterceptor::ClientRequestInfo_ptr)
534 CORBA::Policy_var sched_param = current_->scheduling_parameter ();
536 MIF_Scheduling::SegmentSchedulingParameterPolicy_var sched_param_var =
537 MIF_Scheduling::SegmentSchedulingParameterPolicy::_narrow (sched_param.in ());
539 int importance = sched_param_var->importance ();
541 RTScheduling::Current::IdType_var guid = current_->id ();
543 size_t gu_id;
544 ACE_OS::memcpy (&gu_id,
545 guid->get_buffer (),
546 guid->length ());
548 if (TAO_debug_level > 0)
549 ACE_DEBUG ((LM_DEBUG,
550 "MIF_Scheduler::receive_reply Guid = %d Imp = %d\n",
551 gu_id,
552 importance));
555 DT* new_dt;
556 ACE_NEW (new_dt,
557 DT (this->lock_,
558 gu_id));
560 new_dt->msg_priority (importance);
562 lock_.acquire ();
563 ready_que_.enqueue_prio (new_dt);
564 int priority;
565 ACE_hthread_t current;
566 ACE_Thread::self (current);
567 if (ACE_Thread::getprio (current, priority) == -1)
568 return;
570 RTCORBA::Priority rtpriority;
571 RTCORBA::PriorityMapping* pm = this->mapping_manager_->mapping ();
572 if (pm->to_CORBA(priority - 1, rtpriority))
574 current_->the_priority (rtpriority);
577 new_dt->suspend ();
578 lock_.release ();
581 void
582 MIF_Scheduler::receive_exception (PortableInterceptor::ClientRequestInfo_ptr)
584 CORBA::Policy_var sched_param = current_->scheduling_parameter ();
586 MIF_Scheduling::SegmentSchedulingParameterPolicy_var sched_param_var =
587 MIF_Scheduling::SegmentSchedulingParameterPolicy::_narrow (sched_param.in ());
589 int importance = sched_param_var->importance ();
591 RTScheduling::Current::IdType_var guid = current_->id ();
593 size_t gu_id;
594 ACE_OS::memcpy (&gu_id,
595 guid->get_buffer (),
596 guid->length ());
598 DT* new_dt;
599 ACE_NEW (new_dt,
600 DT (this->lock_,
601 gu_id));
603 new_dt->msg_priority (importance);
605 lock_.acquire ();
606 ready_que_.enqueue_prio (new_dt);
608 int priority;
609 ACE_hthread_t current;
610 ACE_Thread::self (current);
611 if (ACE_Thread::getprio (current, priority) == -1)
612 return;
614 RTCORBA::Priority rtpriority;
615 RTCORBA::PriorityMapping* pm = this->mapping_manager_->mapping ();
616 if (pm->to_CORBA(priority - 1, rtpriority))
618 current_->the_priority (rtpriority);
621 new_dt->suspend ();
622 lock_.release ();
625 void
626 MIF_Scheduler::receive_other (PortableInterceptor::ClientRequestInfo_ptr)
628 CORBA::Policy_var sched_param = current_->scheduling_parameter ();
630 MIF_Scheduling::SegmentSchedulingParameterPolicy_var sched_param_var =
631 MIF_Scheduling::SegmentSchedulingParameterPolicy::_narrow (sched_param.in ());
633 int importance = sched_param_var->importance ();
635 RTScheduling::Current::IdType_var guid = current_->id ();
637 size_t gu_id;
638 ACE_OS::memcpy (&gu_id,
639 guid->get_buffer (),
640 guid->length ());
642 DT* new_dt = 0;
643 ACE_NEW (new_dt,
644 DT (this->lock_,
645 gu_id));
647 new_dt->msg_priority (importance);
649 lock_.acquire ();
650 ready_que_.enqueue_prio (new_dt);
652 int priority;
653 ACE_hthread_t current;
654 ACE_Thread::self (current);
655 if (ACE_Thread::getprio (current, priority) == -1)
656 return;
658 RTCORBA::Priority rtpriority;
659 RTCORBA::PriorityMapping* pm = this->mapping_manager_->mapping ();
660 if (pm->to_CORBA(priority - 1, rtpriority))
662 current_->the_priority (rtpriority);
665 new_dt->suspend ();
666 lock_.release ();
669 void
670 MIF_Scheduler::cancel (const RTScheduling::Current::IdType &)
674 CORBA::PolicyList*
675 MIF_Scheduler::scheduling_policies (void)
677 return 0;
680 void
681 MIF_Scheduler::scheduling_policies (const CORBA::PolicyList &)
685 CORBA::PolicyList*
686 MIF_Scheduler::poa_policies (void)
688 return 0;
691 char *
692 MIF_Scheduler::scheduling_discipline_name (void)
694 return 0;
697 RTScheduling::ResourceManager_ptr
698 MIF_Scheduler::create_resource_manager (const char *,
699 CORBA::Policy_ptr)
701 return 0;
704 void
705 MIF_Scheduler::set_scheduling_parameter (PortableServer::Servant &,
706 const char *,
707 CORBA::Policy_ptr)