=default for generated implementation copy ctor
[ACE_TAO.git] / TAO / examples / RTScheduling / MIF_Scheduler / MIF_Scheduler.cpp
blob406ec0cccad76b11ae8a5a6153d999d362ea6456
1 #include "MIF_Scheduler.h"
2 #include "tao/RTScheduling/Request_Interceptor.h"
3 #include "test.h"
4 #include <atomic>
6 std::atomic<long> server_guid_counter;
8 DT::DT (TAO_SYNCH_MUTEX &lock,
9 int guid)
10 : dt_cond_ (lock),
11 guid_ (guid),
12 eligible_ (0)
16 void
17 DT::suspend ()
19 eligible_ = 0;
20 while (!eligible_)
21 this->dt_cond_.wait ();
24 void
25 DT::resume ()
27 eligible_ = 1;
28 this->dt_cond_.signal ();
31 CORBA::Short
32 Segment_Sched_Param_Policy::importance ()
34 return this->importance_;
37 void
38 Segment_Sched_Param_Policy::importance (CORBA::Short importance)
40 this->importance_ = importance;
43 CORBA::Policy_ptr
44 Segment_Sched_Param_Policy::copy ()
46 Segment_Sched_Param_Policy *copy = 0;
47 ACE_NEW_THROW_EX (copy,
48 Segment_Sched_Param_Policy,
49 CORBA::NO_MEMORY ());
51 copy->importance (this->importance_);
53 return copy;
56 CORBA::PolicyType
57 Segment_Sched_Param_Policy::policy_type ()
59 return 0;
62 void
63 Segment_Sched_Param_Policy::destroy ()
67 MIF_Scheduler::MIF_Scheduler (CORBA::ORB_ptr orb)
68 : wait_cond_ (lock_),
69 wait_ (0)
71 try
73 CORBA::Object_var object =
74 orb->resolve_initial_references ("RTScheduler_Current");
76 this->current_ =
77 RTScheduling::Current::_narrow (object.in ());
79 object =
80 orb->resolve_initial_references ("PriorityMappingManager");
82 this->mapping_manager_ =
83 RTCORBA::PriorityMappingManager::_narrow (object.in ());
85 catch (const CORBA::Exception& ex)
87 ex._tao_print_exception ("Caught exception:");
91 MIF_Scheduler::~MIF_Scheduler ()
93 while (free_que_.message_count () > 0)
95 DT *dt = 0;
96 ACE_Message_Block *msg = 0;
97 free_que_.dequeue_head (msg);
98 dt = dynamic_cast<DT*> (msg);
99 delete dt;
103 void
104 MIF_Scheduler::incr_thr_count ()
106 lock_.acquire ();
107 wait_++;
108 lock_.release ();
111 void
112 MIF_Scheduler::wait ()
114 lock_.acquire ();
115 while (wait_ > 0)
116 wait_cond_.wait ();
118 ACE_DEBUG ((LM_DEBUG,
119 "After Wait %d\n",
120 wait_));
122 lock_.release ();
125 void
126 MIF_Scheduler::resume_main ()
128 wait_--;
129 wait_cond_.signal ();
132 MIF_Scheduling::SegmentSchedulingParameterPolicy_ptr
133 MIF_Scheduler::create_segment_scheduling_parameter (CORBA::Short importance)
135 MIF_Scheduling::SegmentSchedulingParameterPolicy_ptr segment_policy;
136 ACE_NEW_THROW_EX (segment_policy,
137 Segment_Sched_Param_Policy,
138 CORBA::NO_MEMORY (
139 CORBA::SystemException::_tao_minor_code (
140 TAO::VMCID,
141 ENOMEM),
142 CORBA::COMPLETED_NO));
144 segment_policy->importance (importance);
146 return segment_policy;
150 void
151 MIF_Scheduler::begin_new_scheduling_segment (const RTScheduling::Current::IdType &/*guid*/,
152 const char *,
153 CORBA::Policy_ptr sched_policy,
154 CORBA::Policy_ptr)
156 size_t count = 0;
157 RTScheduling::Current::IdType_var guid = this->current_->id ();
159 ACE_OS::memcpy (&count,
160 guid->get_buffer (),
161 guid->length ());
164 MIF_Scheduling::SegmentSchedulingParameterPolicy_var sched_param =
165 MIF_Scheduling::SegmentSchedulingParameterPolicy::_narrow (sched_policy);
167 CORBA::Short desired_priority = sched_param->importance ();
169 if (TAO_debug_level > 0)
170 ACE_DEBUG ((LM_DEBUG,
171 "%t MIF_Scheduler::begin_scheduling_segment - Importance %d\n",
172 desired_priority));
175 if (desired_priority != 100)
177 //NOT Main Thread
178 DT* new_dt = 0;
179 ACE_NEW (new_dt,
180 DT (this->lock_,
181 count));
183 new_dt->msg_priority (desired_priority);
184 lock_.acquire ();
185 ready_que_.enqueue_prio (new_dt);
186 resume_main ();
187 new_dt->suspend ();
188 lock_.release ();
192 void
193 MIF_Scheduler::begin_nested_scheduling_segment (const RTScheduling::Current::IdType &guid,
194 const char *name,
195 CORBA::Policy_ptr sched_param,
196 CORBA::Policy_ptr implicit_sched_param)
198 this->begin_new_scheduling_segment (guid,
199 name,
200 sched_param,
201 implicit_sched_param);
204 void
205 MIF_Scheduler::update_scheduling_segment (const RTScheduling::Current::IdType &/*guid*/,
206 const char* /*name*/,
207 CORBA::Policy_ptr sched_policy,
208 CORBA::Policy_ptr /*implicit_sched_param*/)
210 size_t count = 0;
211 RTScheduling::Current::IdType_var guid = this->current_->id ();
213 ACE_OS::memcpy (&count,
214 guid->get_buffer (),
215 guid->length ());
217 MIF_Scheduling::SegmentSchedulingParameterPolicy_var sched_param =
218 MIF_Scheduling::SegmentSchedulingParameterPolicy::_narrow (sched_policy);
220 CORBA::Short desired_priority = sched_param->importance ();
222 if (TAO_debug_level > 0)
223 ACE_DEBUG ((LM_DEBUG,
224 "%t MIF_Scheduler::update_scheduling_segment - Importance %d\n",
225 desired_priority));
227 DT* new_dt = 0;
228 ACE_NEW (new_dt,
229 DT (this->lock_,
230 count));
232 new_dt->msg_priority (desired_priority);
234 if (ready_que_.message_count () > 0)
236 DT* run_dt;
237 ACE_Message_Block* msg = 0;
238 ready_que_.dequeue_head (msg);
239 run_dt = dynamic_cast<DT*> (msg);
240 if ((desired_priority == 100) || run_dt->msg_priority () >= (unsigned int)desired_priority)
242 ready_que_.enqueue_prio (new_dt);
243 lock_.acquire ();
244 run_dt->resume ();
245 new_dt->suspend ();
246 lock_.release ();
247 free_que_.enqueue_prio (run_dt);
249 else
251 ready_que_.enqueue_prio (run_dt);
252 delete new_dt;
255 else delete new_dt;
258 void
259 MIF_Scheduler::end_scheduling_segment (const RTScheduling::Current::IdType &guid,
260 const char *)
262 size_t count = 0;
263 ACE_OS::memcpy (&count,
264 guid.get_buffer (),
265 guid.length ());
267 ACE_DEBUG ((LM_DEBUG,
268 "MIF_Scheduler::end_scheduling_segment %d\n",
269 count));
271 if (ready_que_.message_count () > 0)
273 DT* run_dt = 0;
274 ACE_Message_Block* msg = 0;
275 ready_que_.dequeue_head (msg);
276 run_dt = dynamic_cast<DT*> (msg);
277 lock_.acquire ();
278 run_dt->resume ();
279 lock_.release ();
280 free_que_.enqueue_prio (run_dt);
284 void
285 MIF_Scheduler::end_nested_scheduling_segment (const RTScheduling::Current::IdType &,
286 const char *,
287 CORBA::Policy_ptr)
291 void
292 MIF_Scheduler::send_request (PortableInterceptor::ClientRequestInfo_ptr request_info)
294 CORBA::Policy_var sched_param = current_->scheduling_parameter ();
296 MIF_Scheduling::SegmentSchedulingParameterPolicy_var sched_param_var =
297 MIF_Scheduling::SegmentSchedulingParameterPolicy::_narrow (sched_param.in ());
299 IOP::ServiceContext srv_con;
300 srv_con.context_id = Client_Interceptor::SchedulingInfo;
302 RTScheduling::Current::IdType_var guid = current_->id ();
304 int guid_length = guid->length ();
306 CORBA::OctetSeq seq_buf (guid_length);
307 seq_buf.length (seq_buf.maximum ());
308 ACE_OS::memcpy (seq_buf.get_buffer (),
309 guid->get_buffer (),
310 guid_length);
312 int cxt_data_length = sizeof (int) + guid_length;
313 srv_con.context_data.length (cxt_data_length);
315 int i = 0;
316 for (;i < guid_length;i++)
318 srv_con.context_data [i] = seq_buf [i];
321 int importance = sched_param_var->importance ();
322 CORBA::OctetSeq int_buf (sizeof (importance));
323 int_buf.length (int_buf.maximum ());
324 ACE_OS::memcpy (int_buf.get_buffer (),
325 &importance,
326 sizeof (importance));
328 int j = 0;
329 for (;i < cxt_data_length;i++)
331 srv_con.context_data [i] = int_buf [j++];
334 request_info->add_request_service_context (srv_con,
337 lock_.acquire ();
338 if (ready_que_.message_count () > 0)
340 int priority;
341 ACE_hthread_t current;
342 ACE_Thread::self (current);
343 if (ACE_Thread::getprio (current, priority) == -1)
344 return;
346 ACE_DEBUG ((LM_DEBUG,
347 "Initial thread priority is %d %d\n",
348 priority,
349 ACE_DEFAULT_THREAD_PRIORITY));
351 RTCORBA::Priority rtpriority;
352 RTCORBA::PriorityMapping* pm = this->mapping_manager_->mapping ();
353 if (pm->to_CORBA(priority + 1, rtpriority))
355 current_->the_priority (rtpriority);
357 ACE_Thread::self (current);
358 if (ACE_Thread::getprio (current, priority) == -1)
359 return;
361 ACE_DEBUG ((LM_DEBUG,
362 "Bumped thread priority is %d\n",
363 priority));
366 DT* run_dt = 0;
367 ACE_Message_Block* msg = 0;
368 ready_que_.dequeue_head (msg);
369 run_dt = dynamic_cast<DT*> (msg);
370 run_dt->resume ();
371 free_que_.enqueue_prio (run_dt);
373 lock_.release ();
376 void
377 MIF_Scheduler::receive_request (PortableInterceptor::ServerRequestInfo_ptr request_info,
378 RTScheduling::Current::IdType_out guid_out,
379 CORBA::String_out,
380 CORBA::Policy_out sched_param_out,
381 CORBA::Policy_out /*implicit_sched_param*/)
383 if (TAO_debug_level > 0)
384 ACE_DEBUG ((LM_DEBUG,
385 "MIF_Scheduler::receive_request\n"));
387 IOP::ServiceContext_var serv_cxt =
388 request_info->get_request_service_context (Server_Interceptor::SchedulingInfo);
390 if (serv_cxt != 0)
392 ACE_DEBUG ((LM_DEBUG,
393 "Got scheduling info\n"));
395 RTScheduling::Current::IdType* guid;
396 ACE_NEW (guid,
397 RTScheduling::Current::IdType);
399 guid->length (sizeof(size_t));
400 ACE_OS::memcpy (guid->get_buffer (),
401 serv_cxt->context_data.get_buffer (),
402 sizeof (size_t));
404 size_t gu_id;
405 ACE_OS::memcpy (&gu_id,
406 guid->get_buffer (),
407 guid->length ());
409 ACE_DEBUG ((LM_DEBUG,
410 "MIF_Scheduler::receive_request %d\n",
411 gu_id));
414 CORBA::OctetSeq int_buf (sizeof (long));
415 int_buf.length (int_buf.maximum ());
416 int i = sizeof (long);
417 for (unsigned int j = 0;j < sizeof (int);j++)
419 int_buf [j] = serv_cxt->context_data [i++];
422 int importance = 0;
423 ACE_OS::memcpy (&importance,
424 int_buf.get_buffer (),
425 sizeof (importance));
427 guid_out = guid;
428 sched_param_out = DT_TEST::instance ()->scheduler ()->create_segment_scheduling_parameter (importance);
430 if (TAO_debug_level > 0)
431 ACE_DEBUG ((LM_DEBUG,
432 "%t The Guid is %d Importance is %d\n",
433 gu_id,
434 importance));
436 DT* new_dt;
437 ACE_NEW (new_dt,
438 DT (this->lock_,
439 gu_id));
441 new_dt->msg_priority (importance);
442 lock_.acquire ();
443 ready_que_.enqueue_prio (new_dt);
444 new_dt->suspend ();
445 lock_.release ();
449 void
450 MIF_Scheduler::send_reply (PortableInterceptor::ServerRequestInfo_ptr)
452 RTScheduling::Current::IdType_var guid = current_->id ();
454 size_t count;
455 ACE_OS::memcpy (&count,
456 guid->get_buffer (),
457 guid->length ());
459 ACE_DEBUG ((LM_DEBUG,
460 "MIF_Scheduler::send_reply %d\n",
461 count));
463 if (ready_que_.message_count () > 0)
465 DT* run_dt = 0;
466 ACE_Message_Block* msg = 0;
467 ready_que_.dequeue_head (msg);
468 run_dt = dynamic_cast<DT*> (msg);
469 lock_.acquire ();
470 run_dt->resume ();
471 lock_.release ();
472 free_que_.enqueue_prio (run_dt);
476 void
477 MIF_Scheduler::send_exception (PortableInterceptor::ServerRequestInfo_ptr)
479 if (ready_que_.message_count () > 0)
481 DT* run_dt = 0;
482 ACE_Message_Block* msg = 0;
483 ready_que_.dequeue_head (msg);
484 run_dt = dynamic_cast<DT*> (msg);
485 lock_.acquire ();
486 run_dt->resume ();
487 lock_.release ();
488 free_que_.enqueue_prio (run_dt);
492 void
493 MIF_Scheduler::send_other (PortableInterceptor::ServerRequestInfo_ptr)
495 if (TAO_debug_level > 0)
497 RTScheduling::Current::IdType_var guid = current_->id ();
499 size_t count;
500 ACE_OS::memcpy (&count,
501 guid->get_buffer (),
502 guid->length ());
505 ACE_DEBUG ((LM_DEBUG,
506 "MIF_Scheduler::send_other %d\n",
507 count));
510 if (ready_que_.message_count () > 0)
512 DT* run_dt;
513 ACE_Message_Block* msg;
514 ready_que_.dequeue_head (msg);
515 run_dt = dynamic_cast<DT*> (msg);
516 lock_.acquire ();
517 run_dt->resume ();
518 lock_.release ();
519 free_que_.enqueue_prio (run_dt);
523 void
524 MIF_Scheduler::send_poll (PortableInterceptor::ClientRequestInfo_ptr)
528 void
529 MIF_Scheduler::receive_reply (PortableInterceptor::ClientRequestInfo_ptr)
531 CORBA::Policy_var sched_param = current_->scheduling_parameter ();
533 MIF_Scheduling::SegmentSchedulingParameterPolicy_var sched_param_var =
534 MIF_Scheduling::SegmentSchedulingParameterPolicy::_narrow (sched_param.in ());
536 int importance = sched_param_var->importance ();
538 RTScheduling::Current::IdType_var guid = current_->id ();
540 size_t gu_id;
541 ACE_OS::memcpy (&gu_id,
542 guid->get_buffer (),
543 guid->length ());
545 if (TAO_debug_level > 0)
546 ACE_DEBUG ((LM_DEBUG,
547 "MIF_Scheduler::receive_reply Guid = %d Imp = %d\n",
548 gu_id,
549 importance));
552 DT* new_dt;
553 ACE_NEW (new_dt,
554 DT (this->lock_,
555 gu_id));
557 new_dt->msg_priority (importance);
559 lock_.acquire ();
560 ready_que_.enqueue_prio (new_dt);
561 int priority;
562 ACE_hthread_t current;
563 ACE_Thread::self (current);
564 if (ACE_Thread::getprio (current, priority) == -1)
565 return;
567 RTCORBA::Priority rtpriority;
568 RTCORBA::PriorityMapping* pm = this->mapping_manager_->mapping ();
569 if (pm->to_CORBA(priority - 1, rtpriority))
571 current_->the_priority (rtpriority);
574 new_dt->suspend ();
575 lock_.release ();
578 void
579 MIF_Scheduler::receive_exception (PortableInterceptor::ClientRequestInfo_ptr)
581 CORBA::Policy_var sched_param = current_->scheduling_parameter ();
583 MIF_Scheduling::SegmentSchedulingParameterPolicy_var sched_param_var =
584 MIF_Scheduling::SegmentSchedulingParameterPolicy::_narrow (sched_param.in ());
586 int importance = sched_param_var->importance ();
588 RTScheduling::Current::IdType_var guid = current_->id ();
590 size_t gu_id;
591 ACE_OS::memcpy (&gu_id,
592 guid->get_buffer (),
593 guid->length ());
595 DT* new_dt;
596 ACE_NEW (new_dt,
597 DT (this->lock_,
598 gu_id));
600 new_dt->msg_priority (importance);
602 lock_.acquire ();
603 ready_que_.enqueue_prio (new_dt);
605 int priority;
606 ACE_hthread_t current;
607 ACE_Thread::self (current);
608 if (ACE_Thread::getprio (current, priority) == -1)
609 return;
611 RTCORBA::Priority rtpriority;
612 RTCORBA::PriorityMapping* pm = this->mapping_manager_->mapping ();
613 if (pm->to_CORBA(priority - 1, rtpriority))
615 current_->the_priority (rtpriority);
618 new_dt->suspend ();
619 lock_.release ();
622 void
623 MIF_Scheduler::receive_other (PortableInterceptor::ClientRequestInfo_ptr)
625 CORBA::Policy_var sched_param = current_->scheduling_parameter ();
627 MIF_Scheduling::SegmentSchedulingParameterPolicy_var sched_param_var =
628 MIF_Scheduling::SegmentSchedulingParameterPolicy::_narrow (sched_param.in ());
630 int importance = sched_param_var->importance ();
632 RTScheduling::Current::IdType_var guid = current_->id ();
634 size_t gu_id;
635 ACE_OS::memcpy (&gu_id,
636 guid->get_buffer (),
637 guid->length ());
639 DT* new_dt = 0;
640 ACE_NEW (new_dt,
641 DT (this->lock_,
642 gu_id));
644 new_dt->msg_priority (importance);
646 lock_.acquire ();
647 ready_que_.enqueue_prio (new_dt);
649 int priority;
650 ACE_hthread_t current;
651 ACE_Thread::self (current);
652 if (ACE_Thread::getprio (current, priority) == -1)
653 return;
655 RTCORBA::Priority rtpriority;
656 RTCORBA::PriorityMapping* pm = this->mapping_manager_->mapping ();
657 if (pm->to_CORBA(priority - 1, rtpriority))
659 current_->the_priority (rtpriority);
662 new_dt->suspend ();
663 lock_.release ();
666 void
667 MIF_Scheduler::cancel (const RTScheduling::Current::IdType &)
671 CORBA::PolicyList*
672 MIF_Scheduler::scheduling_policies ()
674 return 0;
677 void
678 MIF_Scheduler::scheduling_policies (const CORBA::PolicyList &)
682 CORBA::PolicyList*
683 MIF_Scheduler::poa_policies ()
685 return 0;
688 char *
689 MIF_Scheduler::scheduling_discipline_name ()
691 return 0;
694 RTScheduling::ResourceManager_ptr
695 MIF_Scheduler::create_resource_manager (const char *,
696 CORBA::Policy_ptr)
698 return 0;
701 void
702 MIF_Scheduler::set_scheduling_parameter (PortableServer::Servant &,
703 const char *,
704 CORBA::Policy_ptr)