1 #include "Consumer_Client.h"
3 #include "ORB_Run_Task.h"
4 #include "ace/Arg_Shifter.h"
5 #include "orbsvcs/NotifyExtC.h"
6 #include "orbsvcs/CosNamingC.h"
7 #include "tao/ORB_Core.h"
8 #include "ace/Sched_Params.h"
9 #include "ace/OS_NS_errno.h"
13 TAO_Notify_ThreadPool_Consumer_Client::TAO_Notify_ThreadPool_Consumer_Client (TAO_Notify_ORB_Objects
& orb_objects
)
14 : orb_objects_ (orb_objects
)
16 , proxy_supplier_thread_count_ (0)
22 TAO_Notify_ThreadPool_Consumer_Client::~TAO_Notify_ThreadPool_Consumer_Client ()
27 TAO_Notify_ThreadPool_Consumer_Client::parse_args (int argc
, ACE_TCHAR
*argv
[])
29 ACE_Arg_Shifter
arg_shifter (argc
, argv
);
31 const ACE_TCHAR
*current_arg
= 0;
33 while (arg_shifter
.is_anything_left ())
35 if (0 != (current_arg
=
36 arg_shifter
.get_the_parameter(ACE_TEXT("-ProxySupplier_ThreadPool")))) // Specify a threadpool.
38 this->proxy_supplier_thread_count_
= ACE_OS::atoi (arg_shifter
.get_current ());
40 arg_shifter
.consume_arg ();
42 else if (0 != (current_arg
= arg_shifter
.get_the_parameter(ACE_TEXT("-MaxEvents")))) // Max Events
44 this->max_events_
= ACE_OS::atoi (arg_shifter
.get_current ());
46 arg_shifter
.consume_arg ();
48 else if (0 != (current_arg
= arg_shifter
.get_the_parameter(ACE_TEXT("-Delay")))) // seconds wait in consumer per push.
50 this->delay_
= ACE_OS::atoi (current_arg
);
52 arg_shifter
.consume_arg ();
56 arg_shifter
.ignore_arg ();
64 TAO_Notify_ThreadPool_Consumer_Client::_init (void)
66 PortableServer::POAManager_var poa_manager
=
67 this->orb_objects_
.root_poa_
->the_POAManager ();
69 poa_manager
->activate ();
71 // Resolve the Notification Factory.
72 CosNotifyChannelAdmin::EventChannelFactory_var ecf
= this->orb_objects_
.notify_factory ();
74 // Find the EventChannel created by the supplier.
75 CosNotifyChannelAdmin::ChannelIDSeq_var channel_seq
= ecf
->get_all_channels ();
77 CosNotifyChannelAdmin::EventChannel_var ec
;
79 if (channel_seq
->length() > 0)
81 ec
= ecf
->get_event_channel (channel_seq
[0]);
85 ACE_DEBUG ((LM_DEBUG
, "No Event Channel active!\n"));
89 // Create a Consumer Admin
90 CosNotifyChannelAdmin::AdminID adminid
= 0;
92 CosNotifyChannelAdmin::ConsumerAdmin_var consumer_admin
=
93 ec
->new_for_consumers (CosNotifyChannelAdmin::AND_OP
, adminid
);
95 ACE_ASSERT (!CORBA::is_nil (consumer_admin
.in ()));
97 PortableServer::POA_var rt_poa
= this->create_rt_poa ();
100 this->consumer_
= new TAO_Notify_ThreadPool_Consumer (this->orb_objects_
);
103 this->consumer_
->init (rt_poa
, consumer_admin
, this->proxy_supplier_thread_count_
, this->max_events_
, this->delay_
);
106 PortableServer::POA_ptr
107 TAO_Notify_ThreadPool_Consumer_Client::create_rt_poa (void)
109 PortableServer::POA_var rt_poa
;
111 // Create an RT POA with a lane at the given priority.
112 CORBA::Policy_var priority_model_policy
;
113 CORBA::Policy_var thread_pool_policy
;
115 CORBA::Policy_var activation_policy
=
116 this->orb_objects_
.root_poa_
->create_implicit_activation_policy (PortableServer::IMPLICIT_ACTIVATION
);
118 // Create a priority model policy.
119 priority_model_policy
=
120 this->orb_objects_
.rt_orb_
->create_priority_model_policy (RTCORBA::CLIENT_PROPAGATED
123 CORBA::ULong stacksize
= 0;
124 CORBA::ULong static_threads
= 1;
125 CORBA::ULong dynamic_threads
= 0;
126 RTCORBA::Priority default_priority
= 0;
127 CORBA::Boolean allow_request_buffering
= 0;
128 CORBA::ULong max_buffered_requests
= 0;
129 CORBA::ULong max_request_buffer_size
= 0;
131 // Create the thread-pool.
132 RTCORBA::ThreadpoolId threadpool_id
=
133 this->orb_objects_
.rt_orb_
->create_threadpool (stacksize
,
137 allow_request_buffering
,
138 max_buffered_requests
,
139 max_request_buffer_size
);
142 this->orb_objects_
.rt_orb_
->create_threadpool_policy (threadpool_id
);
144 CORBA::PolicyList poa_policy_list
;
146 poa_policy_list
.length (3);
147 poa_policy_list
[0] = priority_model_policy
;
148 poa_policy_list
[1] = activation_policy
;
149 poa_policy_list
[2] = thread_pool_policy
;
151 PortableServer::POAManager_var poa_manager
=
152 this->orb_objects_
.root_poa_
->the_POAManager ();
154 rt_poa
= this->orb_objects_
.root_poa_
->create_POA ("RT POA!",
158 return rt_poa
._retn ();
162 TAO_Notify_ThreadPool_Consumer_Client::run (void)
164 this->consumer_
->run ();
168 TAO_Notify_ThreadPool_Consumer_Client::dump_stats (void)
170 this->consumer_
->dump_throughput ();
174 TAO_Notify_ThreadPool_Consumer_Client::svc (void)
178 // Initialize this threads priority.
179 this->orb_objects_
.current_
->the_priority (0);
181 this->_init (); //Init the Client
185 catch (const CORBA::Exception
& ex
)
187 ex
._tao_print_exception (ACE_TEXT ("Supplier error "));
195 ACE_TMAIN (int argc
, ACE_TCHAR
*argv
[])
200 CORBA::ORB_var orb
= CORBA::ORB_init (argc
, argv
);
202 TAO_Notify_ORB_Objects orb_objects
;
204 orb_objects
.init (orb
);
206 TAO_Notify_ORB_Run_Task
orb_run_task (orb_objects
);
208 TAO_Notify_ThreadPool_Consumer_Client
client (orb_objects
);
210 if (client
.parse_args (argc
, argv
) != 0)
212 ACE_DEBUG ((LM_DEBUG
, "Consumer_Client::Error parsing options\n"));
216 long flags
= THR_NEW_LWP
| THR_JOINABLE
;
219 orb
->orb_core ()->orb_params ()->thread_creation_flags ();
222 if (orb_run_task
.activate (flags
) == -1 || client
.activate (flags
) == -1)
224 if (ACE_OS::last_error () == EPERM
)
225 ACE_ERROR_RETURN ((LM_ERROR
,
226 ACE_TEXT ("Insufficient privilege to activate ACE_Task.\n")),
229 ACE_DEBUG ((LM_ERROR
,
230 ACE_TEXT ("(%t) Task activation at priority %d failed.\n")));
233 orb_run_task
.thr_mgr ()->wait ();
234 client
.thr_mgr ()->wait ();
236 client
.dump_stats ();
238 catch (const CORBA::Exception
& ex
)
240 ex
._tao_print_exception (ACE_TEXT ("Consumer Client error "));