Merge pull request #2309 from mitza-oci/warnings
[ACE_TAO.git] / ACE / examples / Reactor / Misc / notification.cpp
blob6e36d1381324828d150f3dce4ff68adce2fb22f1
1 #include "ace/OS_NS_unistd.h"
2 #include "ace/Service_Config.h"
3 #include "ace/Reactor.h"
4 #include "ace/Thread_Manager.h"
5 #include "ace/Thread.h"
6 #include "ace/Signal.h"
7 #include "ace/Truncate.h"
10 #if defined (ACE_HAS_THREADS)
11 #define MAX_ITERATIONS 10000
13 class Thread_Handler : public ACE_Event_Handler
15 // = TITLE
16 // Illustrate how the ACE_Reactor's thread-safe event notification
17 // mechanism works.
19 // = DESCRIPTION
20 // Handle timeouts in the main thread via the ACE_Reactor and I/O
21 // events in a separate thread. Just before the separate I/O
22 // thread exits it notifies the ACE_Reactor in the main thread
23 // using the ACE_Reactor's notification mechanism.
24 public:
25 Thread_Handler (long delay,
26 long interval,
27 size_t n_threads,
28 size_t max_iterations);
29 // Constructor.
31 Thread_Handler (size_t id,
32 size_t max_iterations);
34 ~Thread_Handler ();
35 // Destructor.
37 virtual int handle_signal (int signum, siginfo_t * = 0, ucontext_t * = 0);
38 // Handle signals.
40 virtual int handle_exception (ACE_HANDLE);
41 // Print data from main thread.
43 virtual int handle_output (ACE_HANDLE);
44 // Print data from main thread.
46 virtual int handle_timeout (const ACE_Time_Value &,
47 const void *);
48 // Handle timeout events in the main thread.
50 virtual int handle_input (ACE_HANDLE);
51 // General notification messages to the Reactor.
53 virtual int notify (ACE_Time_Value *tv = 0);
54 // Perform notifications.
56 virtual int svc ();
57 // Handle I/O events in a separate threads.
59 private:
60 static void *svc_run (void *);
61 // Glues C++ to C thread library functions.
63 size_t id_;
64 // ID passed in by Thread_Handler constructor.
66 size_t iterations_;
68 static sig_atomic_t shutdown_;
69 // Shutting down.
71 // = Timing variables.
72 // Delay factor for timer-driven I/O.
73 static ACE_Time_Value delay_;
75 // Interval factor for Event_Handler timer.
76 static ACE_Time_Value interval_;
79 // Shutdown flag.
80 sig_atomic_t Thread_Handler::shutdown_ = 0;
82 // Delay factor for timer-driven I/O.
83 ACE_Time_Value Thread_Handler::delay_;
85 // Interval factor for Event_Handler timer.
86 ACE_Time_Value Thread_Handler::interval_;
88 Thread_Handler::Thread_Handler (size_t id,
89 size_t max_iterations)
90 : id_ (id),
91 iterations_ (max_iterations)
95 Thread_Handler::~Thread_Handler ()
97 // Cleanup resources so that we don't crash and burn when shutdown.
98 ACE_Event_Handler::remove_stdin_handler (ACE_Reactor::instance (),
99 ACE_Thread_Manager::instance ());
100 ACE_Reactor::instance ()->cancel_timer (this);
103 Thread_Handler::Thread_Handler (
104 long delay,
105 long interval,
106 size_t n_threads,
107 size_t max_iterations)
108 : iterations_ (max_iterations)
110 ACE_Sig_Set sig_set;
112 sig_set.sig_add (SIGQUIT);
113 sig_set.sig_add (SIGINT);
115 delay_.set (delay);
116 interval_.set (interval);
117 this->id_ = 0;
119 if (ACE_Event_Handler::register_stdin_handler (this,
120 ACE_Reactor::instance (),
121 ACE_Thread_Manager::instance ()) == -1)
122 ACE_ERROR ((LM_ERROR,
123 "%p\n",
124 "register_stdin_handler"));
125 else if (ACE_Reactor::instance ()->register_handler (sig_set,
126 this) == -1)
127 ACE_ERROR ((LM_ERROR,
128 "(%t) %p\n",
129 "register_handler"));
130 else if (ACE_Reactor::instance ()->schedule_timer
131 (this,
133 Thread_Handler::delay_,
134 Thread_Handler::interval_) == -1)
135 ACE_ERROR ((LM_ERROR,
136 "(%t) %p\n",
137 "schedule_timer"));
139 // Set up this thread's signal mask to block all the signal in the
140 // <sig_set>, which is inherited by the threads it spawns.
141 ACE_Sig_Guard guard (&sig_set);
143 // Create N new threads of control Thread_Handlers.
145 for (size_t i = 0; i < n_threads; i++)
147 Thread_Handler *th;
149 ACE_NEW (th,
150 Thread_Handler (i + 1,
151 this->iterations_));
153 if (ACE_Thread::spawn (reinterpret_cast<ACE_THR_FUNC> (&Thread_Handler::svc_run),
154 reinterpret_cast<void *> (th),
155 THR_NEW_LWP | THR_DETACHED) != 0)
156 ACE_ERROR ((LM_ERROR,
157 "%p\n",
158 "ACE_Thread::spawn"));
161 // The destructor of <guard> unblocks the signal set so that only
162 // this thread receives them!
166 Thread_Handler::notify (ACE_Time_Value *timeout)
168 // Just do something to test the ACE_Reactor's multi-thread
169 // capabilities...
171 if (ACE_Reactor::instance ()->notify
172 (this,
173 ACE_Event_Handler::EXCEPT_MASK,
174 timeout) == -1)
175 ACE_ERROR_RETURN ((LM_ERROR,
176 "(%t) %p\n",
177 "notification::notify:exception"),
178 -1);
179 else if (ACE_Reactor::instance ()->notify
180 (this,
181 ACE_Event_Handler::WRITE_MASK,
182 timeout) == -1)
183 ACE_ERROR_RETURN ((LM_ERROR,
184 "(%t) %p\n",
185 "notification::notify:write"),
186 -1);
187 return 0;
190 // Test stdin handling that uses <select> to demultiplex HANDLEs.
191 // Input is only handled by the main thread.
194 Thread_Handler::handle_input (ACE_HANDLE handle)
196 char buf[BUFSIZ];
197 ssize_t n = ACE_OS::read (handle, buf, sizeof buf);
199 if (n > 0)
201 ACE_DEBUG ((LM_DEBUG,
202 "input to (%t) %*s",
204 buf));
206 ACE_DEBUG ((LM_DEBUG,
207 "%d more input to kill\n",
208 this->iterations_));
210 // Only wait up to 10 milliseconds to notify the Reactor.
211 ACE_Time_Value timeout (0,
212 10 * 1000);
214 if (this->notify (&timeout) == -1)
215 ACE_ERROR ((LM_DEBUG,
216 "(%t), %p\n",
217 "notification::handle_input:notify"));
218 return 0;
220 else
221 return -1;
224 // Perform a task that will test the ACE_Reactor's multi-threading
225 // capabilities in separate threads.
228 Thread_Handler::svc ()
230 ACE_Time_Value sleep_timeout (Thread_Handler::interval_.sec () / 2);
232 for (int i = ACE_Utils::truncate_cast<int> (this->iterations_);
233 i > 0;
234 --i)
236 if (this->shutdown_ != 0)
237 break;
239 // Block for delay_.secs () / 2, then notify the Reactor.
240 ACE_OS::sleep (sleep_timeout);
242 // Wait up to 10 milliseconds to notify the Reactor.
243 ACE_Time_Value timeout (0,
244 10 * 1000);
245 if (this->notify (&timeout) == -1)
246 ACE_ERROR ((LM_ERROR,
247 "(%t) %p\n",
248 "notify"));
251 ACE_Reactor::instance ()->remove_handler (this,
252 ALL_EVENTS_MASK);
253 ACE_DEBUG ((LM_DEBUG,
254 "(%t) exiting svc\n"));
255 return 0;
258 // Test signal handling.
261 Thread_Handler::handle_signal (int signum, siginfo_t *, ucontext_t *)
263 // @@ Note that this code is not portable to all OS platforms since
264 // it uses print statements within signal handler context.
265 ACE_DEBUG ((LM_DEBUG,
266 "(%t) received signal %S\n",
267 signum));
269 switch (signum)
271 case SIGINT:
272 // This is coded thusly to avoid problems if SIGQUIT is a legit
273 // value but is not a preprocessor macro.
274 #if !defined (SIGQUIT) || (SIGQUIT != 0)
275 case SIGQUIT:
276 #endif
277 ACE_ERROR ((LM_ERROR,
278 "(%t) ******************** shutting down %n on signal %S\n",
279 signum));
280 this->shutdown_ = 1;
281 ACE_Reactor::end_event_loop();
283 return 0;
287 Thread_Handler::handle_timeout (const ACE_Time_Value &time, const void *)
289 ACE_DEBUG ((LM_DEBUG,
290 "(%t) received timeout at (%u, %u), iterations = %d\n",
291 time.sec (),
292 time.usec (),
293 this->iterations_));
295 if (--this->iterations_ <= 0
296 || Thread_Handler::interval_.sec () == 0)
297 ACE_Reactor::end_event_loop ();
299 return 0;
302 // Called by the ACE_Reactor when it receives a notification.
305 Thread_Handler::handle_exception (ACE_HANDLE)
307 ACE_DEBUG ((LM_DEBUG,
308 "(%t) exception to id %d, iteration = %d\n",
309 this->id_,
310 this->iterations_));
311 return 0;
314 // Called by the ACE_Reactor when it receives a notification.
317 Thread_Handler::handle_output (ACE_HANDLE)
319 ACE_DEBUG ((LM_DEBUG,
320 "(%t) output to id %d, iteration = %d\n",
321 this->id_,
322 // This decrement must come last since
323 // <handle_exception> is called before <handle_output>!
324 this->iterations_--));
325 return 0;
328 // "Shim" function that integrates C thread API with C++.
330 void *
331 Thread_Handler::svc_run (void *eh)
333 Thread_Handler *this_handler =
334 reinterpret_cast<Thread_Handler *> (eh);
336 if (this_handler->svc () == 0)
337 return 0;
338 else
339 return reinterpret_cast<void *> (-1);
343 ACE_TMAIN (int argc, ACE_TCHAR *argv[])
345 ACE_LOG_MSG->open (argv[0]);
347 if (argc < 4)
349 ACE_ERROR ((LM_ERROR,
350 ACE_TEXT ("usage: %s delay interval n_threads [iterations]\n"),
351 argv[0]));
352 ACE_OS::exit (1);
355 int delay = ACE_OS::atoi (argv[1]);
356 int interval = ACE_OS::atoi (argv[2]);
357 size_t n_threads = ACE_OS::atoi (argv[3]);
358 size_t max_iterations = argc > 4 ? ACE_OS::atoi (argv[4]) : MAX_ITERATIONS;
360 Thread_Handler thr_handler (delay,
361 interval,
362 n_threads,
363 max_iterations);
365 ACE_Reactor::instance ()->run_reactor_event_loop ();
367 ACE_DEBUG ((LM_DEBUG,
368 ACE_TEXT ("exiting from main\n")));
369 return 0;
371 #else
373 ACE_TMAIN (int, ACE_TCHAR *[])
375 ACE_ERROR_RETURN ((LM_ERROR,
376 "threads must be supported to run this application\n"), -1);
378 #endif /* ACE_HAS_THREADS */