1 #include "orbsvcs/CosNotifyChannelAdminC.h"
2 #include "orbsvcs/CosNotifyCommC.h"
3 #include "orbsvcs/CosNamingC.h"
4 #include "orbsvcs/NotifyExtC.h"
5 #include "tao/RTCORBA/RTCORBA.h"
7 #include "StructuredEventConsumer_i.h"
8 #include "Priorities.h"
11 #include "ace/Get_Opt.h"
13 const ACE_TCHAR
*output_file
= ACE_TEXT ("MessengerConsumer.ready");
16 parse_args (int argc
, ACE_TCHAR
*argv
[])
18 ACE_Get_Opt
get_opts (argc
, argv
, ACE_TEXT("o:"));
21 while ((c
= get_opts ()) != -1)
25 output_file
= get_opts
.opt_arg ();
32 // Indicates successful parsing of the command line
37 ACE_TMAIN (int argc
, ACE_TCHAR
*argv
[])
41 CORBA::ORB_var orb
= CORBA::ORB_init(argc
, argv
);
43 if (parse_args (argc
, argv
) != 0)
46 CORBA::Object_var naming_obj
=
47 orb
->resolve_initial_references ("NameService");
49 CosNaming::NamingContext_var naming_context
=
50 CosNaming::NamingContext::_narrow(naming_obj
.in());
54 name
[0].id
= CORBA::string_dup("MyEventChannel");
55 CORBA::Object_var ecObj
= naming_context
->resolve(name
);
57 CosNotifyChannelAdmin::EventChannel_var ec
=
58 CosNotifyChannelAdmin::EventChannel::_narrow(ecObj
.in());
60 CosNotifyChannelAdmin::AdminID adminid
;
61 CosNotifyChannelAdmin::InterFilterGroupOperator ifgop
=
62 CosNotifyChannelAdmin::AND_OP
;
64 CosNotifyChannelAdmin::ConsumerAdmin_var consumer_admin
=
65 ec
->new_for_consumers(ifgop
,
68 CORBA::Object_var poa_object
=
69 orb
->resolve_initial_references("RootPOA");
71 PortableServer::POA_var poa
=
72 PortableServer::POA::_narrow (poa_object
.in());
74 CORBA::Object_var rtorb_obj
= orb
->resolve_initial_references ("RTORB");
75 RTCORBA::RTORB_var rt_orb
= RTCORBA::RTORB::_narrow (rtorb_obj
.in ());
77 // Create an RT POA with a lane at the given priority.
78 CORBA::Policy_var priority_model_policy
=
79 rt_orb
->create_priority_model_policy (RTCORBA::CLIENT_PROPAGATED
,
82 RTCORBA::ThreadpoolLanes
lanes (2);
85 lanes
[0].lane_priority
= LOW_PRIORITY
;
86 lanes
[0].static_threads
= 2;
87 lanes
[0].dynamic_threads
= 0;
88 lanes
[1].lane_priority
= HIGH_PRIORITY
;
89 lanes
[1].static_threads
= 2;
90 lanes
[1].dynamic_threads
= 0;
93 // Create a thread-pool.
94 CORBA::ULong stacksize
= 0;
95 CORBA::Boolean allow_request_buffering
= 0;
96 CORBA::ULong max_buffered_requests
= 0;
97 CORBA::ULong max_request_buffer_size
= 0;
98 CORBA::Boolean allow_borrowing
= 0;
100 // Create the thread-pool.
101 RTCORBA::ThreadpoolId threadpool_id
=
102 rt_orb
->create_threadpool_with_lanes (stacksize
,
105 allow_request_buffering
,
106 max_buffered_requests
,
107 max_request_buffer_size
);
109 // Create a thread-pool policy.
110 CORBA::Policy_var lanes_policy
=
111 rt_orb
->create_threadpool_policy (threadpool_id
);
113 CORBA::PolicyList
poa_policy_list(2);
114 poa_policy_list
.length (2);
115 poa_policy_list
[0] = priority_model_policy
;
116 poa_policy_list
[1] = lanes_policy
;
118 PortableServer::POAManager_var poa_manager
= poa
->the_POAManager ();
120 PortableServer::POA_var rt_poa
= poa
->create_POA ("RT POA",
124 PortableServer::Servant_var
<StructuredEventConsumer_i
> servant
=
125 new StructuredEventConsumer_i(orb
.in());
127 PortableServer::ObjectId_var objectId
=
128 rt_poa
->activate_object (servant
.in());
130 CORBA::Object_var consumer_obj
=
131 rt_poa
->id_to_reference (objectId
.in ());
133 CosNotifyComm::StructuredPushConsumer_var consumer
=
134 CosNotifyComm::StructuredPushConsumer::_narrow (consumer_obj
.in ());
136 NotifyExt::ThreadPoolLanesParams tpl_params
;
138 tpl_params
.priority_model
= NotifyExt::CLIENT_PROPAGATED
;
139 tpl_params
.server_priority
= DEFAULT_PRIORITY
;
140 tpl_params
.stacksize
= 0;
141 tpl_params
.allow_borrowing
= 0;
142 tpl_params
.allow_request_buffering
= 0;
143 tpl_params
.max_buffered_requests
= 0;
144 tpl_params
.max_request_buffer_size
= 0;
145 tpl_params
.lanes
.length (2);
146 tpl_params
.lanes
[0].lane_priority
= LOW_PRIORITY
;
147 tpl_params
.lanes
[0].static_threads
= 2;
148 tpl_params
.lanes
[0].dynamic_threads
= 0;
149 tpl_params
.lanes
[1].lane_priority
= HIGH_PRIORITY
;
150 tpl_params
.lanes
[1].static_threads
= 2;
151 tpl_params
.lanes
[1].dynamic_threads
= 0;
152 CosNotification::QoSProperties qos
;
154 qos
[0].name
= CORBA::string_dup (NotifyExt::ThreadPoolLanes
);
155 qos
[0].value
<<= tpl_params
;
157 consumer_admin
->set_qos(qos
);
158 CORBA::Object_var current_obj
=
159 orb
->resolve_initial_references ("RTCurrent");
161 RTCORBA::Current_var current
=
162 RTCORBA::Current::_narrow (current_obj
.in ());
163 current
->the_priority(HIGH_PRIORITY
);
165 CosNotifyChannelAdmin::ProxyID consumeradmin_proxy_id
;
167 CosNotifyChannelAdmin::ProxySupplier_var proxy_supplier
=
168 consumer_admin
->obtain_notification_push_supplier(
169 CosNotifyChannelAdmin::STRUCTURED_EVENT
,
170 consumeradmin_proxy_id
);
172 CosNotifyChannelAdmin::StructuredProxyPushSupplier_var supplier_proxy
;
173 supplier_proxy
= CosNotifyChannelAdmin::StructuredProxyPushSupplier::
174 _narrow(proxy_supplier
.in());
176 supplier_proxy
->connect_structured_push_consumer(consumer
.in());
178 CosNotification::EventTypeSeq
added (1);
179 CosNotification::EventTypeSeq
removed (1);
183 added
[0].domain_name
= CORBA::string_dup ("OCI_TAO");
184 added
[0].type_name
= CORBA::string_dup ("examples");
186 removed
[0].domain_name
= CORBA::string_dup ("*");
187 removed
[0].type_name
= CORBA::string_dup ("*");
189 supplier_proxy
->subscription_change(added
, removed
);
191 poa_manager
->activate();
193 // Write a file to let the run_test.pl script know we are ready.
194 std::ofstream
iorFile( ACE_TEXT_ALWAYS_CHAR(output_file
) );
195 iorFile
<< "Ready" << std::endl
;
200 catch(const CORBA::Exception
& ex
)
202 std::cerr
<< "Caught exception: " << ex
<< std::endl
;