1 // EchoEventSupplierMain.cpp
2 // Main program for a PushSupplier of Echo events.
4 #include "EchoEventSupplier_i.h"
5 #include "SimpleAddressServer.h"
7 #include "orbsvcs/RtecEventCommC.h"
8 #include "orbsvcs/RtecEventChannelAdminC.h"
9 #include "orbsvcs/Time_Utilities.h"
10 #include "orbsvcs/Event_Utilities.h"
11 #include "orbsvcs/CosNamingC.h"
12 #include "orbsvcs/Event/EC_Event_Channel.h"
13 #include "orbsvcs/Event/EC_Default_Factory.h"
14 #include "orbsvcs/Event/ECG_Mcast_EH.h"
15 #include "orbsvcs/Event/ECG_UDP_Sender.h"
16 #include "orbsvcs/Event/ECG_UDP_Receiver.h"
17 #include "orbsvcs/Event/ECG_UDP_Out_Endpoint.h"
18 #include "orbsvcs/Event/ECG_UDP_EH.h"
20 #include "tao/ORB_Core.h"
22 #include "ace/Auto_Ptr.h"
26 const RtecEventComm::EventSourceID MY_SOURCE_ID
= ACE_ES_EVENT_SOURCE_ANY
+ 1;
27 const RtecEventComm::EventType MY_EVENT_TYPE
= ACE_ES_EVENT_UNDEFINED
+ 1;
29 const int EVENT_DELAY_MS
= 10;
31 int ACE_TMAIN (int argc
, ACE_TCHAR
*argv
[])
35 // Initialize the EC Factory so we can customize the EC
36 TAO_EC_Default_Factory::init_svcs ();
38 // Initialize the ORB.
39 CORBA::ORB_var orb
= CORBA::ORB_init(argc
, argv
);
41 const ACE_TCHAR
* ecname
= ACE_TEXT ("EventService");
42 const ACE_TCHAR
* address
= ACE_TEXT ("localhost");
43 const ACE_TCHAR
* iorfile
= 0;
45 u_short listenport
= 12345;
48 for (int i
= 0; argv
[i
] != 0; i
++) {
49 if (ACE_OS::strcmp(argv
[i
], ACE_TEXT("-ecname")) == 0) {
54 std::cerr
<< "Missing Event channel name" << std::endl
;
56 } else if (ACE_OS::strcmp(argv
[i
], ACE_TEXT("-address")) == 0) {
61 std::cerr
<< "Missing address" << std::endl
;
63 } else if (ACE_OS::strcmp(argv
[i
], ACE_TEXT("-port")) == 0) {
66 port
= ACE_OS::atoi(argv
[i
]);
68 std::cerr
<< "Missing port" << std::endl
;
70 } else if (ACE_OS::strcmp(argv
[i
], ACE_TEXT("-listenport")) == 0) {
73 listenport
= ACE_OS::atoi(argv
[i
]);
75 std::cerr
<< "Missing port" << std::endl
;
77 } else if (ACE_OS::strcmp(argv
[i
], ACE_TEXT("-iorfile")) == 0) {
82 } else if (ACE_OS::strcmp(argv
[i
], ACE_TEXT("-udp")) == 0) {
88 CORBA::Object_var object
= orb
->resolve_initial_references ("RootPOA");
89 PortableServer::POA_var poa
= PortableServer::POA::_narrow (object
.in ());
90 PortableServer::POAManager_var poa_manager
= poa
->the_POAManager ();
91 poa_manager
->activate ();
93 // Create a local event channel and register it
94 TAO_EC_Event_Channel_Attributes
attributes (poa
.in (), poa
.in ());
95 PortableServer::Servant_var
<TAO_EC_Event_Channel
> ec_impl
=
96 new TAO_EC_Event_Channel(attributes
);
98 PortableServer::ObjectId_var oid
= poa
->activate_object(ec_impl
.in());
99 CORBA::Object_var ec_obj
= poa
->id_to_reference(oid
.in());
100 RtecEventChannelAdmin::EventChannel_var ec
=
101 RtecEventChannelAdmin::EventChannel::_narrow(ec_obj
.in());
103 // Find the Naming Service.
104 CORBA::Object_var obj
= orb
->resolve_initial_references("NameService");
105 CosNaming::NamingContextExt_var root_context
= CosNaming::NamingContextExt::_narrow(obj
.in());
107 // Bind the Event Channel using Naming Services
108 CosNaming::Name_var name
= root_context
->to_name (ACE_TEXT_ALWAYS_CHAR (ecname
));
109 root_context
->rebind(name
.in(), ec
.in());
111 // Get a proxy push consumer from the EventChannel.
112 RtecEventChannelAdmin::SupplierAdmin_var admin
= ec
->for_suppliers();
113 RtecEventChannelAdmin::ProxyPushConsumer_var consumer
=
114 admin
->obtain_push_consumer();
116 // Instantiate an EchoEventSupplier_i servant.
117 PortableServer::Servant_var
<EchoEventSupplier_i
> servant
=
118 new EchoEventSupplier_i(orb
.in());
120 // Register it with the RootPOA.
121 oid
= poa
->activate_object(servant
.in());
122 CORBA::Object_var supplier_obj
= poa
->id_to_reference(oid
.in());
123 RtecEventComm::PushSupplier_var supplier
=
124 RtecEventComm::PushSupplier::_narrow(supplier_obj
.in());
126 // Connect to the EC.
127 ACE_SupplierQOS_Factory qos
;
128 qos
.insert (MY_SOURCE_ID
, MY_EVENT_TYPE
, 0, 1);
129 consumer
->connect_push_supplier (supplier
.in (), qos
.get_SupplierQOS ());
131 // Initialize the address server with the desired address.
132 // This will be used by the sender object and the multicast
134 ACE_INET_Addr
send_addr (port
, address
);
135 PortableServer::Servant_var
<SimpleAddressServer
> addr_srv_impl
=
136 new SimpleAddressServer(send_addr
);
138 PortableServer::ObjectId_var addr_srv_oid
=
139 poa
->activate_object(addr_srv_impl
.in());
140 CORBA::Object_var addr_srv_obj
= poa
->id_to_reference(addr_srv_oid
.in());
141 RtecUDPAdmin::AddrServer_var addr_srv
=
142 RtecUDPAdmin::AddrServer::_narrow(addr_srv_obj
.in());
144 // Create and initialize the sender object
145 PortableServer::Servant_var
<TAO_ECG_UDP_Sender
> sender
=
146 TAO_ECG_UDP_Sender::create();
147 TAO_ECG_UDP_Out_Endpoint endpoint
;
148 if (endpoint
.dgram ().open (ACE_Addr::sap_any
) == -1) {
149 std::cerr
<< "Cannot open send endpoint" << std::endl
;
153 // TAO_ECG_UDP_Sender::init() takes a TAO_ECG_Refcounted_Endpoint.
154 // If we don't clone our endpoint and pass &endpoint, the sender will
155 // attempt to delete endpoint during shutdown.
156 TAO_ECG_Refcounted_Endpoint
clone (new TAO_ECG_UDP_Out_Endpoint (endpoint
));
157 sender
->init (ec
.in (), addr_srv
.in (), clone
);
159 // Setup the subscription and connect to the EC
160 ACE_ConsumerQOS_Factory cons_qos_fact
;
161 cons_qos_fact
.start_disjunction_group ();
162 cons_qos_fact
.insert (ACE_ES_EVENT_SOURCE_ANY
, ACE_ES_EVENT_ANY
, 0);
163 RtecEventChannelAdmin::ConsumerQOS sub
= cons_qos_fact
.get_ConsumerQOS ();
164 sender
->connect (sub
);
166 // Create and initialize the receiver
167 PortableServer::Servant_var
<TAO_ECG_UDP_Receiver
> receiver
=
168 TAO_ECG_UDP_Receiver::create();
170 // TAO_ECG_UDP_Receiver::init() takes a TAO_ECG_Refcounted_Endpoint.
171 // If we don't clone our endpoint and pass &endpoint, the receiver will
172 // attempt to delete endpoint during shutdown.
173 TAO_ECG_Refcounted_Endpoint
clone2 (new TAO_ECG_UDP_Out_Endpoint (endpoint
));
174 receiver
->init (ec
.in (), clone2
, addr_srv
.in ());
176 // Setup the registration and connect to the event channel
177 ACE_SupplierQOS_Factory supp_qos_fact
;
178 supp_qos_fact
.insert (MY_SOURCE_ID
, MY_EVENT_TYPE
, 0, 1);
179 RtecEventChannelAdmin::SupplierQOS pub
= supp_qos_fact
.get_SupplierQOS ();
180 receiver
->connect (pub
);
182 // Create the appropriate event handler and register it with the reactor
183 auto_ptr
<ACE_Event_Handler
> eh
;
185 auto_ptr
<TAO_ECG_Mcast_EH
> mcast_eh(new TAO_ECG_Mcast_EH (receiver
.in()));
186 mcast_eh
->reactor (orb
->orb_core ()->reactor ());
187 mcast_eh
->open (ec
.in());
188 ACE_auto_ptr_reset(eh
,mcast_eh
.release());
189 //eh.reset(mcast_eh.release());
191 auto_ptr
<TAO_ECG_UDP_EH
> udp_eh (new TAO_ECG_UDP_EH (receiver
.in()));
192 udp_eh
->reactor (orb
->orb_core ()->reactor ());
193 ACE_INET_Addr
local_addr (listenport
);
194 if (udp_eh
->open (local_addr
) == -1) {
195 std::cerr
<< "Cannot open EH" << std::endl
;
197 ACE_auto_ptr_reset(eh
,udp_eh
.release());
198 //eh.reset(udp_eh.release());
201 // Create an event (just a string in this case).
202 const CORBA::String_var eventData
= CORBA::string_dup (ACE_TEXT_ALWAYS_CHAR (ecname
));
204 // Create an event set for one event
205 RtecEventComm::EventSet
event (1);
208 // Initialize event header.
209 event
[0].header
.source
= MY_SOURCE_ID
;
210 event
[0].header
.ttl
= 1;
211 event
[0].header
.type
= MY_EVENT_TYPE
;
213 // Initialize data fields in event.
214 event
[0].data
.any_value
<<= eventData
;
217 CORBA::String_var str
= orb
->object_to_string( ec
.in() );
218 std::ofstream
iorFile( ACE_TEXT_ALWAYS_CHAR(iorfile
) );
219 iorFile
<< str
.in() << std::endl
;
222 std::cout
<< "Starting main loop" << std::endl
;
224 const int EVENT_DELAY_MS
= 10;
227 consumer
->push (event
);
229 ACE_Time_Value
tv(0, 1000 * EVENT_DELAY_MS
);
236 catch(const CORBA::Exception
& exc
)
238 std::cerr
<< "Caught CORBA::Exception" << std::endl
<< exc
<< std::endl
;