Merge pull request #1551 from DOCGroup/plm_jira_333
[ACE_TAO.git] / TAO / orbsvcs / examples / Notify / Lanes / Consumer_Client.cpp
blob285db7c8a01f23d25033cb6dc161258b4dde9bcc
1 #include "Consumer_Client.h"
2 #include "Consumer.h"
3 #include "ORB_Run_Task.h"
4 #include "ace/Arg_Shifter.h"
5 #include "orbsvcs/NotifyExtC.h"
6 #include "orbsvcs/CosNamingC.h"
7 #include "tao/ORB_Core.h"
8 #include "ace/Sched_Params.h"
9 #include "ace/OS_NS_errno.h"
13 TAO_Notify_Lanes_Consumer_Client::TAO_Notify_Lanes_Consumer_Client (TAO_Notify_ORB_Objects& orb_objects)
14 : orb_objects_ (orb_objects)
15 , lane_priority_ (0)
16 , consumer_ (0)
20 TAO_Notify_Lanes_Consumer_Client::~TAO_Notify_Lanes_Consumer_Client ()
24 int
25 TAO_Notify_Lanes_Consumer_Client::parse_args (int argc, ACE_TCHAR *argv[])
27 ACE_Arg_Shifter arg_shifter (argc, argv);
29 const ACE_TCHAR *current_arg = 0;
31 while (arg_shifter.is_anything_left ())
33 if (0 != (current_arg = arg_shifter.get_the_parameter (ACE_TEXT("-LanePriority")))) // LanePriority
35 if (current_arg != 0)
37 this->lane_priority_ = ACE_OS::atoi (current_arg);
39 char type[BUFSIZ];
40 ACE_OS::sprintf (type, "TEST_TYPE_%d", this->lane_priority_);
41 this->event_type_ = type;
44 arg_shifter.consume_arg ();
46 else
48 arg_shifter.ignore_arg ();
52 return 0;
55 void
56 TAO_Notify_Lanes_Consumer_Client::initialize (void)
58 ACE_DEBUG ((LM_DEBUG, "(%P, %t)Initializing Consumer Client with lane priority = %d, event type = (%s)\n"
59 , this->lane_priority_, this->event_type_.c_str ()));
61 PortableServer::POAManager_var poa_manager =
62 this->orb_objects_.root_poa_->the_POAManager ();
64 poa_manager->activate ();
66 // Resolve the Notification Factory.
67 CosNotifyChannelAdmin::EventChannelFactory_var ecf = this->orb_objects_.notify_factory ();
69 // Find the EventChannel created by the supplier.
70 CosNotifyChannelAdmin::ChannelIDSeq_var channel_seq = ecf->get_all_channels ();
72 CosNotifyChannelAdmin::EventChannel_var ec;
74 if (channel_seq->length() > 0)
76 ec = ecf->get_event_channel (channel_seq[0]);
78 else
80 ACE_DEBUG ((LM_DEBUG, "No Event Channel active!\n"));
81 return;
84 // Create a Consumer Admin
85 CosNotifyChannelAdmin::AdminID adminid = 0;
87 CosNotifyChannelAdmin::ConsumerAdmin_var consumer_admin =
88 ec->new_for_consumers (CosNotifyChannelAdmin::AND_OP, adminid);
90 ACE_ASSERT (!CORBA::is_nil (consumer_admin.in ()));
92 PortableServer::POA_var rt_poa = this->create_rt_poa ();
94 // Create a Consumer
95 this->consumer_ = new TAO_Notify_Lanes_Consumer (this->orb_objects_);
97 // Initialize it.
98 this->consumer_->init (rt_poa, consumer_admin, this->event_type_);
101 PortableServer::POA_ptr
102 TAO_Notify_Lanes_Consumer_Client::create_rt_poa (void)
104 PortableServer::POA_var rt_poa;
106 // Create an RT POA with a lane at the given priority.
107 CORBA::Policy_var priority_model_policy;
108 CORBA::Policy_var lanes_policy;
110 CORBA::Policy_var activation_policy =
111 this->orb_objects_.root_poa_->create_implicit_activation_policy (PortableServer::IMPLICIT_ACTIVATION);
113 // Create a priority model policy.
114 priority_model_policy =
115 this->orb_objects_.rt_orb_->create_priority_model_policy (RTCORBA::CLIENT_PROPAGATED
116 , 0);
118 RTCORBA::ThreadpoolLanes lanes (1);
119 lanes.length (1);
121 lanes[0].lane_priority = this->lane_priority_;
122 lanes[0].static_threads = 1;
123 lanes[0].dynamic_threads = 0;
126 // Create a thread-pool.
127 CORBA::ULong stacksize = 0;
128 CORBA::Boolean allow_request_buffering = 0;
129 CORBA::ULong max_buffered_requests = 0;
130 CORBA::ULong max_request_buffer_size = 0;
131 CORBA::Boolean allow_borrowing = 0;
133 // Create the thread-pool.
134 RTCORBA::ThreadpoolId threadpool_id =
135 this->orb_objects_.rt_orb_->create_threadpool_with_lanes (stacksize,
136 lanes,
137 allow_borrowing,
138 allow_request_buffering,
139 max_buffered_requests,
140 max_request_buffer_size);
142 // Create a thread-pool policy.
143 lanes_policy =
144 this->orb_objects_.rt_orb_->create_threadpool_policy (threadpool_id);
146 CORBA::PolicyList poa_policy_list;
148 poa_policy_list.length (3);
149 poa_policy_list[0] = priority_model_policy;
150 poa_policy_list[1] = activation_policy;
151 poa_policy_list[2] = lanes_policy;
153 PortableServer::POAManager_var poa_manager =
154 this->orb_objects_.root_poa_->the_POAManager ();
156 rt_poa = this->orb_objects_.root_poa_->create_POA ("RT POA!",
157 poa_manager.in (),
158 poa_policy_list);
160 return rt_poa._retn ();
163 void
164 TAO_Notify_Lanes_Consumer_Client::run (void)
166 this->consumer_->run ();
170 TAO_Notify_Lanes_Consumer_Client::svc (void)
174 // Initialize this threads priority.
175 this->orb_objects_.current_->the_priority (0);
177 this->initialize (); //Init the Client
179 this->run ();
181 catch (const CORBA::Exception& ex)
183 ex._tao_print_exception (ACE_TEXT ("Supplier error "));
187 return 0;
191 ACE_TMAIN (int argc, ACE_TCHAR *argv[])
195 // Initialize an ORB
196 CORBA::ORB_var orb = CORBA::ORB_init (argc, argv);
198 TAO_Notify_ORB_Objects orb_objects;
200 orb_objects.init (orb);
202 TAO_Notify_ORB_Run_Task orb_run_task (orb_objects);
204 TAO_Notify_Lanes_Consumer_Client client (orb_objects);
206 if (client.parse_args (argc, argv) != 0)
208 ACE_DEBUG ((LM_DEBUG, "Consumer_Client::Error parsing options\n"));
209 return -1;
212 long flags = THR_NEW_LWP | THR_JOINABLE;
214 flags |=
215 orb->orb_core ()->orb_params ()->thread_creation_flags ();
218 if (orb_run_task.activate (flags) == -1 || client.activate (flags) == -1)
220 if (ACE_OS::last_error () == EPERM)
221 ACE_ERROR_RETURN ((LM_ERROR,
222 ACE_TEXT ("Insufficient privilege to activate ACE_Task.\n")),
223 -1);
224 else
225 ACE_DEBUG ((LM_ERROR,
226 ACE_TEXT ("(%t) Task activation at priority %d failed.\n")));
229 orb_run_task.thr_mgr ()->wait ();
230 client.thr_mgr ()->wait ();
232 catch (const CORBA::Exception& ex)
234 ex._tao_print_exception (ACE_TEXT ("Consumer Client error "));
237 return 0;