Merge pull request #1551 from DOCGroup/plm_jira_333
[ACE_TAO.git] / TAO / orbsvcs / examples / Notify / Lanes / Supplier_Client.cpp
blob32865fcd20c833b9dfbe13a0334ccae41d19bfa4
1 #include "Supplier_Client.h"
3 #include "ORB_Run_Task.h"
4 #include "ace/Arg_Shifter.h"
5 #include "tao/ORB_Core.h"
6 #include "ace/Sched_Params.h"
7 #include "Supplier.h"
8 #include "orbsvcs/NotifyExtC.h"
9 #include "orbsvcs/CosNamingC.h"
10 #include "ace/OS_NS_errno.h"
14 TAO_Notify_Lanes_Supplier_Client::TAO_Notify_Lanes_Supplier_Client (TAO_Notify_ORB_Objects& orb_objects)
15 : orb_objects_ (orb_objects)
16 ,supplier_ (0)
17 , consumer_count_ (2)
21 TAO_Notify_Lanes_Supplier_Client::~TAO_Notify_Lanes_Supplier_Client ()
25 int
26 TAO_Notify_Lanes_Supplier_Client::parse_args (int argc, ACE_TCHAR *argv[])
28 ACE_Arg_Shifter arg_shifter (argc, argv);
30 const ACE_TCHAR *current_arg = 0;
32 while (arg_shifter.is_anything_left ())
34 if (0 != (current_arg = arg_shifter.get_the_parameter (ACE_TEXT("-Consumers")))) // Number of consumers that we need to send an event to.
36 if (current_arg != 0)
38 this->consumer_count_ = ACE_OS::atoi (current_arg);
41 arg_shifter.consume_arg ();
43 else if (0 != (current_arg = arg_shifter.get_the_parameter (ACE_TEXT("-IORoutput")))) // The file to output the supplier ior to.
45 if (current_arg != 0)
47 this->ior_file_name_ = current_arg;
50 arg_shifter.consume_arg ();
52 else
54 arg_shifter.ignore_arg ();
58 return 0;
61 void
62 TAO_Notify_Lanes_Supplier_Client::initialize (void)
64 PortableServer::POAManager_var poa_manager =
65 this->orb_objects_.root_poa_->the_POAManager ();
67 poa_manager->activate ();
69 CosNotifyChannelAdmin::EventChannel_var ec = this->create_ec ();
71 // Create a Supplier Admin
72 CosNotifyChannelAdmin::AdminID adminid = 0;
74 CosNotifyChannelAdmin::SupplierAdmin_var supplier_admin =
75 ec->new_for_suppliers (CosNotifyChannelAdmin::AND_OP, adminid);
77 ACE_ASSERT (!CORBA::is_nil (supplier_admin.in ()));
79 // Create a Supplier
80 this->supplier_ = new TAO_Notify_Lanes_Supplier (this->orb_objects_);
82 // Initialize it.
83 this->supplier_->init (supplier_admin, this->consumer_count_);
86 CosNotifyChannelAdmin::EventChannel_ptr
87 TAO_Notify_Lanes_Supplier_Client::create_ec (void)
89 CosNotifyChannelAdmin::EventChannel_var ec;
91 CosNotifyChannelAdmin::EventChannelFactory_var ecf = this->orb_objects_.notify_factory ();
93 // Create an EventChannel
94 CosNotification::QoSProperties qos;
95 CosNotification::AdminProperties admin;
97 // Create an event channel
98 CosNotifyChannelAdmin::ChannelID id;
100 ec = ecf->create_channel (qos,
101 admin,
102 id);
104 // Set the Qos : 2 Lanes
105 NotifyExt::ThreadPoolLanesParams tpl_params;
107 tpl_params.priority_model = NotifyExt::CLIENT_PROPAGATED;
108 tpl_params.server_priority = 0;
109 tpl_params.stacksize = 0;
110 tpl_params.lanes.length (this->consumer_count_ + 1);
111 tpl_params.allow_borrowing = 0;
112 tpl_params.allow_request_buffering = 0;
113 tpl_params.max_buffered_requests = 0;
114 tpl_params.max_request_buffer_size = 0;
117 * Note that we actually create 1 extra Lane.
118 * The extra Lane at priority 0 is created to match the priority 0 of the supplier thread.
119 * As the ProxyConsumer is activated in an RT POA with lanes, each invocation must mach some lane.
120 * Now, we typically reserve higer priorities to make requests and the lowest priority 0 for administrative calls
121 * e.g. <subscription_change>. If we do not have a lane at the lowest 0 priority, then the invocation made from
122 * the supplier at priority 0 will fail.
124 tpl_params.lanes[0].lane_priority = 0; // Priority 0
125 tpl_params.lanes[0].static_threads = 1;
126 tpl_params.lanes[0].dynamic_threads = 0;
128 RTCORBA::Priority priority = 1; // The priority at which we send an event each.
130 for (int i = 1; i <= this->consumer_count_; ++i, ++priority)
132 tpl_params.lanes[i].lane_priority = priority;
133 tpl_params.lanes[i].static_threads = 1;
134 tpl_params.lanes[i].dynamic_threads = 0;
137 qos.length (1);
138 qos[0].name = CORBA::string_dup (NotifyExt::ThreadPoolLanes);
139 qos[0].value <<= tpl_params;
141 // Note that instead of <set_qos>, the <qos> can also be passed while creating the channel.
142 ec->set_qos (qos);
144 return ec._retn ();
147 void
148 TAO_Notify_Lanes_Supplier_Client::run (void)
150 /// First, signal that the supplier is ready.
151 this->write_ior ();
153 this->supplier_->run ();
156 void
157 TAO_Notify_Lanes_Supplier_Client::write_ior (void)
159 CosNotifyComm::StructuredPushSupplier_var objref = this->supplier_->_this ();
161 // Write the ior to a file to signal waiting consumers.
162 FILE *ior_output_file = ACE_OS::fopen (this->ior_file_name_.c_str (), ACE_TEXT("w"));
164 if (ior_output_file != 0)
166 CORBA::String_var str =
167 this->orb_objects_.orb_->object_to_string (objref.in ());
169 ACE_OS::fprintf (ior_output_file,
170 "%s",
171 str.in ());
172 ACE_OS::fclose (ior_output_file);
177 TAO_Notify_Lanes_Supplier_Client::svc (void)
181 this->orb_objects_.current_->the_priority (0);
183 this->initialize (); //Init the Client
185 this->run ();
187 catch (const CORBA::Exception& ex)
189 ex._tao_print_exception (ACE_TEXT ("Supplier error "));
193 return 0;
197 ACE_TMAIN (int argc, ACE_TCHAR *argv[])
201 // Initialize an ORB
202 CORBA::ORB_var orb = CORBA::ORB_init (argc, argv);
204 // Create a holder for the common ORB Objects.
205 TAO_Notify_ORB_Objects orb_objects;
207 orb_objects.init (orb);
209 /* Run the ORB in a separate thread */
210 TAO_Notify_ORB_Run_Task orb_run_task (orb_objects);
212 /* Create a Client */
213 TAO_Notify_Lanes_Supplier_Client client (orb_objects);
215 if (client.parse_args (argc, argv) != 0)
217 ACE_DEBUG ((LM_DEBUG, "Supplier_Client::Error parsing options\n"));
218 return -1;
221 long flags = THR_NEW_LWP | THR_JOINABLE;
223 flags |=
224 orb->orb_core ()->orb_params ()->thread_creation_flags ();
226 /* Both the tasks initialize themselves at Priority 0*/
227 if (orb_run_task.activate (flags) == -1 || client.activate (flags) == -1)
229 if (ACE_OS::last_error () == EPERM)
230 ACE_ERROR_RETURN ((LM_ERROR,
231 ACE_TEXT ("Insufficient privilege to activate ACE_Task.\n")),
232 -1);
233 else
234 ACE_DEBUG ((LM_ERROR,
235 ACE_TEXT ("(%t) Task activation at priority %d failed.\n")));
238 orb_run_task.thr_mgr ()->wait ();
239 client.thr_mgr ()->wait ();
241 catch (const CORBA::Exception& ex)
243 ex._tao_print_exception (ACE_TEXT ("Supplier Client error "));
246 return 0;