2 //=============================================================================
4 * @file MT_Reactor_Upcall_Test.cpp
6 * This is a test that shows how to handle upcalls from the
7 * TP_Reactor and the WFMO_Reactor when the event loop is being run
10 * @author Irfan Pyarali <irfan@cs.wustl.edu>
12 //=============================================================================
15 #include "test_config.h"
16 #include "ace/OS_NS_string.h"
17 #include "ace/OS_NS_unistd.h"
18 #include "ace/Reactor.h"
19 #include "ace/TP_Reactor.h"
20 #include "ace/WFMO_Reactor.h"
21 #include "ace/Dev_Poll_Reactor.h"
24 #include "ace/Get_Opt.h"
29 int number_of_event_loop_threads
= 3;
30 int number_of_messages
= 10;
31 int sleep_time_in_msec
= 100;
33 static const char *message
= "Hello there!";
38 Guard (ACE_SYNCH_MUTEX
&lock
)
48 this->lock_
.release ();
51 ACE_SYNCH_MUTEX
&lock_
;
67 class Handler
: public ACE_Event_Handler
70 Handler (ACE_Reactor
&reactor
);
71 int handle_input (ACE_HANDLE fd
);
74 int number_of_messages_read_
;
75 ACE_SYNCH_MUTEX lock_
;
79 Handler::Handler (ACE_Reactor
&reactor
)
80 : ACE_Event_Handler (&reactor
),
81 number_of_messages_read_ (0),
85 int result
= this->pipe_
.open ();
87 ACE_ERROR ((LM_ERROR
, ACE_TEXT ("%p\n"), ACE_TEXT ("pipe open")));
90 // Register for input events.
92 this->reactor ()->register_handler (this->pipe_
.read_handle (),
94 ACE_Event_Handler::READ_MASK
);
98 ACE_TEXT ("Can't register pipe for READ")));
105 Handler::handle_input (ACE_HANDLE fd
)
107 Guard
monitor (this->lock_
);
109 // If we have been shutdown, return.
113 // Read fixed parts of message.
118 sizeof (message
.type_
));
119 if (result
!= static_cast<ssize_t
> (sizeof (message
.type_
)))
120 ACE_ERROR ((LM_ERROR
, ACE_TEXT ("(%t): read %d, %p\n"),
122 ACE_TEXT ("recv 1")));
126 sizeof (message
.size_
));
127 if (result
!= static_cast<ssize_t
> (sizeof (message
.size_
)))
128 ACE_ERROR ((LM_ERROR
, ACE_TEXT ("(%t): read %d, %p\n"),
130 ACE_TEXT ("recv 2")));
132 // On shutdown message, stop the event loop.
133 if (message
.type_
== Message::SHUTDOWN
)
135 ACE_DEBUG ((LM_DEBUG
, ACE_TEXT ("(%t) Shutdown message\n")));
139 this->reactor ()->end_reactor_event_loop ();
141 // Remove self from Reactor.
145 // Else it is a data message: read the data.
150 if (result
!= static_cast<ssize_t
> (message
.size_
))
151 ACE_ERROR ((LM_ERROR
, ACE_TEXT ("(%t): read %d, %p\n"),
153 ACE_TEXT ("recv 3")));
156 message
.data_
[result
] = '\0';
157 ACE_DEBUG ((LM_DEBUG
,
158 ACE_TEXT ("(%t) Starting to handle message %d: %s\n"),
159 this->number_of_messages_read_
+ 1,
163 // Process message (sleep).
164 ACE_OS::sleep (ACE_Time_Value (0,
165 sleep_time_in_msec
* 1000));
168 this->number_of_messages_read_
++;
170 ACE_DEBUG ((LM_DEBUG
,
171 ACE_TEXT ("(%t) Completed handling message %d\n"),
172 this->number_of_messages_read_
));
177 class Event_Loop_Task
: public ACE_Task_Base
180 Event_Loop_Task (ACE_Reactor
&reactor
);
184 ACE_Reactor
&reactor_
;
187 Event_Loop_Task::Event_Loop_Task (ACE_Reactor
&reactor
)
193 Event_Loop_Task::svc (void)
195 return this->reactor_
.run_reactor_event_loop ();
199 test_reactor_upcall (ACE_Reactor
&reactor
)
201 Handler
handler (reactor
);
202 if (handler
.shutdown_
)
204 ACE_ERROR ((LM_ERROR
, ACE_TEXT ("Error initializing test; abort.\n")));
208 Event_Loop_Task
event_loop_task (reactor
);
210 // Start up the event loop threads.
212 event_loop_task
.activate (THR_NEW_LWP
| THR_JOINABLE
,
213 number_of_event_loop_threads
);
215 ACE_ERROR ((LM_ERROR
,
217 ACE_TEXT ("test_reactor_upcall, activate")));
220 Message data_message
;
224 ACE_OS::strlen (message
);
225 ACE_OS::strcpy (data_message
.data_
, message
);
227 // Send in three pieces because the struct members may not be adjacent
230 i
< number_of_messages
;
233 // This should trigger a call to <handle_input>.
235 ACE::send_n (handler
.pipe_
.write_handle (),
237 sizeof (data_message
.type_
));
239 ACE_ERROR ((LM_ERROR
, ACE_TEXT ("(%t): %p\n"), ACE_TEXT ("send 1")));
241 ACE::send_n (handler
.pipe_
.write_handle (),
243 sizeof (data_message
.size_
));
245 ACE_ERROR ((LM_ERROR
, ACE_TEXT ("(%t): %p\n"), ACE_TEXT ("send 2")));
247 ACE::send_n (handler
.pipe_
.write_handle (),
251 ACE_ERROR ((LM_ERROR
, ACE_TEXT ("(%t): %p\n"), ACE_TEXT ("send 3")));
254 // We are done: send shutdown message.
255 Message shutdown_message
;
256 shutdown_message
.type_
=
258 shutdown_message
.size_
= 0;
260 // This should trigger a call to <handle_input>.
261 ssize_t sent
= ACE::send_n (handler
.pipe_
.write_handle (),
262 &shutdown_message
.type_
,
263 sizeof (shutdown_message
.type_
));
265 ACE_ERROR ((LM_ERROR
, ACE_TEXT ("(%t): %p\n"), ACE_TEXT ("send 4")));
266 sent
= ACE::send_n (handler
.pipe_
.write_handle (),
267 &shutdown_message
.size_
,
268 sizeof (shutdown_message
.size_
));
270 ACE_ERROR ((LM_ERROR
, ACE_TEXT ("(%t): %p\n"), ACE_TEXT ("send 5")));
272 // Wait for the event loop tasks to exit.
273 event_loop_task
.wait ();
277 parse_args (int argc
, ACE_TCHAR
*argv
[])
279 ACE_Get_Opt
get_opt (argc
, argv
, ACE_TEXT ("t:m:s:l:"));
283 while ((c
= get_opt ()) != -1)
287 number_of_event_loop_threads
=
288 ACE_OS::atoi (get_opt
.opt_arg ());
292 ACE_OS::atoi (get_opt
.opt_arg ());
296 ACE_OS::atoi (get_opt
.opt_arg ());
300 ACE_OS::atoi (get_opt
.opt_arg ());
305 ACE_TEXT ("usage: %s\n")
306 ACE_TEXT ("\t-m <number of messages> (defaults to %d)\n")
307 ACE_TEXT ("\t-t <number of event loop threads> (defaults to %d)\n")
308 ACE_TEXT ("\t-s <sleep time in msec> (defaults to %d)\n")
309 ACE_TEXT ("\t-l <lock upcall> (defaults to %d)\n")
313 number_of_event_loop_threads
,
323 run_main (int argc
, ACE_TCHAR
*argv
[])
325 ACE_START_TEST (ACE_TEXT ("MT_Reactor_Upcall_Test"));
327 #if defined (ACE_HAS_THREADS)
329 // ACE_LOG_MSG->clr_flags (ACE_Log_Msg::VERBOSE_LITE);
332 parse_args (argc
, argv
);
337 ACE_DEBUG ((LM_DEBUG
, ACE_TEXT ("Testing TP Reactor\n")));
339 ACE_TP_Reactor tp_reactor_impl
;
340 ACE_Reactor
tp_reactor (&tp_reactor_impl
);
342 test_reactor_upcall (tp_reactor
);
344 #if defined (ACE_HAS_EVENT_POLL)
346 ACE_DEBUG ((LM_DEBUG
, ACE_TEXT ("Testing Dev Poll Reactor\n")));
348 ACE_Dev_Poll_Reactor dev_poll_reactor_impl
;
349 dev_poll_reactor_impl
.restart (true);
350 ACE_Reactor
dev_poll_reactor (&dev_poll_reactor_impl
);
352 test_reactor_upcall (dev_poll_reactor
);
354 #endif /* ACE_HAS_EVENT_POLL */
356 #if defined (ACE_WIN32) && \
357 (defined (ACE_HAS_WINSOCK2) && (ACE_HAS_WINSOCK2 != 0))
359 ACE_DEBUG ((LM_DEBUG
, ACE_TEXT ("Testing WFMO Reactor\n")));
361 ACE_WFMO_Reactor wfmo_reactor_impl
;
362 ACE_Reactor
wfmo_reactor (&wfmo_reactor_impl
);
364 test_reactor_upcall (wfmo_reactor
);
366 #endif /* ACE_WIN32 && ACE_HAS_WINSOCK2 */
368 #else /* ACE_HAS_THREADS */
369 ACE_UNUSED_ARG(argc
);
370 ACE_UNUSED_ARG(argv
);
373 ACE_TEXT ("threads not supported on this platform\n")));
375 #endif /* ACE_HAS_THREADS */