Changes to attempt to silence bcc64x
[ACE_TAO.git] / ACE / tests / MT_Reactor_Upcall_Test.cpp
blob7b850bb1d2ab2d397d1ffe3b578fc4cda7cb22bf
2 //=============================================================================
3 /**
4 * @file MT_Reactor_Upcall_Test.cpp
6 * This is a test that shows how to handle upcalls from the
7 * TP_Reactor and the WFMO_Reactor when the event loop is being run
8 * by multiple threads.
10 * @author Irfan Pyarali <irfan@cs.wustl.edu>
12 //=============================================================================
15 #include "test_config.h"
16 #include "ace/OS_NS_string.h"
17 #include "ace/OS_NS_unistd.h"
18 #include "ace/Reactor.h"
19 #include "ace/TP_Reactor.h"
20 #include "ace/WFMO_Reactor.h"
21 #include "ace/Dev_Poll_Reactor.h"
22 #include "ace/Pipe.h"
23 #include "ace/Task.h"
24 #include "ace/Get_Opt.h"
25 #include "ace/ACE.h"
28 int number_of_event_loop_threads = 3;
29 int number_of_messages = 10;
30 int sleep_time_in_msec = 100;
31 int lock_upcall = 1;
32 static const char *message = "Hello there!";
34 class Guard
36 public:
37 Guard (ACE_SYNCH_MUTEX &lock)
38 : lock_ (lock)
40 if (lock_upcall)
41 lock.acquire ();
44 ~Guard ()
46 if (lock_upcall)
47 this->lock_.release ();
50 ACE_SYNCH_MUTEX &lock_;
53 struct Message
55 enum Type
57 DATA,
58 SHUTDOWN
61 Type type_;
62 size_t size_;
63 char data_[BUFSIZ];
66 class Handler : public ACE_Event_Handler
68 public:
69 Handler (ACE_Reactor &reactor);
70 int handle_input (ACE_HANDLE fd) override;
72 ACE_Pipe pipe_;
73 int number_of_messages_read_;
74 ACE_SYNCH_MUTEX lock_;
75 int shutdown_;
78 Handler::Handler (ACE_Reactor &reactor)
79 : ACE_Event_Handler (&reactor),
80 number_of_messages_read_ (0),
81 shutdown_ (1)
83 // Create the pipe.
84 int result = this->pipe_.open ();
85 if (result != 0)
86 ACE_ERROR ((LM_ERROR, ACE_TEXT ("%p\n"), ACE_TEXT ("pipe open")));
87 else
89 // Register for input events.
90 result =
91 this->reactor ()->register_handler (this->pipe_.read_handle (),
92 this,
93 ACE_Event_Handler::READ_MASK);
94 if (result != 0)
95 ACE_ERROR ((LM_ERROR,
96 ACE_TEXT ("%p\n"),
97 ACE_TEXT ("Can't register pipe for READ")));
98 else
99 this->shutdown_ = 0;
104 Handler::handle_input (ACE_HANDLE fd)
106 Guard monitor (this->lock_);
108 // If we have been shutdown, return.
109 if (this->shutdown_)
110 return 0;
112 // Read fixed parts of message.
113 Message message;
114 ssize_t result =
115 ACE::recv_n (fd,
116 &message.type_,
117 sizeof (message.type_));
118 if (result != static_cast<ssize_t> (sizeof (message.type_)))
119 ACE_ERROR ((LM_ERROR, ACE_TEXT ("(%t): read %d, %p\n"),
120 result,
121 ACE_TEXT ("recv 1")));
122 result =
123 ACE::recv_n (fd,
124 &message.size_,
125 sizeof (message.size_));
126 if (result != static_cast<ssize_t> (sizeof (message.size_)))
127 ACE_ERROR ((LM_ERROR, ACE_TEXT ("(%t): read %d, %p\n"),
128 result,
129 ACE_TEXT ("recv 2")));
131 // On shutdown message, stop the event loop.
132 if (message.type_ == Message::SHUTDOWN)
134 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) Shutdown message\n")));
136 this->shutdown_ = 1;
138 this->reactor ()->end_reactor_event_loop ();
140 // Remove self from Reactor.
141 return -1;
144 // Else it is a data message: read the data.
145 result =
146 ACE::recv_n (fd,
147 &message.data_,
148 message.size_);
149 if (result != static_cast<ssize_t> (message.size_))
150 ACE_ERROR ((LM_ERROR, ACE_TEXT ("(%t): read %d, %p\n"),
151 result,
152 ACE_TEXT ("recv 3")));
153 else
155 message.data_[result] = '\0';
156 ACE_DEBUG ((LM_DEBUG,
157 ACE_TEXT ("(%t) Starting to handle message %d: %s\n"),
158 this->number_of_messages_read_ + 1,
159 message.data_));
162 // Process message (sleep).
163 ACE_OS::sleep (ACE_Time_Value (0,
164 sleep_time_in_msec * 1000));
166 // Keep count.
167 this->number_of_messages_read_++;
169 ACE_DEBUG ((LM_DEBUG,
170 ACE_TEXT ("(%t) Completed handling message %d\n"),
171 this->number_of_messages_read_));
173 return 0;
176 class Event_Loop_Task : public ACE_Task_Base
178 public:
179 Event_Loop_Task (ACE_Reactor &reactor);
180 int svc () override;
182 private:
183 ACE_Reactor &reactor_;
186 Event_Loop_Task::Event_Loop_Task (ACE_Reactor &reactor)
187 : reactor_ (reactor)
192 Event_Loop_Task::svc ()
194 return this->reactor_.run_reactor_event_loop ();
197 void
198 test_reactor_upcall (ACE_Reactor &reactor)
200 Handler handler (reactor);
201 if (handler.shutdown_)
203 ACE_ERROR ((LM_ERROR, ACE_TEXT ("Error initializing test; abort.\n")));
204 return;
207 Event_Loop_Task event_loop_task (reactor);
209 // Start up the event loop threads.
210 int result =
211 event_loop_task.activate (THR_NEW_LWP | THR_JOINABLE,
212 number_of_event_loop_threads);
213 if (result != 0)
214 ACE_ERROR ((LM_ERROR,
215 ACE_TEXT ("%p\n"),
216 ACE_TEXT ("test_reactor_upcall, activate")));
218 // Data message.
219 Message data_message;
220 data_message.type_ =
221 Message::DATA;
222 data_message.size_ =
223 ACE_OS::strlen (message);
224 ACE_OS::strcpy (data_message.data_, message);
226 // Send in three pieces because the struct members may not be adjacent
227 // in memory.
228 for (int i = 0;
229 i < number_of_messages;
230 ++i)
232 // This should trigger a call to <handle_input>.
233 ssize_t sent =
234 ACE::send_n (handler.pipe_.write_handle (),
235 &data_message.type_,
236 sizeof (data_message.type_));
237 if (sent == -1)
238 ACE_ERROR ((LM_ERROR, ACE_TEXT ("(%t): %p\n"), ACE_TEXT ("send 1")));
239 sent =
240 ACE::send_n (handler.pipe_.write_handle (),
241 &data_message.size_,
242 sizeof (data_message.size_));
243 if (sent == -1)
244 ACE_ERROR ((LM_ERROR, ACE_TEXT ("(%t): %p\n"), ACE_TEXT ("send 2")));
245 sent =
246 ACE::send_n (handler.pipe_.write_handle (),
247 &data_message.data_,
248 data_message.size_);
249 if (sent == -1)
250 ACE_ERROR ((LM_ERROR, ACE_TEXT ("(%t): %p\n"), ACE_TEXT ("send 3")));
253 // We are done: send shutdown message.
254 Message shutdown_message;
255 shutdown_message.type_ =
256 Message::SHUTDOWN;
257 shutdown_message.size_ = 0;
259 // This should trigger a call to <handle_input>.
260 ssize_t sent = ACE::send_n (handler.pipe_.write_handle (),
261 &shutdown_message.type_,
262 sizeof (shutdown_message.type_));
263 if (sent == -1)
264 ACE_ERROR ((LM_ERROR, ACE_TEXT ("(%t): %p\n"), ACE_TEXT ("send 4")));
265 sent = ACE::send_n (handler.pipe_.write_handle (),
266 &shutdown_message.size_,
267 sizeof (shutdown_message.size_));
268 if (sent == -1)
269 ACE_ERROR ((LM_ERROR, ACE_TEXT ("(%t): %p\n"), ACE_TEXT ("send 5")));
271 // Wait for the event loop tasks to exit.
272 event_loop_task.wait ();
276 parse_args (int argc, ACE_TCHAR *argv[])
278 ACE_Get_Opt get_opt (argc, argv, ACE_TEXT ("t:m:s:l:"));
280 int c;
282 while ((c = get_opt ()) != -1)
283 switch (c)
285 case 't':
286 number_of_event_loop_threads =
287 ACE_OS::atoi (get_opt.opt_arg ());
288 break;
289 case 'm':
290 number_of_messages =
291 ACE_OS::atoi (get_opt.opt_arg ());
292 break;
293 case 's':
294 sleep_time_in_msec =
295 ACE_OS::atoi (get_opt.opt_arg ());
296 break;
297 case 'l':
298 lock_upcall =
299 ACE_OS::atoi (get_opt.opt_arg ());
300 break;
301 default:
302 ACE_ERROR_RETURN
303 ((LM_ERROR,
304 ACE_TEXT ("usage: %s\n")
305 ACE_TEXT ("\t-m <number of messages> (defaults to %d)\n")
306 ACE_TEXT ("\t-t <number of event loop threads> (defaults to %d)\n")
307 ACE_TEXT ("\t-s <sleep time in msec> (defaults to %d)\n")
308 ACE_TEXT ("\t-l <lock upcall> (defaults to %d)\n")
309 ACE_TEXT ("\n"),
310 argv [0],
311 number_of_messages,
312 number_of_event_loop_threads,
313 sleep_time_in_msec,
314 lock_upcall),
315 -1);
318 return 0;
322 run_main (int argc, ACE_TCHAR *argv[])
324 ACE_START_TEST (ACE_TEXT ("MT_Reactor_Upcall_Test"));
326 #if defined (ACE_HAS_THREADS)
328 // ACE_LOG_MSG->clr_flags (ACE_Log_Msg::VERBOSE_LITE);
330 int result =
331 parse_args (argc, argv);
333 if (result != 0)
334 return result;
336 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("Testing TP Reactor\n")));
338 ACE_TP_Reactor tp_reactor_impl;
339 ACE_Reactor tp_reactor (&tp_reactor_impl);
341 test_reactor_upcall (tp_reactor);
343 #if defined (ACE_HAS_EVENT_POLL)
345 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("Testing Dev Poll Reactor\n")));
347 ACE_Dev_Poll_Reactor dev_poll_reactor_impl;
348 dev_poll_reactor_impl.restart (true);
349 ACE_Reactor dev_poll_reactor (&dev_poll_reactor_impl);
351 test_reactor_upcall (dev_poll_reactor);
353 #endif /* ACE_HAS_EVENT_POLL */
355 #if defined (ACE_WIN32) && \
356 (defined (ACE_HAS_WINSOCK2) && (ACE_HAS_WINSOCK2 != 0))
358 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("Testing WFMO Reactor\n")));
360 ACE_WFMO_Reactor wfmo_reactor_impl;
361 ACE_Reactor wfmo_reactor (&wfmo_reactor_impl);
363 test_reactor_upcall (wfmo_reactor);
365 #endif /* ACE_WIN32 && ACE_HAS_WINSOCK2 */
367 #else /* ACE_HAS_THREADS */
368 ACE_UNUSED_ARG(argc);
369 ACE_UNUSED_ARG(argv);
371 ACE_ERROR ((LM_INFO,
372 ACE_TEXT ("threads not supported on this platform\n")));
374 #endif /* ACE_HAS_THREADS */
376 ACE_END_TEST;
377 return 0;