4 Session::Session (Test::Session_Control_ptr control
,
5 CORBA::ULong payload_size
,
6 CORBA::ULong thread_count
,
7 CORBA::ULong message_count
,
8 CORBA::ULong peer_count
)
9 : control_ (Test::Session_Control::_duplicate (control
))
11 , payload_size_ (payload_size
)
12 , thread_count_ (thread_count
)
13 , message_count_ (message_count
)
14 , active_thread_count_ (0)
15 , expected_messages_ (thread_count
* message_count
* (peer_count
- 1))
17 , barrier_ (thread_count
+ 1)
28 this->barrier_
.wait ();
33 // Use the same payload over and over
34 Test::Payload
payload (this->payload_size_
);
35 payload
.length (this->payload_size_
);
37 for (CORBA::ULong j
= 0; j
!= this->payload_size_
; ++j
)
42 // Get the number of peers just once.
43 CORBA::ULong session_count
=
44 this->other_sessions_
.length ();
46 this->validate_connections ();
48 for (; i
!= this->message_count_
; ++i
)
54 "(%P|%t) Session::svc, "
55 "sending message %d\n",
59 for (CORBA::ULong j
= 0; j
!= session_count
; ++j
)
61 this->other_sessions_
[j
]->receive_payload (payload
);
66 ACE_GUARD_RETURN (TAO_SYNCH_MUTEX
, ace_mon
, this->mutex_
, -1);
67 this->active_thread_count_
--;
68 if (this->more_work ())
75 catch (const CORBA::Exception
& ex
)
78 "(%P|%t) ERROR: Session::svc, "
79 "send %d messages out of %d\n",
81 ex
._tao_print_exception ("Session::svc - ");
89 Session::validate_connections ()
91 CORBA::ULong session_count
=
92 this->other_sessions_
.length ();
93 for (CORBA::ULong i
= 0; i
!= 100; ++i
)
95 for (CORBA::ULong j
= 0; j
!= session_count
; ++j
)
99 this->other_sessions_
[j
]->ping ();
101 catch (const CORBA::Exception
&)
109 Session::start (const Test::Session_List
&other_sessions
)
111 if (other_sessions
.length () == 0)
112 throw Test::No_Peers ();
115 ACE_GUARD (TAO_SYNCH_MUTEX
, ace_mon
, this->mutex_
);
117 throw Test::Already_Running ();
119 this->other_sessions_
= other_sessions
;
121 for (CORBA::ULong i
= 0; i
!= this->thread_count_
; ++i
)
125 if (this->task_
.activate (THR_NEW_LWP
| THR_JOINABLE
, 1, 1) != -1)
128 this->active_thread_count_
++;
131 catch (const CORBA::Exception
& ex
)
133 ex
._tao_print_exception ("Session::start, ignored");
137 if (this->active_thread_count_
!= this->thread_count_
)
141 this->validate_connections ();
143 this->barrier_
.wait ();
145 if (this->running_
!= 0)
148 /// None of the threads are running, this session is useless at
149 /// this point, report the problem and destroy the local objects
159 Session::receive_payload (const Test::Payload
&the_payload
)
161 if (the_payload
.length () != this->payload_size_
)
163 ACE_ERROR ((LM_ERROR
,
164 "ERROR: (%P|%t) Session::receive_payload, "
165 "unexpected payload size (%d != %d)\n",
166 the_payload
.length (), this->payload_size_
));
170 ACE_GUARD (TAO_SYNCH_MUTEX
, ace_mon
, this->mutex_
);
171 --this->expected_messages_
;
175 verbose
= this->expected_messages_
% 500 == 0;
176 if (this->expected_messages_
< 500)
177 verbose
= (this->expected_messages_
% 100 == 0);
178 if (this->expected_messages_
< 100)
179 verbose
= (this->expected_messages_
% 10 == 0);
180 if (this->expected_messages_
< 5)
185 ACE_DEBUG ((LM_DEBUG
,
186 "(%P|%t) Session::receive_payload, "
187 "%d messages to go\n",
188 this->expected_messages_
));
192 if (this->more_work ())
202 // Make sure local resources are released
204 PortableServer::POA_var poa
=
205 this->_default_POA ();
206 PortableServer::ObjectId_var oid
=
207 poa
->servant_to_id (this);
208 poa
->deactivate_object (oid
.in ());
212 Session::more_work () const
214 if (this->expected_messages_
> 0
215 || this->active_thread_count_
> 0
216 || this->running_
== 0)
223 Session::terminate (CORBA::Boolean success
)
225 // Make sure that global resources are released
228 this->control_
->session_finished (success
);
230 catch (const CORBA::Exception
& ex
)
232 ex
._tao_print_exception ("Session::terminate, ignored");