1 // EchoEventSupplierMain.cpp
2 // Main program for a PushSupplier of Echo events.
4 #include "EchoEventSupplier_i.h"
5 #include "SimpleAddressServer.h"
7 #include "orbsvcs/RtecEventCommC.h"
8 #include "orbsvcs/RtecEventChannelAdminC.h"
9 #include "orbsvcs/Time_Utilities.h"
10 #include "orbsvcs/Event_Utilities.h"
11 #include "orbsvcs/CosNamingC.h"
12 #include "orbsvcs/Event/EC_Event_Channel.h"
13 #include "orbsvcs/Event/EC_Default_Factory.h"
14 #include "orbsvcs/Event/ECG_Mcast_EH.h"
15 #include "orbsvcs/Event/ECG_UDP_Sender.h"
16 #include "orbsvcs/Event/ECG_UDP_Receiver.h"
17 #include "orbsvcs/Event/ECG_UDP_Out_Endpoint.h"
18 #include "orbsvcs/Event/ECG_UDP_EH.h"
20 #include "tao/ORB_Core.h"
21 #include "ace/OS_NS_strings.h"
26 const RtecEventComm::EventSourceID MY_SOURCE_ID
= ACE_ES_EVENT_SOURCE_ANY
+ 1;
27 const RtecEventComm::EventType MY_EVENT_TYPE
= ACE_ES_EVENT_UNDEFINED
+ 1;
29 // Initialize the ORB.
31 int ACE_TMAIN (int argc
, ACE_TCHAR
*argv
[])
35 // Initialize the EC Factory so we can customize the EC
36 TAO_EC_Default_Factory::init_svcs ();
38 // Initialize the ORB.
39 CORBA::ORB_var orb
= CORBA::ORB_init (argc
, argv
);
41 const ACE_TCHAR
*ecname
= ACE_TEXT ("EventService");
42 const ACE_TCHAR
*address
= ACE_TEXT ("localhost");
43 const ACE_TCHAR
*iorfile
= 0;
45 u_short listenport
= 12345;
49 for (int i
= 0; argv
[i
] != 0; i
++)
51 if (ACE_OS::strcasecmp(argv
[i
], ACE_TEXT ("-ecname")) == 0)
56 ACE_ERROR_RETURN ((LM_ERROR
, "Missing Event channel name\n"),0);
58 else if (ACE_OS::strcasecmp(argv
[i
], ACE_TEXT ("-address")) == 0)
63 ACE_ERROR_RETURN ((LM_ERROR
, "Missing address\n"),0);
65 else if (ACE_OS::strcasecmp(argv
[i
], ACE_TEXT ("-port")) == 0)
68 port
= ACE_OS::atoi(argv
[++i
]);
70 ACE_ERROR_RETURN ((LM_ERROR
, "Missing port\n"),0);
72 else if (ACE_OS::strcasecmp(argv
[i
], ACE_TEXT ("-listenport")) == 0)
75 listenport
= ACE_OS::atoi(argv
[++i
]);
77 ACE_ERROR_RETURN ((LM_ERROR
, "Missing port\n"), 0);
79 else if (ACE_OS::strcasecmp(argv
[i
], ACE_TEXT ("-iorfile")) == 0)
84 ACE_ERROR_RETURN ((LM_ERROR
, "Missing ior file\n"), 0);
86 else if (ACE_OS::strcasecmp(argv
[i
], ACE_TEXT ("-udp")) == 0)
91 CORBA::Object_var tmpobj
= orb
->resolve_initial_references ("RootPOA");
92 PortableServer::POA_var poa
= PortableServer::POA::_narrow (tmpobj
.in ());
93 PortableServer::POAManager_var poa_manager
= poa
->the_POAManager ();
94 poa_manager
->activate ();
96 // Create a local event channel and register it
97 TAO_EC_Event_Channel_Attributes
attributes (poa
.in (), poa
.in ());
98 TAO_EC_Event_Channel
ec_impl (attributes
);
100 PortableServer::ObjectId_var oid
= poa
->activate_object(&ec_impl
);
101 tmpobj
= poa
->id_to_reference(oid
.in());
102 RtecEventChannelAdmin::EventChannel_var ec
=
103 RtecEventChannelAdmin::EventChannel::_narrow(tmpobj
.in());
105 // Find the Naming Service.
106 tmpobj
= orb
->resolve_initial_references("NameService");
107 CosNaming::NamingContextExt_var root_context
=
108 CosNaming::NamingContextExt::_narrow(tmpobj
.in());
110 // Bind the Event Channel using Naming Services
111 CosNaming::Name_var name
=
112 root_context
->to_name (ACE_TEXT_ALWAYS_CHAR (ecname
));
113 root_context
->rebind(name
.in(), ec
.in());
115 // Get a proxy push consumer from the EventChannel.
116 RtecEventChannelAdmin::SupplierAdmin_var admin
= ec
->for_suppliers();
117 RtecEventChannelAdmin::ProxyPushConsumer_var consumer
=
118 admin
->obtain_push_consumer();
120 // Instantiate an EchoEventSupplier_i servant.
121 EchoEventSupplier_i
servant(orb
.in());
123 // Register it with the RootPOA.
124 oid
= poa
->activate_object(&servant
);
125 tmpobj
= poa
->id_to_reference(oid
.in());
126 RtecEventComm::PushSupplier_var supplier
=
127 RtecEventComm::PushSupplier::_narrow(tmpobj
.in());
129 // Connect to the EC.
130 ACE_SupplierQOS_Factory qos
;
131 qos
.insert (MY_SOURCE_ID
, MY_EVENT_TYPE
, 0, 1);
132 consumer
->connect_push_supplier (supplier
.in (), qos
.get_SupplierQOS ());
134 // Initialize the address server with the desired address. This will
135 // be used by the sender object and the multicast receiver only if
136 // one is not otherwise available via the naming service.
137 ACE_INET_Addr
send_addr (port
, address
);
138 SimpleAddressServer
addr_srv_impl (send_addr
);
140 // Create an instance of the addr server for local use
142 PortableServer::ObjectId_var addr_srv_oid
=
143 poa
->activate_object(&addr_srv_impl
);
145 poa
->id_to_reference(addr_srv_oid
.in());
147 RtecUDPAdmin::AddrServer_var addr_srv
=
148 RtecUDPAdmin::AddrServer::_narrow(tmpobj
.in());
150 // Create and initialize the sender object
151 PortableServer::Servant_var
<TAO_ECG_UDP_Sender
> sender
=
152 TAO_ECG_UDP_Sender::create();
153 TAO_ECG_UDP_Out_Endpoint endpoint
;
154 // need to be explicit about the address type when built with
155 // IPv6 support, otherwise SOCK_DGram::open defaults to ipv6 when
156 // given a sap_any address. This causes trouble on at least windows,
157 // or at most on not-linux.
158 if (endpoint
.dgram ().open (ACE_Addr::sap_any
,
159 send_addr
.get_type()) == -1)
161 ACE_ERROR_RETURN ((LM_ERROR
,
162 "Cannot open send endpoint\n"),
166 // TAO_ECG_UDP_Sender::init() takes a TAO_ECG_Refcounted_Endpoint.
167 // If we don't clone our endpoint and pass &endpoint, the sender will
168 // attempt to delete endpoint during shutdown.
169 TAO_ECG_Refcounted_Endpoint
clone (new TAO_ECG_UDP_Out_Endpoint (endpoint
));
170 sender
->init (ec
.in (), addr_srv
.in (), clone
);
172 // Setup the subscription and connect to the EC
173 ACE_ConsumerQOS_Factory cons_qos_fact
;
174 cons_qos_fact
.start_disjunction_group ();
175 cons_qos_fact
.insert (ACE_ES_EVENT_SOURCE_ANY
, ACE_ES_EVENT_ANY
, 0);
176 RtecEventChannelAdmin::ConsumerQOS sub
= cons_qos_fact
.get_ConsumerQOS ();
177 sender
->connect (sub
);
179 // Create and initialize the receiver
180 PortableServer::Servant_var
<TAO_ECG_UDP_Receiver
> receiver
=
181 TAO_ECG_UDP_Receiver::create();
183 // TAO_ECG_UDP_Receiver::init() takes a TAO_ECG_Refcounted_Endpoint.
184 // If we don't clone our endpoint and pass &endpoint, the receiver will
185 // attempt to delete endpoint during shutdown.
186 TAO_ECG_Refcounted_Endpoint
clone2 (new TAO_ECG_UDP_Out_Endpoint (endpoint
));
187 receiver
->init (ec
.in (), clone2
, addr_srv
.in ());
189 // Setup the registration and connect to the event channel
190 ACE_SupplierQOS_Factory supp_qos_fact
;
191 supp_qos_fact
.insert (MY_SOURCE_ID
, MY_EVENT_TYPE
, 0, 1);
192 RtecEventChannelAdmin::SupplierQOS pub
= supp_qos_fact
.get_SupplierQOS ();
193 receiver
->connect (pub
);
195 // Create the appropriate event handler and register it with the reactor
196 std::unique_ptr
<ACE_Event_Handler
> eh
;
198 std::unique_ptr
<TAO_ECG_Mcast_EH
> mcast_eh(new TAO_ECG_Mcast_EH (receiver
.in()));
199 mcast_eh
->reactor (orb
->orb_core ()->reactor ());
200 mcast_eh
->open (ec
.in());
201 eh
.reset (mcast_eh
.release());
203 std::unique_ptr
<TAO_ECG_UDP_EH
> udp_eh (new TAO_ECG_UDP_EH (receiver
.in()));
204 udp_eh
->reactor (orb
->orb_core ()->reactor ());
205 ACE_INET_Addr
local_addr (listenport
);
206 if (udp_eh
->open (local_addr
) == -1)
207 ACE_ERROR ((LM_ERROR
,"Cannot open EH\n"));
209 eh
.reset(udp_eh
.release());
212 // Create an event (just a string in this case).
214 // Create an event set for one event
215 RtecEventComm::EventSet
event (1);
218 // Initialize event header.
219 event
[0].header
.source
= MY_SOURCE_ID
;
220 event
[0].header
.ttl
= 1;
221 event
[0].header
.type
= MY_EVENT_TYPE
;
223 #if !defined (TAO_LACKS_EVENT_CHANNEL_ANY)
224 // Initialize data fields in event.
225 const CORBA::String_var eventData
=
226 CORBA::string_dup (ACE_TEXT_ALWAYS_CHAR (ecname
));
228 event
[0].data
.any_value
<<= eventData
;
230 // Use the octet sequence payload instead
231 char *tmpstr
= const_cast<char *>(ACE_TEXT_ALWAYS_CHAR (ecname
));
232 size_t len
= ACE_OS::strlen(tmpstr
) +1;
233 event
[0].data
.payload
.replace (
236 reinterpret_cast<CORBA::Octet
*> (tmpstr
));
237 #endif /* !TAO_LACKS_EVENT_CHANNEL_ANY */
240 CORBA::String_var str
= orb
->object_to_string( ec
.in());
241 std::ofstream
iorFile( ACE_TEXT_ALWAYS_CHAR(iorfile
));
242 iorFile
<< str
.in() << std::endl
;
245 ACE_DEBUG ((LM_DEBUG
,
246 "Starting main loop\n"));
248 const int EVENT_DELAY_MS
= 1000;
251 consumer
->push (event
);
253 ACE_Time_Value
tv(0, 1000 * EVENT_DELAY_MS
);
260 catch (const CORBA::Exception
& exc
)
262 ACE_ERROR ((LM_ERROR
,
263 "Caught CORBA::Exception\n%C (%C)\n",