Merge pull request #1844 from jrw972/monterey
[ACE_TAO.git] / ACE / tests / Reactors_Test.cpp
blobdad89ebb4a0bd7141b207386b0e1fe242e322d7c
2 //=============================================================================
3 /**
4 * @file Reactors_Test.cpp
6 * This is a test that performs a torture test of multiple
7 * <ACE_Reactors> and <ACE_Tasks> in the same process.
9 * @author Prashant Jain <pjain@cs.wustl.edu>
10 * @author Detlef Becker <Detlef.Becker@med.siemens.de>
11 * @author and Douglas C. Schmidt <d.schmidt@vanderbilt.edu>
13 //=============================================================================
15 #include "test_config.h"
16 #include "ace/Task.h"
17 #include "ace/Reactor.h"
18 #include "ace/Atomic_Op.h"
19 #include "ace/Recursive_Thread_Mutex.h"
21 #if defined (ACE_HAS_THREADS)
23 ACE_Thread_Manager *thr_mgr;
25 static const int MAX_TASKS = 20;
27 /**
28 * @class Test_Task
30 * @brief Exercise the tasks.
32 class Test_Task : public ACE_Task<ACE_MT_SYNCH>
34 public:
35 Test_Task (void);
36 ~Test_Task (void);
38 //FUZZ: disable check_for_lack_ACE_OS
39 // = Task hooks.
40 ///FUZZ: enable check_for_lack_ACE_OS
41 virtual int open (void *args = 0);
42 virtual int close (u_long flags = 0);
43 virtual int svc (void);
45 // = Event Handler hooks.
46 virtual int handle_input (ACE_HANDLE handle);
47 virtual int handle_close (ACE_HANDLE fd,
48 ACE_Reactor_Mask close_mask);
49 private:
50 /// Number of iterations handled.
51 size_t handled_;
53 /// Number of tasks running.
54 static int task_count_;
57 // Static data member initialization.
58 int Test_Task::task_count_ = 0;
60 static ACE_Atomic_Op<ACE_Thread_Mutex, int> done_count = MAX_TASKS * 2;
61 static ACE_Recursive_Thread_Mutex recursive_lock;
63 Test_Task::Test_Task (void)
64 : handled_ (0)
66 ACE_GUARD (ACE_Recursive_Thread_Mutex, ace_mon, recursive_lock);
68 Test_Task::task_count_++;
70 ACE_DEBUG ((LM_DEBUG,
71 ACE_TEXT ("(%t) TT+ Test_Task::task_count_ = %d\n"),
72 Test_Task::task_count_));
75 Test_Task::~Test_Task (void)
77 ACE_GUARD (ACE_Recursive_Thread_Mutex, ace_mon, recursive_lock);
79 ACE_DEBUG ((LM_DEBUG,
80 ACE_TEXT ("(%t) TT- Test_Task::task_count_ = %d\n"),
81 Test_Task::task_count_));
83 ACE_TEST_ASSERT (Test_Task::task_count_ == 0);
86 int
87 Test_Task::open (void *args)
89 this->reactor (reinterpret_cast<ACE_Reactor *> (args));
90 return this->activate (THR_NEW_LWP);
93 int
94 Test_Task::close (u_long)
96 ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex, ace_mon, recursive_lock, -1);
98 Test_Task::task_count_--;
100 ACE_DEBUG ((LM_DEBUG,
101 ACE_TEXT ("(%t) close Test_Task::task_count_ = %d\n"),
102 Test_Task::task_count_));
104 ACE_TEST_ASSERT (Test_Task::task_count_ >= 0);
106 return 0;
110 Test_Task::svc (void)
112 ACE_DEBUG ((LM_DEBUG,
113 ACE_TEXT ("(%t) svc\n")));
115 for (size_t i = 0; i < ACE_MAX_ITERATIONS; i++)
117 ACE_OS::thr_yield ();
119 // Only wait up to 10 milliseconds to notify the Reactor.
120 ACE_Time_Value timeout (0, 10 * 1000);
122 if (this->reactor ()->notify (this,
123 ACE_Event_Handler::READ_MASK,
124 &timeout) == -1)
126 if (errno == ETIME)
127 ACE_DEBUG ((LM_DEBUG,
128 ACE_TEXT ("(%t) %p\n"),
129 ACE_TEXT ("notify() timed out")));
130 else
131 ACE_ERROR_RETURN ((LM_ERROR,
132 ACE_TEXT ("(%t) %p\n"),
133 ACE_TEXT ("notify")),
134 -1);
138 return 0;
142 Test_Task::handle_close (ACE_HANDLE, ACE_Reactor_Mask)
144 return 0;
148 Test_Task::handle_input (ACE_HANDLE)
150 this->handled_++;
152 if (this->handled_ == ACE_MAX_ITERATIONS)
154 done_count--;
155 ACE_DEBUG ((LM_DEBUG,
156 ACE_TEXT ("(%t) handle_input, handled_ = %d, done_count = %d\n"),
157 this->handled_,
158 done_count.value ()));
161 ACE_OS::thr_yield ();
162 return -1;
165 static void *
166 worker (void *args)
168 ACE_Reactor *reactor = reinterpret_cast<ACE_Reactor *> (args);
170 // Make this thread the owner of the Reactor's event loop.
171 reactor->owner (ACE_Thread::self ());
173 // Use a timeout to inform the Reactor when to shutdown.
174 ACE_Time_Value timeout (4);
176 for (;;)
177 switch (reactor->handle_events (timeout))
179 case -1:
180 ACE_ERROR_RETURN ((LM_ERROR,
181 ACE_TEXT ("(%t) %p\n"),
182 ACE_TEXT ("reactor")),
184 /* NOTREACHED */
185 case 0:
186 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) Reactor shutdown\n")));
187 return 0;
190 ACE_NOTREACHED (return 0);
193 #endif /* ACE_HAS_THREADS */
196 run_main (int, ACE_TCHAR *[])
198 ACE_START_TEST (ACE_TEXT ("Reactors_Test"));
200 #if defined (ACE_HAS_THREADS)
201 ACE_TEST_ASSERT (ACE_LOG_MSG->op_status () != -1);
203 thr_mgr = ACE_Thread_Manager::instance ();
205 ACE_Reactor reactor;
206 ACE_TEST_ASSERT (ACE_LOG_MSG->op_status () != -1);
208 Test_Task tt1[MAX_TASKS];
209 Test_Task tt2[MAX_TASKS];
211 // Activate all of the Tasks.
213 for (int i = 0; i < MAX_TASKS; i++)
215 tt1[i].open (ACE_Reactor::instance ());
216 tt2[i].open (&reactor);
219 // Spawn two threads each running a different reactor.
221 if (ACE_Thread_Manager::instance ()->spawn
222 (ACE_THR_FUNC (worker),
223 (void *) ACE_Reactor::instance (),
224 THR_BOUND | THR_DETACHED) == -1)
225 ACE_ERROR_RETURN ((LM_ERROR,
226 ACE_TEXT ("%p\n"),
227 ACE_TEXT ("spawn")),
228 -1);
230 else if (ACE_Thread_Manager::instance ()->spawn
231 (ACE_THR_FUNC (worker), (void *) &reactor,
232 THR_BOUND | THR_DETACHED) == -1)
233 ACE_ERROR_RETURN ((LM_ERROR,
234 ACE_TEXT ("%p\n"),
235 ACE_TEXT ("spawn")),
236 -1);
238 if (ACE_Thread_Manager::instance ()->wait () == -1)
239 ACE_ERROR_RETURN ((LM_ERROR,
240 ACE_TEXT ("%p\n"),
241 ACE_TEXT ("wait")),
242 -1);
244 ACE_DEBUG ((LM_DEBUG,
245 ACE_TEXT ("(%t) all threads are finished\n")));
247 #else
248 ACE_ERROR ((LM_INFO,
249 ACE_TEXT ("threads not supported on this platform\n")));
250 #endif /* ACE_HAS_THREADS */
251 ACE_END_TEST;
252 return 0;