Merge pull request #2301 from sonndinh/remove-dup-reactor-functions
[ACE_TAO.git] / TAO / tests / Big_Oneways / Session.cpp
blob5baadc17457a1897c83cb8c1b770cf44ee32e0d9
1 #include "Session.h"
2 #include "tao/debug.h"
4 Session::Session (Test::Session_Control_ptr control,
5 CORBA::ULong payload_size,
6 CORBA::ULong thread_count,
7 CORBA::ULong message_count,
8 CORBA::ULong peer_count)
9 : control_ (Test::Session_Control::_duplicate (control))
10 , running_ (0)
11 , payload_size_ (payload_size)
12 , thread_count_ (thread_count)
13 , message_count_ (message_count)
14 , active_thread_count_ (0)
15 , expected_messages_ (thread_count * message_count * (peer_count - 1))
16 , task_ (this)
17 , barrier_ (thread_count + 1)
21 Session::~Session ()
25 int
26 Session::svc ()
28 this->barrier_.wait ();
29 CORBA::ULong i = 0;
31 try
33 // Use the same payload over and over
34 Test::Payload payload (this->payload_size_);
35 payload.length (this->payload_size_);
37 for (CORBA::ULong j = 0; j != this->payload_size_; ++j)
39 payload[j] = j % 256;
42 // Get the number of peers just once.
43 CORBA::ULong session_count =
44 this->other_sessions_.length ();
46 this->validate_connections ();
48 for (; i != this->message_count_; ++i)
50 #if 0
51 if (i % 500 == 0)
53 ACE_DEBUG ((LM_DEBUG,
54 "(%P|%t) Session::svc, "
55 "sending message %d\n",
56 i));
58 #endif /* 0 */
59 for (CORBA::ULong j = 0; j != session_count; ++j)
61 this->other_sessions_[j]->receive_payload (payload);
66 ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, this->mutex_, -1);
67 this->active_thread_count_--;
68 if (this->more_work ())
70 return 0;
73 this->terminate (1);
75 catch (const CORBA::Exception& ex)
77 ACE_ERROR ((LM_ERROR,
78 "(%P|%t) ERROR: Session::svc, "
79 "send %d messages out of %d\n",
80 i, message_count_));
81 ex._tao_print_exception ("Session::svc - ");
82 return -1;
85 return 0;
88 void
89 Session::validate_connections ()
91 CORBA::ULong session_count =
92 this->other_sessions_.length ();
93 for (CORBA::ULong i = 0; i != 100; ++i)
95 for (CORBA::ULong j = 0; j != session_count; ++j)
97 try
99 this->other_sessions_[j]->ping ();
101 catch (const CORBA::Exception&)
108 void
109 Session::start (const Test::Session_List &other_sessions)
111 if (other_sessions.length () == 0)
112 throw Test::No_Peers ();
115 ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->mutex_);
116 if (this->running_)
117 throw Test::Already_Running ();
119 this->other_sessions_ = other_sessions;
121 for (CORBA::ULong i = 0; i != this->thread_count_; ++i)
125 if (this->task_.activate (THR_NEW_LWP | THR_JOINABLE, 1, 1) != -1)
127 this->running_ = 1;
128 this->active_thread_count_++;
131 catch (const CORBA::Exception& ex)
133 ex._tao_print_exception ("Session::start, ignored");
137 if (this->active_thread_count_ != this->thread_count_)
138 return;
141 this->validate_connections ();
143 this->barrier_.wait ();
145 if (this->running_ != 0)
146 return;
148 /// None of the threads are running, this session is useless at
149 /// this point, report the problem and destroy the local objects
150 this->terminate (0);
153 void
154 Session::ping ()
158 void
159 Session::receive_payload (const Test::Payload &the_payload)
161 if (the_payload.length () != this->payload_size_)
163 ACE_ERROR ((LM_ERROR,
164 "ERROR: (%P|%t) Session::receive_payload, "
165 "unexpected payload size (%d != %d)\n",
166 the_payload.length (), this->payload_size_));
170 ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->mutex_);
171 --this->expected_messages_;
173 #if 0
174 int verbose = 0;
175 verbose = this->expected_messages_ % 500 == 0;
176 if (this->expected_messages_ < 500)
177 verbose = (this->expected_messages_ % 100 == 0);
178 if (this->expected_messages_ < 100)
179 verbose = (this->expected_messages_ % 10 == 0);
180 if (this->expected_messages_ < 5)
181 verbose = 1;
183 if (verbose)
185 ACE_DEBUG ((LM_DEBUG,
186 "(%P|%t) Session::receive_payload, "
187 "%d messages to go\n",
188 this->expected_messages_));
190 #endif /* 0 */
192 if (this->more_work ())
193 return;
195 this->terminate (1);
199 void
200 Session::destroy ()
202 // Make sure local resources are released
204 PortableServer::POA_var poa =
205 this->_default_POA ();
206 PortableServer::ObjectId_var oid =
207 poa->servant_to_id (this);
208 poa->deactivate_object (oid.in ());
212 Session::more_work () const
214 if (this->expected_messages_ > 0
215 || this->active_thread_count_ > 0
216 || this->running_ == 0)
217 return 1;
219 return 0;
222 void
223 Session::terminate (CORBA::Boolean success)
225 // Make sure that global resources are released
228 this->control_->session_finished (success);
230 catch (const CORBA::Exception& ex)
232 ex._tao_print_exception ("Session::terminate, ignored");