Cleanup Solaris support
[ACE_TAO.git] / TAO / orbsvcs / tests / Event / Mcast / RTEC_MCast_Federated / EchoEventSupplierMain.cpp
blob2837ba34dbb6f57fdbf007c8abfbfa35cc6bc2f7
1 // EchoEventSupplierMain.cpp
2 // Main program for a PushSupplier of Echo events.
4 #include "EchoEventSupplier_i.h"
5 #include "SimpleAddressServer.h"
7 #include "orbsvcs/RtecEventCommC.h"
8 #include "orbsvcs/RtecEventChannelAdminC.h"
9 #include "orbsvcs/Time_Utilities.h"
10 #include "orbsvcs/Event_Utilities.h"
11 #include "orbsvcs/CosNamingC.h"
12 #include "orbsvcs/Event/EC_Event_Channel.h"
13 #include "orbsvcs/Event/EC_Default_Factory.h"
14 #include "orbsvcs/Event/ECG_Mcast_EH.h"
15 #include "orbsvcs/Event/ECG_UDP_Sender.h"
16 #include "orbsvcs/Event/ECG_UDP_Receiver.h"
17 #include "orbsvcs/Event/ECG_UDP_Out_Endpoint.h"
18 #include "orbsvcs/Event/ECG_UDP_EH.h"
20 #include "tao/ORB_Core.h"
21 #include "ace/OS_NS_strings.h"
23 #include <memory>
24 #include <fstream>
26 const RtecEventComm::EventSourceID MY_SOURCE_ID = ACE_ES_EVENT_SOURCE_ANY + 1;
27 const RtecEventComm::EventType MY_EVENT_TYPE = ACE_ES_EVENT_UNDEFINED + 1;
29 // Initialize the ORB.
31 int ACE_TMAIN (int argc, ACE_TCHAR *argv[])
33 try
35 // Initialize the EC Factory so we can customize the EC
36 TAO_EC_Default_Factory::init_svcs ();
38 // Initialize the ORB.
39 CORBA::ORB_var orb = CORBA::ORB_init (argc, argv);
41 const ACE_TCHAR *ecname = ACE_TEXT ("EventService");
42 const ACE_TCHAR *address = ACE_TEXT ("localhost");
43 const ACE_TCHAR *iorfile = 0;
44 u_short port = 12345;
45 u_short listenport = 12345;
47 int mcast = 1;
49 for (int i = 0; argv[i] != 0; i++)
51 if (ACE_OS::strcasecmp(argv[i], ACE_TEXT ("-ecname")) == 0)
53 if (argv[i+1] != 0)
54 ecname = argv[++i];
55 else
56 ACE_ERROR_RETURN ((LM_ERROR, "Missing Event channel name\n"),0);
58 else if (ACE_OS::strcasecmp(argv[i], ACE_TEXT ("-address")) == 0)
60 if (argv[i+1] != 0)
61 address = argv[++i];
62 else
63 ACE_ERROR_RETURN ((LM_ERROR, "Missing address\n"),0);
65 else if (ACE_OS::strcasecmp(argv[i], ACE_TEXT ("-port")) == 0)
67 if (argv[i+1] != 0)
68 port = ACE_OS::atoi(argv[++i]);
69 else
70 ACE_ERROR_RETURN ((LM_ERROR, "Missing port\n"),0);
72 else if (ACE_OS::strcasecmp(argv[i], ACE_TEXT ("-listenport")) == 0)
74 if (argv[i+1] != 0)
75 listenport = ACE_OS::atoi(argv[++i]);
76 else
77 ACE_ERROR_RETURN ((LM_ERROR, "Missing port\n"), 0);
79 else if (ACE_OS::strcasecmp(argv[i], ACE_TEXT ("-iorfile")) == 0)
81 if (argv[i+1] != 0)
82 iorfile = argv[++i];
83 else
84 ACE_ERROR_RETURN ((LM_ERROR, "Missing ior file\n"), 0);
86 else if (ACE_OS::strcasecmp(argv[i], ACE_TEXT ("-udp")) == 0)
87 mcast = 0;
90 // Get the POA
91 CORBA::Object_var tmpobj = orb->resolve_initial_references ("RootPOA");
92 PortableServer::POA_var poa = PortableServer::POA::_narrow (tmpobj.in ());
93 PortableServer::POAManager_var poa_manager = poa->the_POAManager ();
94 poa_manager->activate ();
96 // Create a local event channel and register it
97 TAO_EC_Event_Channel_Attributes attributes (poa.in (), poa.in ());
98 TAO_EC_Event_Channel ec_impl (attributes);
99 ec_impl.activate ();
100 PortableServer::ObjectId_var oid = poa->activate_object(&ec_impl);
101 tmpobj = poa->id_to_reference(oid.in());
102 RtecEventChannelAdmin::EventChannel_var ec =
103 RtecEventChannelAdmin::EventChannel::_narrow(tmpobj.in());
105 // Find the Naming Service.
106 tmpobj = orb->resolve_initial_references("NameService");
107 CosNaming::NamingContextExt_var root_context =
108 CosNaming::NamingContextExt::_narrow(tmpobj.in());
110 // Bind the Event Channel using Naming Services
111 CosNaming::Name_var name =
112 root_context->to_name (ACE_TEXT_ALWAYS_CHAR (ecname));
113 root_context->rebind(name.in(), ec.in());
115 // Get a proxy push consumer from the EventChannel.
116 RtecEventChannelAdmin::SupplierAdmin_var admin = ec->for_suppliers();
117 RtecEventChannelAdmin::ProxyPushConsumer_var consumer =
118 admin->obtain_push_consumer();
120 // Instantiate an EchoEventSupplier_i servant.
121 EchoEventSupplier_i servant(orb.in());
123 // Register it with the RootPOA.
124 oid = poa->activate_object(&servant);
125 tmpobj = poa->id_to_reference(oid.in());
126 RtecEventComm::PushSupplier_var supplier =
127 RtecEventComm::PushSupplier::_narrow(tmpobj.in());
129 // Connect to the EC.
130 ACE_SupplierQOS_Factory qos;
131 qos.insert (MY_SOURCE_ID, MY_EVENT_TYPE, 0, 1);
132 consumer->connect_push_supplier (supplier.in (), qos.get_SupplierQOS ());
134 // Initialize the address server with the desired address. This will
135 // be used by the sender object and the multicast receiver only if
136 // one is not otherwise available via the naming service.
137 ACE_INET_Addr send_addr (port, address);
138 SimpleAddressServer addr_srv_impl (send_addr);
140 // Create an instance of the addr server for local use
142 PortableServer::ObjectId_var addr_srv_oid =
143 poa->activate_object(&addr_srv_impl);
144 tmpobj =
145 poa->id_to_reference(addr_srv_oid.in());
147 RtecUDPAdmin::AddrServer_var addr_srv =
148 RtecUDPAdmin::AddrServer::_narrow(tmpobj.in());
150 // Create and initialize the sender object
151 PortableServer::Servant_var<TAO_ECG_UDP_Sender> sender =
152 TAO_ECG_UDP_Sender::create();
153 TAO_ECG_UDP_Out_Endpoint endpoint;
154 // need to be explicit about the address type when built with
155 // IPv6 support, otherwise SOCK_DGram::open defaults to ipv6 when
156 // given a sap_any address. This causes trouble on at least windows,
157 // or at most on not-linux.
158 if (endpoint.dgram ().open (ACE_Addr::sap_any,
159 send_addr.get_type()) == -1)
161 ACE_ERROR_RETURN ((LM_ERROR,
162 "Cannot open send endpoint\n"),
166 // TAO_ECG_UDP_Sender::init() takes a TAO_ECG_Refcounted_Endpoint.
167 // If we don't clone our endpoint and pass &endpoint, the sender will
168 // attempt to delete endpoint during shutdown.
169 TAO_ECG_Refcounted_Endpoint clone (new TAO_ECG_UDP_Out_Endpoint (endpoint));
170 sender->init (ec.in (), addr_srv.in (), clone);
172 // Setup the subscription and connect to the EC
173 ACE_ConsumerQOS_Factory cons_qos_fact;
174 cons_qos_fact.start_disjunction_group ();
175 cons_qos_fact.insert (ACE_ES_EVENT_SOURCE_ANY, ACE_ES_EVENT_ANY, 0);
176 RtecEventChannelAdmin::ConsumerQOS sub = cons_qos_fact.get_ConsumerQOS ();
177 sender->connect (sub);
179 // Create and initialize the receiver
180 PortableServer::Servant_var<TAO_ECG_UDP_Receiver> receiver =
181 TAO_ECG_UDP_Receiver::create();
183 // TAO_ECG_UDP_Receiver::init() takes a TAO_ECG_Refcounted_Endpoint.
184 // If we don't clone our endpoint and pass &endpoint, the receiver will
185 // attempt to delete endpoint during shutdown.
186 TAO_ECG_Refcounted_Endpoint clone2 (new TAO_ECG_UDP_Out_Endpoint (endpoint));
187 receiver->init (ec.in (), clone2, addr_srv.in ());
189 // Setup the registration and connect to the event channel
190 ACE_SupplierQOS_Factory supp_qos_fact;
191 supp_qos_fact.insert (MY_SOURCE_ID, MY_EVENT_TYPE, 0, 1);
192 RtecEventChannelAdmin::SupplierQOS pub = supp_qos_fact.get_SupplierQOS ();
193 receiver->connect (pub);
195 // Create the appropriate event handler and register it with the reactor
196 std::unique_ptr<ACE_Event_Handler> eh;
197 if (mcast) {
198 std::unique_ptr<TAO_ECG_Mcast_EH> mcast_eh(new TAO_ECG_Mcast_EH (receiver.in()));
199 mcast_eh->reactor (orb->orb_core ()->reactor ());
200 mcast_eh->open (ec.in());
201 eh.reset (mcast_eh.release());
202 } else {
203 std::unique_ptr<TAO_ECG_UDP_EH> udp_eh (new TAO_ECG_UDP_EH (receiver.in()));
204 udp_eh->reactor (orb->orb_core ()->reactor ());
205 ACE_INET_Addr local_addr (listenport);
206 if (udp_eh->open (local_addr) == -1)
207 ACE_ERROR ((LM_ERROR,"Cannot open EH\n"));
209 eh.reset(udp_eh.release());
212 // Create an event (just a string in this case).
214 // Create an event set for one event
215 RtecEventComm::EventSet event (1);
216 event.length (1);
218 // Initialize event header.
219 event[0].header.source = MY_SOURCE_ID;
220 event[0].header.ttl = 1;
221 event[0].header.type = MY_EVENT_TYPE;
223 #if !defined (TAO_LACKS_EVENT_CHANNEL_ANY)
224 // Initialize data fields in event.
225 const CORBA::String_var eventData =
226 CORBA::string_dup (ACE_TEXT_ALWAYS_CHAR (ecname));
228 event[0].data.any_value <<= eventData;
229 #else
230 // Use the octet sequence payload instead
231 char *tmpstr = const_cast<char *>(ACE_TEXT_ALWAYS_CHAR (ecname));
232 size_t len = ACE_OS::strlen(tmpstr) +1;
233 event[0].data.payload.replace (
234 len,
235 len,
236 reinterpret_cast<CORBA::Octet *> (tmpstr));
237 #endif /* !TAO_LACKS_EVENT_CHANNEL_ANY */
239 if (iorfile != 0) {
240 CORBA::String_var str = orb->object_to_string( ec.in());
241 std::ofstream iorFile( ACE_TEXT_ALWAYS_CHAR(iorfile));
242 iorFile << str.in() << std::endl;
243 iorFile.close();
245 ACE_DEBUG ((LM_DEBUG,
246 "Starting main loop\n"));
248 const int EVENT_DELAY_MS = 1000;
250 while (1) {
251 consumer->push (event);
253 ACE_Time_Value tv(0, 1000 * EVENT_DELAY_MS);
254 orb->run(tv);
257 orb->destroy();
258 return 0;
260 catch (const CORBA::Exception& exc)
262 ACE_ERROR ((LM_ERROR,
263 "Caught CORBA::Exception\n%C (%C)\n",
264 exc._name (),
265 exc._rep_id () ));
267 return 1;