Merge pull request #1844 from jrw972/monterey
[ACE_TAO.git] / TAO / orbsvcs / DevGuideExamples / EventServices / RTEC_MCast_Federated / EchoEventSupplierMain.cpp
blob18dd4175aff4fa3c2b5457d12307a3e4a9e01671
1 // EchoEventSupplierMain.cpp
2 // Main program for a PushSupplier of Echo events.
4 #include "EchoEventSupplier_i.h"
5 #include "SimpleAddressServer.h"
7 #include "orbsvcs/RtecEventCommC.h"
8 #include "orbsvcs/RtecEventChannelAdminC.h"
9 #include "orbsvcs/Time_Utilities.h"
10 #include "orbsvcs/Event_Utilities.h"
11 #include "orbsvcs/CosNamingC.h"
12 #include "orbsvcs/Event/EC_Event_Channel.h"
13 #include "orbsvcs/Event/EC_Default_Factory.h"
14 #include "orbsvcs/Event/ECG_Mcast_EH.h"
15 #include "orbsvcs/Event/ECG_UDP_Sender.h"
16 #include "orbsvcs/Event/ECG_UDP_Receiver.h"
17 #include "orbsvcs/Event/ECG_UDP_Out_Endpoint.h"
18 #include "orbsvcs/Event/ECG_UDP_EH.h"
20 #include "tao/ORB_Core.h"
22 #include "ace/Auto_Ptr.h"
23 #include <iostream>
24 #include <fstream>
26 const RtecEventComm::EventSourceID MY_SOURCE_ID = ACE_ES_EVENT_SOURCE_ANY + 1;
27 const RtecEventComm::EventType MY_EVENT_TYPE = ACE_ES_EVENT_UNDEFINED + 1;
29 const int EVENT_DELAY_MS = 10;
31 int ACE_TMAIN (int argc, ACE_TCHAR *argv[])
33 try
35 // Initialize the EC Factory so we can customize the EC
36 TAO_EC_Default_Factory::init_svcs ();
38 // Initialize the ORB.
39 CORBA::ORB_var orb = CORBA::ORB_init(argc, argv);
41 const ACE_TCHAR* ecname = ACE_TEXT ("EventService");
42 const ACE_TCHAR* address = ACE_TEXT ("localhost");
43 const ACE_TCHAR* iorfile = 0;
44 u_short port = 12345;
45 u_short listenport = 12345;
46 int mcast = 1;
48 for (int i = 0; argv[i] != 0; i++) {
49 if (ACE_OS::strcmp(argv[i], ACE_TEXT("-ecname")) == 0) {
50 if (argv[i+1] != 0) {
51 i++;
52 ecname = argv[i];
53 } else {
54 std::cerr << "Missing Event channel name" << std::endl;
56 } else if (ACE_OS::strcmp(argv[i], ACE_TEXT("-address")) == 0) {
57 if (argv[i+1] != 0) {
58 i++;
59 address = argv[i];
60 } else {
61 std::cerr << "Missing address" << std::endl;
63 } else if (ACE_OS::strcmp(argv[i], ACE_TEXT("-port")) == 0) {
64 if (argv[i+1] != 0) {
65 i++;
66 port = ACE_OS::atoi(argv[i]);
67 } else {
68 std::cerr << "Missing port" << std::endl;
70 } else if (ACE_OS::strcmp(argv[i], ACE_TEXT("-listenport")) == 0) {
71 if (argv[i+1] != 0) {
72 i++;
73 listenport = ACE_OS::atoi(argv[i]);
74 } else {
75 std::cerr << "Missing port" << std::endl;
77 } else if (ACE_OS::strcmp(argv[i], ACE_TEXT("-iorfile")) == 0) {
78 if (argv[i+1] != 0) {
79 i++;
80 iorfile = argv[i];
82 } else if (ACE_OS::strcmp(argv[i], ACE_TEXT("-udp")) == 0) {
83 mcast = 0;
87 // Get the POA
88 CORBA::Object_var object = orb->resolve_initial_references ("RootPOA");
89 PortableServer::POA_var poa = PortableServer::POA::_narrow (object.in ());
90 PortableServer::POAManager_var poa_manager = poa->the_POAManager ();
91 poa_manager->activate ();
93 // Create a local event channel and register it
94 TAO_EC_Event_Channel_Attributes attributes (poa.in (), poa.in ());
95 PortableServer::Servant_var<TAO_EC_Event_Channel> ec_impl =
96 new TAO_EC_Event_Channel(attributes);
97 ec_impl->activate ();
98 PortableServer::ObjectId_var oid = poa->activate_object(ec_impl.in());
99 CORBA::Object_var ec_obj = poa->id_to_reference(oid.in());
100 RtecEventChannelAdmin::EventChannel_var ec =
101 RtecEventChannelAdmin::EventChannel::_narrow(ec_obj.in());
103 // Find the Naming Service.
104 CORBA::Object_var obj = orb->resolve_initial_references("NameService");
105 CosNaming::NamingContextExt_var root_context = CosNaming::NamingContextExt::_narrow(obj.in());
107 // Bind the Event Channel using Naming Services
108 CosNaming::Name_var name = root_context->to_name (ACE_TEXT_ALWAYS_CHAR (ecname));
109 root_context->rebind(name.in(), ec.in());
111 // Get a proxy push consumer from the EventChannel.
112 RtecEventChannelAdmin::SupplierAdmin_var admin = ec->for_suppliers();
113 RtecEventChannelAdmin::ProxyPushConsumer_var consumer =
114 admin->obtain_push_consumer();
116 // Instantiate an EchoEventSupplier_i servant.
117 PortableServer::Servant_var<EchoEventSupplier_i> servant =
118 new EchoEventSupplier_i(orb.in());
120 // Register it with the RootPOA.
121 oid = poa->activate_object(servant.in());
122 CORBA::Object_var supplier_obj = poa->id_to_reference(oid.in());
123 RtecEventComm::PushSupplier_var supplier =
124 RtecEventComm::PushSupplier::_narrow(supplier_obj.in());
126 // Connect to the EC.
127 ACE_SupplierQOS_Factory qos;
128 qos.insert (MY_SOURCE_ID, MY_EVENT_TYPE, 0, 1);
129 consumer->connect_push_supplier (supplier.in (), qos.get_SupplierQOS ());
131 // Initialize the address server with the desired address.
132 // This will be used by the sender object and the multicast
133 // receiver.
134 ACE_INET_Addr send_addr (port, address);
135 PortableServer::Servant_var<SimpleAddressServer> addr_srv_impl =
136 new SimpleAddressServer(send_addr);
138 PortableServer::ObjectId_var addr_srv_oid =
139 poa->activate_object(addr_srv_impl.in());
140 CORBA::Object_var addr_srv_obj = poa->id_to_reference(addr_srv_oid.in());
141 RtecUDPAdmin::AddrServer_var addr_srv =
142 RtecUDPAdmin::AddrServer::_narrow(addr_srv_obj.in());
144 // Create and initialize the sender object
145 PortableServer::Servant_var<TAO_ECG_UDP_Sender> sender =
146 TAO_ECG_UDP_Sender::create();
147 TAO_ECG_UDP_Out_Endpoint endpoint;
148 if (endpoint.dgram ().open (ACE_Addr::sap_any) == -1) {
149 std::cerr << "Cannot open send endpoint" << std::endl;
150 return 1;
153 // TAO_ECG_UDP_Sender::init() takes a TAO_ECG_Refcounted_Endpoint.
154 // If we don't clone our endpoint and pass &endpoint, the sender will
155 // attempt to delete endpoint during shutdown.
156 TAO_ECG_Refcounted_Endpoint clone (new TAO_ECG_UDP_Out_Endpoint (endpoint));
157 sender->init (ec.in (), addr_srv.in (), clone);
159 // Setup the subscription and connect to the EC
160 ACE_ConsumerQOS_Factory cons_qos_fact;
161 cons_qos_fact.start_disjunction_group ();
162 cons_qos_fact.insert (ACE_ES_EVENT_SOURCE_ANY, ACE_ES_EVENT_ANY, 0);
163 RtecEventChannelAdmin::ConsumerQOS sub = cons_qos_fact.get_ConsumerQOS ();
164 sender->connect (sub);
166 // Create and initialize the receiver
167 PortableServer::Servant_var<TAO_ECG_UDP_Receiver> receiver =
168 TAO_ECG_UDP_Receiver::create();
170 // TAO_ECG_UDP_Receiver::init() takes a TAO_ECG_Refcounted_Endpoint.
171 // If we don't clone our endpoint and pass &endpoint, the receiver will
172 // attempt to delete endpoint during shutdown.
173 TAO_ECG_Refcounted_Endpoint clone2 (new TAO_ECG_UDP_Out_Endpoint (endpoint));
174 receiver->init (ec.in (), clone2, addr_srv.in ());
176 // Setup the registration and connect to the event channel
177 ACE_SupplierQOS_Factory supp_qos_fact;
178 supp_qos_fact.insert (MY_SOURCE_ID, MY_EVENT_TYPE, 0, 1);
179 RtecEventChannelAdmin::SupplierQOS pub = supp_qos_fact.get_SupplierQOS ();
180 receiver->connect (pub);
182 // Create the appropriate event handler and register it with the reactor
183 auto_ptr<ACE_Event_Handler> eh;
184 if (mcast) {
185 auto_ptr<TAO_ECG_Mcast_EH> mcast_eh(new TAO_ECG_Mcast_EH (receiver.in()));
186 mcast_eh->reactor (orb->orb_core ()->reactor ());
187 mcast_eh->open (ec.in());
188 ACE_auto_ptr_reset(eh,mcast_eh.release());
189 //eh.reset(mcast_eh.release());
190 } else {
191 auto_ptr<TAO_ECG_UDP_EH> udp_eh (new TAO_ECG_UDP_EH (receiver.in()));
192 udp_eh->reactor (orb->orb_core ()->reactor ());
193 ACE_INET_Addr local_addr (listenport);
194 if (udp_eh->open (local_addr) == -1) {
195 std::cerr << "Cannot open EH" << std::endl;
197 ACE_auto_ptr_reset(eh,udp_eh.release());
198 //eh.reset(udp_eh.release());
201 // Create an event (just a string in this case).
202 const CORBA::String_var eventData = CORBA::string_dup (ACE_TEXT_ALWAYS_CHAR (ecname));
204 // Create an event set for one event
205 RtecEventComm::EventSet event (1);
206 event.length (1);
208 // Initialize event header.
209 event[0].header.source = MY_SOURCE_ID;
210 event[0].header.ttl = 1;
211 event[0].header.type = MY_EVENT_TYPE;
213 // Initialize data fields in event.
214 event[0].data.any_value <<= eventData;
216 if (iorfile != 0) {
217 CORBA::String_var str = orb->object_to_string( ec.in() );
218 std::ofstream iorFile( ACE_TEXT_ALWAYS_CHAR(iorfile) );
219 iorFile << str.in() << std::endl;
220 iorFile.close();
222 std::cout << "Starting main loop" << std::endl;
224 const int EVENT_DELAY_MS = 10;
226 while (1) {
227 consumer->push (event);
229 ACE_Time_Value tv(0, 1000 * EVENT_DELAY_MS);
230 orb->run(tv);
233 orb->destroy();
234 return 0;
236 catch(const CORBA::Exception& exc)
238 std::cerr << "Caught CORBA::Exception" << std::endl << exc << std::endl;
240 return 1;