1 // ******************************************************************
3 // ******************************************************************
5 #include "ace/Get_Opt.h"
7 #include "tao/ORB_Core.h"
9 #include "orbsvcs/CosNotifyChannelAdminS.h"
10 #include "orbsvcs/CosNotifyCommC.h"
11 #include "orbsvcs/CosNamingC.h"
12 #include "orbsvcs/TimeBaseC.h"
13 #include "Notify_SequencePushSupplier.h"
16 #include "Notify_Test_Client.h"
18 #include "ace/OS_NS_unistd.h"
21 // ******************************************************************
23 // ******************************************************************
25 static TAO_Notify_Tests_SequencePushSupplier
* supplier_1
= 0;
27 static CORBA::Boolean use_deadline_ordering
= 0;
29 // Must be a multiple of the consumers batch size.
30 static int num_events
= 40;
32 // Must match the consumers batch size so that we send
33 // the correct number of each type of event per batch, which
34 // allows us to validate the discard policy.
35 static const CORBA::Long BATCH_SIZE
= 4;
37 static const ACE_TCHAR
*ior_output_file
= ACE_TEXT ("supplier.ior");
39 // ******************************************************************
41 // ******************************************************************
43 // The supplier will not start sending events until the
44 // go() operation is invoked.
45 class sig_i
: public POA_sig
48 sig_i(CORBA::ORB_ptr orb
)
64 void wait_for_startup()
67 ACE_Time_Value
tv(0, 100 * 1000); // 100ms
72 void wait_for_completion()
75 ACE_Time_Value
tv(0, 100 * 1000); // 100ms
85 class Supplier_Client
: public Notify_Test_Client
88 virtual int parse_args (int argc
, ACE_TCHAR
*argv
[]);
93 Supplier_Client::parse_args (int argc
, ACE_TCHAR
*argv
[])
95 ACE_Get_Opt
get_opts (argc
, argv
, ACE_TEXT("o:e:d"));
98 while ((c
= get_opts ()) != -1)
102 use_deadline_ordering
= 1;
103 #if !defined (ACE_HAS_TIMED_MESSAGE_BLOCKS)
104 ACE_ERROR_RETURN ((LM_ERROR
,
105 "This order policy requires timed message "
106 "blocks.\nPlease #define "
107 "ACE_HAS_TIMED_MESSAGE_BLOCKS in your "
109 #endif /* ACE_HAS_TIMED_MESSAGE_BLOCKS */
112 num_events
= ACE_OS::atoi (get_opts
.optarg
);
113 num_events
-= num_events
% BATCH_SIZE
; // round down
117 ior_output_file
= get_opts
.optarg
;
121 ACE_ERROR_RETURN ((LM_ERROR
,
123 "-o <iorfile> -e <# of events> -d"
129 // Indicates successful parsing of the command line
134 static CosNotifyChannelAdmin::SupplierAdmin_ptr
135 create_supplieradmin (CosNotifyChannelAdmin::EventChannel_ptr ec
)
137 CosNotifyChannelAdmin::AdminID adminid
= 0;
138 CosNotifyChannelAdmin::SupplierAdmin_var admin
=
139 ec
->new_for_suppliers (CosNotifyChannelAdmin::AND_OP
,
143 return CosNotifyChannelAdmin::SupplierAdmin::_duplicate (admin
.in ());
148 SendBatch (int batch_id
)
150 CosNotification::EventBatch events
;
151 events
.length(BATCH_SIZE
);
152 for (CORBA::Long i
= 0; i
< BATCH_SIZE
; ++i
)
154 int id
= batch_id
* BATCH_SIZE
+ i
+ 1;
156 CosNotification::StructuredEvent event
;
158 event
.header
.fixed_header
.event_type
.domain_name
= CORBA::string_dup ("a");
159 event
.header
.fixed_header
.event_type
.type_name
= CORBA::string_dup ("b");
160 event
.header
.fixed_header
.event_name
= CORBA::string_dup ("c");
162 event
.header
.variable_header
.length (3);
163 event
.header
.variable_header
[0].name
=
164 CORBA::string_dup ("Id");
165 event
.header
.variable_header
[0].value
<<= (CORBA::Long
)id
;
167 event
.header
.variable_header
[1].name
=
168 CORBA::string_dup (CosNotification::Priority
);
169 event
.header
.variable_header
[1].value
<<= (CORBA::Short
)(id
);
171 event
.header
.variable_header
[2].name
=
172 CORBA::string_dup (CosNotification::Timeout
);
173 event
.header
.variable_header
[2].value
<<= (TimeBase::TimeT
) (id
* 10000);
177 supplier_1
->send_events (events
);
181 create_suppliers (CosNotifyChannelAdmin::SupplierAdmin_ptr admin
,
182 PortableServer::POA_ptr poa
)
184 // start up the supplier
185 ACE_NEW_THROW_EX (supplier_1
,
186 TAO_Notify_Tests_SequencePushSupplier (),
187 CORBA::NO_MEMORY ());
189 supplier_1
->init (poa
);
191 supplier_1
->connect (admin
);
195 // ******************************************************************
197 // ******************************************************************
199 int ACE_TMAIN (int argc
, ACE_TCHAR
*argv
[])
202 std::unique_ptr
<sig_i
> sig_impl
;
205 Supplier_Client client
;
206 status
= client
.init (argc
, argv
);
210 CosNotifyChannelAdmin::EventChannel_var ec
=
211 client
.create_event_channel ("MyEventChannel", 0);
213 if (use_deadline_ordering
)
215 CosNotification::QoSProperties
qos (1);
217 qos
[0].name
= CORBA::string_dup (CosNotification::OrderPolicy
);
218 qos
[0].value
<<= (CORBA::Short
)CosNotification::DeadlineOrder
;
222 sig_impl
.reset( new sig_i(client
.orb()));
223 sig_var sig
= sig_impl
->_this ();
225 CORBA::String_var ior
=
226 client
.orb()->object_to_string (sig
.in ());
228 // If the ior_output_file exists, output the ior to it
229 if (ior_output_file
!= 0)
231 FILE *output_file
= ACE_OS::fopen (ior_output_file
, "w");
232 ACE_ASSERT(output_file
!= 0);
233 ACE_OS::fprintf (output_file
, "%s", ior
.in ());
234 ACE_OS::fclose (output_file
);
237 CosNotifyChannelAdmin::SupplierAdmin_var admin
=
238 create_supplieradmin (ec
.in ());
239 ACE_ASSERT(!CORBA::is_nil (admin
.in ()));
240 create_suppliers (admin
.in (), client
.root_poa ());
242 sig_impl
->wait_for_startup();
244 ACE_DEBUG((LM_DEBUG
, "1 supplier sending %d events...\n", num_events
));
245 for (int i
= 0; i
< num_events
/ BATCH_SIZE
; ++i
)
247 ACE_DEBUG((LM_DEBUG
, "+"));
250 ACE_DEBUG((LM_DEBUG
, "\nSupplier sent %d events.\n", num_events
));
252 sig_impl
->wait_for_completion();
254 ACE_OS::unlink (ior_output_file
);
256 supplier_1
->disconnect();
261 catch (const CORBA::Exception
& e
)
263 e
._tao_print_exception ("Error: ");