Merge branch 'master' into jwi-bcc64xsingletonwarning
[ACE_TAO.git] / ACE / examples / Reactor / Proactor / test_udp_proactor.cpp
blob46805f29b2e2c89d2bf25450d9e45a3c2ce8ab24
2 //=============================================================================
3 /**
4 * @file test_udp_proactor.cpp
6 * This program illustrates how the <ACE_Proactor> can be used to
7 * implement an application that does asynchronous operations using
8 * datagrams.
10 * @author Irfan Pyarali <irfan@cs.wustl.edu> and Roger Tragin <r.tragin@computer.org>
12 //=============================================================================
15 #include "ace/OS_NS_string.h"
16 #include "ace/OS_main.h"
17 #include "ace/Proactor.h"
18 #include "ace/Asynch_IO.h"
19 #include "ace/INET_Addr.h"
20 #include "ace/SOCK_Dgram.h"
21 #include "ace/Message_Block.h"
22 #include "ace/Get_Opt.h"
23 #include "ace/Log_Msg.h"
26 #if defined (ACE_HAS_WIN32_OVERLAPPED_IO) || defined (ACE_HAS_AIO_CALLS)
27 // This only works on asynch I/O-capable platforms.
29 // Host that we're connecting to.
30 static ACE_TCHAR *host = 0;
32 // Port that we're receiving connections on.
33 static u_short port = ACE_DEFAULT_SERVER_PORT;
35 // Keep track of when we're done.
36 static int done = 0;
38 /**
39 * @class Receiver
41 * @brief This class will receive data from
42 * the network connection and dump it to a file.
44 class Receiver : public ACE_Service_Handler
46 public:
47 Receiver ();
48 ~Receiver ();
50 int open_addr (const ACE_INET_Addr &localAddr);
52 protected:
53 // These methods are called by the framework
55 /// This method will be called when an asynchronous read completes on
56 /// a UDP socket.
57 virtual void handle_read_dgram (const ACE_Asynch_Read_Dgram::Result &result);
59 private:
60 ACE_SOCK_Dgram sock_dgram_;
62 /// rd (read dgram): for reading from a UDP socket.
63 ACE_Asynch_Read_Dgram rd_;
64 const char* completion_key_;
65 const char* act_;
68 Receiver::Receiver ()
69 : completion_key_ ("Receiver Completion Key"),
70 act_ ("Receiver ACT")
74 Receiver::~Receiver ()
76 sock_dgram_.close ();
79 int
80 Receiver::open_addr (const ACE_INET_Addr &localAddr)
82 ACE_DEBUG ((LM_DEBUG,
83 "%N:%l:Receiver::open_addr called\n"));
85 // Create a local UDP socket to receive datagrams.
86 if (this->sock_dgram_.open (localAddr) == -1)
87 ACE_ERROR_RETURN ((LM_ERROR,
88 "%p\n",
89 "ACE_SOCK_Dgram::open"), -1);
91 // Initialize the asynchronous read.
92 if (this->rd_.open (*this,
93 this->sock_dgram_.get_handle (),
94 this->completion_key_,
95 ACE_Proactor::instance ()) == -1)
96 ACE_ERROR_RETURN ((LM_ERROR,
97 "%p\n",
98 "ACE_Asynch_Read_Dgram::open"), -1);
100 // Create a buffer to read into. We are using scatter/gather to
101 // read the message header and message body into 2 buffers
103 // create a message block to read the message header
104 ACE_Message_Block* msg = 0;
105 ACE_NEW_RETURN (msg, ACE_Message_Block (1024), -1);
107 // the next line sets the size of the header, even though we
108 // allocated a the message block of 1k, by setting the size to 20
109 // bytes then the first 20 bytes of the reveived datagram will be
110 // put into this message block.
111 msg->size (20); // size of header to read is 20 bytes
113 // create a message block to read the message body
114 ACE_Message_Block* body = 0;
115 ACE_NEW_RETURN (body, ACE_Message_Block (1024), -1);
116 // The message body will not exceed 1024 bytes, at least not in this test.
118 // set body as the cont of msg. This associates the 2 message
119 // blocks so that a read will fill the first block (which is the
120 // header) up to size (), and use the cont () block for the rest of
121 // the data. You can chain up to IOV_MAX message block using this
122 // method.
123 msg->cont (body);
125 // ok lets do the asynch read
126 size_t number_of_bytes_recvd = 0;
128 int res = rd_.recv (msg,
129 number_of_bytes_recvd,
131 PF_INET,
132 this->act_);
133 switch (res)
135 case 0:
136 // this is a good error. The proactor will call our handler when the
137 // read has completed.
138 break;
139 case 1:
140 // actually read something, we will handle it in the handler callback
141 ACE_DEBUG ((LM_DEBUG, "********************\n"));
142 ACE_DEBUG ((LM_DEBUG,
143 "%s = %d\n",
144 "bytes received immediately",
145 number_of_bytes_recvd));
146 ACE_DEBUG ((LM_DEBUG, "********************\n"));
147 res = 0;
148 break;
149 case -1:
150 // Something else went wrong.
151 ACE_ERROR ((LM_ERROR,
152 "%p\n",
153 "ACE_Asynch_Read_Dgram::recv"));
154 // the handler will not get called in this case so lets clean up our msg
155 msg->release ();
156 break;
157 default:
158 // Something undocumented really went wrong.
159 ACE_ERROR ((LM_ERROR,
160 "%p\n",
161 "ACE_Asynch_Read_Dgram::recv"));
162 msg->release ();
163 break;
166 return res;
169 void
170 Receiver::handle_read_dgram (const ACE_Asynch_Read_Dgram::Result &result)
172 ACE_DEBUG ((LM_DEBUG,
173 "handle_read_dgram called\n"));
175 ACE_DEBUG ((LM_DEBUG, "********************\n"));
176 ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_to_read", result.bytes_to_read ()));
177 ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "handle", result.handle ()));
178 ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_transfered", result.bytes_transferred ()));
179 ACE_INET_Addr peerAddr;
180 result.remote_address (peerAddr);
181 ACE_DEBUG ((LM_DEBUG, "%s = %s:%d\n", "peer_address", peerAddr.get_host_addr (), peerAddr.get_port_number ()));
182 ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "flags", result.flags ()));
183 ACE_DEBUG ((LM_DEBUG, "%s = %s\n", "act", result.act ()));
184 ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "success", result.success ()));
185 ACE_DEBUG ((LM_DEBUG, "%s = %s\n", "completion_key", result.completion_key ()));
186 ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "error", result.error ()));
187 ACE_DEBUG ((LM_DEBUG, "********************\n"));
189 if (result.success () && result.bytes_transferred () != 0)
191 // loop through our message block and print out the contents
192 for (const ACE_Message_Block* msg = result.message_block (); msg != 0; msg = msg->cont ())
193 { // use msg->length () to get the number of bytes written to the message
194 // block.
195 ACE_DEBUG ((LM_DEBUG, "Buf=[size=<%d>", msg->length ()));
196 for (u_long i = 0; i < msg->length (); ++i)
197 ACE_DEBUG ((LM_DEBUG,
198 "%c", (msg->rd_ptr ())[i]));
199 ACE_DEBUG ((LM_DEBUG, "]\n"));
203 ACE_DEBUG ((LM_DEBUG,
204 "Receiver completed\n"));
206 // No need for this message block anymore.
207 result.message_block ()->release ();
209 // Note that we are done with the test.
210 done++;
214 * @class Sender
216 * @brief The class will be created by <main>.
218 class Sender : public ACE_Handler
220 public:
221 Sender ();
222 ~Sender ();
224 //FUZZ: disable check_for_lack_ACE_OS
225 ///FUZZ: enable check_for_lack_ACE_OS
226 int open (const ACE_TCHAR *host, u_short port);
228 protected:
229 // These methods are called by the freamwork
231 /// This is called when asynchronous writes from the dgram socket
232 /// complete
233 virtual void handle_write_dgram (const ACE_Asynch_Write_Dgram::Result &result);
235 private:
236 /// Network I/O handle
237 ACE_SOCK_Dgram sock_dgram_;
239 /// wd (write dgram): for writing to the socket
240 ACE_Asynch_Write_Dgram wd_;
242 const char* completion_key_;
243 const char* act_;
246 Sender::Sender ()
247 : completion_key_ ("Sender completion key"),
248 act_ ("Sender ACT")
252 Sender::~Sender ()
254 this->sock_dgram_.close ();
258 Sender::open (const ACE_TCHAR *host,
259 u_short port)
261 // Initialize stuff
263 if (this->sock_dgram_.open (ACE_INET_Addr::sap_any) == -1)
264 ACE_ERROR_RETURN ((LM_ERROR,
265 "%p\n",
266 "ACE_SOCK_Dgram::open"), -1);
268 // Initialize the asynchronous read.
269 if (this->wd_.open (*this,
270 this->sock_dgram_.get_handle (),
271 this->completion_key_,
272 ACE_Proactor::instance ()) == -1)
273 ACE_ERROR_RETURN ((LM_ERROR,
274 "%p\n",
275 "ACE_Asynch_Write_Dgram::open"), -1);
277 // We are using scatter/gather to send the message header and
278 // message body using 2 buffers
280 // create a message block for the message header
281 ACE_Message_Block* msg = 0;
282 ACE_NEW_RETURN (msg, ACE_Message_Block (100), -1);
283 const char raw_msg [] = "To be or not to be.";
284 // Copy buf into the Message_Block and update the wr_ptr ().
285 msg->copy (raw_msg, ACE_OS::strlen (raw_msg) + 1);
287 // create a message block for the message body
288 ACE_Message_Block* body = 0;
289 ACE_NEW_RETURN (body, ACE_Message_Block (100), -1);
290 ACE_OS::memset (body->wr_ptr (), 'X', 100);
291 body->wr_ptr (100); // always remember to update the wr_ptr ()
293 // set body as the cont of msg. This associates the 2 message blocks so
294 // that a send will send the first block (which is the header) up to
295 // length (), and use the cont () to get the next block to send. You can
296 // chain up to IOV_MAX message block using this method.
297 msg->cont (body);
299 // do the asynch send
300 size_t number_of_bytes_sent = 0;
301 ACE_INET_Addr serverAddr (port, host);
302 int res = this->wd_.send (msg, number_of_bytes_sent, 0, serverAddr, this->act_);
304 switch (res)
306 case 0:
307 // this is a good error. The proactor will call our handler when the
308 // send has completed.
309 break;
310 case 1:
311 // actually sent something, we will handle it in the handler callback
312 ACE_DEBUG ((LM_DEBUG, "********************\n"));
313 ACE_DEBUG ((LM_DEBUG,
314 "%s = %d\n",
315 "bytes sent immediately",
316 number_of_bytes_sent));
317 ACE_DEBUG ((LM_DEBUG, "********************\n"));
318 res = 0;
319 break;
320 case -1:
321 // Something else went wrong.
322 ACE_ERROR ((LM_ERROR,
323 "%p\n",
324 "ACE_Asynch_Write_Dgram::recv"));
325 // the handler will not get called in this case so lets clean up our msg
326 msg->release ();
327 break;
328 default:
329 // Something undocumented really went wrong.
330 ACE_ERROR ((LM_ERROR,
331 "%p\n",
332 "ACE_Asynch_Write_Dgram::recv"));
333 msg->release ();
334 break;
336 return res;
339 void
340 Sender::handle_write_dgram (const ACE_Asynch_Write_Dgram::Result &result)
342 ACE_DEBUG ((LM_DEBUG,
343 "handle_write_dgram called\n"));
345 ACE_DEBUG ((LM_DEBUG, "********************\n"));
346 ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_to_write", result.bytes_to_write ()));
347 ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "handle", result.handle ()));
348 ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_transfered", result.bytes_transferred ()));
349 ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "flags", result.flags ()));
350 ACE_DEBUG ((LM_DEBUG, "%s = %s\n", "act", result.act ()));
351 ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "success", result.success ()));
352 ACE_DEBUG ((LM_DEBUG, "%s = %s\n", "completion_key", result.completion_key ()));
353 ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "error", result.error ()));
354 ACE_DEBUG ((LM_DEBUG, "********************\n"));
357 ACE_DEBUG ((LM_DEBUG,
358 "Sender completed\n"));
360 // No need for this message block anymore.
361 result.message_block ()->release ();
363 // Note that we are done with the test.
364 done++;
367 static int
368 parse_args (int argc, ACE_TCHAR *argv[])
370 ACE_Get_Opt get_opt (argc, argv, ACE_TEXT ("h:p:"));
371 int c;
373 while ((c = get_opt ()) != EOF)
374 switch (c)
376 case 'h':
377 host = get_opt.opt_arg ();
378 break;
379 case 'p':
380 port = ACE_OS::atoi (get_opt.opt_arg ());
381 break;
382 default:
383 ACE_ERROR_RETURN ((LM_ERROR, "%p.\n",
384 "usage :\n"
385 "-h <host>\n"), -1);
388 return 0;
392 ACE_TMAIN (int argc, ACE_TCHAR *argv[])
394 if (parse_args (argc, argv) == -1)
395 return -1;
397 Sender sender;
399 Receiver receiver;
401 // If passive side
402 if (host == 0)
404 if (receiver.open_addr (ACE_INET_Addr (port)) == -1)
405 return -1;
407 // If active side
408 else if (sender.open (host, port) == -1)
409 return -1;
411 for (int success = 1;
412 success > 0 && !done;
414 // Dispatch events via Proactor singleton.
415 success = ACE_Proactor::instance ()->handle_events ();
417 return 0;
420 #else /* ACE_HAS_WIN32_OVERLAPPED_IO || ACE_HAS_AIO_CALLS*/
423 ACE_TMAIN (int, ACE_TCHAR *[])
425 ACE_DEBUG ((LM_DEBUG,
426 "This example does not work on this platform.\n"));
427 return 1;
430 #endif /* ACE_HAS_WIN32_OVERLAPPED_IO || ACE_HAS_AIO_CALLS */