Merge pull request #1551 from DOCGroup/plm_jira_333
[ACE_TAO.git] / TAO / orbsvcs / DevGuideExamples / EventServices / RTEC_Federated / EchoEventSupplierMain.cpp
blob33d2946cabe220eb239089e655bf167a260d3c70
1 // EchoEventSupplierMain.cpp
2 // Main program for a PushSupplier of Echo events.
4 #include "EchoEventSupplier_i.h"
6 #include "orbsvcs/RtecEventCommC.h"
7 #include "orbsvcs/RtecEventChannelAdminC.h"
8 #include "orbsvcs/Time_Utilities.h"
9 #include "orbsvcs/Event_Utilities.h"
10 #include "orbsvcs/CosNamingC.h"
11 #include "orbsvcs/Event/EC_Event_Channel.h"
12 #include "orbsvcs/Event/EC_Gateway_IIOP.h"
13 #include "orbsvcs/Event/EC_Default_Factory.h"
15 #include "ace/Thread_Manager.h"
16 #include <iostream>
17 #include <fstream>
19 const RtecEventComm::EventSourceID MY_SOURCE_ID = ACE_ES_EVENT_SOURCE_ANY + 1;
20 const RtecEventComm::EventType MY_EVENT_TYPE = ACE_ES_EVENT_UNDEFINED + 1;
22 const int EVENT_DELAY_MS = 10;
24 ACE_THR_FUNC_RETURN orb_thread(void *orb_ptr)
26 CORBA::ORB_var orb = CORBA::ORB::_duplicate((CORBA::ORB_ptr) orb_ptr);
27 orb->run();
28 return 0;
31 int ACE_TMAIN (int argc, ACE_TCHAR *argv[])
33 try
35 // Initialize the EC Factory so we can customize the EC
36 TAO_EC_Default_Factory::init_svcs ();
38 // Initialize the ORB.
39 CORBA::ORB_var orb = CORBA::ORB_init(argc, argv);
41 const ACE_TCHAR* ecname = ACE_TEXT ("EventService");
42 const ACE_TCHAR* remote_ecname = 0;
43 const ACE_TCHAR* iorfile = 0;
44 for (int i = 0; argv[i] != 0; i++) {
45 if (ACE_OS::strcmp(argv[i], ACE_TEXT("-ecname")) == 0) {
46 if (argv[i+1] != 0) {
47 i++;
48 ecname = argv[i];
49 } else {
50 std::cerr << "Missing Event channel name" << std::endl;
53 if (ACE_OS::strcmp(argv[i], ACE_TEXT("-gateway")) == 0) {
54 if (argv[i+1] != 0) {
55 i++;
56 remote_ecname = argv[i];
57 } else {
58 std::cerr << "Missing Event channel name" << std::endl;
61 if (ACE_OS::strcmp(argv[i], ACE_TEXT("-iorfile")) == 0) {
62 if (argv[i+1] != 0) {
63 i++;
64 iorfile = argv[i];
69 // Get the POA
70 CORBA::Object_var object = orb->resolve_initial_references ("RootPOA");
71 PortableServer::POA_var poa = PortableServer::POA::_narrow (object.in ());
72 PortableServer::POAManager_var poa_manager = poa->the_POAManager ();
73 poa_manager->activate ();
75 // Spawn a thread for the orb
76 ACE_Thread_Manager *thread_mgr = ACE_Thread_Manager::instance();
77 thread_mgr->spawn(orb_thread, orb.in());
79 // Create a local event channel and register it with the RootPOA.
80 TAO_EC_Event_Channel_Attributes attributes (poa.in (), poa.in ());
81 PortableServer::Servant_var<TAO_EC_Event_Channel> ec_impl =
82 new TAO_EC_Event_Channel(attributes);
83 ec_impl->activate ();
84 PortableServer::ObjectId_var oid = poa->activate_object(ec_impl.in());
85 CORBA::Object_var ec_obj = poa->id_to_reference(oid.in());
86 RtecEventChannelAdmin::EventChannel_var ec =
87 RtecEventChannelAdmin::EventChannel::_narrow(ec_obj.in());
89 // Find the Naming Service.
90 object = orb->resolve_initial_references("NameService");
91 CosNaming::NamingContextExt_var root_context = CosNaming::NamingContextExt::_narrow(object.in());
92 CosNaming::Name_var name = root_context->to_name (ACE_TEXT_ALWAYS_CHAR (ecname));
93 root_context->rebind(name.in(), ec.in());
95 // Get a SupplierAdmin object from the EventChannel.
96 RtecEventChannelAdmin::SupplierAdmin_var admin = ec->for_suppliers();
98 // Get a ProxyPushConsumer from the SupplierAdmin.
99 RtecEventChannelAdmin::ProxyPushConsumer_var consumer =
100 admin->obtain_push_consumer();
102 // Instantiate an EchoEventSupplier_i servant.
103 PortableServer::Servant_var<EchoEventSupplier_i> servant =
104 new EchoEventSupplier_i(orb.in());
106 // Register it with the RootPOA.
107 oid = poa->activate_object(servant.in());
108 CORBA::Object_var supplier_obj = poa->id_to_reference(oid.in());
109 RtecEventComm::PushSupplier_var supplier =
110 RtecEventComm::PushSupplier::_narrow(supplier_obj.in());
112 // Publish the events the supplier provides.
113 ACE_SupplierQOS_Factory qos;
114 qos.insert (MY_SOURCE_ID, // Supplier's unique id
115 MY_EVENT_TYPE, // Event type
116 0, // handle to the rt_info structure
117 1); // number of calls
119 // Connect as a supplier of the published events.
120 consumer->connect_push_supplier (supplier.in (),
121 qos.get_SupplierQOS ());
123 // Create an event (just a string in this case).
124 const CORBA::String_var eventData = CORBA::string_dup (ACE_TEXT_ALWAYS_CHAR (ecname));
126 // Create an event set for one event
127 RtecEventComm::EventSet event (1);
128 event.length (1);
129 // Initialize event header.
130 event[0].header.source = MY_SOURCE_ID;
131 event[0].header.ttl = 1;
132 event[0].header.type = MY_EVENT_TYPE;
133 // Initialize data fields in event.
134 event[0].data.any_value <<= eventData;
136 PortableServer::Servant_var<TAO_EC_Gateway_IIOP> gateway =
137 new TAO_EC_Gateway_IIOP;
138 int gateway_initialized = 0;
140 std::cout << "Supplier starting sending of events.\n";
142 while (1) {
144 consumer->push (event);
145 ACE_Time_Value tv(0, 1000 * EVENT_DELAY_MS);
146 orb->run(tv);
148 if ((remote_ecname != 0) && (!gateway_initialized)) {
150 try {
151 // Get the remote event channel object
152 CORBA::Object_var obj = root_context->resolve_str (ACE_TEXT_ALWAYS_CHAR (remote_ecname));
153 RtecEventChannelAdmin::EventChannel_var remote_ec =
154 RtecEventChannelAdmin::EventChannel::_narrow(obj.in());
156 int ok = 0;
157 if (!CORBA::is_nil(remote_ec.in())) {
158 // Now check if we can talk to it...
159 try {
160 RtecEventChannelAdmin::SupplierAdmin_var adm =
161 remote_ec->for_suppliers();
162 ok = 1;
163 } catch(const CORBA::UserException&) {
164 // What is the correct exception(s) to catch here?
168 // There is a good remote event channel so initialize the
169 // gateway.
170 if (ok) {
171 gateway->init(remote_ec.in(), ec.in());
173 PortableServer::ObjectId_var gateway_oid =
174 poa->activate_object(gateway.in());
175 CORBA::Object_var gateway_obj =
176 poa->id_to_reference(gateway_oid.in());
177 RtecEventChannelAdmin::Observer_var obs =
178 RtecEventChannelAdmin::Observer::_narrow(gateway_obj.in());
179 RtecEventChannelAdmin::Observer_Handle local_ec_obs_handle =
180 ec->append_observer (obs.in ());
181 ACE_UNUSED_ARG (local_ec_obs_handle);
182 gateway_initialized = 1;
183 std::cout << "Gateway initialized\n";
184 if (iorfile != 0) {
185 CORBA::String_var str = orb->object_to_string( ec.in() );
186 std::ofstream iorFile( ACE_TEXT_ALWAYS_CHAR(iorfile) );
187 iorFile << str.in() << std::endl;
188 iorFile.close();
191 } catch(const CosNaming::NamingContext::NotFound&) {
192 // Try again later...
197 orb->destroy();
199 return 0;
201 catch(const CORBA::Exception& exc)
203 std::cerr << "Caught CORBA::Exception" << std::endl << exc << std::endl;
206 return 1;