1 #include "Consumer_Client.h"
3 #include "ORB_Run_Task.h"
4 #include "ace/Arg_Shifter.h"
5 #include "orbsvcs/NotifyExtC.h"
6 #include "orbsvcs/CosNamingC.h"
7 #include "tao/ORB_Core.h"
8 #include "ace/Sched_Params.h"
9 #include "ace/OS_NS_errno.h"
13 TAO_Notify_Lanes_Consumer_Client::TAO_Notify_Lanes_Consumer_Client (TAO_Notify_ORB_Objects
& orb_objects
)
14 : orb_objects_ (orb_objects
)
20 TAO_Notify_Lanes_Consumer_Client::~TAO_Notify_Lanes_Consumer_Client ()
25 TAO_Notify_Lanes_Consumer_Client::parse_args (int argc
, ACE_TCHAR
*argv
[])
27 ACE_Arg_Shifter
arg_shifter (argc
, argv
);
29 const ACE_TCHAR
*current_arg
= 0;
31 while (arg_shifter
.is_anything_left ())
33 if (0 != (current_arg
= arg_shifter
.get_the_parameter (ACE_TEXT("-LanePriority")))) // LanePriority
37 this->lane_priority_
= ACE_OS::atoi (current_arg
);
40 ACE_OS::sprintf (type
, "TEST_TYPE_%d", this->lane_priority_
);
41 this->event_type_
= type
;
44 arg_shifter
.consume_arg ();
48 arg_shifter
.ignore_arg ();
56 TAO_Notify_Lanes_Consumer_Client::initialize (void)
58 ACE_DEBUG ((LM_DEBUG
, "(%P, %t)Initializing Consumer Client with lane priority = %d, event type = (%s)\n"
59 , this->lane_priority_
, this->event_type_
.c_str ()));
61 PortableServer::POAManager_var poa_manager
=
62 this->orb_objects_
.root_poa_
->the_POAManager ();
64 poa_manager
->activate ();
66 // Resolve the Notification Factory.
67 CosNotifyChannelAdmin::EventChannelFactory_var ecf
= this->orb_objects_
.notify_factory ();
69 // Find the EventChannel created by the supplier.
70 CosNotifyChannelAdmin::ChannelIDSeq_var channel_seq
= ecf
->get_all_channels ();
72 CosNotifyChannelAdmin::EventChannel_var ec
;
74 if (channel_seq
->length() > 0)
76 ec
= ecf
->get_event_channel (channel_seq
[0]);
80 ACE_DEBUG ((LM_DEBUG
, "No Event Channel active!\n"));
84 // Create a Consumer Admin
85 CosNotifyChannelAdmin::AdminID adminid
= 0;
87 CosNotifyChannelAdmin::ConsumerAdmin_var consumer_admin
=
88 ec
->new_for_consumers (CosNotifyChannelAdmin::AND_OP
, adminid
);
90 ACE_ASSERT (!CORBA::is_nil (consumer_admin
.in ()));
92 PortableServer::POA_var rt_poa
= this->create_rt_poa ();
95 this->consumer_
= new TAO_Notify_Lanes_Consumer (this->orb_objects_
);
98 this->consumer_
->init (rt_poa
, consumer_admin
, this->event_type_
);
101 PortableServer::POA_ptr
102 TAO_Notify_Lanes_Consumer_Client::create_rt_poa (void)
104 PortableServer::POA_var rt_poa
;
106 // Create an RT POA with a lane at the given priority.
107 CORBA::Policy_var priority_model_policy
;
108 CORBA::Policy_var lanes_policy
;
110 CORBA::Policy_var activation_policy
=
111 this->orb_objects_
.root_poa_
->create_implicit_activation_policy (PortableServer::IMPLICIT_ACTIVATION
);
113 // Create a priority model policy.
114 priority_model_policy
=
115 this->orb_objects_
.rt_orb_
->create_priority_model_policy (RTCORBA::CLIENT_PROPAGATED
118 RTCORBA::ThreadpoolLanes
lanes (1);
121 lanes
[0].lane_priority
= this->lane_priority_
;
122 lanes
[0].static_threads
= 1;
123 lanes
[0].dynamic_threads
= 0;
126 // Create a thread-pool.
127 CORBA::ULong stacksize
= 0;
128 CORBA::Boolean allow_request_buffering
= 0;
129 CORBA::ULong max_buffered_requests
= 0;
130 CORBA::ULong max_request_buffer_size
= 0;
131 CORBA::Boolean allow_borrowing
= 0;
133 // Create the thread-pool.
134 RTCORBA::ThreadpoolId threadpool_id
=
135 this->orb_objects_
.rt_orb_
->create_threadpool_with_lanes (stacksize
,
138 allow_request_buffering
,
139 max_buffered_requests
,
140 max_request_buffer_size
);
142 // Create a thread-pool policy.
144 this->orb_objects_
.rt_orb_
->create_threadpool_policy (threadpool_id
);
146 CORBA::PolicyList poa_policy_list
;
148 poa_policy_list
.length (3);
149 poa_policy_list
[0] = priority_model_policy
;
150 poa_policy_list
[1] = activation_policy
;
151 poa_policy_list
[2] = lanes_policy
;
153 PortableServer::POAManager_var poa_manager
=
154 this->orb_objects_
.root_poa_
->the_POAManager ();
156 rt_poa
= this->orb_objects_
.root_poa_
->create_POA ("RT POA!",
160 return rt_poa
._retn ();
164 TAO_Notify_Lanes_Consumer_Client::run (void)
166 this->consumer_
->run ();
170 TAO_Notify_Lanes_Consumer_Client::svc (void)
174 // Initialize this threads priority.
175 this->orb_objects_
.current_
->the_priority (0);
177 this->initialize (); //Init the Client
181 catch (const CORBA::Exception
& ex
)
183 ex
._tao_print_exception (ACE_TEXT ("Supplier error "));
191 ACE_TMAIN (int argc
, ACE_TCHAR
*argv
[])
196 CORBA::ORB_var orb
= CORBA::ORB_init (argc
, argv
);
198 TAO_Notify_ORB_Objects orb_objects
;
200 orb_objects
.init (orb
);
202 TAO_Notify_ORB_Run_Task
orb_run_task (orb_objects
);
204 TAO_Notify_Lanes_Consumer_Client
client (orb_objects
);
206 if (client
.parse_args (argc
, argv
) != 0)
208 ACE_DEBUG ((LM_DEBUG
, "Consumer_Client::Error parsing options\n"));
212 long flags
= THR_NEW_LWP
| THR_JOINABLE
;
215 orb
->orb_core ()->orb_params ()->thread_creation_flags ();
218 if (orb_run_task
.activate (flags
) == -1 || client
.activate (flags
) == -1)
220 if (ACE_OS::last_error () == EPERM
)
221 ACE_ERROR_RETURN ((LM_ERROR
,
222 ACE_TEXT ("Insufficient privilege to activate ACE_Task.\n")),
225 ACE_DEBUG ((LM_ERROR
,
226 ACE_TEXT ("(%t) Task activation at priority %d failed.\n")));
229 orb_run_task
.thr_mgr ()->wait ();
230 client
.thr_mgr ()->wait ();
232 catch (const CORBA::Exception
& ex
)
234 ex
._tao_print_exception (ACE_TEXT ("Consumer Client error "));