Merge branch 'master' into jwi-bcc64xsingletonwarning
[ACE_TAO.git] / ACE / examples / Reactor / Misc / test_demuxing.cpp
blob1dc03bbd4a2573dee6aaafcb1b1ace03f3ba48f6
1 // Perform an extensive test of all the ACE_Reactor's event handler
2 // dispatching mechanisms. These mechanisms illustrate how I/O,
3 // timeout, and signal events, as well as ACE_Message_Queues, can all
4 // be handled within the same demultiplexing and dispatching
5 // framework. In addition, this example illustrates how to use the
6 // ACE_Reactor for devices that perform I/O via signals (such as SVR4
7 // message queues).
9 #include "ace/ACE.h"
10 #include "ace/Service_Config.h"
11 #include "ace/Reactor.h"
12 #include "ace/Task.h"
13 #include "ace/Reactor_Notification_Strategy.h"
14 #include "ace/Signal.h"
15 #include "ace/OS_NS_fcntl.h"
16 #include "ace/OS_NS_unistd.h"
19 // Default is to have a 2 second timeout.
20 static int timeout = 2;
22 // This class illustrates how to handle signal-driven I/O using
23 // the <ACE_Reactor> framework. Note that signals may be caught
24 // and processed without requiring the use of global signal
25 // handler functions or global signal handler data.
26 class Sig_Handler : public ACE_Event_Handler
28 public:
29 Sig_Handler ();
30 virtual ACE_HANDLE get_handle () const;
31 virtual int handle_input (ACE_HANDLE);
33 //FUZZ: disable check_for_lack_ACE_OS
34 virtual int shutdown (ACE_HANDLE, ACE_Reactor_Mask);
35 //FUZZ: enable check_for_lack_ACE_OS
37 virtual int handle_signal (int signum, siginfo_t * = 0, ucontext_t * = 0);
39 private:
40 ACE_HANDLE handle_;
43 // A dummy_handle is required to reserve a slot in the ACE_Reactor's
44 // descriptor table.
46 Sig_Handler::Sig_Handler ()
48 // Assign the Sig_Handler a dummy I/O descriptor. Note that even
49 // though we open this file "Write Only" we still need to use the
50 // ACE_Event_Handler::NULL_MASK when registering this with the
51 // ACE_Reactor (see below).
52 this->handle_ = ACE_OS::open (ACE_DEV_NULL, O_WRONLY);
53 ACE_ASSERT (this->handle_ != ACE_INVALID_HANDLE);
55 // Register signal handler object. Note that NULL_MASK is used to
56 // keep the ACE_Reactor from calling us back on the "/dev/null"
57 // descriptor. NULL_MASK just reserves a "slot" in the Reactor's
58 // internal demuxing table, but doesn't cause it to dispatch the
59 // event handler directly. Instead, we use the signal handler to do
60 // this.
61 ACE_Reactor_Mask mask = ACE_Event_Handler::NULL_MASK;
62 if (ACE_Reactor::instance ()->register_handler (this, mask) == -1)
63 ACE_ERROR ((LM_ERROR,
64 "%p\n%a",
65 "register_handler",
66 1));
68 // Create a sigset_t corresponding to the signals we want to catch.
69 ACE_Sig_Set sig_set;
71 sig_set.sig_add (SIGINT);
72 sig_set.sig_add (SIGQUIT);
73 sig_set.sig_add (SIGALRM);
75 // Register the signal handler object to catch the signals.
76 if (ACE_Reactor::instance ()->register_handler
77 (sig_set, this) == -1)
78 ACE_ERROR ((LM_ERROR,
79 "%p\n%a",
80 "register_handler",
81 1));
84 // Called by the ACE_Reactor to extract the handle.
86 ACE_HANDLE
87 Sig_Handler::get_handle () const
89 return this->handle_;
92 // In a real application, this method would be where the read on the
93 // signal-driven I/O device would occur asynchronously. For now we'll
94 // just print a greeting to let you know that everything is working
95 // properly!
97 int
98 Sig_Handler::handle_input (ACE_HANDLE)
100 ACE_DEBUG ((LM_DEBUG,
101 "(%t) handling asynchonrous input...\n"));
102 return 0;
105 // In a real application, this method would do any cleanup activities
106 // required when shutting down the I/O device.
109 Sig_Handler::shutdown (ACE_HANDLE, ACE_Reactor_Mask)
111 ACE_DEBUG ((LM_DEBUG,
112 "(%t) closing down Sig_Handler...\n"));
113 return 0;
116 // This method handles all the signals that are being caught by this
117 // object. In our simple example, we are simply catching SIGALRM,
118 // SIGINT, and SIGQUIT. Anything else is logged and ignored. Note
119 // that the ACE_Reactor's signal handling mechanism eliminates the
120 // need to use global signal handler functions and data.
123 Sig_Handler::handle_signal (int signum, siginfo_t *, ucontext_t *)
125 switch (signum)
127 #if !defined (ACE_WIN32)
128 case SIGALRM:
129 // Rearm the alarm.
130 ACE_OS::alarm (4);
131 break;
132 #endif /* !ACE_WIN32 */
133 case SIGINT:
134 // Tell the ACE_Reactor to enable the ready bit for
135 // this->handle_. The ACE_Reactor will subsequently call the
136 // <Sig_Handler::handle_input> method from within its event
137 // loop, i.e., the behavior triggered by the signal is handled
138 // in the main event loop, rather than in the signal handler.
139 return ACE_Reactor::instance ()->ready_ops
140 (this->handle_,
141 ACE_Event_Handler::READ_MASK,
142 ACE_Reactor::ADD_MASK);
143 #if defined (SIGTERM) && (SIGTERM != 0)
144 case SIGTERM:
145 // This is coded thusly to avoid problems if SIGQUIT is a legit
146 // value but is not a preprocessor macro.
147 #elif !defined (SIGQUIT) || (SIGQUIT != 0)
148 case SIGQUIT:
149 #endif /* SIGTERM != 0 */
150 ACE_Reactor::end_event_loop ();
151 break;
152 default:
153 ACE_ERROR_RETURN ((LM_ERROR, "invalid signal"), -1);
154 /* NOTREACHED */
157 return 0;
160 class STDIN_Handler : public ACE_Event_Handler
162 // = TITLE
163 // This class illustrates that the ACE_Reactor can handle signals,
164 // STDIO, and timeouts using the same mechanisms.
165 public:
166 STDIN_Handler ();
167 ~STDIN_Handler ();
168 virtual int handle_input (ACE_HANDLE);
169 virtual int handle_timeout (const ACE_Time_Value &,
170 const void *arg);
173 STDIN_Handler::STDIN_Handler ()
175 if (ACE_Event_Handler::register_stdin_handler (this,
176 ACE_Reactor::instance (),
177 ACE_Thread_Manager::instance ()) == -1)
178 ACE_ERROR ((LM_ERROR,
179 "%p\n",
180 "register_stdin_handler"));
182 // Register the <STDIN_Handler> to be dispatched once every
183 // <timeout> seconds starting in <timeout> seconds. This example
184 // uses the "interval timer" feature of the <ACE_Reactor>'s timer
185 // queue.
186 else if (ACE_Reactor::instance ()->schedule_timer
187 (this,
189 ACE_Time_Value (timeout),
190 ACE_Time_Value (timeout)) == -1)
191 ACE_ERROR ((LM_ERROR,
192 "%p\n%a",
193 "schedule_timer",
194 1));
197 STDIN_Handler::~STDIN_Handler ()
199 if (ACE_Event_Handler::remove_stdin_handler (ACE_Reactor::instance (),
200 ACE_Thread_Manager::instance ()) == -1)
201 ACE_ERROR ((LM_ERROR,
202 "%p\n",
203 "remove_stdin_handler"));
204 else if (ACE_Reactor::instance ()->cancel_timer
205 (this) == -1)
206 ACE_ERROR ((LM_ERROR,
207 "%p\n%a",
208 "cancel_timer",
209 1));
213 STDIN_Handler::handle_timeout (const ACE_Time_Value &tv,
214 const void *)
216 ACE_DEBUG ((LM_DEBUG,
217 "(%t) timeout occurred at %d sec, %d usec\n",
218 tv.sec (),
219 tv.usec ()));
220 return 0;
223 // Read from input handle and write to stdout handle.
226 STDIN_Handler::handle_input (ACE_HANDLE handle)
228 char buf[BUFSIZ];
229 ssize_t n = ACE_OS::read (handle, buf, sizeof buf);
231 switch (n)
233 case -1:
234 if (errno == EINTR)
235 return 0;
236 /* NOTREACHED */
237 else
238 ACE_ERROR ((LM_ERROR,
239 "%p\n",
240 "read"));
241 ACE_FALLTHROUGH;
242 case 0:
243 ACE_Reactor::end_event_loop ();
244 break;
245 default:
247 ssize_t result = ACE::write_n (ACE_STDOUT, buf, n);
249 if (result != n)
250 ACE_ERROR_RETURN ((LM_ERROR,
251 "%p\n",
252 "write"),
253 result == -1 && errno == EINTR ? 0 : -1);
256 return 0;
259 class Message_Handler : public ACE_Task <ACE_SYNCH>
261 public:
262 Message_Handler ();
264 virtual int handle_input (ACE_HANDLE);
265 // Called back within the context of the <ACE_Reactor> Singleton to
266 // dequeue and process the message on the <ACE_Message_Queue>.
268 virtual int svc ();
269 // Run the "event-loop" periodically putting messages to our
270 // internal <Message_Queue> that we inherit from <ACE_Task>.
272 private:
273 ACE_Reactor_Notification_Strategy notification_strategy_;
274 // This strategy will notify the <ACE_Reactor> Singleton when a new
275 // message is enqueued.
278 Message_Handler::Message_Handler ()
279 : notification_strategy_ (ACE_Reactor::instance (),
280 this,
281 ACE_Event_Handler::READ_MASK)
283 // Set this to the Reactor notification strategy.
284 this->msg_queue ()->notification_strategy (&this->notification_strategy_);
286 if (this->activate ())
287 ACE_ERROR ((LM_ERROR,
288 "%p\n",
289 "activate"));
293 Message_Handler::svc ()
295 for (int i = 0;; i++)
297 ACE_Message_Block *mb = 0;
299 ACE_NEW_RETURN (mb,
300 ACE_Message_Block (1),
303 mb->msg_priority (i);
304 ACE_OS::sleep (1);
306 // Note that this putq() call with automagically invoke the
307 // notify() hook of our ACE_Reactor_Notification_Strategy,
308 // thereby informing the <ACE_Reactor> Singleton to call our
309 // <handle_input> method.
310 if (this->putq (mb) == -1)
312 if (errno == ESHUTDOWN)
313 ACE_ERROR_RETURN ((LM_ERROR,
314 "(%t) queue is deactivated"), 0);
315 else
316 ACE_ERROR_RETURN ((LM_ERROR,
317 "(%t) %p\n",
318 "putq"),
319 -1);
323 ACE_NOTREACHED (return 0);
327 Message_Handler::handle_input (ACE_HANDLE)
329 ACE_DEBUG ((LM_DEBUG,
330 "(%t) Message_Handler::handle_input\n"));
332 ACE_Message_Block *mb = 0;
334 if (this->getq (mb, (ACE_Time_Value *) &ACE_Time_Value::zero) == -1)
335 ACE_ERROR ((LM_ERROR,
336 "(%t) %p\n",
337 "dequeue_head"));
338 else
340 ACE_DEBUG ((LM_DEBUG,
341 "(%t) priority = %d\n",
342 mb->msg_priority ()));
343 mb->release ();
346 return 0;
350 ACE_TMAIN (int argc, ACE_TCHAR *argv[])
352 ACE_Service_Config daemon (argv [0]);
354 // Optionally start the alarm.
355 if (argc > 1)
357 ACE_OS::alarm (4);
358 timeout = ACE_OS::atoi (argv[1]);
361 // Signal handler.
362 Sig_Handler sh;
364 // Define an I/O handler object.
365 STDIN_Handler ioh;
367 // Define a message handler.
368 Message_Handler mh;
370 // Loop handling signals and I/O events until SIGQUIT occurs.
372 while (ACE_Reactor::instance ()->event_loop_done () == 0)
373 ACE_Reactor::instance ()->run_reactor_event_loop ();
375 // Deactivate the message queue.
376 mh.msg_queue ()->deactivate ();
378 // Wait for the thread to exit.
379 ACE_Thread_Manager::instance ()->wait ();
380 ACE_DEBUG ((LM_DEBUG,
381 ACE_TEXT ("(%t) leaving main\n")));
382 return 0;