2 // ============================================================================
8 // Michael Kircher (mk1@cs.wustl.edu)
11 // This is a Push Consumer which takes the data field of the
12 // event and updates with it a Data Handler.
14 // ============================================================================
18 // The Consumer has to implement the Skeleton Consumer
20 public class PushConsumer
extends RtecEventComm
.PushConsumerPOA
23 public static final int ACE_ES_EVENT_ANY
= 0;
24 public static final int ACE_ES_EVENT_SHUTDOWN
= 1;
25 public static final int ACE_ES_EVENT_ACT
= 2;
26 public static final int ACE_ES_EVENT_NOTIFICATION
= 3;
27 public static final int ACE_ES_EVENT_TIMEOUT
= 4;
28 public static final int ACE_ES_EVENT_INTERVAL_TIMEOUT
= 5;
29 public static final int ACE_ES_EVENT_DEADLINE_TIMEOUT
= 6;
30 public static final int ACE_ES_GLOBAL_DESIGNATOR
= 7;
31 public static final int ACE_ES_CONJUNCTION_DESIGNATOR
= 8;
32 public static final int ACE_ES_DISJUNCTION_DESIGNATOR
= 9;
33 public static final int ACE_ES_EVENT_UNDEFINED
= 16;
34 public static final int TOTAL_MESSAGES
= 30;
36 // Store the number of received events
37 private int total_received_
= 0;
38 private org
.omg
.CORBA
.ORB orb_
;
39 private org
.omg
.PortableServer
.POA poa_
;
40 private MTDataHandlerAdapter dataHandlerAdapter_
;
42 private RtecEventChannelAdmin
.EventChannel channel_admin_
;
43 private RtecEventChannelAdmin
.ConsumerAdmin consumer_admin_
;
44 private RtecEventChannelAdmin
.ProxyPushSupplier suppliers_
;
46 public PushConsumer (org
.omg
.CORBA
.ORB orb
,
47 org
.omg
.PortableServer
.POA poa
,
48 DataHandler dataHandler
,
54 new MTDataHandlerAdapter (dataHandler
, use_queueing
);
57 dataHandlerAdapter_
.start ();
62 public void push (RtecEventComm
.Event
[] events
)
64 if (total_received_
< 5)
65 System
.out
.println ("Demo Consumer: Received an event set! ->Number: "
67 else if (total_received_
== 5)
68 System
.out
.println ("Demo Consumer: Everything is fine. " +
71 if (events
.length
== 0)
73 System
.err
.println ("No events");
78 dataHandlerAdapter_
.push (events
);
82 public void disconnect_push_consumer()
84 System
.out
.println ("Demo Consumer: Have to disconnect!");
87 public void open_consumer (RtecEventChannelAdmin
.EventChannel event_channel_
,
88 RtecScheduler
.Scheduler scheduler_
,
93 // Define Real-time information
95 rt_info_
= scheduler_
.create (name
);
97 scheduler_
.set (rt_info_
,
98 RtecScheduler
.Criticality_t
.VERY_LOW_CRITICALITY
,
103 RtecScheduler
.Importance_t
.VERY_LOW_IMPORTANCE
,
106 RtecScheduler
.Info_Type_t
.OPERATION
);
109 // Register for Notification and Shutdown events
111 byte payload
[] = new byte[1];
113 RtecEventComm
.Event notification_event_
= new RtecEventComm
.Event ();
114 notification_event_
.header
=
115 new RtecEventComm
.EventHeader (ACE_ES_EVENT_NOTIFICATION
,
118 notification_event_
.data
=
119 new RtecEventData (0, payload
, orb_
.create_any());
121 RtecEventChannelAdmin
.Dependency dependencies_
[] = new RtecEventChannelAdmin
.Dependency
[1];
122 dependencies_
[0] = new RtecEventChannelAdmin
.Dependency (notification_event_
, rt_info_
);
125 // @@ Carlos please help me to set the right boolean value
126 RtecEventChannelAdmin
.ConsumerQOS qos
= new RtecEventChannelAdmin
.ConsumerQOS (dependencies_
, false);
129 // The channel administrator is the event channel we got from the invocation
132 channel_admin_
= event_channel_
;
134 // Connect as a consumer
136 consumer_admin_
= channel_admin_
.for_consumers ();
138 // Obtain a reference to the proxy push supplier
140 suppliers_
= consumer_admin_
.obtain_push_supplier ();
142 org
.omg
.CORBA
.Object objref
= poa_
.servant_to_reference (this);
143 RtecEventComm
.PushConsumer consumer_ref
=
144 RtecEventComm
.PushConsumerHelper
.narrow (objref
);
145 suppliers_
.connect_push_consumer (consumer_ref
, qos
);
147 System
.out
.println ("Registered the consumer successfully.");
151 catch (RtecEventChannelAdmin
.TypeError e
)
153 System
.err
.println ("Demo_Consumer.open_consumer: RtecEventChannelAdmin.TypeError");
154 System
.err
.println (e
);
156 catch (RtecEventChannelAdmin
.AlreadyConnected e
)
158 System
.err
.println ("Demo_Consumer.open_consumer: RtecEventChannelAdmin.AlreadyConnected");
159 System
.err
.println (e
);
161 catch (RtecScheduler
.UNKNOWN_TASK e
)
163 System
.err
.println ("Demo_Consumer.open_consumer: Unknown task");
164 System
.err
.println (e
);
166 catch (RtecScheduler
.DUPLICATE_NAME e
)
168 System
.err
.println ("Demo_Consumer.open_consumer: Duplicate names");
169 System
.err
.println (e
);
171 catch (RtecScheduler
.INTERNAL e
)
173 System
.err
.println ("Demo_Consumer.open_consumer: internal scheduler error");
174 System
.err
.println (e
);
176 catch (RtecScheduler
.SYNCHRONIZATION_FAILURE e
)
178 System
.err
.println ("Demo_Consumer.open_consumer: scheduler synchronization failure");
179 System
.err
.println (e
);
181 catch (org
.omg
.PortableServer
.POAPackage
.ServantNotActive e
)
183 System
.err
.println ("Demo_Consumer.open_consumer: org.omg.PortableServer.POAPackage.ServantNotActive");
184 System
.err
.println (e
);
186 catch (org
.omg
.PortableServer
.POAPackage
.WrongPolicy e
)
188 System
.err
.println ("Demo_Consumer.open_consumer: org.omg.PortableServer.POAPackage.WrongPolicy");
189 System
.err
.println (e
);
191 catch(org
.omg
.CORBA
.SystemException e
)
193 System
.err
.println(e
);