Merge pull request #1551 from DOCGroup/plm_jira_333
[ACE_TAO.git] / TAO / orbsvcs / examples / Notify / ThreadPool / Consumer_Client.cpp
blobd34251bb9b78f78ee75a2af2a6f4213982cd470e
1 #include "Consumer_Client.h"
2 #include "Consumer.h"
3 #include "ORB_Run_Task.h"
4 #include "ace/Arg_Shifter.h"
5 #include "orbsvcs/NotifyExtC.h"
6 #include "orbsvcs/CosNamingC.h"
7 #include "tao/ORB_Core.h"
8 #include "ace/Sched_Params.h"
9 #include "ace/OS_NS_errno.h"
13 TAO_Notify_ThreadPool_Consumer_Client::TAO_Notify_ThreadPool_Consumer_Client (TAO_Notify_ORB_Objects& orb_objects)
14 : orb_objects_ (orb_objects)
15 , consumer_ (0)
16 , proxy_supplier_thread_count_ (0)
17 , max_events_ (10)
18 , delay_ (0)
22 TAO_Notify_ThreadPool_Consumer_Client::~TAO_Notify_ThreadPool_Consumer_Client ()
26 int
27 TAO_Notify_ThreadPool_Consumer_Client::parse_args (int argc, ACE_TCHAR *argv[])
29 ACE_Arg_Shifter arg_shifter (argc, argv);
31 const ACE_TCHAR *current_arg = 0;
33 while (arg_shifter.is_anything_left ())
35 if (0 != (current_arg =
36 arg_shifter.get_the_parameter(ACE_TEXT("-ProxySupplier_ThreadPool")))) // Specify a threadpool.
38 this->proxy_supplier_thread_count_ = ACE_OS::atoi (arg_shifter.get_current ());
40 arg_shifter.consume_arg ();
42 else if (0 != (current_arg = arg_shifter.get_the_parameter(ACE_TEXT("-MaxEvents")))) // Max Events
44 this->max_events_ = ACE_OS::atoi (arg_shifter.get_current ());
46 arg_shifter.consume_arg ();
48 else if (0 != (current_arg = arg_shifter.get_the_parameter(ACE_TEXT("-Delay")))) // seconds wait in consumer per push.
50 this->delay_ = ACE_OS::atoi (current_arg);
52 arg_shifter.consume_arg ();
54 else
56 arg_shifter.ignore_arg ();
60 return 0;
63 void
64 TAO_Notify_ThreadPool_Consumer_Client::_init (void)
66 PortableServer::POAManager_var poa_manager =
67 this->orb_objects_.root_poa_->the_POAManager ();
69 poa_manager->activate ();
71 // Resolve the Notification Factory.
72 CosNotifyChannelAdmin::EventChannelFactory_var ecf = this->orb_objects_.notify_factory ();
74 // Find the EventChannel created by the supplier.
75 CosNotifyChannelAdmin::ChannelIDSeq_var channel_seq = ecf->get_all_channels ();
77 CosNotifyChannelAdmin::EventChannel_var ec;
79 if (channel_seq->length() > 0)
81 ec = ecf->get_event_channel (channel_seq[0]);
83 else
85 ACE_DEBUG ((LM_DEBUG, "No Event Channel active!\n"));
86 return;
89 // Create a Consumer Admin
90 CosNotifyChannelAdmin::AdminID adminid = 0;
92 CosNotifyChannelAdmin::ConsumerAdmin_var consumer_admin =
93 ec->new_for_consumers (CosNotifyChannelAdmin::AND_OP, adminid);
95 ACE_ASSERT (!CORBA::is_nil (consumer_admin.in ()));
97 PortableServer::POA_var rt_poa = this->create_rt_poa ();
99 // Create a Consumer
100 this->consumer_ = new TAO_Notify_ThreadPool_Consumer (this->orb_objects_);
102 // Initialize it.
103 this->consumer_->init (rt_poa, consumer_admin, this->proxy_supplier_thread_count_, this->max_events_, this->delay_);
106 PortableServer::POA_ptr
107 TAO_Notify_ThreadPool_Consumer_Client::create_rt_poa (void)
109 PortableServer::POA_var rt_poa;
111 // Create an RT POA with a lane at the given priority.
112 CORBA::Policy_var priority_model_policy;
113 CORBA::Policy_var thread_pool_policy;
115 CORBA::Policy_var activation_policy =
116 this->orb_objects_.root_poa_->create_implicit_activation_policy (PortableServer::IMPLICIT_ACTIVATION);
118 // Create a priority model policy.
119 priority_model_policy =
120 this->orb_objects_.rt_orb_->create_priority_model_policy (RTCORBA::CLIENT_PROPAGATED
121 , 0);
123 CORBA::ULong stacksize = 0;
124 CORBA::ULong static_threads = 1;
125 CORBA::ULong dynamic_threads = 0;
126 RTCORBA::Priority default_priority = 0;
127 CORBA::Boolean allow_request_buffering = 0;
128 CORBA::ULong max_buffered_requests = 0;
129 CORBA::ULong max_request_buffer_size = 0;
131 // Create the thread-pool.
132 RTCORBA::ThreadpoolId threadpool_id =
133 this->orb_objects_.rt_orb_->create_threadpool (stacksize,
134 static_threads,
135 dynamic_threads,
136 default_priority,
137 allow_request_buffering,
138 max_buffered_requests,
139 max_request_buffer_size);
141 thread_pool_policy =
142 this->orb_objects_.rt_orb_->create_threadpool_policy (threadpool_id);
144 CORBA::PolicyList poa_policy_list;
146 poa_policy_list.length (3);
147 poa_policy_list[0] = priority_model_policy;
148 poa_policy_list[1] = activation_policy;
149 poa_policy_list[2] = thread_pool_policy;
151 PortableServer::POAManager_var poa_manager =
152 this->orb_objects_.root_poa_->the_POAManager ();
154 rt_poa = this->orb_objects_.root_poa_->create_POA ("RT POA!",
155 poa_manager.in (),
156 poa_policy_list);
158 return rt_poa._retn ();
161 void
162 TAO_Notify_ThreadPool_Consumer_Client::run (void)
164 this->consumer_->run ();
167 void
168 TAO_Notify_ThreadPool_Consumer_Client::dump_stats (void)
170 this->consumer_->dump_throughput ();
174 TAO_Notify_ThreadPool_Consumer_Client::svc (void)
178 // Initialize this threads priority.
179 this->orb_objects_.current_->the_priority (0);
181 this->_init (); //Init the Client
183 this->run ();
185 catch (const CORBA::Exception& ex)
187 ex._tao_print_exception (ACE_TEXT ("Supplier error "));
191 return 0;
195 ACE_TMAIN (int argc, ACE_TCHAR *argv[])
199 // Initialize an ORB
200 CORBA::ORB_var orb = CORBA::ORB_init (argc, argv);
202 TAO_Notify_ORB_Objects orb_objects;
204 orb_objects.init (orb);
206 TAO_Notify_ORB_Run_Task orb_run_task (orb_objects);
208 TAO_Notify_ThreadPool_Consumer_Client client (orb_objects);
210 if (client.parse_args (argc, argv) != 0)
212 ACE_DEBUG ((LM_DEBUG, "Consumer_Client::Error parsing options\n"));
213 return -1;
216 long flags = THR_NEW_LWP | THR_JOINABLE;
218 flags |=
219 orb->orb_core ()->orb_params ()->thread_creation_flags ();
222 if (orb_run_task.activate (flags) == -1 || client.activate (flags) == -1)
224 if (ACE_OS::last_error () == EPERM)
225 ACE_ERROR_RETURN ((LM_ERROR,
226 ACE_TEXT ("Insufficient privilege to activate ACE_Task.\n")),
227 -1);
228 else
229 ACE_DEBUG ((LM_ERROR,
230 ACE_TEXT ("(%t) Task activation at priority %d failed.\n")));
233 orb_run_task.thr_mgr ()->wait ();
234 client.thr_mgr ()->wait ();
236 client.dump_stats ();
238 catch (const CORBA::Exception& ex)
240 ex._tao_print_exception (ACE_TEXT ("Consumer Client error "));
243 return 0;