Merge pull request #1844 from jrw972/monterey
[ACE_TAO.git] / TAO / orbsvcs / DevGuideExamples / NotifyService / RTNotify / MessengerConsumer.cpp
blobca7a74bb15aaca081702d1be55c76adcb687b1fa
1 #include "orbsvcs/CosNotifyChannelAdminC.h"
2 #include "orbsvcs/CosNotifyCommC.h"
3 #include "orbsvcs/CosNamingC.h"
4 #include "orbsvcs/NotifyExtC.h"
5 #include "tao/RTCORBA/RTCORBA.h"
7 #include "StructuredEventConsumer_i.h"
8 #include "Priorities.h"
9 #include <iostream>
10 #include <fstream>
11 #include "ace/Get_Opt.h"
13 const ACE_TCHAR *output_file = ACE_TEXT ("MessengerConsumer.ready");
15 int
16 parse_args (int argc, ACE_TCHAR *argv[])
18 ACE_Get_Opt get_opts (argc, argv, ACE_TEXT("o:"));
19 int c;
21 while ((c = get_opts ()) != -1)
22 switch (c)
24 case 'o':
25 output_file = get_opts.opt_arg ();
26 break;
28 case '?':
29 default:
32 // Indicates successful parsing of the command line
33 return 0;
36 int
37 ACE_TMAIN (int argc, ACE_TCHAR *argv[])
39 try
41 CORBA::ORB_var orb = CORBA::ORB_init(argc, argv);
43 if (parse_args (argc, argv) != 0)
44 return 1;
46 CORBA::Object_var naming_obj =
47 orb->resolve_initial_references ("NameService");
49 CosNaming::NamingContext_var naming_context =
50 CosNaming::NamingContext::_narrow(naming_obj.in());
52 CosNaming::Name name;
53 name.length (1);
54 name[0].id = CORBA::string_dup("MyEventChannel");
55 CORBA::Object_var ecObj = naming_context->resolve(name);
57 CosNotifyChannelAdmin::EventChannel_var ec =
58 CosNotifyChannelAdmin::EventChannel::_narrow(ecObj.in());
60 CosNotifyChannelAdmin::AdminID adminid;
61 CosNotifyChannelAdmin::InterFilterGroupOperator ifgop =
62 CosNotifyChannelAdmin::AND_OP;
64 CosNotifyChannelAdmin::ConsumerAdmin_var consumer_admin =
65 ec->new_for_consumers(ifgop,
66 adminid);
68 CORBA::Object_var poa_object =
69 orb->resolve_initial_references("RootPOA");
71 PortableServer::POA_var poa =
72 PortableServer::POA::_narrow (poa_object.in());
74 CORBA::Object_var rtorb_obj = orb->resolve_initial_references ("RTORB");
75 RTCORBA::RTORB_var rt_orb = RTCORBA::RTORB::_narrow (rtorb_obj.in ());
77 // Create an RT POA with a lane at the given priority.
78 CORBA::Policy_var priority_model_policy =
79 rt_orb->create_priority_model_policy (RTCORBA::CLIENT_PROPAGATED,
80 DEFAULT_PRIORITY);
82 RTCORBA::ThreadpoolLanes lanes (2);
83 lanes.length (2);
85 lanes[0].lane_priority = LOW_PRIORITY;
86 lanes[0].static_threads = 2;
87 lanes[0].dynamic_threads = 0;
88 lanes[1].lane_priority = HIGH_PRIORITY;
89 lanes[1].static_threads = 2;
90 lanes[1].dynamic_threads = 0;
93 // Create a thread-pool.
94 CORBA::ULong stacksize = 0;
95 CORBA::Boolean allow_request_buffering = 0;
96 CORBA::ULong max_buffered_requests = 0;
97 CORBA::ULong max_request_buffer_size = 0;
98 CORBA::Boolean allow_borrowing = 0;
100 // Create the thread-pool.
101 RTCORBA::ThreadpoolId threadpool_id =
102 rt_orb->create_threadpool_with_lanes (stacksize,
103 lanes,
104 allow_borrowing,
105 allow_request_buffering,
106 max_buffered_requests,
107 max_request_buffer_size);
109 // Create a thread-pool policy.
110 CORBA::Policy_var lanes_policy =
111 rt_orb->create_threadpool_policy (threadpool_id);
113 CORBA::PolicyList poa_policy_list(2);
114 poa_policy_list.length (2);
115 poa_policy_list[0] = priority_model_policy;
116 poa_policy_list[1] = lanes_policy;
118 PortableServer::POAManager_var poa_manager = poa->the_POAManager ();
120 PortableServer::POA_var rt_poa = poa->create_POA ("RT POA",
121 poa_manager.in (),
122 poa_policy_list);
124 PortableServer::Servant_var<StructuredEventConsumer_i> servant =
125 new StructuredEventConsumer_i(orb.in());
127 PortableServer::ObjectId_var objectId =
128 rt_poa->activate_object (servant.in());
130 CORBA::Object_var consumer_obj =
131 rt_poa->id_to_reference (objectId.in ());
133 CosNotifyComm::StructuredPushConsumer_var consumer =
134 CosNotifyComm::StructuredPushConsumer::_narrow (consumer_obj.in ());
136 NotifyExt::ThreadPoolLanesParams tpl_params;
138 tpl_params.priority_model = NotifyExt::CLIENT_PROPAGATED;
139 tpl_params.server_priority = DEFAULT_PRIORITY;
140 tpl_params.stacksize = 0;
141 tpl_params.allow_borrowing = 0;
142 tpl_params.allow_request_buffering = 0;
143 tpl_params.max_buffered_requests = 0;
144 tpl_params.max_request_buffer_size = 0;
145 tpl_params.lanes.length (2);
146 tpl_params.lanes[0].lane_priority = LOW_PRIORITY;
147 tpl_params.lanes[0].static_threads = 2;
148 tpl_params.lanes[0].dynamic_threads = 0;
149 tpl_params.lanes[1].lane_priority = HIGH_PRIORITY;
150 tpl_params.lanes[1].static_threads = 2;
151 tpl_params.lanes[1].dynamic_threads = 0;
152 CosNotification::QoSProperties qos;
153 qos.length(1);
154 qos[0].name = CORBA::string_dup (NotifyExt::ThreadPoolLanes);
155 qos[0].value <<= tpl_params;
157 consumer_admin->set_qos(qos);
158 CORBA::Object_var current_obj =
159 orb->resolve_initial_references ("RTCurrent");
161 RTCORBA::Current_var current =
162 RTCORBA::Current::_narrow (current_obj.in ());
163 current->the_priority(HIGH_PRIORITY);
165 CosNotifyChannelAdmin::ProxyID consumeradmin_proxy_id;
167 CosNotifyChannelAdmin::ProxySupplier_var proxy_supplier =
168 consumer_admin->obtain_notification_push_supplier(
169 CosNotifyChannelAdmin::STRUCTURED_EVENT,
170 consumeradmin_proxy_id);
172 CosNotifyChannelAdmin::StructuredProxyPushSupplier_var supplier_proxy;
173 supplier_proxy = CosNotifyChannelAdmin::StructuredProxyPushSupplier::
174 _narrow(proxy_supplier.in());
176 supplier_proxy->connect_structured_push_consumer(consumer.in());
178 CosNotification::EventTypeSeq added (1);
179 CosNotification::EventTypeSeq removed (1);
180 added.length (1);
181 removed.length (1);
183 added[0].domain_name = CORBA::string_dup ("OCI_TAO");
184 added[0].type_name = CORBA::string_dup ("examples");
186 removed[0].domain_name = CORBA::string_dup ("*");
187 removed[0].type_name = CORBA::string_dup ("*");
189 supplier_proxy->subscription_change(added, removed);
191 poa_manager->activate();
193 // Write a file to let the run_test.pl script know we are ready.
194 std::ofstream iorFile( ACE_TEXT_ALWAYS_CHAR(output_file) );
195 iorFile << "Ready" << std::endl;
196 iorFile.close();
198 orb->run();
200 catch(const CORBA::Exception& ex)
202 std::cerr << "Caught exception: " << ex << std::endl;
203 return 1;
206 return 0;