Doxygen changes
[ACE_TAO.git] / ACE / tests / MT_Reactor_Upcall_Test.cpp
blob65aaa050b78ec706a12e935a124be092f7796686
2 //=============================================================================
3 /**
4 * @file MT_Reactor_Upcall_Test.cpp
6 * This is a test that shows how to handle upcalls from the
7 * TP_Reactor and the WFMO_Reactor when the event loop is being run
8 * by multiple threads.
10 * @author Irfan Pyarali <irfan@cs.wustl.edu>
12 //=============================================================================
15 #include "test_config.h"
16 #include "ace/OS_NS_string.h"
17 #include "ace/OS_NS_unistd.h"
18 #include "ace/Reactor.h"
19 #include "ace/TP_Reactor.h"
20 #include "ace/WFMO_Reactor.h"
21 #include "ace/Dev_Poll_Reactor.h"
22 #include "ace/Pipe.h"
23 #include "ace/Task.h"
24 #include "ace/Get_Opt.h"
25 #include "ace/ACE.h"
29 int number_of_event_loop_threads = 3;
30 int number_of_messages = 10;
31 int sleep_time_in_msec = 100;
32 int lock_upcall = 1;
33 static const char *message = "Hello there!";
35 class Guard
37 public:
38 Guard (ACE_SYNCH_MUTEX &lock)
39 : lock_ (lock)
41 if (lock_upcall)
42 lock.acquire ();
45 ~Guard (void)
47 if (lock_upcall)
48 this->lock_.release ();
51 ACE_SYNCH_MUTEX &lock_;
54 struct Message
56 enum Type
58 DATA,
59 SHUTDOWN
62 Type type_;
63 size_t size_;
64 char data_[BUFSIZ];
67 class Handler : public ACE_Event_Handler
69 public:
70 Handler (ACE_Reactor &reactor);
71 int handle_input (ACE_HANDLE fd);
73 ACE_Pipe pipe_;
74 int number_of_messages_read_;
75 ACE_SYNCH_MUTEX lock_;
76 int shutdown_;
79 Handler::Handler (ACE_Reactor &reactor)
80 : ACE_Event_Handler (&reactor),
81 number_of_messages_read_ (0),
82 shutdown_ (1)
84 // Create the pipe.
85 int result = this->pipe_.open ();
86 if (result != 0)
87 ACE_ERROR ((LM_ERROR, ACE_TEXT ("%p\n"), ACE_TEXT ("pipe open")));
88 else
90 // Register for input events.
91 result =
92 this->reactor ()->register_handler (this->pipe_.read_handle (),
93 this,
94 ACE_Event_Handler::READ_MASK);
95 if (result != 0)
96 ACE_ERROR ((LM_ERROR,
97 ACE_TEXT ("%p\n"),
98 ACE_TEXT ("Can't register pipe for READ")));
99 else
100 this->shutdown_ = 0;
105 Handler::handle_input (ACE_HANDLE fd)
107 Guard monitor (this->lock_);
109 // If we have been shutdown, return.
110 if (this->shutdown_)
111 return 0;
113 // Read fixed parts of message.
114 Message message;
115 ssize_t result =
116 ACE::recv_n (fd,
117 &message.type_,
118 sizeof (message.type_));
119 if (result != static_cast<ssize_t> (sizeof (message.type_)))
120 ACE_ERROR ((LM_ERROR, ACE_TEXT ("(%t): read %d, %p\n"),
121 result,
122 ACE_TEXT ("recv 1")));
123 result =
124 ACE::recv_n (fd,
125 &message.size_,
126 sizeof (message.size_));
127 if (result != static_cast<ssize_t> (sizeof (message.size_)))
128 ACE_ERROR ((LM_ERROR, ACE_TEXT ("(%t): read %d, %p\n"),
129 result,
130 ACE_TEXT ("recv 2")));
132 // On shutdown message, stop the event loop.
133 if (message.type_ == Message::SHUTDOWN)
135 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) Shutdown message\n")));
137 this->shutdown_ = 1;
139 this->reactor ()->end_reactor_event_loop ();
141 // Remove self from Reactor.
142 return -1;
145 // Else it is a data message: read the data.
146 result =
147 ACE::recv_n (fd,
148 &message.data_,
149 message.size_);
150 if (result != static_cast<ssize_t> (message.size_))
151 ACE_ERROR ((LM_ERROR, ACE_TEXT ("(%t): read %d, %p\n"),
152 result,
153 ACE_TEXT ("recv 3")));
154 else
156 message.data_[result] = '\0';
157 ACE_DEBUG ((LM_DEBUG,
158 ACE_TEXT ("(%t) Starting to handle message %d: %s\n"),
159 this->number_of_messages_read_ + 1,
160 message.data_));
163 // Process message (sleep).
164 ACE_OS::sleep (ACE_Time_Value (0,
165 sleep_time_in_msec * 1000));
167 // Keep count.
168 this->number_of_messages_read_++;
170 ACE_DEBUG ((LM_DEBUG,
171 ACE_TEXT ("(%t) Completed handling message %d\n"),
172 this->number_of_messages_read_));
174 return 0;
177 class Event_Loop_Task : public ACE_Task_Base
179 public:
180 Event_Loop_Task (ACE_Reactor &reactor);
181 int svc (void);
183 private:
184 ACE_Reactor &reactor_;
187 Event_Loop_Task::Event_Loop_Task (ACE_Reactor &reactor)
188 : reactor_ (reactor)
193 Event_Loop_Task::svc (void)
195 return this->reactor_.run_reactor_event_loop ();
198 void
199 test_reactor_upcall (ACE_Reactor &reactor)
201 Handler handler (reactor);
202 if (handler.shutdown_)
204 ACE_ERROR ((LM_ERROR, ACE_TEXT ("Error initializing test; abort.\n")));
205 return;
208 Event_Loop_Task event_loop_task (reactor);
210 // Start up the event loop threads.
211 int result =
212 event_loop_task.activate (THR_NEW_LWP | THR_JOINABLE,
213 number_of_event_loop_threads);
214 if (result != 0)
215 ACE_ERROR ((LM_ERROR,
216 ACE_TEXT ("%p\n"),
217 ACE_TEXT ("test_reactor_upcall, activate")));
219 // Data message.
220 Message data_message;
221 data_message.type_ =
222 Message::DATA;
223 data_message.size_ =
224 ACE_OS::strlen (message);
225 ACE_OS::strcpy (data_message.data_, message);
227 // Send in three pieces because the struct members may not be adjacent
228 // in memory.
229 for (int i = 0;
230 i < number_of_messages;
231 ++i)
233 // This should trigger a call to <handle_input>.
234 ssize_t sent =
235 ACE::send_n (handler.pipe_.write_handle (),
236 &data_message.type_,
237 sizeof (data_message.type_));
238 if (sent == -1)
239 ACE_ERROR ((LM_ERROR, ACE_TEXT ("(%t): %p\n"), ACE_TEXT ("send 1")));
240 sent =
241 ACE::send_n (handler.pipe_.write_handle (),
242 &data_message.size_,
243 sizeof (data_message.size_));
244 if (sent == -1)
245 ACE_ERROR ((LM_ERROR, ACE_TEXT ("(%t): %p\n"), ACE_TEXT ("send 2")));
246 sent =
247 ACE::send_n (handler.pipe_.write_handle (),
248 &data_message.data_,
249 data_message.size_);
250 if (sent == -1)
251 ACE_ERROR ((LM_ERROR, ACE_TEXT ("(%t): %p\n"), ACE_TEXT ("send 3")));
254 // We are done: send shutdown message.
255 Message shutdown_message;
256 shutdown_message.type_ =
257 Message::SHUTDOWN;
258 shutdown_message.size_ = 0;
260 // This should trigger a call to <handle_input>.
261 ssize_t sent = ACE::send_n (handler.pipe_.write_handle (),
262 &shutdown_message.type_,
263 sizeof (shutdown_message.type_));
264 if (sent == -1)
265 ACE_ERROR ((LM_ERROR, ACE_TEXT ("(%t): %p\n"), ACE_TEXT ("send 4")));
266 sent = ACE::send_n (handler.pipe_.write_handle (),
267 &shutdown_message.size_,
268 sizeof (shutdown_message.size_));
269 if (sent == -1)
270 ACE_ERROR ((LM_ERROR, ACE_TEXT ("(%t): %p\n"), ACE_TEXT ("send 5")));
272 // Wait for the event loop tasks to exit.
273 event_loop_task.wait ();
277 parse_args (int argc, ACE_TCHAR *argv[])
279 ACE_Get_Opt get_opt (argc, argv, ACE_TEXT ("t:m:s:l:"));
281 int c;
283 while ((c = get_opt ()) != -1)
284 switch (c)
286 case 't':
287 number_of_event_loop_threads =
288 ACE_OS::atoi (get_opt.opt_arg ());
289 break;
290 case 'm':
291 number_of_messages =
292 ACE_OS::atoi (get_opt.opt_arg ());
293 break;
294 case 's':
295 sleep_time_in_msec =
296 ACE_OS::atoi (get_opt.opt_arg ());
297 break;
298 case 'l':
299 lock_upcall =
300 ACE_OS::atoi (get_opt.opt_arg ());
301 break;
302 default:
303 ACE_ERROR_RETURN
304 ((LM_ERROR,
305 ACE_TEXT ("usage: %s\n")
306 ACE_TEXT ("\t-m <number of messages> (defaults to %d)\n")
307 ACE_TEXT ("\t-t <number of event loop threads> (defaults to %d)\n")
308 ACE_TEXT ("\t-s <sleep time in msec> (defaults to %d)\n")
309 ACE_TEXT ("\t-l <lock upcall> (defaults to %d)\n")
310 ACE_TEXT ("\n"),
311 argv [0],
312 number_of_messages,
313 number_of_event_loop_threads,
314 sleep_time_in_msec,
315 lock_upcall),
316 -1);
319 return 0;
323 run_main (int argc, ACE_TCHAR *argv[])
325 ACE_START_TEST (ACE_TEXT ("MT_Reactor_Upcall_Test"));
327 #if defined (ACE_HAS_THREADS)
329 // ACE_LOG_MSG->clr_flags (ACE_Log_Msg::VERBOSE_LITE);
331 int result =
332 parse_args (argc, argv);
334 if (result != 0)
335 return result;
337 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("Testing TP Reactor\n")));
339 ACE_TP_Reactor tp_reactor_impl;
340 ACE_Reactor tp_reactor (&tp_reactor_impl);
342 test_reactor_upcall (tp_reactor);
344 #if defined (ACE_HAS_EVENT_POLL)
346 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("Testing Dev Poll Reactor\n")));
348 ACE_Dev_Poll_Reactor dev_poll_reactor_impl;
349 dev_poll_reactor_impl.restart (true);
350 ACE_Reactor dev_poll_reactor (&dev_poll_reactor_impl);
352 test_reactor_upcall (dev_poll_reactor);
354 #endif /* ACE_HAS_EVENT_POLL */
356 #if defined (ACE_WIN32) && \
357 (defined (ACE_HAS_WINSOCK2) && (ACE_HAS_WINSOCK2 != 0))
359 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("Testing WFMO Reactor\n")));
361 ACE_WFMO_Reactor wfmo_reactor_impl;
362 ACE_Reactor wfmo_reactor (&wfmo_reactor_impl);
364 test_reactor_upcall (wfmo_reactor);
366 #endif /* ACE_WIN32 && ACE_HAS_WINSOCK2 */
368 #else /* ACE_HAS_THREADS */
369 ACE_UNUSED_ARG(argc);
370 ACE_UNUSED_ARG(argv);
372 ACE_ERROR ((LM_INFO,
373 ACE_TEXT ("threads not supported on this platform\n")));
375 #endif /* ACE_HAS_THREADS */
377 ACE_END_TEST;
378 return 0;