1 #include "Consumer_Client.h"
3 #include "ORB_Run_Task.h"
4 #include "ace/Arg_Shifter.h"
5 #include "orbsvcs/NotifyExtC.h"
6 #include "orbsvcs/CosNamingC.h"
7 #include "tao/ORB_Core.h"
8 #include "ace/Sched_Params.h"
9 #include "ace/OS_NS_errno.h"
12 TAO_Notify_Lanes_Consumer_Client::TAO_Notify_Lanes_Consumer_Client (TAO_Notify_ORB_Objects
& orb_objects
)
13 : orb_objects_ (orb_objects
)
19 TAO_Notify_Lanes_Consumer_Client::~TAO_Notify_Lanes_Consumer_Client ()
24 TAO_Notify_Lanes_Consumer_Client::parse_args (int argc
, ACE_TCHAR
*argv
[])
26 ACE_Arg_Shifter
arg_shifter (argc
, argv
);
28 const ACE_TCHAR
*current_arg
= 0;
30 while (arg_shifter
.is_anything_left ())
32 if (0 != (current_arg
= arg_shifter
.get_the_parameter (ACE_TEXT("-LanePriority")))) // LanePriority
36 this->lane_priority_
= ACE_OS::atoi (current_arg
);
39 ACE_OS::sprintf (type
, "TEST_TYPE_%d", this->lane_priority_
);
40 this->event_type_
= type
;
43 arg_shifter
.consume_arg ();
47 arg_shifter
.ignore_arg ();
55 TAO_Notify_Lanes_Consumer_Client::initialize ()
57 ACE_DEBUG ((LM_DEBUG
, "(%P, %t)Initializing Consumer Client with lane priority = %d, event type = (%s)\n"
58 , this->lane_priority_
, this->event_type_
.c_str ()));
60 PortableServer::POAManager_var poa_manager
=
61 this->orb_objects_
.root_poa_
->the_POAManager ();
63 poa_manager
->activate ();
65 // Resolve the Notification Factory.
66 CosNotifyChannelAdmin::EventChannelFactory_var ecf
= this->orb_objects_
.notify_factory ();
68 // Find the EventChannel created by the supplier.
69 CosNotifyChannelAdmin::ChannelIDSeq_var channel_seq
= ecf
->get_all_channels ();
71 CosNotifyChannelAdmin::EventChannel_var ec
;
73 if (channel_seq
->length() > 0)
75 ec
= ecf
->get_event_channel (channel_seq
[0]);
79 ACE_DEBUG ((LM_DEBUG
, "No Event Channel active!\n"));
83 // Create a Consumer Admin
84 CosNotifyChannelAdmin::AdminID adminid
= 0;
86 CosNotifyChannelAdmin::ConsumerAdmin_var consumer_admin
=
87 ec
->new_for_consumers (CosNotifyChannelAdmin::AND_OP
, adminid
);
89 ACE_ASSERT (!CORBA::is_nil (consumer_admin
.in ()));
91 PortableServer::POA_var rt_poa
= this->create_rt_poa ();
94 this->consumer_
= new TAO_Notify_Lanes_Consumer (this->orb_objects_
);
97 this->consumer_
->init (rt_poa
, consumer_admin
, this->event_type_
);
100 PortableServer::POA_ptr
101 TAO_Notify_Lanes_Consumer_Client::create_rt_poa ()
103 PortableServer::POA_var rt_poa
;
105 // Create an RT POA with a lane at the given priority.
106 CORBA::Policy_var priority_model_policy
;
107 CORBA::Policy_var lanes_policy
;
109 CORBA::Policy_var activation_policy
=
110 this->orb_objects_
.root_poa_
->create_implicit_activation_policy (PortableServer::IMPLICIT_ACTIVATION
);
112 // Create a priority model policy.
113 priority_model_policy
=
114 this->orb_objects_
.rt_orb_
->create_priority_model_policy (RTCORBA::CLIENT_PROPAGATED
117 RTCORBA::ThreadpoolLanes
lanes (1);
120 lanes
[0].lane_priority
= this->lane_priority_
;
121 lanes
[0].static_threads
= 1;
122 lanes
[0].dynamic_threads
= 0;
125 // Create a thread-pool.
126 CORBA::ULong stacksize
= 0;
127 CORBA::Boolean allow_request_buffering
= 0;
128 CORBA::ULong max_buffered_requests
= 0;
129 CORBA::ULong max_request_buffer_size
= 0;
130 CORBA::Boolean allow_borrowing
= 0;
132 // Create the thread-pool.
133 RTCORBA::ThreadpoolId threadpool_id
=
134 this->orb_objects_
.rt_orb_
->create_threadpool_with_lanes (stacksize
,
137 allow_request_buffering
,
138 max_buffered_requests
,
139 max_request_buffer_size
);
141 // Create a thread-pool policy.
143 this->orb_objects_
.rt_orb_
->create_threadpool_policy (threadpool_id
);
145 CORBA::PolicyList poa_policy_list
;
147 poa_policy_list
.length (3);
148 poa_policy_list
[0] = priority_model_policy
;
149 poa_policy_list
[1] = activation_policy
;
150 poa_policy_list
[2] = lanes_policy
;
152 PortableServer::POAManager_var poa_manager
=
153 this->orb_objects_
.root_poa_
->the_POAManager ();
155 rt_poa
= this->orb_objects_
.root_poa_
->create_POA ("RT POA!",
159 return rt_poa
._retn ();
163 TAO_Notify_Lanes_Consumer_Client::run ()
165 this->consumer_
->run ();
169 TAO_Notify_Lanes_Consumer_Client::svc ()
173 // Initialize this threads priority.
174 this->orb_objects_
.current_
->the_priority (0);
176 this->initialize (); //Init the Client
180 catch (const CORBA::Exception
& ex
)
182 ex
._tao_print_exception (ACE_TEXT ("Supplier error "));
189 ACE_TMAIN (int argc
, ACE_TCHAR
*argv
[])
194 CORBA::ORB_var orb
= CORBA::ORB_init (argc
, argv
);
196 TAO_Notify_ORB_Objects orb_objects
;
198 orb_objects
.init (orb
);
200 TAO_Notify_ORB_Run_Task
orb_run_task (orb_objects
);
202 TAO_Notify_Lanes_Consumer_Client
client (orb_objects
);
204 if (client
.parse_args (argc
, argv
) != 0)
206 ACE_DEBUG ((LM_DEBUG
, "Consumer_Client::Error parsing options\n"));
210 long flags
= THR_NEW_LWP
| THR_JOINABLE
;
213 orb
->orb_core ()->orb_params ()->thread_creation_flags ();
216 if (orb_run_task
.activate (flags
) == -1 || client
.activate (flags
) == -1)
218 if (ACE_OS::last_error () == EPERM
)
219 ACE_ERROR_RETURN ((LM_ERROR
,
220 ACE_TEXT ("Insufficient privilege to activate ACE_Task.\n")),
223 ACE_DEBUG ((LM_ERROR
,
224 ACE_TEXT ("(%t) Task activation at priority %d failed.\n")));
227 orb_run_task
.thr_mgr ()->wait ();
228 client
.thr_mgr ()->wait ();
230 catch (const CORBA::Exception
& ex
)
232 ex
._tao_print_exception (ACE_TEXT ("Consumer Client error "));