Merge pull request #2309 from mitza-oci/warnings
[ACE_TAO.git] / ACE / examples / Reactor / Misc / pingpong.cpp
blob0da26118eb06406fdcd70d4682910cdc47cde47c
1 /* Simple program that illustrates many features of the ACE_Reactor:
3 1. I/O event demultiplexing
4 2. Signal-based demultiplexing
5 3. Timer-based demultiplexing
7 To test this program, compile it and then execute it as follows:
9 % ./pingpong hello
11 You should see lots of the following output:
13 writing <4> [7860]
14 writing <4> [7860]
15 writing <4> [7860]
16 writing <4> [7860]
17 reading <5> (7860) [1] = hello
18 writing <4> [7860]
19 writing <5> [7861]
20 reading <4> (7861) [2] = hello
21 reading <5> (7860) [2] = hello
22 writing <4> [7860]
23 writing <5> [7861]
24 reading <4> (7861) [3] = hello
25 reading <5> (7860) [3] = hello
27 After 10 seconds you'll see the following:
29 ./pingpong: shutting down tester (pid = 7861)
30 ./pingpong: shutting down tester (pid = 7860)
32 and the program will stop. If you'd like to
33 stop it earlier, just hit the control-C sequence
34 and you'll see the same messages. */
36 #include "ace/Reactor.h"
37 #include "ace/Pipe.h"
38 #include "ace/Log_Msg.h"
39 #include "ace/ACE.h"
40 #include "ace/Test_and_Set.h"
41 #include "ace/OS_NS_string.h"
42 #include "ace/Null_Mutex.h"
43 #include "ace/OS_NS_unistd.h"
44 #if defined (ACE_WIN32)
45 # include "ace/Barrier.h"
46 # include "ace/Thread.h"
47 #endif
49 class Ping_Pong : public ACE_Test_and_Set<ACE_Null_Mutex, sig_atomic_t>
51 public:
52 Ping_Pong (char b[], ACE_HANDLE f);
53 virtual ACE_HANDLE get_handle () const;
54 virtual int handle_input (ACE_HANDLE);
55 virtual int handle_output (ACE_HANDLE);
56 virtual int handle_timeout (const ACE_Time_Value &,
57 const void *);
58 virtual int handle_close (ACE_HANDLE handle,
59 ACE_Reactor_Mask close_mask);
60 private:
61 char buf_[BUFSIZ];
62 // Buffer to send.
64 size_t buflen_;
65 // Length of the buffer to send.
67 int pid_;
68 // Process ID.
70 ACE_HANDLE handle_;
71 // Open handle.
74 Ping_Pong::Ping_Pong (char b[], ACE_HANDLE f)
75 : buflen_ (ACE_OS::strlen (b) + 1 + (2 * sizeof (int))),
76 pid_ (ACE_OS::getpid ()),
77 handle_ (f)
79 int *pi_buf = (int *) this->buf_;
80 *(pi_buf) = (int) this->pid_;
81 pi_buf = (int *) (this->buf_ + sizeof (int));
82 *(pi_buf) = 0;
83 ACE_OS::strcpy (this->buf_ + (2 * sizeof (int)), b);
84 this->buf_[this->buflen_ - 1] = '\n';
85 this->buf_[this->buflen_] = '\0';
88 ACE_HANDLE
89 Ping_Pong::get_handle () const
91 return this->handle_;
94 int
95 Ping_Pong::handle_close (ACE_HANDLE,
96 ACE_Reactor_Mask)
98 delete this; // Cleanup when we're removed from the reactor.
99 return 0;
103 Ping_Pong::handle_input (ACE_HANDLE)
105 #if defined (ACE_HAS_STREAM_PIPES)
106 // We can rely on record-oriented reads...
108 ssize_t n = ACE::recv (this->handle_, this->buf_, this->buflen_);
110 if (n != (ssize_t) this->buflen_)
111 ACE_ERROR_RETURN ((LM_ERROR,
112 ACE_TEXT ("(%P|%t) reading [%d] %p\n"),
113 handle_,
114 ACE_TEXT ("read")),
115 -1);
117 ACE_DEBUG ((LM_DEBUG,
118 ACE_TEXT ("(%P|%t) reading <%d> (%d) [%d] = %C\n"),
119 this->handle_,
120 *(int *) this->buf_,
121 *(int *) (this->buf_ + sizeof (int)),
122 this->buf_ + (2 * sizeof (int))));
123 #else
124 ssize_t n = ACE::recv (this->handle_,
125 this->buf_,
126 this->buflen_);
127 if (n == -1)
128 ACE_ERROR_RETURN ((LM_ERROR,
129 ACE_TEXT ("[%d] %p\n"),
130 handle_,
131 ACE_TEXT ("read")),
132 -1);
133 n -= (2 * sizeof (int));
134 char *buf = this->buf_ + (2 * sizeof (int));
136 ACE_DEBUG ((LM_DEBUG,
137 ACE_TEXT ("(%P|%t) reading <%d> = %*C\n"),
138 this->handle_,
140 buf));
141 #endif /* ACE_HAS_STREAM_PIPES */
142 return 0;
146 Ping_Pong::handle_output (ACE_HANDLE)
148 #if defined (ACE_HAS_STREAM_PIPES)
149 // We can rely on record-oriented reads...
151 (*(int *) (this->buf_)) = this->pid_;
152 (*(int *) (this->buf_ + sizeof (int)))++;
153 if (ACE::send (this->handle_,
154 this->buf_,
155 this->buflen_) == -1)
156 return -1;
157 else
159 ACE_DEBUG ((LM_DEBUG,
160 ACE_TEXT ("(%P|%t) writing <%d> [%d]\n"),
161 this->handle_,
162 this->pid_));
163 return 0;
165 #else
166 if (ACE::send (this->handle_,
167 this->buf_,
168 this->buflen_) == -1)
169 return -1;
170 else
172 ACE_DEBUG ((LM_DEBUG,
173 ACE_TEXT ("(%P|%t) writing <%d>\n"),
174 this->handle_));
175 return 0;
177 #endif /* ACE_HAS_STREAM_PIPES */
181 Ping_Pong::handle_timeout (const ACE_Time_Value &,
182 const void *)
184 this->set (1);
185 return 0;
188 // Contains the string to "pingpong" back and forth...
189 static ACE_TCHAR *string_name;
191 // Wait for 10 seconds and then shut down.
192 static const ACE_Time_Value SHUTDOWN_TIME (10);
194 static void
195 run_svc (ACE_HANDLE handle)
197 Ping_Pong *callback = 0;
198 ACE_NEW (callback,
199 Ping_Pong (ACE_TEXT_ALWAYS_CHAR (string_name),
200 handle));
202 ACE_Reactor reactor;
204 // Register the callback object for the various I/O, signal, and
205 // timer-based events.
207 if (reactor.register_handler (callback,
208 ACE_Event_Handler::READ_MASK
209 | ACE_Event_Handler::WRITE_MASK) == -1
210 || reactor.register_handler (SIGINT,
211 callback) == -1
212 || reactor.schedule_timer (callback,
214 SHUTDOWN_TIME) == -1)
216 ACE_ERROR ((LM_ERROR,
217 ACE_TEXT ("%p\n"),
218 ACE_TEXT ("reactor")));
219 ACE_OS::exit (1);
222 // Main event loop (one per process).
224 while (callback->is_set () == 0)
225 if (reactor.handle_events () == -1)
226 ACE_ERROR ((LM_ERROR,
227 ACE_TEXT ("%p\n"),
228 ACE_TEXT ("handle_events")));
231 #if defined (ACE_WIN32)
232 static ACE_Barrier barrier (3);
234 static void *
235 worker (void *arg)
237 ACE_HANDLE handle = (ACE_HANDLE) arg;
239 run_svc (handle);
241 // Wait for the threads to exit.
242 barrier.wait ();
244 ACE_DEBUG ((LM_DEBUG,
245 ACE_TEXT ("(%P|%t) %n: shutting down tester\n")));
246 return 0;
248 #endif /* ACE_WIN32 */
251 ACE_TMAIN (int argc, ACE_TCHAR *argv[])
253 if (argc != 2)
254 ACE_ERROR_RETURN ((LM_ERROR,
255 ACE_TEXT ("usage: pingpong <string>\n")),
256 -1);
258 ACE_LOG_MSG->open (argv[0]);
260 string_name = argv[1];
262 ACE_HANDLE handles[2];
264 //FUZZ: disable check_for_lack_ACE_OS
265 // Create a pipe and initialize the handles.
266 ACE_Pipe pipe (handles);
267 //FUZZ: enable check_for_lack_ACE_OS
269 #if defined (ACE_WIN32)
270 if (ACE_Thread::spawn (ACE_THR_FUNC (worker),
271 (void *) handles[0],
272 THR_DETACHED) == -1
273 || ACE_Thread::spawn (ACE_THR_FUNC (worker),
274 (void *) handles[1],
275 THR_DETACHED) == -1)
276 ACE_ERROR ((LM_ERROR,
277 ACE_TEXT ("%p\n%a"),
278 ACE_TEXT ("spawn"),
279 1));
280 barrier.wait ();
281 #else
282 pid_t pid = ACE_OS::fork (argv[0]);
284 if (pid == -1)
285 ACE_ERROR ((LM_ERROR,
286 ACE_TEXT ("%p\n%a"),
287 ACE_TEXT ("fork"),
288 1));
289 run_svc (handles[pid == 0]);
291 ACE_DEBUG ((LM_DEBUG,
292 ACE_TEXT ("(%P|%t) %n: shutting down tester\n")));
293 #endif /* ACE_WIN32 */
295 if (pipe.close () == -1)
296 ACE_ERROR ((LM_ERROR,
297 ACE_TEXT ("%p\n"),
298 ACE_TEXT ("close")));
299 return 0;