2 //=============================================================================
4 * @file MT_Reactor_Upcall_Test.cpp
6 * This is a test that shows how to handle upcalls from the
7 * TP_Reactor and the WFMO_Reactor when the event loop is being run
10 * @author Irfan Pyarali <irfan@cs.wustl.edu>
12 //=============================================================================
15 #include "test_config.h"
16 #include "ace/OS_NS_string.h"
17 #include "ace/OS_NS_unistd.h"
18 #include "ace/Reactor.h"
19 #include "ace/TP_Reactor.h"
20 #include "ace/WFMO_Reactor.h"
21 #include "ace/Dev_Poll_Reactor.h"
24 #include "ace/Get_Opt.h"
28 int number_of_event_loop_threads
= 3;
29 int number_of_messages
= 10;
30 int sleep_time_in_msec
= 100;
32 static const char *message
= "Hello there!";
37 Guard (ACE_SYNCH_MUTEX
&lock
)
47 this->lock_
.release ();
50 ACE_SYNCH_MUTEX
&lock_
;
66 class Handler
: public ACE_Event_Handler
69 Handler (ACE_Reactor
&reactor
);
70 int handle_input (ACE_HANDLE fd
) override
;
73 int number_of_messages_read_
;
74 ACE_SYNCH_MUTEX lock_
;
78 Handler::Handler (ACE_Reactor
&reactor
)
79 : ACE_Event_Handler (&reactor
),
80 number_of_messages_read_ (0),
84 int result
= this->pipe_
.open ();
86 ACE_ERROR ((LM_ERROR
, ACE_TEXT ("%p\n"), ACE_TEXT ("pipe open")));
89 // Register for input events.
91 this->reactor ()->register_handler (this->pipe_
.read_handle (),
93 ACE_Event_Handler::READ_MASK
);
97 ACE_TEXT ("Can't register pipe for READ")));
104 Handler::handle_input (ACE_HANDLE fd
)
106 Guard
monitor (this->lock_
);
108 // If we have been shutdown, return.
112 // Read fixed parts of message.
117 sizeof (message
.type_
));
118 if (result
!= static_cast<ssize_t
> (sizeof (message
.type_
)))
119 ACE_ERROR ((LM_ERROR
, ACE_TEXT ("(%t): read %d, %p\n"),
121 ACE_TEXT ("recv 1")));
125 sizeof (message
.size_
));
126 if (result
!= static_cast<ssize_t
> (sizeof (message
.size_
)))
127 ACE_ERROR ((LM_ERROR
, ACE_TEXT ("(%t): read %d, %p\n"),
129 ACE_TEXT ("recv 2")));
131 // On shutdown message, stop the event loop.
132 if (message
.type_
== Message::SHUTDOWN
)
134 ACE_DEBUG ((LM_DEBUG
, ACE_TEXT ("(%t) Shutdown message\n")));
138 this->reactor ()->end_reactor_event_loop ();
140 // Remove self from Reactor.
144 // Else it is a data message: read the data.
149 if (result
!= static_cast<ssize_t
> (message
.size_
))
150 ACE_ERROR ((LM_ERROR
, ACE_TEXT ("(%t): read %d, %p\n"),
152 ACE_TEXT ("recv 3")));
155 message
.data_
[result
] = '\0';
156 ACE_DEBUG ((LM_DEBUG
,
157 ACE_TEXT ("(%t) Starting to handle message %d: %s\n"),
158 this->number_of_messages_read_
+ 1,
162 // Process message (sleep).
163 ACE_OS::sleep (ACE_Time_Value (0,
164 sleep_time_in_msec
* 1000));
167 this->number_of_messages_read_
++;
169 ACE_DEBUG ((LM_DEBUG
,
170 ACE_TEXT ("(%t) Completed handling message %d\n"),
171 this->number_of_messages_read_
));
176 class Event_Loop_Task
: public ACE_Task_Base
179 Event_Loop_Task (ACE_Reactor
&reactor
);
183 ACE_Reactor
&reactor_
;
186 Event_Loop_Task::Event_Loop_Task (ACE_Reactor
&reactor
)
192 Event_Loop_Task::svc ()
194 return this->reactor_
.run_reactor_event_loop ();
198 test_reactor_upcall (ACE_Reactor
&reactor
)
200 Handler
handler (reactor
);
201 if (handler
.shutdown_
)
203 ACE_ERROR ((LM_ERROR
, ACE_TEXT ("Error initializing test; abort.\n")));
207 Event_Loop_Task
event_loop_task (reactor
);
209 // Start up the event loop threads.
211 event_loop_task
.activate (THR_NEW_LWP
| THR_JOINABLE
,
212 number_of_event_loop_threads
);
214 ACE_ERROR ((LM_ERROR
,
216 ACE_TEXT ("test_reactor_upcall, activate")));
219 Message data_message
;
223 ACE_OS::strlen (message
);
224 ACE_OS::strcpy (data_message
.data_
, message
);
226 // Send in three pieces because the struct members may not be adjacent
229 i
< number_of_messages
;
232 // This should trigger a call to <handle_input>.
234 ACE::send_n (handler
.pipe_
.write_handle (),
236 sizeof (data_message
.type_
));
238 ACE_ERROR ((LM_ERROR
, ACE_TEXT ("(%t): %p\n"), ACE_TEXT ("send 1")));
240 ACE::send_n (handler
.pipe_
.write_handle (),
242 sizeof (data_message
.size_
));
244 ACE_ERROR ((LM_ERROR
, ACE_TEXT ("(%t): %p\n"), ACE_TEXT ("send 2")));
246 ACE::send_n (handler
.pipe_
.write_handle (),
250 ACE_ERROR ((LM_ERROR
, ACE_TEXT ("(%t): %p\n"), ACE_TEXT ("send 3")));
253 // We are done: send shutdown message.
254 Message shutdown_message
;
255 shutdown_message
.type_
=
257 shutdown_message
.size_
= 0;
259 // This should trigger a call to <handle_input>.
260 ssize_t sent
= ACE::send_n (handler
.pipe_
.write_handle (),
261 &shutdown_message
.type_
,
262 sizeof (shutdown_message
.type_
));
264 ACE_ERROR ((LM_ERROR
, ACE_TEXT ("(%t): %p\n"), ACE_TEXT ("send 4")));
265 sent
= ACE::send_n (handler
.pipe_
.write_handle (),
266 &shutdown_message
.size_
,
267 sizeof (shutdown_message
.size_
));
269 ACE_ERROR ((LM_ERROR
, ACE_TEXT ("(%t): %p\n"), ACE_TEXT ("send 5")));
271 // Wait for the event loop tasks to exit.
272 event_loop_task
.wait ();
276 parse_args (int argc
, ACE_TCHAR
*argv
[])
278 ACE_Get_Opt
get_opt (argc
, argv
, ACE_TEXT ("t:m:s:l:"));
282 while ((c
= get_opt ()) != -1)
286 number_of_event_loop_threads
=
287 ACE_OS::atoi (get_opt
.opt_arg ());
291 ACE_OS::atoi (get_opt
.opt_arg ());
295 ACE_OS::atoi (get_opt
.opt_arg ());
299 ACE_OS::atoi (get_opt
.opt_arg ());
304 ACE_TEXT ("usage: %s\n")
305 ACE_TEXT ("\t-m <number of messages> (defaults to %d)\n")
306 ACE_TEXT ("\t-t <number of event loop threads> (defaults to %d)\n")
307 ACE_TEXT ("\t-s <sleep time in msec> (defaults to %d)\n")
308 ACE_TEXT ("\t-l <lock upcall> (defaults to %d)\n")
312 number_of_event_loop_threads
,
322 run_main (int argc
, ACE_TCHAR
*argv
[])
324 ACE_START_TEST (ACE_TEXT ("MT_Reactor_Upcall_Test"));
326 #if defined (ACE_HAS_THREADS)
328 // ACE_LOG_MSG->clr_flags (ACE_Log_Msg::VERBOSE_LITE);
331 parse_args (argc
, argv
);
336 ACE_DEBUG ((LM_DEBUG
, ACE_TEXT ("Testing TP Reactor\n")));
338 ACE_TP_Reactor tp_reactor_impl
;
339 ACE_Reactor
tp_reactor (&tp_reactor_impl
);
341 test_reactor_upcall (tp_reactor
);
343 #if defined (ACE_HAS_EVENT_POLL)
345 ACE_DEBUG ((LM_DEBUG
, ACE_TEXT ("Testing Dev Poll Reactor\n")));
347 ACE_Dev_Poll_Reactor dev_poll_reactor_impl
;
348 dev_poll_reactor_impl
.restart (true);
349 ACE_Reactor
dev_poll_reactor (&dev_poll_reactor_impl
);
351 test_reactor_upcall (dev_poll_reactor
);
353 #endif /* ACE_HAS_EVENT_POLL */
355 #if defined (ACE_WIN32) && \
356 (defined (ACE_HAS_WINSOCK2) && (ACE_HAS_WINSOCK2 != 0))
358 ACE_DEBUG ((LM_DEBUG
, ACE_TEXT ("Testing WFMO Reactor\n")));
360 ACE_WFMO_Reactor wfmo_reactor_impl
;
361 ACE_Reactor
wfmo_reactor (&wfmo_reactor_impl
);
363 test_reactor_upcall (wfmo_reactor
);
365 #endif /* ACE_WIN32 && ACE_HAS_WINSOCK2 */
367 #else /* ACE_HAS_THREADS */
368 ACE_UNUSED_ARG(argc
);
369 ACE_UNUSED_ARG(argv
);
372 ACE_TEXT ("threads not supported on this platform\n")));
374 #endif /* ACE_HAS_THREADS */