1 #include "Supplier_Client.h"
3 #include "ORB_Run_Task.h"
4 #include "ace/Arg_Shifter.h"
5 #include "tao/ORB_Core.h"
6 #include "ace/Sched_Params.h"
8 #include "orbsvcs/NotifyExtC.h"
9 #include "orbsvcs/CosNamingC.h"
10 #include "ace/OS_NS_errno.h"
14 TAO_Notify_Lanes_Supplier_Client::TAO_Notify_Lanes_Supplier_Client (TAO_Notify_ORB_Objects
& orb_objects
)
15 : orb_objects_ (orb_objects
)
21 TAO_Notify_Lanes_Supplier_Client::~TAO_Notify_Lanes_Supplier_Client ()
26 TAO_Notify_Lanes_Supplier_Client::parse_args (int argc
, ACE_TCHAR
*argv
[])
28 ACE_Arg_Shifter
arg_shifter (argc
, argv
);
30 const ACE_TCHAR
*current_arg
= 0;
32 while (arg_shifter
.is_anything_left ())
34 if (0 != (current_arg
= arg_shifter
.get_the_parameter (ACE_TEXT("-Consumers")))) // Number of consumers that we need to send an event to.
38 this->consumer_count_
= ACE_OS::atoi (current_arg
);
41 arg_shifter
.consume_arg ();
43 else if (0 != (current_arg
= arg_shifter
.get_the_parameter (ACE_TEXT("-IORoutput")))) // The file to output the supplier ior to.
47 this->ior_file_name_
= current_arg
;
50 arg_shifter
.consume_arg ();
54 arg_shifter
.ignore_arg ();
62 TAO_Notify_Lanes_Supplier_Client::initialize (void)
64 PortableServer::POAManager_var poa_manager
=
65 this->orb_objects_
.root_poa_
->the_POAManager ();
67 poa_manager
->activate ();
69 CosNotifyChannelAdmin::EventChannel_var ec
= this->create_ec ();
71 // Create a Supplier Admin
72 CosNotifyChannelAdmin::AdminID adminid
= 0;
74 CosNotifyChannelAdmin::SupplierAdmin_var supplier_admin
=
75 ec
->new_for_suppliers (CosNotifyChannelAdmin::AND_OP
, adminid
);
77 ACE_ASSERT (!CORBA::is_nil (supplier_admin
.in ()));
80 this->supplier_
= new TAO_Notify_Lanes_Supplier (this->orb_objects_
);
83 this->supplier_
->init (supplier_admin
, this->consumer_count_
);
86 CosNotifyChannelAdmin::EventChannel_ptr
87 TAO_Notify_Lanes_Supplier_Client::create_ec (void)
89 CosNotifyChannelAdmin::EventChannel_var ec
;
91 CosNotifyChannelAdmin::EventChannelFactory_var ecf
= this->orb_objects_
.notify_factory ();
93 // Create an EventChannel
94 CosNotification::QoSProperties qos
;
95 CosNotification::AdminProperties admin
;
97 // Create an event channel
98 CosNotifyChannelAdmin::ChannelID id
;
100 ec
= ecf
->create_channel (qos
,
104 // Set the Qos : 2 Lanes
105 NotifyExt::ThreadPoolLanesParams tpl_params
;
107 tpl_params
.priority_model
= NotifyExt::CLIENT_PROPAGATED
;
108 tpl_params
.server_priority
= 0;
109 tpl_params
.stacksize
= 0;
110 tpl_params
.lanes
.length (this->consumer_count_
+ 1);
111 tpl_params
.allow_borrowing
= 0;
112 tpl_params
.allow_request_buffering
= 0;
113 tpl_params
.max_buffered_requests
= 0;
114 tpl_params
.max_request_buffer_size
= 0;
117 * Note that we actually create 1 extra Lane.
118 * The extra Lane at priority 0 is created to match the priority 0 of the supplier thread.
119 * As the ProxyConsumer is activated in an RT POA with lanes, each invocation must mach some lane.
120 * Now, we typically reserve higer priorities to make requests and the lowest priority 0 for administrative calls
121 * e.g. <subscription_change>. If we do not have a lane at the lowest 0 priority, then the invocation made from
122 * the supplier at priority 0 will fail.
124 tpl_params
.lanes
[0].lane_priority
= 0; // Priority 0
125 tpl_params
.lanes
[0].static_threads
= 1;
126 tpl_params
.lanes
[0].dynamic_threads
= 0;
128 RTCORBA::Priority priority
= 1; // The priority at which we send an event each.
130 for (int i
= 1; i
<= this->consumer_count_
; ++i
, ++priority
)
132 tpl_params
.lanes
[i
].lane_priority
= priority
;
133 tpl_params
.lanes
[i
].static_threads
= 1;
134 tpl_params
.lanes
[i
].dynamic_threads
= 0;
138 qos
[0].name
= CORBA::string_dup (NotifyExt::ThreadPoolLanes
);
139 qos
[0].value
<<= tpl_params
;
141 // Note that instead of <set_qos>, the <qos> can also be passed while creating the channel.
148 TAO_Notify_Lanes_Supplier_Client::run (void)
150 /// First, signal that the supplier is ready.
153 this->supplier_
->run ();
157 TAO_Notify_Lanes_Supplier_Client::write_ior (void)
159 CosNotifyComm::StructuredPushSupplier_var objref
= this->supplier_
->_this ();
161 // Write the ior to a file to signal waiting consumers.
162 FILE *ior_output_file
= ACE_OS::fopen (this->ior_file_name_
.c_str (), ACE_TEXT("w"));
164 if (ior_output_file
!= 0)
166 CORBA::String_var str
=
167 this->orb_objects_
.orb_
->object_to_string (objref
.in ());
169 ACE_OS::fprintf (ior_output_file
,
172 ACE_OS::fclose (ior_output_file
);
177 TAO_Notify_Lanes_Supplier_Client::svc (void)
181 this->orb_objects_
.current_
->the_priority (0);
183 this->initialize (); //Init the Client
187 catch (const CORBA::Exception
& ex
)
189 ex
._tao_print_exception (ACE_TEXT ("Supplier error "));
197 ACE_TMAIN (int argc
, ACE_TCHAR
*argv
[])
202 CORBA::ORB_var orb
= CORBA::ORB_init (argc
, argv
);
204 // Create a holder for the common ORB Objects.
205 TAO_Notify_ORB_Objects orb_objects
;
207 orb_objects
.init (orb
);
209 /* Run the ORB in a separate thread */
210 TAO_Notify_ORB_Run_Task
orb_run_task (orb_objects
);
212 /* Create a Client */
213 TAO_Notify_Lanes_Supplier_Client
client (orb_objects
);
215 if (client
.parse_args (argc
, argv
) != 0)
217 ACE_DEBUG ((LM_DEBUG
, "Supplier_Client::Error parsing options\n"));
221 long flags
= THR_NEW_LWP
| THR_JOINABLE
;
224 orb
->orb_core ()->orb_params ()->thread_creation_flags ();
226 /* Both the tasks initialize themselves at Priority 0*/
227 if (orb_run_task
.activate (flags
) == -1 || client
.activate (flags
) == -1)
229 if (ACE_OS::last_error () == EPERM
)
230 ACE_ERROR_RETURN ((LM_ERROR
,
231 ACE_TEXT ("Insufficient privilege to activate ACE_Task.\n")),
234 ACE_DEBUG ((LM_ERROR
,
235 ACE_TEXT ("(%t) Task activation at priority %d failed.\n")));
238 orb_run_task
.thr_mgr ()->wait ();
239 client
.thr_mgr ()->wait ();
241 catch (const CORBA::Exception
& ex
)
243 ex
._tao_print_exception (ACE_TEXT ("Supplier Client error "));