3 #include "tao/Messaging/Messaging.h"
4 #include "tao/AnyTypeCode/TAOA.h"
5 #include "tao/AnyTypeCode/Any.h"
7 #include "ace/Get_Opt.h"
8 #include "ace/Sched_Params.h"
10 #include "ace/Throughput_Stats.h"
11 #include "ace/High_Res_Timer.h"
12 #include "ace/OS_NS_errno.h"
13 #include "ace/OS_NS_string.h"
16 //#define USING_QUANTIFY
18 #if defined (USING_QUANTIFY)
20 #if defined (ACE_WIN32)
24 #else /* !ACE_WIN32 */
28 inline int QuantifyClearData ()
30 return quantify_clear_data ();
33 inline int QuantifyStartRecordingData ()
35 return quantify_start_recording_data ();
38 inline int QuantifyStopRecordingData ()
40 return quantify_stop_recording_data ();
43 #endif /* ACE_WIN32 */
45 #endif /* USING_QUANTIFY */
48 static const ACE_TCHAR
*ior
= ACE_TEXT("file://test.ior");
50 // Levels at which syncscope policy can be set.
51 enum LEVEL
{ORB_LEVEL
, THREAD_LEVEL
, OBJECT_LEVEL
};
53 // Default is OBJECT level.
54 static LEVEL level
= OBJECT_LEVEL
;
56 // Default iterations.
57 static CORBA::ULong iterations
= 100;
59 // Default amount of work.
60 static CORBA::ULong work
= 0;
62 // Benchmark payload based operations?
63 static int payload_test
= 0;
65 // Default payload size.
66 static CORBA::ULong payload_size
= 0;
68 // Default number of invocations to buffer before flushing.
69 static CORBA::ULong buffering_queue_size
= iterations
/ 2;
71 // Benchmark the twoway operation?
72 static int test_twoway
= 0;
74 // Shut down server after test?
75 static int shutdown_server
= 0;
77 // Default SyncScope value.
78 static Messaging::SyncScope sync_scope
= Messaging::SYNC_WITH_TRANSPORT
;
80 // Global scale factor.
81 static ACE_High_Res_Timer::global_scale_factor_type gsf
= 0;
89 "\nTesting twoway requests\n"));
93 const char *one_way_style
= 0;
94 if (sync_scope
== Messaging::SYNC_NONE
)
95 one_way_style
= "SYNC_NONE";
96 else if (sync_scope
== Messaging::SYNC_WITH_TRANSPORT
)
97 one_way_style
= "SYNC_WITH_TRANSPORT";
98 else if (sync_scope
== Messaging::SYNC_WITH_SERVER
)
99 one_way_style
= "SYNC_WITH_SERVER";
100 else if (sync_scope
== Messaging::SYNC_WITH_TARGET
)
101 one_way_style
= "SYNC_WITH_TARGET";
103 const char *payload_style
= 0;
105 payload_style
= "Payload based";
107 payload_style
= "Work based";
109 ACE_DEBUG ((LM_DEBUG
,
110 "\nTesting oneway requests: %C : %C\n",
114 if (sync_scope
== Messaging::SYNC_NONE
)
116 ACE_DEBUG ((LM_DEBUG
,
117 "Request queue limited to %d messages\n",
118 buffering_queue_size
));
123 ACE_DEBUG ((LM_DEBUG
,
124 "Payload size %d bytes\n",
129 ACE_DEBUG ((LM_DEBUG
,
135 twoway_work_test (Test_ptr server
)
137 #if defined (USING_TIMERS)
138 ACE_Throughput_Stats latency
;
139 ACE_hrtime_t base
= ACE_OS::gethrtime ();
140 #endif /* USING_TIMERS */
142 #if defined (USING_QUANTIFY)
143 // Reset Quantify data recording; whatever happened in the past is
144 // not relevant to this test.
145 QuantifyClearData ();
146 QuantifyStartRecordingData ();
147 #endif /* USING_QUANTIFY */
149 for (CORBA::ULong i
= 0; i
!= iterations
; ++i
)
151 #if defined (USING_TIMERS)
152 ACE_hrtime_t latency_base
= ACE_OS::gethrtime ();
153 #endif /* USING_TIMERS */
155 server
->twoway_work_test (work
);
157 #if defined (USING_TIMERS)
158 ACE_hrtime_t now
= ACE_OS::gethrtime ();
160 latency
.sample (now
- base
,
162 #endif /* USING_TIMERS */
165 #if defined (USING_QUANTIFY)
166 // Stop recording data here; whatever happens after this in the test
167 // is not relevant to this test.
168 QuantifyStopRecordingData ();
169 #endif /* USING_QUANTIFY */
171 #if defined (USING_TIMERS)
172 latency
.dump_results (ACE_TEXT("Twoway"), gsf
);
173 #endif /* USING_TIMERS */
177 oneway_work_test (Test_ptr server
)
179 #if defined (USING_TIMERS)
180 ACE_Throughput_Stats latency
;
181 ACE_hrtime_t base
= ACE_OS::gethrtime ();
182 #endif /* USING_TIMERS */
184 #if defined (USING_QUANTIFY)
185 // Reset Quantify data recording; whatever happened in the past is
186 // not relevant to this test.
187 QuantifyClearData ();
188 QuantifyStartRecordingData ();
189 #endif /* USING_QUANTIFY */
191 for (CORBA::ULong i
= 0; i
!= iterations
; ++i
)
193 #if defined (USING_TIMERS)
194 ACE_hrtime_t latency_base
= ACE_OS::gethrtime ();
195 #endif /* USING_TIMERS */
197 server
->oneway_work_test (work
);
199 #if defined (USING_TIMERS)
200 ACE_hrtime_t now
= ACE_OS::gethrtime ();
202 latency
.sample (now
- base
,
204 #endif /* USING_TIMERS */
207 #if defined (USING_QUANTIFY)
208 // Stop recording data here; whatever happens after this in the test
209 // is not relevant to this test.
210 QuantifyStopRecordingData ();
211 #endif /* USING_QUANTIFY */
213 #if defined (USING_TIMERS)
214 latency
.dump_results (ACE_TEXT("Oneway (work based)"), gsf
);
215 #endif /* USING_TIMERS */
219 oneway_payload_test (Test_ptr server
)
221 #if defined (USING_TIMERS)
222 ACE_Throughput_Stats latency
;
223 ACE_hrtime_t base
= ACE_OS::gethrtime ();
224 #endif /* USING_TIMERS */
226 #if defined (USING_QUANTIFY)
227 // Reset Quantify data recording; whatever happened in the past is
228 // not relevant to this test.
229 QuantifyClearData ();
230 QuantifyStartRecordingData ();
231 #endif /* USING_QUANTIFY */
233 Test::data
the_data (payload_size
);
234 the_data
.length (payload_size
);
236 for (CORBA::ULong i
= 0; i
!= iterations
; ++i
)
238 #if defined (USING_TIMERS)
239 ACE_hrtime_t latency_base
= ACE_OS::gethrtime ();
240 #endif /* USING_TIMERS */
242 server
->oneway_payload_test (the_data
);
244 #if defined (USING_TIMERS)
245 ACE_hrtime_t now
= ACE_OS::gethrtime ();
247 latency
.sample (now
- base
,
249 #endif /* USING_TIMERS */
252 #if defined (USING_QUANTIFY)
253 // Stop recording data here; whatever happens after this in the test
254 // is not relevant to this test.
255 QuantifyStopRecordingData ();
256 #endif /* USING_QUANTIFY */
258 #if defined (USING_TIMERS)
259 latency
.dump_results (ACE_TEXT("Oneway (payload based)"), gsf
);
260 #endif /* USING_TIMERS */
264 parse_args (int argc
, ACE_TCHAR
*argv
[])
266 ACE_Get_Opt
get_opts (argc
, argv
, ACE_TEXT("ps:k:i:t:l:m:w:x"));
270 while ((c
= get_opts ()) != -1)
274 payload_size
= ACE_OS::atoi (get_opts
.opt_arg ());
282 iterations
= ACE_OS::atoi (get_opts
.opt_arg ());
286 ior
= get_opts
.opt_arg ();
291 ACE_TCHAR
*tmp
= get_opts
.opt_arg ();
293 if (!ACE_OS::strcmp (tmp
, ACE_TEXT("none")))
294 sync_scope
= Messaging::SYNC_NONE
;
295 else if (!ACE_OS::strcmp (tmp
, ACE_TEXT("transport")))
296 sync_scope
= Messaging::SYNC_WITH_TRANSPORT
;
297 else if (!ACE_OS::strcmp (tmp
, ACE_TEXT("server")))
298 sync_scope
= Messaging::SYNC_WITH_SERVER
;
299 else if (!ACE_OS::strcmp (tmp
, ACE_TEXT("target")))
300 sync_scope
= Messaging::SYNC_WITH_TARGET
;
301 else if (!ACE_OS::strcmp (tmp
, ACE_TEXT("twoway")))
310 ACE_TCHAR
*tmp
= get_opts
.opt_arg ();
312 if (!ACE_OS::strcmp (tmp
, ACE_TEXT("orb")))
314 else if (!ACE_OS::strcmp (tmp
, ACE_TEXT("thread")))
315 level
= THREAD_LEVEL
;
316 else if (!ACE_OS::strcmp (tmp
, ACE_TEXT("object")))
317 level
= OBJECT_LEVEL
;
324 buffering_queue_size
= ACE_OS::atoi (get_opts
.opt_arg ());
328 work
= ACE_OS::atoi (get_opts
.opt_arg ());
342 ACE_ERROR_RETURN ((LM_ERROR
,
346 "-p <payload based test> "
347 "-i <# of iterations> "
348 "-t <none|transport|server|target|twoway> "
349 "-l <orb|thread|object> "
350 "-m <message count> "
351 "-w <# of server loops> "
352 "-x shutdown server "
357 // Indicates successful parsing of the command line
364 int policy
= ACE_SCHED_FIFO
;
366 (ACE_Sched_Params::priority_min (policy
)
367 + ACE_Sched_Params::priority_max (policy
)) / 2;
369 // Enable FIFO scheduling
371 ACE_OS::sched_params (ACE_Sched_Params (policy
,
376 if (ACE_OS::last_error () == EPERM
)
378 ACE_DEBUG ((LM_DEBUG
,
379 "client (%P|%t): user is not superuser, "
380 "test runs in time-shared class\n"));
383 ACE_ERROR ((LM_ERROR
,
384 "client (%P|%t): sched_params failed %p\n",
388 // Get our thread handle.
390 ACE_OS::thr_self (self
);
392 // Set our thread priority.
393 if (ACE_OS::thr_setprio (self
, priority
) != 0)
394 ACE_ERROR ((LM_ERROR
,
395 "server (%P|%t):thr_setprio failed %p\n",
398 // Do a sanity check.
399 if (ACE_OS::thr_getprio (self
, priority
) == 0)
400 ACE_DEBUG ((LM_DEBUG
,
401 "client (%P|%t): thread priority = %d.\n",
408 ACE_TMAIN(int argc
, ACE_TCHAR
*argv
[])
410 int result
= set_rt_mode ();
416 // Calibrate the timer.
417 gsf
= ACE_High_Res_Timer::global_scale_factor ();
419 // Initialize the ORB, the POA, and get the server reference.
421 CORBA::ORB_init (argc
,
424 // Get the command line options.
425 if (parse_args (argc
, argv
) != 0)
427 ACE_ERROR_RETURN ((LM_ERROR
,
428 "parse_args failed\n"),
432 CORBA::Object_var obj
=
433 orb
->resolve_initial_references ("ORBPolicyManager");
435 CORBA::PolicyManager_var policy_manager
=
436 CORBA::PolicyManager::_narrow (obj
.in ());
438 obj
= orb
->resolve_initial_references ("PolicyCurrent");
440 CORBA::PolicyCurrent_var policy_current
=
441 CORBA::PolicyCurrent::_narrow (obj
.in ());
444 obj
= orb
->string_to_object (ior
);
446 Test_var server
= Test::_narrow (obj
.in ());
448 // Print testing parameters.
454 twoway_work_test (server
.in ());
458 // Set up the sync scope any.
459 CORBA::Any sync_scope_any
;
460 sync_scope_any
<<= sync_scope
;
462 // Set the length of the policy list.
463 CORBA::PolicyList
sync_scope_policy_list (1);
464 sync_scope_policy_list
.length (1);
466 // Set up the sync scope policy.
467 sync_scope_policy_list
[0] =
468 orb
->create_policy (Messaging::SYNC_SCOPE_POLICY_TYPE
,
474 // Set the sync scope policy at the ORB level.
475 policy_manager
->set_policy_overrides (sync_scope_policy_list
,
476 CORBA::ADD_OVERRIDE
);
480 // Set the sync scope policy at the thread level.
481 policy_current
->set_policy_overrides (sync_scope_policy_list
,
482 CORBA::ADD_OVERRIDE
);
486 // Set the sync scope policy at the object level.
487 obj
= server
->_set_policy_overrides (sync_scope_policy_list
,
488 CORBA::ADD_OVERRIDE
);
490 // Get the new object reference with the updated policy.
491 server
= Test::_narrow (obj
.in ());
498 // We are done with this policy.
499 sync_scope_policy_list
[0]->destroy ();
501 // Are we buffering the oneway requests?
502 if (sync_scope
== Messaging::SYNC_NONE
)
504 TAO::BufferingConstraint buffering_constraint
;
505 buffering_constraint
.mode
= TAO::BUFFER_MESSAGE_COUNT
;
506 buffering_constraint
.message_count
= buffering_queue_size
;
507 buffering_constraint
.message_bytes
= 0;
508 buffering_constraint
.timeout
= 0;
510 // Set up the buffering constraint any.
511 CORBA::Any buffering_constraint_any
;
512 buffering_constraint_any
<<= buffering_constraint
;
514 // Set up the buffering constraint policy list.
515 CORBA::PolicyList
buffering_constraint_policy_list (1);
516 buffering_constraint_policy_list
.length (1);
518 // Set up the buffering constraint policy.
519 buffering_constraint_policy_list
[0] =
520 orb
->create_policy (TAO::BUFFERING_CONSTRAINT_POLICY_TYPE
,
521 buffering_constraint_any
);
523 // Set up the constraints (at the object level).
524 obj
= server
->_set_policy_overrides (buffering_constraint_policy_list
,
525 CORBA::ADD_OVERRIDE
);
527 // We are done with this policy.
528 buffering_constraint_policy_list
[0]->destroy ();
530 // Get the new object reference with the updated policy.
531 server
= Test::_narrow (obj
.in ());
534 // Run the oneway test.
536 oneway_payload_test (server
.in ());
538 oneway_work_test (server
.in ());
543 ACE_DEBUG ((LM_DEBUG
,
544 "\nShutting down server\n"));
549 // Destroy the ORB. On some platforms, e.g., Win32, the socket
550 // library is closed at the end of main(). This means that any
551 // socket calls made after main() fail. Hence if we wait for
552 // static destructors to flush the queues, it will be too late.
553 // Therefore, we use explicit destruction here and flush the
554 // queues before main() ends.
557 catch (const CORBA::Exception
& ex
)
559 ex
._tao_print_exception ("client");