Merge pull request #2317 from jwillemsen/jwi-deleteop
[ACE_TAO.git] / ACE / examples / Reactor / Proactor / test_proactor2.cpp
blob247dbe9e07230bd0400fbf217355792f7712e893
2 //=============================================================================
3 /**
4 * @file test_proactor2.cpp
6 * Alexander Libman <Alibman@baltimore.com> modified
7 * <test_proactor> and made this test. Instead of writing received
8 * data to the file, the receiver sends them back to the
9 * sender,i.e. ACE_Asynch_Write_File wf_ has been changed to
10 * ACE_Asynch_Write_Stream wf_.
12 * @author Irfan Pyarali <irfan@cs.wustl.edu> and Alexander Libman <Alibman@baltimore.com>.
14 //=============================================================================
17 #include "ace/Signal.h"
19 #include "ace/Service_Config.h"
20 #include "ace/Proactor.h"
21 #include "ace/Asynch_IO.h"
22 #include "ace/Asynch_IO_Impl.h"
23 #include "ace/Asynch_Acceptor.h"
24 #include "ace/INET_Addr.h"
25 #include "ace/SOCK_Connector.h"
26 #include "ace/SOCK_Acceptor.h"
27 #include "ace/SOCK_Stream.h"
28 #include "ace/Message_Block.h"
29 #include "ace/Get_Opt.h"
31 // FUZZ: disable check_for_streams_include
32 #include "ace/streams.h"
34 #include "ace/Task.h"
35 #include "ace/OS_main.h"
38 #if defined (ACE_HAS_WIN32_OVERLAPPED_IO) || defined (ACE_HAS_AIO_CALLS)
39 // This only works on Win32 platforms and on Unix platforms supporting
40 // POSIX aio calls.
42 #if defined (ACE_HAS_WIN32_OVERLAPPED_IO)
44 #include "ace/WIN32_Proactor.h"
46 #elif defined (ACE_HAS_AIO_CALLS)
48 #include "ace/POSIX_Proactor.h"
50 #endif
52 // Some debug helper functions
53 int DisableSignal ( int SigNum );
54 int PrintSigMask ();
56 #define COUT(X) cout << X ; cout.flush ();
58 // Host that we're connecting to.
59 static ACE_TCHAR *host = 0;
61 // duplex mode: ==0 half-duplex
62 // !=0 full duplex
63 static int duplex = 0 ;
65 // number threads in the Proactor thread pool
66 static int nThreads = 1;
68 // Port that we're receiving connections on.
69 static u_short port = ACE_DEFAULT_SERVER_PORT;
71 // Size of each initial asynchronous <read> operation.
72 static int initial_read_size = BUFSIZ;
75 #define MyMutex ACE_Recursive_Thread_Mutex
76 //#define MyMutex ACE_Thread_Mutex
77 //#define MyMutex ACE_Null_Mutex
79 //--------------------------------------------------------------------------
80 // MyTask plays role for Proactor threads pool
81 //--------------------------------------------------------------------------
82 class MyTask: public ACE_Task<ACE_MT_SYNCH>
84 public:
85 int svc () ;
89 int MyTask::svc ()
91 ACE_DEBUG ((LM_DEBUG, "(%t) MyTask started\n"));
93 while ( ACE_Proactor::event_loop_done () == 0 )
95 ACE_Proactor::run_event_loop ();
98 ACE_DEBUG ((LM_DEBUG, "(%t) MyTask finished\n"));
99 return 0 ;
102 //-----------------------------------------------------------
103 // Receiver
104 //-----------------------------------------------------------
105 class Receiver : public ACE_Service_Handler
107 public:
108 Receiver ();
109 ~Receiver ();
111 //FUZZ: disable check_for_lack_ACE_OS
112 /// This is called after the new connection has been accepted.
113 ///FUZZ: enable check_for_lack_ACE_OS
114 virtual void open (ACE_HANDLE handle,
115 ACE_Message_Block &message_block);
117 protected:
118 // These methods are called by the framework
120 /// This is called when asynchronous <read> operation from the socket
121 /// complete.
122 virtual void handle_read_stream (const ACE_Asynch_Read_Stream::Result
123 &result);
125 /// This is called when an asynchronous <write> to the file
126 /// completes.
127 virtual void handle_write_stream (const ACE_Asynch_Write_Stream::Result
128 &result);
130 private:
131 int initiate_read_stream ();
132 int initiate_write_stream (ACE_Message_Block & mb, int nBytes );
133 bool check_destroy () ;
135 ACE_Asynch_Read_Stream rs_;
136 ACE_Asynch_Write_Stream ws_;
137 ACE_HANDLE handle_;
138 MyMutex m_Mtx ;
139 long nIOCount ;
140 static long nSessions ;
144 long Receiver::nSessions = 0 ;
146 Receiver::Receiver ()
147 : handle_ (ACE_INVALID_HANDLE),
148 nIOCount ( 0 )
150 ACE_GUARD (MyMutex, locker, m_Mtx);
151 nSessions ++ ;
152 ACE_DEBUG ((LM_DEBUG, "Receiver Ctor nSessions=%d\n", nSessions ));
155 Receiver::~Receiver ()
157 ACE_GUARD (MyMutex, locker, m_Mtx);
158 nSessions -- ;
159 ACE_OS::closesocket (this->handle_);
160 ACE_DEBUG ((LM_DEBUG, "~Receiver Dtor nSessions=%d\n", nSessions ));
163 //---------------------------------------------------------------------
164 // return true if we alive, false we commited suicide
166 //---------------------------------------------------------------------
167 bool Receiver::check_destroy ()
170 ACE_GUARD_RETURN (MyMutex, locker, m_Mtx, false);
172 if ( nIOCount > 0 )
174 return true ;
178 delete this ;
179 return false ;
183 void Receiver::open (ACE_HANDLE handle,
184 ACE_Message_Block &)
186 ACE_DEBUG ((LM_DEBUG,
187 "%N:%l:Receiver::open called\n"));
190 this->handle_ = handle;
192 if (this->ws_.open (*this, this->handle_ ) == -1)
194 ACE_ERROR ((LM_ERROR,
195 "%p\n",
196 "ACE_Asynch_Write_Stream::open"));
198 else if (this->rs_.open (*this, this->handle_) == -1)
200 ACE_ERROR ((LM_ERROR,
201 "%p\n",
202 "ACE_Asynch_Read_Stream::open"));
204 else
206 initiate_read_stream ();
210 check_destroy ();
213 int Receiver::initiate_read_stream ()
215 ACE_GUARD_RETURN (MyMutex, locker, m_Mtx, -1);
217 // Create a new <Message_Block>. Note that this message block will
218 // be used both to <read> data asynchronously from the socket and to
219 // <write> data asynchronously to the file.
220 ACE_DEBUG ((LM_DEBUG,
221 "initiate_read_stream called\n"));
224 ACE_Message_Block *mb = 0;
225 ACE_NEW_RETURN (mb,
226 ACE_Message_Block (BUFSIZ + 1),
227 -1);
229 // Inititiate read
230 if (this->rs_.read (*mb, mb->size ()- 1) == -1)
232 mb->release () ;
233 ACE_ERROR_RETURN ((LM_ERROR,
234 "%p\n",
235 "ACE_Asynch_Read_Stream::read"),
236 -1);
239 nIOCount++ ;
240 return 0;
243 int Receiver::initiate_write_stream (ACE_Message_Block & mb, int nBytes )
245 ACE_GUARD_RETURN (MyMutex, locker, m_Mtx, -1);
246 if (this->ws_.write (mb , nBytes ) == -1)
248 mb.release ();
249 ACE_ERROR_RETURN((LM_ERROR,
250 "%p\n",
251 "ACE_Asynch_Write_File::write"),
252 -1);
255 nIOCount++ ;
256 return 0;
259 void
260 Receiver::handle_read_stream (const ACE_Asynch_Read_Stream::Result &result)
262 ACE_DEBUG ((LM_DEBUG,
263 "handle_read_stream called\n"));
265 // Reset pointers.
266 result.message_block ().rd_ptr ()[result.bytes_transferred ()] =
267 '\0';
269 ACE_DEBUG ((LM_DEBUG, "********************\n"));
270 ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_to_read", result.bytes_to_read
271 ()));
272 ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "handle", result.handle ()));
273 ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_transfered",
274 result.bytes_transferred ()));
275 ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "act", (u_long) result.act ()));
276 ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "success", result.success ()));
277 ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "completion_key", (u_long)
278 result.completion_key ()));
279 ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "error", result.error ()));
280 ACE_DEBUG ((LM_DEBUG, "********************\n"));
281 ACE_DEBUG ((LM_DEBUG, "%s = %s\n", "message_block",
282 result.message_block ().rd_ptr ()));
284 if ( result.success () && result.bytes_transferred () != 0)
286 // Successful read: write the data to the file asynchronously.
287 // Note how we reuse the <ACE_Message_Block> for the writing.
288 // Therefore, we do not delete this buffer because it is handled
289 // in <handle_write_stream>.
291 if(this->initiate_write_stream (result.message_block (),
293 result.bytes_transferred () ) == 0 )
295 if ( duplex != 0 )
297 // Initiate new read from the stream.
298 this->initiate_read_stream () ;
302 else
304 result.message_block ().release ();
305 ACE_DEBUG ((LM_DEBUG, "Receiver completed\n"));
309 ACE_GUARD (MyMutex, locker, m_Mtx);
310 --nIOCount;
312 check_destroy () ;
315 void
316 Receiver::handle_write_stream (const ACE_Asynch_Write_Stream::Result
317 &result)
319 ACE_DEBUG ((LM_DEBUG, "handle_write_stream called\n"));
321 ACE_DEBUG ((LM_DEBUG, "********************\n"));
322 ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_to_write",
323 result.bytes_to_write ()));
324 ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "handle", result.handle ()));
325 ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_transfered",
326 result.bytes_transferred ()));
327 ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "act", (u_long) result.act ()));
328 ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "success", result.success ()));
329 ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "completion_key", (u_long)
330 result.completion_key ()));
331 ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "error", result.error ()));
332 ACE_DEBUG ((LM_DEBUG, "********************\n"));
334 result.message_block ().release ();
336 if (result.success ())
338 // This code is not robust enough to deal with short file writes
339 // (which hardly ever happen) ;-)
340 //ACE_ASSERT (result.bytes_to_write () == result.bytes_transferred ());
342 if ( duplex == 0 )
344 initiate_read_stream () ;
349 ACE_GUARD (MyMutex, locker, m_Mtx);
350 --nIOCount;
352 check_destroy () ;
355 //-------------------------------------------------------------------------
356 // Sender: sends indefinetely welcome message
357 // and recieves it back
358 //------------------------------------------------------------------------
359 class Sender : public ACE_Handler
361 public:
362 Sender ();
363 ~Sender ();
365 //FUZZ: disable check_for_lack_ACE_OS
366 ///FUZZ: enable check_for_lack_ACE_OS
367 int open (const ACE_TCHAR *host, u_short port);
368 void close ();
370 ACE_HANDLE handle () const;
371 void handle (ACE_HANDLE);
373 protected:
374 // These methods are called by the freamwork
376 /// This is called when asynchronous reads from the socket complete
377 virtual void handle_read_stream (const ACE_Asynch_Read_Stream::Result
378 &result);
380 /// This is called when asynchronous writes from the socket complete
381 virtual void handle_write_stream (const ACE_Asynch_Write_Stream::Result
382 &result);
384 private:
385 int initiate_read_stream ();
386 int initiate_write_stream ();
388 /// Network I/O handle
389 ACE_SOCK_Stream stream_;
391 /// ws (write stream): for writing to the socket
392 ACE_Asynch_Write_Stream ws_;
394 /// rs (read file): for reading from the socket
395 ACE_Asynch_Read_Stream rs_;
397 /// Welcome message
398 ACE_Message_Block welcome_message_;
400 MyMutex m_Mtx ;
401 long nIOCount ;
404 static const char *data = "Welcome to Irfan World! Irfan RULES here !!\n";
406 Sender::Sender ()
407 :nIOCount ( 0 )
409 // Moment of inspiration... :-)
410 this->welcome_message_.init (data, ACE_OS::strlen (data));
413 Sender::~Sender ()
415 this->close ();
418 void Sender::close ()
420 this->stream_.close ();
423 ACE_HANDLE Sender::handle () const
425 return this->stream_.get_handle ();
428 void Sender::handle (ACE_HANDLE handle)
430 this->stream_.set_handle (handle);
433 int Sender::open (const ACE_TCHAR *host, u_short port)
435 // Initialize stuff
436 // Connect to remote host
437 ACE_INET_Addr address (port, host);
438 ACE_SOCK_Connector connector;
440 if (connector.connect (this->stream_,
441 address) == -1)
443 ACE_ERROR_RETURN ((LM_ERROR,
444 "%p\n",
445 "ACE_SOCK_Connector::connect"),
446 -1);
449 // Open ACE_Asynch_Write_Stream
450 if (this->ws_.open (*this) == -1)
451 ACE_ERROR_RETURN ((LM_ERROR,
452 "%p\n",
453 "ACE_Asynch_Write_Stream::open"),
454 -1);
456 // Open ACE_Asynch_Read_Stream
457 if (this->rs_.open (*this) == -1)
458 ACE_ERROR_RETURN ((LM_ERROR,
459 "%p\n",
460 "ACE_Asynch_Read_File::open"),
461 -1);
463 // Start an asynchronous transmit file
464 if ( this->initiate_write_stream () == -1)
465 return -1;
467 if ( duplex != 0 )
469 // Start an asynchronous read file
470 if (this->initiate_read_stream () == -1)
471 return -1;
474 return 0;
477 int Sender::initiate_write_stream ()
479 ACE_GUARD_RETURN (MyMutex, locker, m_Mtx, -1);
481 welcome_message_.rd_ptr( welcome_message_.base ());
482 welcome_message_.wr_ptr( welcome_message_.base ());
483 welcome_message_.wr_ptr (ACE_OS::strlen (data));
485 if (this->ws_.write (welcome_message_,
486 welcome_message_.length ()
487 ) == -1)
489 ACE_ERROR_RETURN((LM_ERROR,
490 "%p\n",
491 "ACE_Asynch_Write_File::write"),
492 -1);
495 nIOCount++ ;
496 return 0;
499 int Sender::initiate_read_stream ()
501 ACE_GUARD_RETURN (MyMutex, locker, m_Mtx, -1);
503 // Create a new <Message_Block>. Note that this message block will
504 // be used both to <read> data asynchronously from the socket and to
505 // <write> data asynchronously to the file.
506 ACE_DEBUG ((LM_DEBUG,
507 "initiate_read_stream called\n"));
510 ACE_Message_Block *mb = 0;
511 ACE_NEW_RETURN (mb,
512 ACE_Message_Block (BUFSIZ + 1),
513 -1);
515 // Inititiate read
516 if (this->rs_.read (*mb, mb->size ()- 1) == -1)
518 mb->release () ;
519 ACE_ERROR_RETURN ((LM_ERROR,
520 "%p\n",
521 "ACE_Asynch_Read_Stream::read"),
522 -1);
525 nIOCount++ ;
526 return 0;
530 void Sender::handle_write_stream (const ACE_Asynch_Write_Stream::Result
531 &result)
533 ACE_DEBUG ((LM_DEBUG,
534 "handle_write_stream called\n"));
536 // Reset pointers.
537 result.message_block ().rd_ptr (result.message_block ().rd_ptr () -
538 result.bytes_transferred ());
541 ACE_DEBUG ((LM_DEBUG, "********************\n"));
542 ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_to_write",
543 result.bytes_to_write ()));
544 ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "handle", result.handle ()));
545 ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_transfered",
546 result.bytes_transferred ()));
547 ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "act", (u_long) result.act ()));
548 ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "success", result.success ()));
549 ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "completion_key", (u_long)
550 result.completion_key ()));
551 ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "error", result.error ()));
552 ACE_DEBUG ((LM_DEBUG, "********************\n"));
553 ACE_DEBUG ((LM_DEBUG, "%s = %s\n", "message_block",
554 result.message_block ().rd_ptr ()));
556 // Simplify just for Test
557 if (result.success () && result.bytes_transferred () != 0)
559 if ( duplex != 0 ) // full duplex, continue write
561 initiate_write_stream () ;
563 else // half-duplex read reply, after read we will start
564 // write
566 initiate_read_stream () ;
571 ACE_GUARD_RETURN (MyMutex, locker, m_Mtx);
572 --nIOCount;
576 void
577 Sender::handle_read_stream (const ACE_Asynch_Read_Stream::Result &result)
579 ACE_DEBUG ((LM_DEBUG,
580 "handle_read_stream called\n"));
582 // Reset pointers.
583 result.message_block ().rd_ptr ()[result.bytes_transferred ()] =
584 '\0';
586 ACE_DEBUG ((LM_DEBUG, "********************\n"));
587 ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_to_read", result.bytes_to_read
588 ()));
589 ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "handle", result.handle ()));
590 ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_transfered",
591 result.bytes_transferred ()));
592 ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "act", (u_long) result.act ()));
593 ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "success", result.success ()));
594 ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "completion_key", (u_long)
595 result.completion_key ()));
596 ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "error", result.error ()));
597 ACE_DEBUG ((LM_DEBUG, "********************\n"));
598 ACE_DEBUG ((LM_DEBUG, "%s = %s\n", "message_block",
599 result.message_block ().rd_ptr ()));
601 result.message_block().release ();
603 if ( result.success () && result.bytes_transferred () != 0)
605 // Successful read: write the data to the file asynchronously.
606 // Note how we reuse the <ACE_Message_Block> for the writing.
607 // Therefore, we do not delete this buffer because it is handled
608 // in <handle_write_stream>.
610 if ( duplex != 0 ) // full duplex, continue read
612 initiate_read_stream () ;
614 else // half-duplex writey, after write we will start read
616 initiate_write_stream () ;
621 ACE_GUARD (MyMutex, locker, m_Mtx);
622 --nIOCount;
626 //--------------------------------------------------------------------------
628 static int
629 parse_args (int argc, ACE_TCHAR *argv[])
631 ACE_Get_Opt get_opt (argc, argv, ACE_TEXT("n:p:d:h:"));
632 int c;
634 while ((c = get_opt ()) != EOF)
635 switch (c)
637 case 'h':
638 host = get_opt.opt_arg ();
639 break;
640 case 'n':
641 nThreads = ACE_OS::atoi (get_opt.opt_arg ()) ;
642 break;
643 case 'p':
644 port = ACE_OS::atoi (get_opt.opt_arg ());
645 break;
646 case 'd':
647 duplex = ACE_OS::atoi (get_opt.opt_arg ());
648 break;
649 default:
650 ACE_ERROR ((LM_ERROR, "%p.\n",
651 "usage :\n"
652 "-h <host> for Sender mode\n"
653 "-d <duplex mode 1-on/0-off>\n"
654 "-p <port to listen/connect>\n"
655 "-n <number threads for Proactor pool>\n"));
656 return -1;
659 return 0;
663 ACE_TMAIN (int argc, ACE_TCHAR *argv[])
665 ACE_UNUSED_ARG (initial_read_size);
667 if (parse_args (argc, argv) == -1)
668 return -1;
670 #if defined (ACE_HAS_WIN32_OVERLAPPED_IO)
672 ACE_WIN32_Proactor * pImpl = new ACE_WIN32_Proactor;
674 #elif defined (ACE_HAS_AIO_CALLS)
676 // ACE_POSIX_AIOCB_Proactor * pImpl = new ACE_POSIX_AIOCB_Proactor;
677 ACE_POSIX_SIG_Proactor * pImpl = new ACE_POSIX_SIG_Proactor;
678 #endif
680 ACE_Proactor Proactor ( pImpl ,1 );
682 ACE_Proactor::instance( & Proactor );
685 MyTask Task1 ;
687 if (Task1.activate (THR_NEW_LWP, nThreads ) == -1)
689 ACE_ERROR_RETURN ((LM_ERROR, "%p.\n", "main"), -1);
692 Sender sender;
693 ACE_Asynch_Acceptor<Receiver> acceptor;
695 int Rc = -1 ;
697 if ( host == 0 ) // Acceptor
699 // Simplify , initial read with zero size
700 Rc = acceptor.open (ACE_INET_Addr (port),0,1);
702 else
704 Rc = sender.open (host, port);
707 if ( Rc == 0 )
709 char c ;
710 cout << "Press any key to stop and exit=>\n" << flush ;
711 cin.clear ();
712 cin >> c ;
715 ACE_Proactor::end_event_loop () ;
717 if ( host != 0 ) // we are sender
719 sender.close () ; // disconnect to get reciever error !!!
723 ACE_Thread_Manager * pTM = ACE_Thread_Manager::instance();
725 pTM->wait_task ( & Task1 ) ;
727 ACE_Proactor::instance( ( ACE_Proactor* )0 );
729 return 0;
731 //--------------------------------------------------------------------
733 //--------------------------------------------------------------------
734 int DisableSignal ( int SigNum )
736 #ifndef ACE_WIN32
737 sigset_t signal_set;
738 if ( ACE_OS::sigemptyset (&signal_set) == - 1 )
740 ACE_ERROR ((LM_ERROR,
741 "Error:(%P | %t):%p\n",
742 "sigemptyset failed"));
745 ACE_OS::sigaddset (&signal_set, SigNum);
747 // Put the <signal_set>.
748 if (ACE_OS::pthread_sigmask (SIG_BLOCK, &signal_set, 0) != 0)
750 ACE_ERROR ((LM_ERROR,
751 "Error:(%P | %t):%p\n",
752 "pthread_sigmask failed"));
754 #else
755 ACE_UNUSED_ARG(SigNum);
756 #endif
758 return 1;
760 //--------------------------------------------------------------------
761 // Get the <signal_set> back from the OS.
762 //--------------------------------------------------------------------
764 int PrintSigMask ()
766 #ifndef ACE_WIN32
768 sigset_t mask ;
769 int member = 0;
771 COUT ( "\n=============Signal Mask==========" )
773 if (ACE_OS::pthread_sigmask (SIG_SETMASK, 0, & mask ) != 0)
775 ACE_ERROR ((LM_ERROR,
776 "Error:(%P | %t):%p\n",
777 "ACE_OS::pthread_sigmask failed"));
779 else for (int i = 1 ; i < 1000; i++)
781 member = ACE_OS::sigismember (&mask,i);
783 COUT ( "\nSig " )
784 COUT ( i )
785 COUT ( " is " )
786 COUT (member )
788 if (member == -1)
790 break ;
793 #endif
794 return 0;
797 #else /* ACE_HAS_WIN32_OVERLAPPED_IO || ACE_HAS_AIO_CALLS */
800 ACE_TMAIN (int, ACE_TCHAR *[])
802 ACE_DEBUG ((LM_DEBUG,
803 "This example does not work on this platform.\n"));
804 return 1;
807 #endif /* ACE_HAS_WIN32_OVERLAPPED_IO || ACE_HAS_AIO_CALLS */