2 //=============================================================================
6 * This demo just tests the basic functionality of the Event Service
7 * One Conumer which inherits from the Rtec Consumer. One Supplier
8 * with an internal Rtec Consumer and one internal Rtec Supplier.
9 * The internal Supplier is just a demo supplier because the
10 * architecture expects an supplier which has inherited from the
13 * @author originally David Levine (levine@cs.wustl.edu) and Tim Harrison (harrison@cs.wustl.edu) modified Michael Kircher (mk1@cs.wustl.edu)
15 //=============================================================================
18 #include "Event_Con.h"
20 #include "orbsvcs/Event_Utilities.h"
21 #include "orbsvcs/Event_Service_Constants.h"
22 #include "orbsvcs/Scheduler_Factory.h"
24 #include "tao/ORB_Core.h"
25 #include "tao/AnyTypeCode/TypeCode.h"
27 #include "ace/Get_Opt.h"
28 #include "ace/Sched_Params.h"
29 #include "ace/Profile_Timer.h"
30 #include "ace/streams.h"
32 #include "ace/os_include/os_limits.h"
34 static const char usage
[] =
36 " [-c <consumers> [4]]\n"
37 " [-d directly connect all consumers/suppliers\n"
38 " [-j to collect jitter statistics]\n"
39 " [-m <count> of messages to send [10]]\n"
40 " [-s <suppliers>, [1]]\n"
41 " [-t <timeout interval>, msec [250]]]";
43 static int received
= 0;
45 // ************************************************************
47 Demo_Consumer::Demo_Consumer ()
52 Demo_Consumer::open_consumer (RtecEventChannelAdmin::EventChannel_ptr ec
,
59 RtecScheduler::Scheduler_ptr server
=
60 ACE_Scheduler_Factory::server ();
62 // Define Real-time information.
63 rt_info_
= server
->create (my_name
);
65 server
->set (rt_info_
,
66 RtecScheduler::VERY_LOW_CRITICALITY
,
67 ORBSVCS_Time::zero (),
68 ORBSVCS_Time::zero (),
69 ORBSVCS_Time::zero (),
71 RtecScheduler::VERY_LOW_IMPORTANCE
,
72 ORBSVCS_Time::zero (),
74 RtecScheduler::OPERATION
);
77 // Create the event that we're registering for.
79 ACE_ConsumerQOS_Factory dependencies
;
80 dependencies
.start_disjunction_group ();
81 dependencies
.insert_type (ACE_ES_EVENT_NOTIFICATION
, rt_info_
);
82 dependencies
.insert_type (ACE_ES_EVENT_SHUTDOWN
, rt_info_
);
84 // The channel administrator is the event channel we got from
85 // the invocation of this routine.
87 this->channel_admin_
= ec
;
89 // = Connect as a consumer.
91 this->consumer_admin_
=
92 channel_admin_
->for_consumers ();
94 // Obtain a pointer to a push supplier. "suppliers" is
95 // inherited from a base class.
98 consumer_admin_
->obtain_push_supplier ();
100 // The _this function returns an object pointer. This is needed
101 // because a consumer inherits from a Servant class that is no
104 RtecEventComm::PushConsumer_var objref
=
107 this->suppliers_
->connect_push_consumer (objref
.in (),
108 dependencies
.get_ConsumerQOS ());
110 catch (const RtecEventChannelAdmin::EventChannel::SUBSCRIPTION_ERROR
& )
112 ACE_ERROR_RETURN ((LM_ERROR
,
113 "Demo_Consumer::open: subscribe failed.\n"),
116 catch (const CORBA::Exception
&)
118 ACE_ERROR_RETURN ((LM_ERROR
,
119 "Demo_Consumer::open: unexpected exception.\n"),
126 Demo_Consumer::disconnect_push_consumer ()
128 ACE_DEBUG ((LM_DEBUG
,
129 "Consumer received disconnect from channel.\n"));
133 Demo_Consumer::push (const RtecEventComm::EventSet
&events
)
135 if (events
.length () == 0)
137 ACE_DEBUG ((LM_DEBUG
, "no events\n"));
141 ACE_DEBUG ((LM_DEBUG
, "Number of events: %d\n", received
));
144 for (CORBA::ULong i
= 0; i
< events
.length (); ++i
)
146 if (events
[i
].header
.type
== ACE_ES_EVENT_SHUTDOWN
)
148 ACE_DEBUG ((LM_DEBUG
, "Demo Consumer: received shutdown event\n"));
153 ACE_DEBUG ((LM_DEBUG
, "Demo Consumer: received ACE_ES_EVENT_NOTIFICATION event.\n"));
157 // Use a temporary int to avoid overload ambiguities with
159 int kind
= events
[i
].data
.any_value
.type()->kind ();
161 ACE_DEBUG ((LM_DEBUG
, "ID: %s\n", events
[i
].data
.any_value
.type()->id()));
162 ACE_DEBUG ((LM_DEBUG
, "Name: %s\n", events
[i
].data
.any_value
.type()->name()));
163 ACE_DEBUG ((LM_DEBUG
, "member_count: %u\n", events
[i
].data
.any_value
.type()->member_count()));
164 ACE_DEBUG ((LM_DEBUG
, "TCKind: %d\n", kind
));
167 _tc_Navigation
->equal (events
[i
].data
.any_value
.type());
171 const Navigation
*navigation_
= 0;
172 events
[i
].data
.any_value
>>= navigation_
;
173 ACE_DEBUG ((LM_DEBUG
, "Found a Navigation struct in the any: pos_lat = %d\n", navigation_
->position_latitude
));
177 ret
= _tc_Weapons
->equal (events
[i
].data
.any_value
.type());
181 const Weapons
*weapons_
= 0;
182 events
[i
].data
.any_value
>>= weapons_
;
183 ACE_DEBUG ((LM_DEBUG
, "Found a Weapons struct in the any: nr_of_weapons = %u\n", weapons_
->number_of_weapons
));
187 catch (const CORBA::Exception
&)
189 ACE_ERROR ((LM_ERROR
, "(%t)Error in extracting the Navigation and Weapons data.\n"));
196 Demo_Consumer::shutdown ()
200 // Disconnect from the push supplier.
202 this->suppliers_
->disconnect_push_supplier ();
204 ACE_DEBUG ((LM_DEBUG
, "@@ we should shutdown here!!!\n"));
206 TAO_ORB_Core_instance ()->orb ()->shutdown ();
208 catch (const CORBA::Exception
&)
210 ACE_ERROR ((LM_ERROR
,
211 "(%t) Demo_Consumer::shutdown: unexpected exception.\n"));
215 // function get_options
218 get_options (int argc
, ACE_TCHAR
*argv
[])
220 ACE_Get_Opt
get_opt (argc
, argv
, ACE_TEXT("Oc:djm:s:t:?"));
223 while ((opt
= get_opt ()) != EOF
)
228 ACE_DEBUG ((LM_DEBUG
,
234 ACE_ERROR_RETURN ((LM_ERROR
,
235 "%s: unknown arg, -%c\n"
242 if (argc
!= get_opt
.opt_ind ())
243 ACE_ERROR_RETURN ((LM_ERROR
,
244 "%s: too many arguments\n"
256 ACE_TMAIN(int argc
, ACE_TCHAR
*argv
[])
263 CORBA::ORB_init (argc
, argv
, "internet");
265 CORBA::Object_var poa_object
=
266 orb
->resolve_initial_references("RootPOA");
268 if (CORBA::is_nil (poa_object
.in ()))
269 ACE_ERROR_RETURN ((LM_ERROR
,
270 " (%P|%t) Unable to initialize the POA.\n"),
273 PortableServer::POA_var root_poa
=
274 PortableServer::POA::_narrow (poa_object
.in ());
276 PortableServer::POAManager_var poa_manager
=
277 root_poa
->the_POAManager ();
279 CORBA::Object_var naming_obj
=
280 orb
->resolve_initial_references ("NameService");
282 if (CORBA::is_nil (naming_obj
.in ()))
283 ACE_ERROR_RETURN ((LM_ERROR
,
284 " (%P|%t) Unable to initialize the POA.\n"),
287 CosNaming::NamingContext_var naming_context
=
288 CosNaming::NamingContext::_narrow (naming_obj
.in ());
290 ACE_Scheduler_Factory::use_config (naming_context
.in ());
292 if (get_options (argc
, argv
))
295 // Get the Event Channel.
297 CosNaming::Name
channel_name (1);
298 channel_name
.length (1);
299 channel_name
[0].id
= CORBA::string_dup ("EventService");
301 CORBA::Object_var ec_obj
=
302 naming_context
->resolve (channel_name
);
304 RtecEventChannelAdmin::EventChannel_var ec
=
305 RtecEventChannelAdmin::EventChannel::_narrow (ec_obj
.in());
307 if (CORBA::is_nil (ec
.in()))
308 ACE_ERROR_RETURN ((LM_ERROR
,
309 "Not able to get the Event Service reference.\n"),
314 Demo_Consumer
*demo_consumer
= 0;
315 ACE_NEW_RETURN (demo_consumer
,
319 if (demo_consumer
->open_consumer (ec
.in (),
320 "demo_consumer") == -1)
321 ACE_ERROR_RETURN ((LM_ERROR
,
322 "Someone was feeling introverted.\n"),
325 poa_manager
->activate ();
331 delete demo_consumer
;
333 root_poa
->destroy (true, true);
335 catch (const CORBA::Exception
& ex
)
337 ex
._tao_print_exception ("SYS_EX");