1 #include "AdminProperties.h"
2 #include "ace/Arg_Shifter.h"
3 #include "ace/Get_Opt.h"
4 #include "ace/OS_NS_unistd.h"
7 /***************************************************************************/
9 AdminProperties_Task::AdminProperties_Task ()
16 AdminProperties_Task::init (TAO_Notify_Tests_StructuredPushSupplier
*supplier
, AdminProperties
* client
)
23 AdminProperties_Task::init (int argc
, ACE_TCHAR
*argv
[])
25 return ACE_Task_Base::init (argc
, argv
);
29 AdminProperties_Task::svc ()
32 CosNotification::StructuredEvent event
;
39 event
.header
.fixed_header
.event_type
.domain_name
= CORBA::string_dup("*");
41 event
.header
.fixed_header
.event_type
.type_name
= CORBA::string_dup("*");
43 event
.header
.fixed_header
.event_name
= CORBA::string_dup("myevent");
45 // OptionalHeaderFields
47 // sequence<Property>: string name, any value
48 event
.header
.variable_header
.length (0); // put nothing here
50 // FilterableEventBody
52 // sequence<Property>: string name, any value
53 event
.filterable_data
.length (3);
54 event
.filterable_data
[0].name
= CORBA::string_dup("threshold");
56 event
.filterable_data
[1].name
= CORBA::string_dup("temperature");
57 event
.filterable_data
[1].value
<<= (CORBA::Long
)70;
59 event
.filterable_data
[2].name
= CORBA::string_dup("pressure");
60 event
.filterable_data
[2].value
<<= (CORBA::Long
)80;
62 // @@ CORBA::Short prio = CosNotification::LowestPriority;
64 int event_count
= this->client_
->event_count_
;
66 ACE_DEBUG ((LM_DEBUG
, "\n1 supplier sending %d events...\n", event_count
));
69 for (int i
= 0 ; i
< event_count
; ++i
)
71 event
.filterable_data
[0].value
<<= (CORBA::Long
)i
;
73 event
.remainder_of_body
<<= (CORBA::Long
)i
;
77 ACE_DEBUG((LM_DEBUG
, "+"));
78 this->supplier_
->send_event (event
);
80 catch (const CORBA::IMP_LIMIT
&)
82 if (TAO_debug_level
> 0)
83 ACE_DEBUG ((LM_DEBUG
, "\nEvent %d was not send due to Impl Limit reached\n", i
));
85 ++ this->client_
->rejections_
;
87 catch (const CORBA::Exception
& ex
)
89 ex
._tao_print_exception (
90 "Error: Exception sending event\n");
97 /***************************************************************************/
99 AdminProperties_StructuredPushConsumer::AdminProperties_StructuredPushConsumer (AdminProperties
* client
)
101 , events_received_ (0)
103 client_
->consumer_start(this);
107 AdminProperties_StructuredPushConsumer::push_structured_event (const CosNotification::StructuredEvent
& /*notification*/
112 if (events_received_
>= client_
->max_queue_length_
)
113 client_
->consumer_done(this);
115 ACE_DEBUG((LM_DEBUG
, "-"));
118 ACE_DEBUG ((LM_DEBUG
, "\nConsumer %x received event %d\n", this, events_received_
.value ()));
121 /***************************************************************************/
123 AdminProperties::AdminProperties ()
124 : max_queue_length_ (10),
127 reject_new_events_ (0),
131 suppliers_connected_count_ (0),
132 consumers_connected_count_ (0),
137 AdminProperties::~AdminProperties ()
142 AdminProperties::parse_args(int argc
, ACE_TCHAR
*argv
[])
144 ACE_Arg_Shifter
arg_shifter (argc
, argv
);
146 const ACE_TCHAR
*current_arg
= 0;
148 while (arg_shifter
.is_anything_left ())
150 if (0 != (current_arg
= arg_shifter
.get_the_parameter (ACE_TEXT("-max_queue_length"))))
152 this->max_queue_length_
= ACE_OS::atoi (current_arg
);
153 // Max. queue length.
155 arg_shifter
.consume_arg ();
157 else if (0 != (current_arg
= arg_shifter
.get_the_parameter (ACE_TEXT("-max_consumers"))))
159 this->max_consumers_
= ACE_OS::atoi (current_arg
);
160 // Max consumers allowed to connect.
161 arg_shifter
.consume_arg ();
163 else if (0 != (current_arg
= arg_shifter
.get_the_parameter (ACE_TEXT("-max_suppliers"))))
165 this->max_suppliers_
= ACE_OS::atoi (current_arg
);
166 // Max. number of suppliers allowed to connect.
167 arg_shifter
.consume_arg ();
169 else if (arg_shifter
.cur_arg_strncasecmp (ACE_TEXT("-reject_new_events")) == 0)
171 this->reject_new_events_
= 1;
172 arg_shifter
.consume_arg ();
174 else if (0 != (current_arg
= arg_shifter
.get_the_parameter (ACE_TEXT("-consumers"))))
176 this->consumers_
= ACE_OS::atoi (current_arg
);
177 // Number of consumers to create.
178 arg_shifter
.consume_arg ();
180 else if (0 != (current_arg
= arg_shifter
.get_the_parameter (ACE_TEXT("-suppliers"))))
182 this->suppliers_
= ACE_OS::atoi (current_arg
);
183 // Number of suppliers to create.
184 arg_shifter
.consume_arg ();
186 else if (arg_shifter
.cur_arg_strncasecmp (ACE_TEXT("-?")) == 0)
190 "-max_queue_length [max_queue_length] "
191 "-max_consumers [max_consumers] "
192 "-max_suppliers [max_suppliers] "
193 "-reject_new_events [reject_new_events] "
194 "-consumers [consumers] "
195 "-suppliers [suppliers] "
196 "-event_count [event_count] ",
200 arg_shifter
.consume_arg ();
206 arg_shifter
.ignore_arg ();
213 AdminProperties::create_channel(bool reject
)
215 CosNotifyChannelAdmin::ChannelID id
;
217 initial_admin_
.length (4);
219 this->initial_admin_
[0].name
=
220 CORBA::string_dup (CosNotification::MaxQueueLength
);
221 this->initial_admin_
[0].value
<<= this->max_queue_length_
;
224 this->initial_admin_
[1].name
=
225 CORBA::string_dup (CosNotification::MaxSuppliers
);
226 this->initial_admin_
[1].value
<<= this->max_suppliers_
;
228 this->initial_admin_
[2].name
=
229 CORBA::string_dup (CosNotification::MaxConsumers
);
230 this->initial_admin_
[2].value
<<= this->max_consumers_
;
233 this->initial_admin_
[3].name
=
234 CORBA::string_dup (CosNotification::RejectNewEvents
);
235 this->initial_admin_
[3].value
<<= CORBA::Any::from_boolean (reject
);
237 this->ec_
= notify_factory_
->create_channel (this->initial_qos_
,
238 this->initial_admin_
,
241 ACE_ASSERT (!CORBA::is_nil (ec_
.in ()));
244 CosNotifyChannelAdmin::AdminID adminid
;
246 this->supplier_admin_
= ec_
->new_for_suppliers (this->ifgop_
,
249 ACE_ASSERT (!CORBA::is_nil (supplier_admin_
.in ()));
251 this->consumer_admin_
= ec_
->new_for_consumers (this->ifgop_
,
254 ACE_ASSERT (!CORBA::is_nil (consumer_admin_
.in ()));
258 AdminProperties::run_test ()
261 this->create_channel(reject
);
262 this->test_max_queue_length (reject
);
264 this->ec_
->destroy();
267 this->create_channel(reject
);
268 this->test_max_queue_length (reject
);
270 this->test_max_clients ();
272 this->ec_
->destroy ();
276 AdminProperties::test_max_queue_length (bool reject
)
278 // Create the consumer
279 AdminProperties_StructuredPushConsumer
*consumer
;
280 ACE_NEW (consumer
, AdminProperties_StructuredPushConsumer (this));
281 consumer
->init (root_poa_
.in ());
282 consumer
->connect (this->consumer_admin_
.in ());
284 // Create the supplier
285 TAO_Notify_Tests_StructuredPushSupplier
*supplier
= 0;
286 ACE_NEW (supplier
, TAO_Notify_Tests_StructuredPushSupplier ());
287 supplier
->init (root_poa_
.in ());
288 supplier
->connect (this->supplier_admin_
.in ());
290 AdminProperties_Task supplier_task
;
292 // Init the Task to send events;
293 supplier_task
.init (supplier
, this);
295 if (supplier_task
.activate (THR_NEW_LWP
| THR_JOINABLE
, 1) != 0)
297 ACE_ERROR ((LM_ERROR
, "\nCannot activate supplier task\n"));
300 // All supplier events should be sent before the first consumer event is
301 // received. This relies on our use of -ORBClientConnectionHandler RW.
302 supplier_task
.wait ();
306 // consumer is destroyed by consumer->disconnect()
307 CORBA::Long received_count
= consumer
->events_received_
.value ();
309 // disconnect the participants.
310 consumer
->disconnect ();
311 supplier
->disconnect ();
313 // If the reject_new_events setting == true, then the supplier should
314 // have received an imp_limit exception for each event it tried to push
315 // after the maximum was reached.
316 // If the reject_new_events setting == false, then the events should
317 // have been discarded according to the DiscardPolicy, which for this
318 // test we leave as AnyOrder.
320 ACE_DEBUG ((LM_DEBUG
, "\nSupplier sent %d events, consumer received %d events, max_queue_length = %d\n",
321 event_count_
, received_count
, max_queue_length_
));
323 int expected_min
= max_queue_length_
;
324 int expected_max
= max_queue_length_
+ max_consumers_
;
327 expected_max
= event_count_
- rejections_
;
328 expected_min
= expected_max
;
331 if (reject
&& rejections_
!= event_count_
- received_count
)
333 ACE_ERROR ((LM_ERROR
, "\nError: Expected %d rejections, but got %d\n",
334 event_count_
- received_count
, rejections_
));
338 if (received_count
< expected_min
|| received_count
> expected_max
)
340 ACE_ERROR ((LM_ERROR
, "\nError: Expected %d to %d events, but received %d\n",
341 expected_min
, expected_max
, received_count
));
346 AdminProperties::test_max_clients ()
348 this->create_suppliers ();
350 this->create_consumers ();
352 // check the results and print the assessment.
353 if (this->consumers_connected_count_
> this->max_consumers_
)
354 ACE_DEBUG ((LM_ERROR
, "\nConnected consumers %d, exceed MaxConsumers %d\n",
355 this->consumers_connected_count_
> this->max_consumers_
));
357 if (this->suppliers_connected_count_
> this->max_suppliers_
)
358 ACE_DEBUG ((LM_ERROR
, "\nConnected suppliers %d, exceed MaxSuppliers %d\n",
359 this->suppliers_connected_count_
> this->max_suppliers_
));
363 AdminProperties::create_suppliers ()
365 // Create the requested number of suppliers.
366 // @@ CosNotifyChannelAdmin::AdminID adminid;
367 // @@ CosNotifyChannelAdmin::InterFilterGroupOperator ifgop =
368 // @@ CosNotifyChannelAdmin::OR_OP;
374 TAO_Notify_Tests_StructuredPushSupplier
*supplier
;
376 for (index
= 0; index
< this->suppliers_
; ++index
)
379 TAO_Notify_Tests_StructuredPushSupplier ());
380 supplier
->init (root_poa_
.in ());
382 supplier
->connect (this->supplier_admin_
.in ());
384 this->suppliers_connected_count_
++;
387 catch (const CORBA::IMP_LIMIT
&)
390 ACE_DEBUG ((LM_DEBUG
, "\nImpl Limit excpetion when connecting supplier\n"));
392 catch (const CORBA::Exception
& ex
)
394 ex
._tao_print_exception ("\nError: Exception in connecting supplier\n");
399 AdminProperties::create_consumers ()
401 // Create the requested number of suppliers.
402 // @@ CosNotifyChannelAdmin::AdminID adminid;
403 // @@ CosNotifyChannelAdmin::InterFilterGroupOperator ifgop =
404 // @@ CosNotifyChannelAdmin::OR_OP;
410 TAO_Notify_Tests_StructuredPushConsumer
*consumer
;
412 for (index
= 0; index
< this->consumers_
; ++index
)
414 ACE_NEW (consumer
, TAO_Notify_Tests_StructuredPushConsumer ());
415 consumer
->init (root_poa_
.in ());
417 consumer
->connect (this->consumer_admin_
.in ());
419 this->consumers_connected_count_
++;
422 catch (const CORBA::IMP_LIMIT
&)
425 ACE_DEBUG ((LM_DEBUG
, "\nImpl Limit exception when connecting consumer\n"));
427 catch (const CORBA::Exception
& ex
)
429 ex
._tao_print_exception (
430 "\nError: Exception in connecting consumer\n");
434 /***************************************************************************/
437 ACE_TMAIN(int argc
, ACE_TCHAR
*argv
[])
439 AdminProperties test
;
443 test
.init (argc
, argv
);
447 catch (const CORBA::Exception
& se
)
449 se
._tao_print_exception ("Error: ");