Cleanup ACE_HAS_PTHREAD_SIGMASK_PROTOTYPE, all platforms support it so far as I can...
[ACE_TAO.git] / TAO / performance-tests / RTCorba / Thread_Pool / client.cpp
blobd49019b3a7ad5d9e9eb8fc2a61de7b2c89968043
1 #include "ace/Get_Opt.h"
2 #include "ace/High_Res_Timer.h"
3 #include "ace/Stats.h"
4 #include "ace/Throughput_Stats.h"
5 #include "ace/Sample_History.h"
6 #include "ace/Read_Buffer.h"
7 #include "ace/Array_Base.h"
8 #include "ace/Task.h"
9 #include "ace/OS_NS_unistd.h"
10 #include "tao/ORB_Core.h"
11 #include "tao/debug.h"
12 #include "tao/RTCORBA/RTCORBA.h"
13 #include "tao/RTCORBA/Priority_Mapping_Manager.h"
14 #include "testC.h"
15 #include "tests/RTCORBA/common_args.cpp"
16 #include "tests/RTCORBA/check_supported_priorities.cpp"
17 #include "ace/Event.h"
19 enum Priority_Setting
21 AT_THREAD_CREATION = 0,
22 AFTER_THREAD_CREATION = 1
25 static const ACE_TCHAR *ior = ACE_TEXT("file://ior");
26 static const ACE_TCHAR *rates_file = ACE_TEXT("rates");
27 static const ACE_TCHAR *invocation_priorities_file = ACE_TEXT("empty-file");
28 static int shutdown_server = 0;
29 static int do_dump_history = 0;
30 static ACE_High_Res_Timer::global_scale_factor_type gsf = 0;
31 static CORBA::ULong continuous_workers = 0;
32 static int done = 0;
33 static CORBA::ULong time_for_test = 10;
34 static CORBA::ULong work = 10;
35 static CORBA::ULong max_throughput_timeout = 5;
36 static CORBA::ULong continuous_worker_priority = 0;
37 static int set_priority = 1;
38 static Priority_Setting priority_setting = AFTER_THREAD_CREATION;
39 static int individual_continuous_worker_stats = 0;
40 static int print_missed_invocations = 0;
41 static ACE_hrtime_t test_start;
42 static CORBA::ULong prime_number = 9619;
43 static int count_missed_end_deadlines = 0;
45 struct Synchronizers
47 Synchronizers ()
48 : worker_lock_ (),
49 workers_ (1),
50 workers_ready_ (0),
51 number_of_workers_ (0)
55 TAO_SYNCH_MUTEX worker_lock_;
56 ACE_Event workers_;
57 CORBA::ULong workers_ready_;
58 CORBA::ULong number_of_workers_;
61 int
62 parse_args (int argc, ACE_TCHAR *argv[])
64 ACE_Get_Opt get_opts (argc, argv,
65 "c:e:g:hi:k:m:p:q:r:t:u:v:w:x:y:z:" //client options
66 "b:f:hl:n:o:s:" // server options
68 int c;
70 while ((c = get_opts ()) != -1)
71 switch (c)
73 case 'c':
74 continuous_workers =
75 ACE_OS::atoi (get_opts.opt_arg ());
76 break;
78 case 'e':
79 count_missed_end_deadlines =
80 ACE_OS::atoi (get_opts.opt_arg ());
81 break;
83 case 'g':
84 do_dump_history =
85 ACE_OS::atoi (get_opts.opt_arg ());
86 break;
88 case 'i':
89 individual_continuous_worker_stats =
90 ACE_OS::atoi (get_opts.opt_arg ());
91 break;
93 case 'k':
94 ior =
95 get_opts.opt_arg ();
96 break;
98 case 'm':
99 print_missed_invocations =
100 ACE_OS::atoi (get_opts.opt_arg ());
101 break;
103 case 'p':
104 invocation_priorities_file =
105 get_opts.opt_arg ();
106 break;
108 case 'q':
109 prime_number =
110 ACE_OS::atoi (get_opts.opt_arg ());
111 break;
113 case 'r':
114 rates_file =
115 get_opts.opt_arg ();
116 break;
118 case 't':
119 time_for_test =
120 ACE_OS::atoi (get_opts.opt_arg ());
121 break;
123 case 'u':
124 continuous_worker_priority =
125 ACE_OS::atoi (get_opts.opt_arg ());
126 break;
128 case 'v':
129 priority_setting =
130 Priority_Setting (ACE_OS::atoi (get_opts.opt_arg ()));
131 break;
133 case 'w':
134 work =
135 ACE_OS::atoi (get_opts.opt_arg ());
136 break;
138 case 'x':
139 shutdown_server =
140 ACE_OS::atoi (get_opts.opt_arg ());
141 break;
143 case 'y':
144 set_priority =
145 ACE_OS::atoi (get_opts.opt_arg ());
146 break;
148 case 'z':
149 max_throughput_timeout =
150 ACE_OS::atoi (get_opts.opt_arg ());
151 break;
153 case 'b':
154 case 'f':
155 case 'l':
156 case 'n':
157 case 'o':
158 case 's':
159 // server options: ignored.
160 break;
162 case 'h':
163 case '?':
164 default:
165 ACE_ERROR_RETURN ((LM_ERROR,
166 "usage: %s\n"
167 "\t-c <number of continuous workers> (defaults to %d)\n"
168 "\t-e <count missed end deadlines> (defaults to %d)\n"
169 "\t-g <show history> (defaults to %d)\n"
170 "\t-h <help: shows options menu>\n"
171 "\t-i <print stats of individual continuous workers> (defaults to %d)\n"
172 "\t-k <ior> (defaults to %s)\n"
173 "\t-m <print missed invocations for paced workers> (defaults to %d)\n"
174 "\t-p <invocation priorities file> (defaults to %s)\n"
175 "\t-q <prime number> (defaults to %d)\n"
176 "\t-r <rates file> (defaults to %s)\n"
177 "\t-t <time for test> (defaults to %d)\n"
178 "\t-u <continuous worker priority> (defaults to %d)\n"
179 "\t-v <priority setting: AT_THREAD_CREATION = 0, AFTER_THREAD_CREATION = 1> (defaults to %s)\n"
180 "\t-w <work> (defaults to %d)\n"
181 "\t-x <shutdown server> (defaults to %d)\n"
182 "\t-y <set invocation priorities> (defaults to %d)\n"
183 "\t-z <timeout for max throughput measurement> (defaults to %d)\n"
184 "\n",
185 argv [0],
186 continuous_workers,
187 count_missed_end_deadlines,
188 do_dump_history,
189 individual_continuous_worker_stats,
190 ior,
191 print_missed_invocations,
192 invocation_priorities_file,
193 prime_number,
194 rates_file,
195 time_for_test,
196 continuous_worker_priority,
197 priority_setting == 0 ? "AT_THREAD_CREATION" : "AFTER_THREAD_CREATION",
198 work,
199 shutdown_server,
200 set_priority,
201 max_throughput_timeout),
202 -1);
205 return 0;
208 double
209 to_seconds (ACE_UINT64 hrtime,
210 ACE_High_Res_Timer::global_scale_factor_type sf)
212 double seconds =
213 static_cast<double> (ACE_UINT64_DBLCAST_ADAPTER (hrtime / sf));
214 seconds /= ACE_HR_SCALE_CONVERSION;
216 return seconds;
219 ACE_UINT64
220 to_hrtime (double seconds,
221 ACE_High_Res_Timer::global_scale_factor_type sf)
223 return ACE_UINT64 (seconds * sf * ACE_HR_SCALE_CONVERSION);
227 start_synchronization (test_ptr test,
228 Synchronizers &synchronizers)
230 CORBA::ULong synchronization_iterations = 1;
233 for (CORBA::ULong i = 0;
234 i < synchronization_iterations;
235 ++i)
237 test->method (work,
238 prime_number);
241 catch (const CORBA::Exception& ex)
243 ex._tao_print_exception ("Exception caught:");
244 return -1;
248 ACE_GUARD_RETURN (TAO_SYNCH_MUTEX,
249 mon,
250 synchronizers.worker_lock_,
251 -1);
253 if (synchronizers.workers_ready_ == 0)
255 if (TAO_debug_level > 0)
256 ACE_DEBUG ((LM_DEBUG,
257 "\n"));
260 ++synchronizers.workers_ready_;
262 if (TAO_debug_level > 0)
263 ACE_DEBUG ((LM_DEBUG,
264 "%d worker ready\n",
265 synchronizers.workers_ready_));
267 if (synchronizers.workers_ready_ ==
268 synchronizers.number_of_workers_)
270 if (TAO_debug_level > 0)
271 ACE_DEBUG ((LM_DEBUG,
272 "\n"));
274 test_start =
275 ACE_OS::gethrtime ();
277 synchronizers.workers_.signal ();
279 return 0;
283 synchronizers.workers_.wait ();
285 return 0;
289 end_synchronization (Synchronizers &synchronizers)
292 ACE_GUARD_RETURN (TAO_SYNCH_MUTEX,
293 mon,
294 synchronizers.worker_lock_,
295 -1);
297 if (synchronizers.workers_ready_ ==
298 synchronizers.number_of_workers_)
300 if (TAO_debug_level > 0)
301 ACE_DEBUG ((LM_DEBUG,
302 "\n"));
304 synchronizers.workers_.reset ();
307 if (TAO_debug_level > 0)
308 ACE_DEBUG ((LM_DEBUG,
309 "%d worker completed\n",
310 synchronizers.workers_ready_));
312 --synchronizers.workers_ready_;
314 if (synchronizers.workers_ready_ == 0)
316 if (TAO_debug_level > 0)
317 ACE_DEBUG ((LM_DEBUG,
318 "\n"));
320 synchronizers.workers_.signal ();
322 return 0;
326 synchronizers.workers_.wait ();
328 return 0;
332 max_throughput (test_ptr test,
333 RTCORBA::Current_ptr current,
334 RTCORBA::PriorityMapping &priority_mapping,
335 CORBA::ULong &max_rate)
337 CORBA::ULong calls_made = 0;
338 CORBA::Short CORBA_priority = 0;
339 CORBA::Short native_priority = 0;
343 CORBA_priority =
344 current->the_priority ();
346 CORBA::Boolean result =
347 priority_mapping.to_native (CORBA_priority,
348 native_priority);
349 if (!result)
350 ACE_ERROR_RETURN ((LM_ERROR,
351 "Error in converting CORBA priority %d to native priority\n",
352 CORBA_priority),
353 -1);
355 ACE_hrtime_t start =
356 ACE_OS::gethrtime ();
358 ACE_hrtime_t end =
359 start +
360 to_hrtime (max_throughput_timeout, gsf);
362 for (;;)
364 ACE_hrtime_t now =
365 ACE_OS::gethrtime ();
367 if (now > end)
368 break;
370 test->method (work,
371 prime_number);
373 ++calls_made;
376 catch (const CORBA::Exception& ex)
378 ex._tao_print_exception ("Exception caught:");
379 return -1;
382 max_rate =
383 calls_made / max_throughput_timeout;
385 ACE_DEBUG ((LM_DEBUG,
386 "\nPriority = %d/%d; Max rate calculations => %d calls in %d seconds; Max rate = %.2f\n",
387 CORBA_priority,
388 native_priority,
389 calls_made,
390 max_throughput_timeout,
391 calls_made / (double) max_throughput_timeout));
393 return 0;
396 class Paced_Worker :
397 public ACE_Task_Base
399 public:
400 Paced_Worker (ACE_Thread_Manager &thread_manager,
401 test_ptr test,
402 CORBA::Short rate,
403 CORBA::ULong iterations,
404 CORBA::Short priority,
405 RTCORBA::Current_ptr current,
406 RTCORBA::PriorityMapping &priority_mapping,
407 Synchronizers &synchronizers);
409 int svc ();
410 ACE_hrtime_t deadline_for_current_call (CORBA::ULong i);
411 void reset_priority ();
412 void print_stats (ACE_hrtime_t test_end);
413 int setup ();
414 void missed_start_deadline (CORBA::ULong invocation);
415 void missed_end_deadline (CORBA::ULong invocation);
417 test_var test_;
418 int rate_;
419 ACE_Sample_History history_;
420 CORBA::Short priority_;
421 RTCORBA::Current_var current_;
422 RTCORBA::PriorityMapping &priority_mapping_;
423 Synchronizers &synchronizers_;
424 CORBA::Short CORBA_priority_;
425 CORBA::Short native_priority_;
426 ACE_hrtime_t interval_between_calls_;
427 CORBA::ULong missed_start_deadlines_;
428 CORBA::ULong missed_end_deadlines_;
430 typedef ACE_Array_Base<CORBA::ULong> Missed_Invocations;
431 Missed_Invocations missed_start_invocations_;
432 Missed_Invocations missed_end_invocations_;
435 Paced_Worker::Paced_Worker (ACE_Thread_Manager &thread_manager,
436 test_ptr test,
437 CORBA::Short rate,
438 CORBA::ULong iterations,
439 CORBA::Short priority,
440 RTCORBA::Current_ptr current,
441 RTCORBA::PriorityMapping &priority_mapping,
442 Synchronizers &synchronizers)
443 : ACE_Task_Base (&thread_manager),
444 test_ (test::_duplicate (test)),
445 rate_ (rate),
446 history_ (iterations),
447 priority_ (priority),
448 current_ (RTCORBA::Current::_duplicate (current)),
449 priority_mapping_ (priority_mapping),
450 synchronizers_ (synchronizers),
451 CORBA_priority_ (0),
452 native_priority_ (0),
453 interval_between_calls_ (),
454 missed_start_deadlines_ (0),
455 missed_end_deadlines_ (0),
456 missed_start_invocations_ (iterations),
457 missed_end_invocations_ (iterations)
459 this->interval_between_calls_ =
460 to_hrtime (1 / double (this->rate_), gsf);
463 void
464 Paced_Worker::reset_priority ()
466 if (set_priority)
468 this->current_->the_priority (this->priority_);
470 else
472 this->current_->the_priority (continuous_worker_priority);
476 ACE_hrtime_t
477 Paced_Worker::deadline_for_current_call (CORBA::ULong i)
479 ACE_hrtime_t deadline_for_current_call =
480 this->interval_between_calls_;
482 deadline_for_current_call *= i;
484 deadline_for_current_call += test_start;
486 return deadline_for_current_call;
489 void
490 Paced_Worker::print_stats (ACE_hrtime_t test_end)
492 ACE_GUARD (TAO_SYNCH_MUTEX,
493 mon,
494 this->synchronizers_.worker_lock_);
496 CORBA::ULong missed_total_deadlines =
497 this->missed_start_deadlines_ + this->missed_end_deadlines_;
499 CORBA::ULong made_total_deadlines =
500 this->history_.max_samples () - missed_total_deadlines;
502 ACE_DEBUG ((LM_DEBUG,
503 "\n************ Statistics for thread %t ************\n\n"));
505 ACE_DEBUG ((LM_DEBUG,
506 "Priority = %d/%d; Rate = %d/sec; Iterations = %d; ",
507 this->CORBA_priority_,
508 this->native_priority_,
509 this->rate_,
510 this->history_.max_samples ()));
512 if (count_missed_end_deadlines)
513 ACE_DEBUG ((LM_DEBUG,
514 "Deadlines made/missed[start,end]/%% = %d/%d[%d,%d]/%.2f%%; Effective Rate = %.2f\n",
515 made_total_deadlines,
516 missed_total_deadlines,
517 this->missed_start_deadlines_,
518 this->missed_end_deadlines_,
519 made_total_deadlines * 100 / (double) this->history_.max_samples (),
520 made_total_deadlines / to_seconds (test_end - test_start, gsf)));
521 else
522 ACE_DEBUG ((LM_DEBUG,
523 "Deadlines made/missed/%% = %d/%d/%.2f%%; Effective Rate = %.2f\n",
524 made_total_deadlines,
525 missed_total_deadlines,
526 made_total_deadlines * 100 / (double) this->history_.max_samples (),
527 made_total_deadlines / to_seconds (test_end - test_start, gsf)));
530 if (do_dump_history)
532 this->history_.dump_samples (ACE_TEXT("HISTORY"), gsf);
535 ACE_Basic_Stats stats;
536 this->history_.collect_basic_stats (stats);
537 stats.dump_results (ACE_TEXT("Total"), gsf);
539 ACE_Throughput_Stats::dump_throughput (ACE_TEXT("Total"), gsf,
540 test_end - test_start,
541 stats.samples_count ());
543 if (print_missed_invocations)
545 ACE_DEBUG ((LM_DEBUG, "\nMissed start invocations are: "));
547 for (CORBA::ULong j = 0;
548 j != this->missed_start_deadlines_;
549 ++j)
551 ACE_DEBUG ((LM_DEBUG,
552 "%d ",
553 this->missed_start_invocations_[j]));
556 ACE_DEBUG ((LM_DEBUG, "\n"));
558 if (count_missed_end_deadlines)
560 ACE_DEBUG ((LM_DEBUG, "\nMissed end invocations are: "));
562 for (CORBA::ULong j = 0;
563 j != this->missed_end_deadlines_;
564 ++j)
566 ACE_DEBUG ((LM_DEBUG,
567 "%d ",
568 this->missed_end_invocations_[j]));
571 ACE_DEBUG ((LM_DEBUG, "\n"));
577 Paced_Worker::setup ()
579 if (priority_setting == AFTER_THREAD_CREATION)
581 this->reset_priority ();
584 this->CORBA_priority_ =
585 this->current_->the_priority ();
587 CORBA::Boolean result =
588 this->priority_mapping_.to_native (this->CORBA_priority_,
589 this->native_priority_);
590 if (!result)
591 ACE_ERROR_RETURN ((LM_ERROR,
592 "Error in converting CORBA priority %d to native priority\n",
593 this->CORBA_priority_),
594 -1);
596 return
597 start_synchronization (this->test_.in (),
598 this->synchronizers_);
601 void
602 Paced_Worker::missed_start_deadline (CORBA::ULong invocation)
604 this->missed_start_invocations_[this->missed_start_deadlines_++] =
605 invocation;
608 void
609 Paced_Worker::missed_end_deadline (CORBA::ULong invocation)
611 if (count_missed_end_deadlines)
612 this->missed_end_invocations_[this->missed_end_deadlines_++] =
613 invocation;
617 Paced_Worker::svc ()
621 int result =
622 this->setup ();
624 if (result != 0)
625 return result;
627 for (CORBA::ULong i = 0;
628 i != this->history_.max_samples ();
629 ++i)
631 ACE_hrtime_t deadline_for_current_call =
632 this->deadline_for_current_call (i);
634 ACE_hrtime_t time_before_call =
635 ACE_OS::gethrtime ();
637 if (time_before_call > deadline_for_current_call)
639 this->missed_start_deadline (i + 1);
640 continue;
643 this->test_->method (work,
644 prime_number);
646 ACE_hrtime_t time_after_call =
647 ACE_OS::gethrtime ();
648 this->history_.sample (time_after_call - time_before_call);
650 if (time_after_call > deadline_for_current_call)
652 this->missed_end_deadline (i + 1);
653 continue;
656 ACE_hrtime_t sleep_time =
657 deadline_for_current_call - time_after_call;
659 ACE_OS::sleep (ACE_Time_Value (0,
660 long (to_seconds (sleep_time, gsf) *
661 ACE_ONE_SECOND_IN_USECS)));
664 ACE_hrtime_t test_end = ACE_OS::gethrtime ();
666 done = 1;
668 end_synchronization (this->synchronizers_);
670 this->print_stats (test_end);
672 catch (const CORBA::Exception& ex)
674 ex._tao_print_exception ("Exception caught:");
675 return -1;
678 return 0;
681 class Continuous_Worker :
682 public ACE_Task_Base
684 public:
685 Continuous_Worker (ACE_Thread_Manager &thread_manager,
686 test_ptr test,
687 CORBA::ULong iterations,
688 RTCORBA::Current_ptr current,
689 RTCORBA::PriorityMapping &priority_mapping,
690 Synchronizers &synchronizers);
692 int svc ();
693 void print_stats (ACE_Sample_History &history,
694 ACE_hrtime_t test_end);
695 int setup ();
696 void print_collective_stats ();
698 test_var test_;
699 CORBA::ULong iterations_;
700 RTCORBA::Current_var current_;
701 RTCORBA::PriorityMapping &priority_mapping_;
702 Synchronizers &synchronizers_;
703 CORBA::Short CORBA_priority_;
704 CORBA::Short native_priority_;
705 ACE_Basic_Stats collective_stats_;
706 ACE_hrtime_t time_for_test_;
709 Continuous_Worker::Continuous_Worker (ACE_Thread_Manager &thread_manager,
710 test_ptr test,
711 CORBA::ULong iterations,
712 RTCORBA::Current_ptr current,
713 RTCORBA::PriorityMapping &priority_mapping,
714 Synchronizers &synchronizers)
715 : ACE_Task_Base (&thread_manager),
716 test_ (test::_duplicate (test)),
717 iterations_ (iterations),
718 current_ (RTCORBA::Current::_duplicate (current)),
719 priority_mapping_ (priority_mapping),
720 synchronizers_ (synchronizers),
721 CORBA_priority_ (0),
722 native_priority_ (0),
723 collective_stats_ (),
724 time_for_test_ (0)
728 void
729 Continuous_Worker::print_stats (ACE_Sample_History &history,
730 ACE_hrtime_t test_end)
732 ACE_GUARD (TAO_SYNCH_MUTEX,
733 mon,
734 this->synchronizers_.worker_lock_);
736 if (individual_continuous_worker_stats)
738 ACE_DEBUG ((LM_DEBUG,
739 "\n************ Statistics for thread %t ************\n\n"));
741 ACE_DEBUG ((LM_DEBUG,
742 "Iterations = %d\n",
743 history.sample_count ()));
745 if (do_dump_history)
747 history.dump_samples (ACE_TEXT("HISTORY"), gsf);
750 ACE_Basic_Stats stats;
751 history.collect_basic_stats (stats);
752 stats.dump_results (ACE_TEXT("Total"), gsf);
754 ACE_Throughput_Stats::dump_throughput (ACE_TEXT("Total"), gsf,
755 test_end - test_start,
756 stats.samples_count ());
759 history.collect_basic_stats (this->collective_stats_);
760 ACE_hrtime_t elapsed_time_for_current_thread =
761 test_end - test_start;
762 if (elapsed_time_for_current_thread > this->time_for_test_)
763 this->time_for_test_ = elapsed_time_for_current_thread;
766 void
767 Continuous_Worker::print_collective_stats ()
769 if (continuous_workers > 0)
771 ACE_DEBUG ((LM_DEBUG,
772 "\n************ Statistics for continuous workers ************\n\n"));
774 ACE_DEBUG ((LM_DEBUG,
775 "Priority = %d/%d; Collective iterations = %d; Workers = %d; Average = %d\n",
776 this->CORBA_priority_,
777 this->native_priority_,
778 this->collective_stats_.samples_count (),
779 continuous_workers,
780 this->collective_stats_.samples_count () /
781 continuous_workers));
783 this->collective_stats_.dump_results (ACE_TEXT("Collective"), gsf);
785 ACE_Throughput_Stats::dump_throughput (ACE_TEXT("Individual"), gsf,
786 this->time_for_test_,
787 this->collective_stats_.samples_count () /
788 continuous_workers);
790 ACE_Throughput_Stats::dump_throughput (ACE_TEXT("Collective"), gsf,
791 this->time_for_test_,
792 this->collective_stats_.samples_count ());
797 Continuous_Worker::setup ()
799 if (priority_setting == AFTER_THREAD_CREATION)
801 this->current_->the_priority (continuous_worker_priority);
804 this->CORBA_priority_ =
805 this->current_->the_priority ();
807 CORBA::Boolean result =
808 this->priority_mapping_.to_native (this->CORBA_priority_,
809 this->native_priority_);
810 if (!result)
811 ACE_ERROR_RETURN ((LM_ERROR,
812 "Error in converting CORBA priority %d to native priority\n",
813 this->CORBA_priority_),
814 -1);
816 return
817 start_synchronization (this->test_.in (),
818 this->synchronizers_);
822 Continuous_Worker::svc ()
826 ACE_Sample_History history (this->iterations_);
828 int result =
829 this->setup ();
831 if (result != 0)
832 return result;
834 for (CORBA::ULong i = 0;
835 i != history.max_samples () && !done;
836 ++i)
838 ACE_hrtime_t start = ACE_OS::gethrtime ();
840 this->test_->method (work,
841 prime_number);
843 ACE_hrtime_t end = ACE_OS::gethrtime ();
844 history.sample (end - start);
847 ACE_hrtime_t test_end = ACE_OS::gethrtime ();
849 end_synchronization (this->synchronizers_);
851 this->print_stats (history,
852 test_end);
854 catch (const CORBA::Exception& ex)
856 ex._tao_print_exception ("Exception caught:");
857 return -1;
860 return 0;
863 class Task : public ACE_Task_Base
865 public:
866 Task (ACE_Thread_Manager &thread_manager,
867 CORBA::ORB_ptr orb);
869 int svc ();
871 CORBA::ORB_var orb_;
874 Task::Task (ACE_Thread_Manager &thread_manager,
875 CORBA::ORB_ptr orb)
876 : ACE_Task_Base (&thread_manager),
877 orb_ (CORBA::ORB::_duplicate (orb))
882 Task::svc ()
884 Synchronizers synchronizers;
886 gsf = ACE_High_Res_Timer::global_scale_factor ();
890 CORBA::Object_var object =
891 this->orb_->string_to_object (ior);
893 test_var test =
894 test::_narrow (object.in ());
896 object =
897 this->orb_->resolve_initial_references ("RTCurrent");
899 RTCORBA::Current_var current =
900 RTCORBA::Current::_narrow (object.in ());
902 object =
903 this->orb_->resolve_initial_references ("PriorityMappingManager");
905 RTCORBA::PriorityMappingManager_var mapping_manager =
906 RTCORBA::PriorityMappingManager::_narrow (object.in ());
908 RTCORBA::PriorityMapping &priority_mapping =
909 *mapping_manager->mapping ();
911 ULong_Array rates;
912 int result =
913 get_values ("client",
914 rates_file,
915 "rates",
916 rates,
918 if (result != 0)
919 return result;
921 ULong_Array invocation_priorities;
922 result =
923 get_values ("client",
924 invocation_priorities_file,
925 "invocation priorities",
926 invocation_priorities,
928 if (result != 0)
929 return result;
931 if (invocation_priorities.size () != 0 &&
932 invocation_priorities.size () != rates.size ())
933 ACE_ERROR_RETURN ((LM_ERROR,
934 "Number of invocation priorities (%d) != Number of rates (%d)\n",
935 invocation_priorities.size (),
936 rates.size ()),
937 -1);
939 synchronizers.number_of_workers_ =
940 rates.size () + continuous_workers;
942 CORBA::ULong max_rate = 0;
943 result =
944 max_throughput (test.in (),
945 current.in (),
946 priority_mapping,
947 max_rate);
948 if (result != 0)
949 return result;
951 CORBA::Short priority_range =
952 RTCORBA::maxPriority - RTCORBA::minPriority;
954 ACE_Thread_Manager paced_workers_manager;
956 CORBA::ULong i = 0;
957 Paced_Worker **paced_workers =
958 new Paced_Worker *[rates.size ()];
960 for (i = 0;
961 i < rates.size ();
962 ++i)
964 CORBA::Short priority = 0;
966 if (invocation_priorities.size () == 0)
967 priority =
968 CORBA::Short ((priority_range /
969 double (rates.size ())) *
970 (i + 1));
971 else
972 priority =
973 invocation_priorities[i];
975 paced_workers[i] =
976 new Paced_Worker (paced_workers_manager,
977 test.in (),
978 rates[i],
979 time_for_test * rates[i],
980 priority,
981 current.in (),
982 priority_mapping,
983 synchronizers);
986 ACE_Thread_Manager continuous_workers_manager;
987 Continuous_Worker continuous_worker (continuous_workers_manager,
988 test.in (),
989 max_rate * time_for_test,
990 current.in (),
991 priority_mapping,
992 synchronizers);
993 long flags =
994 THR_NEW_LWP |
995 THR_JOINABLE |
996 this->orb_->orb_core ()->orb_params ()->thread_creation_flags ();
998 CORBA::Short CORBA_priority =
999 continuous_worker_priority;
1000 CORBA::Short native_priority;
1001 CORBA::Boolean convert_result =
1002 priority_mapping.to_native (CORBA_priority,
1003 native_priority);
1004 if (!convert_result)
1005 ACE_ERROR_RETURN ((LM_ERROR,
1006 "Error in converting CORBA priority %d to native priority\n",
1007 CORBA_priority),
1008 -1);
1010 int force_active = 0;
1012 if (priority_setting == AT_THREAD_CREATION)
1014 result =
1015 continuous_worker.activate (flags,
1016 continuous_workers,
1017 force_active,
1018 native_priority);
1019 if (result != 0)
1020 ACE_ERROR_RETURN ((LM_ERROR,
1021 "Continuous_Worker::activate failed\n"),
1022 result);
1024 else
1026 result =
1027 continuous_worker.activate (flags,
1028 continuous_workers);
1029 if (result != 0)
1030 ACE_ERROR_RETURN ((LM_ERROR,
1031 "Continuous_Worker::activate failed\n"),
1032 result);
1035 flags =
1036 THR_NEW_LWP |
1037 THR_JOINABLE |
1038 this->orb_->orb_core ()->orb_params ()->thread_creation_flags ();
1040 for (i = 0;
1041 i < rates.size ();
1042 ++i)
1044 if (priority_setting == AT_THREAD_CREATION)
1046 if (set_priority)
1048 CORBA_priority =
1049 paced_workers[i]->priority_;
1051 convert_result =
1052 priority_mapping.to_native (CORBA_priority,
1053 native_priority);
1054 if (!convert_result)
1055 ACE_ERROR_RETURN ((LM_ERROR,
1056 "Error in converting CORBA priority %d to native priority\n",
1057 CORBA_priority),
1058 -1);
1061 result =
1062 paced_workers[i]->activate (flags,
1064 force_active,
1065 native_priority);
1066 if (result != 0)
1067 ACE_ERROR_RETURN ((LM_ERROR,
1068 "Paced_Worker::activate failed\n"),
1069 result);
1071 else
1073 result =
1074 paced_workers[i]->activate (flags);
1075 if (result != 0)
1076 ACE_ERROR_RETURN ((LM_ERROR,
1077 "Paced_Worker::activate failed\n"),
1078 result);
1082 if (rates.size () != 0)
1084 paced_workers_manager.wait ();
1087 continuous_workers_manager.wait ();
1089 continuous_worker.print_collective_stats ();
1091 for (i = 0;
1092 i < rates.size ();
1093 ++i)
1095 delete paced_workers[i];
1097 delete[] paced_workers;
1099 if (shutdown_server)
1101 test->shutdown ();
1104 catch (const CORBA::Exception& ex)
1106 ex._tao_print_exception ("Exception caught:");
1107 return -1;
1110 return 0;
1114 ACE_TMAIN(int argc, ACE_TCHAR *argv[])
1118 CORBA::ORB_var orb =
1119 CORBA::ORB_init (argc, argv);
1121 int result =
1122 parse_args (argc, argv);
1123 if (result != 0)
1124 return result;
1126 // Make sure we can support multiple priorities that are required
1127 // for this test.
1128 check_supported_priorities (orb.in ());
1130 // Thread Manager for managing task.
1131 ACE_Thread_Manager thread_manager;
1133 // Create task.
1134 Task task (thread_manager,
1135 orb.in ());
1137 // Task activation flags.
1138 long flags =
1139 THR_NEW_LWP |
1140 THR_JOINABLE |
1141 orb->orb_core ()->orb_params ()->thread_creation_flags ();
1143 // Activate task.
1144 result =
1145 task.activate (flags);
1146 ACE_ASSERT (result != -1);
1147 ACE_UNUSED_ARG (result);
1149 // Wait for task to exit.
1150 result =
1151 thread_manager.wait ();
1152 ACE_ASSERT (result != -1);
1154 catch (const CORBA::Exception& ex)
1156 ex._tao_print_exception ("Exception caught:");
1157 return -1;
1160 return 0;