1 #include "ace/Arg_Shifter.h"
2 #include "ace/Get_Opt.h"
3 #include "ace/OS_NS_unistd.h"
8 /***************************************************************************/
10 SequencePushConsumer::SequencePushConsumer (Sequence
*test_client
)
11 : test_client_ (test_client
)
16 SequencePushConsumer::push_structured_events (
17 const CosNotification::EventBatch
&batch
)
19 this->test_client_
->events_received_
+= batch
.length ();
21 if (batch
.length () > this->test_client_
->consumer_batch_size_
)
23 "Error: Received more than max event batch %d\n",
26 this->test_client_
->on_event_received ();
28 ACE_OS::sleep (this->test_client_
->consumer_delay_
);
31 /***************************************************************************/
33 SequencePushSupplier::SequencePushSupplier (
34 Sequence
* test_client
)
35 : test_client_ (test_client
)
39 SequencePushSupplier::~SequencePushSupplier ()
43 /***************************************************************************/
45 : event_count_ (15), supplier_batch_size_ (5), consumer_batch_size_ (3),
46 pacing_ (2), order_policy_ (CosNotification::PriorityOrder
), events_received_ (0),
51 Sequence::~Sequence ()
56 Sequence::init (int argc
,
62 "Options: event count = %d\n"
63 "supplier batch size = %d\n"
64 "consumer batch size = %d\n"
67 , supplier_batch_size_
68 , consumer_batch_size_
71 ACE_DEBUG ((LM_DEBUG
, "consumer delay = %d\n", consumer_delay_
.sec ()));
74 // Initialize the base class.
75 Notify_Test_Client::init (argc
,
78 // Create all participents.
81 CosNotifyChannelAdmin::AdminID adminid
;
83 this->supplier_admin_
=
84 this->ec_
->new_for_suppliers (this->ifgop_
,
87 ACE_ASSERT (!CORBA::is_nil (supplier_admin_
.in ()));
89 this->consumer_admin_
=
90 this->ec_
->new_for_consumers (this->ifgop_
,
93 ACE_ASSERT (!CORBA::is_nil (consumer_admin_
.in ()));
95 ACE_NEW_RETURN (this->consumer_
,
96 SequencePushConsumer (this),
98 this->consumer_
->init (root_poa_
.in ());
99 this->consumer_
->connect (this->consumer_admin_
.in ());
101 CosNotification::QoSProperties
properties (3);
102 properties
.length (3);
104 properties
[0].name
= CORBA::string_dup (CosNotification::MaximumBatchSize
);
105 properties
[0].value
<<= (CORBA::Long
) this->consumer_batch_size_
;
106 properties
[1].name
= CORBA::string_dup (CosNotification::PacingInterval
);
107 properties
[1].value
<<= (TimeBase::TimeT
) (this->pacing_
* 1000 * 10000);
108 properties
[2].name
= CORBA::string_dup (CosNotification::OrderPolicy
);
109 properties
[2].value
<<= this->order_policy_
;
111 this->consumer_
->get_proxy_supplier ()->set_qos (properties
);
113 ACE_NEW_RETURN (this->supplier_
,
114 SequencePushSupplier (this),
116 this->supplier_
->init (root_poa_
.in ());
118 this->supplier_
->connect (this->supplier_admin_
.in ());
126 Sequence::parse_args (int argc
,
129 ACE_Arg_Shifter
arg_shifter (argc
,
131 const ACE_TCHAR
*current_arg
= 0;
133 while (arg_shifter
.is_anything_left ())
135 if (0 != (current_arg
= arg_shifter
.get_the_parameter (ACE_TEXT("-events"))))
137 this->event_count_
= ACE_OS::atoi (current_arg
); // The number of events to send/receive.
139 arg_shifter
.consume_arg ();
141 else if (0 != (current_arg
= arg_shifter
.get_the_parameter (ACE_TEXT("-SupplierBatchSize"))))
143 this->supplier_batch_size_
= ACE_OS::atoi (current_arg
); // Supplier batch size
145 arg_shifter
.consume_arg ();
147 else if (0 != (current_arg
= arg_shifter
.get_the_parameter (ACE_TEXT("-ConsumerBatchSize"))))
149 this->consumer_batch_size_
= ACE_OS::atoi (current_arg
); // Consumer batch size
151 arg_shifter
.consume_arg ();
153 else if (0 != (current_arg
= arg_shifter
.get_the_parameter (ACE_TEXT("-ConsumerDelay"))))
155 this->consumer_delay_
= ACE_Time_Value (ACE_OS::atoi (current_arg
), 0); // Consumer delay in secs.
157 arg_shifter
.consume_arg ();
159 else if (0 != (current_arg
= arg_shifter
.get_the_parameter (ACE_TEXT("-Pacing")))) // in seconds
161 this->pacing_
= (TimeBase::TimeT
) ACE_OS::atoi (current_arg
);
163 arg_shifter
.consume_arg ();
166 else if (arg_shifter
.cur_arg_strncasecmp (ACE_TEXT("-?")) == 0)
170 "-events event_count "
171 "-SupplierBatchSize size "
172 "-ConsumerBatchSize size "
173 "-ConsumerDelay delay "
177 arg_shifter
.consume_arg ();
183 arg_shifter
.ignore_arg ();
191 Sequence::create_EC ()
193 CosNotifyChannelAdmin::ChannelID id
;
195 this->ec_
= notify_factory_
->create_channel (this->initial_qos_
,
196 this->initial_admin_
,
198 ACE_ASSERT (!CORBA::is_nil (this->ec_
.in ()));
202 Sequence::on_event_received ()
205 ACE_DEBUG ((LM_DEBUG
,
206 "Events received = %d\n",
207 this->events_received_
.value ()));
209 if (this->events_received_
.value () == this->event_count_
)
216 Sequence::run_test ()
219 CosNotification::StructuredEvent event
;
226 event
.header
.fixed_header
.event_type
.domain_name
= CORBA::string_dup("*");
228 event
.header
.fixed_header
.event_type
.type_name
= CORBA::string_dup("*");
230 event
.header
.fixed_header
.event_name
= CORBA::string_dup("myevent");
232 // OptionalHeaderFields.
234 // sequence<Property>: string name, any value
235 CosNotification::PropertySeq
& qos
= event
.header
.variable_header
;
236 qos
.length (1); // put nothing here
238 // FilterableEventBody
240 // sequence<Property>: string name, any value
241 event
.filterable_data
.length (3);
242 event
.filterable_data
[0].name
= CORBA::string_dup("threshold");
244 event
.filterable_data
[1].name
= CORBA::string_dup("temperature");
245 event
.filterable_data
[1].value
<<= (CORBA::Long
)70;
247 event
.filterable_data
[2].name
= CORBA::string_dup("pressure");
248 event
.filterable_data
[2].value
<<= (CORBA::Long
)80;
250 CORBA::Short prio
= CosNotification::LowestPriority
;
252 CosNotification::EventBatch batch
;
253 batch
.length (this->supplier_batch_size_
);
254 CORBA::ULong batch_index
= 0;
256 for (int i
= 0; i
< this->event_count_
; ++i
)
258 event
.filterable_data
[0].value
<<= (CORBA::Long
)i
;
261 event
.remainder_of_body
<<= (CORBA::Long
)i
;
263 qos
[0].name
= CORBA::string_dup (CosNotification::Priority
);
264 qos
[0].value
<<= (CORBA::Short
)prio
++;
266 batch
[batch_index
] = event
;
269 if (batch_index
== this->supplier_batch_size_
)
271 batch
.length (batch_index
); // set the correct length
274 ACE_DEBUG ((LM_DEBUG
, "Sending batch with %d events\n", batch
.length ()));
276 this->supplier_
->send_events (batch
);
279 batch
.length (this->supplier_batch_size_
);
284 // send the last batch.
287 batch
.length (batch_index
); // set the correct length
289 this->supplier_
->send_events (batch
);
295 Sequence::end_test ()
301 Sequence::check_results ()
303 // Destroy the channel.
304 this->ec_
->destroy ();
306 if (this->events_received_
.value () == this->event_count_
)
308 ACE_DEBUG ((LM_DEBUG
,
309 "Sequence test success\n"));
314 ACE_DEBUG ((LM_DEBUG
,
315 "Sequence test failed!\n"));
320 /***************************************************************************/
323 ACE_TMAIN(int argc
, ACE_TCHAR
*argv
[])
327 if (events
.parse_args (argc
, argv
) == -1)
334 events
.init (argc
, argv
);
340 catch (const CORBA::Exception
& se
)
342 se
._tao_print_exception ("Error: ");
350 status
= events
.check_results ();
352 catch (const CORBA::Exception
& se
)
354 se
._tao_print_exception ("Error: ");