2 //=============================================================================
4 * @file test_proactor2.cpp
6 * Alexander Libman <Alibman@baltimore.com> modified
7 * <test_proactor> and made this test. Instead of writing received
8 * data to the file, the receiver sends them back to the
9 * sender,i.e. ACE_Asynch_Write_File wf_ has been changed to
10 * ACE_Asynch_Write_Stream wf_.
12 * @author Irfan Pyarali <irfan@cs.wustl.edu> and Alexander Libman <Alibman@baltimore.com>.
14 //=============================================================================
17 #include "ace/Signal.h"
19 #include "ace/Service_Config.h"
20 #include "ace/Proactor.h"
21 #include "ace/Asynch_IO.h"
22 #include "ace/Asynch_IO_Impl.h"
23 #include "ace/Asynch_Acceptor.h"
24 #include "ace/INET_Addr.h"
25 #include "ace/SOCK_Connector.h"
26 #include "ace/SOCK_Acceptor.h"
27 #include "ace/SOCK_Stream.h"
28 #include "ace/Message_Block.h"
29 #include "ace/Get_Opt.h"
31 // FUZZ: disable check_for_streams_include
32 #include "ace/streams.h"
35 #include "ace/OS_main.h"
38 #if defined (ACE_HAS_WIN32_OVERLAPPED_IO) || defined (ACE_HAS_AIO_CALLS)
39 // This only works on Win32 platforms and on Unix platforms supporting
42 #if defined (ACE_HAS_WIN32_OVERLAPPED_IO)
44 #include "ace/WIN32_Proactor.h"
46 #elif defined (ACE_HAS_AIO_CALLS)
48 #include "ace/POSIX_Proactor.h"
52 // Some debug helper functions
53 int DisableSignal ( int SigNum
);
56 #define COUT(X) cout << X ; cout.flush ();
58 // Host that we're connecting to.
59 static ACE_TCHAR
*host
= 0;
61 // duplex mode: ==0 half-duplex
63 static int duplex
= 0 ;
65 // number threads in the Proactor thread pool
66 static int nThreads
= 1;
68 // Port that we're receiving connections on.
69 static u_short port
= ACE_DEFAULT_SERVER_PORT
;
71 // Size of each initial asynchronous <read> operation.
72 static int initial_read_size
= BUFSIZ
;
75 #define MyMutex ACE_Recursive_Thread_Mutex
76 //#define MyMutex ACE_Thread_Mutex
77 //#define MyMutex ACE_Null_Mutex
79 //--------------------------------------------------------------------------
80 // MyTask plays role for Proactor threads pool
81 //--------------------------------------------------------------------------
82 class MyTask
: public ACE_Task
<ACE_MT_SYNCH
>
91 ACE_DEBUG ((LM_DEBUG
, "(%t) MyTask started\n"));
93 while ( ACE_Proactor::event_loop_done () == 0 )
95 ACE_Proactor::run_event_loop ();
98 ACE_DEBUG ((LM_DEBUG
, "(%t) MyTask finished\n"));
102 //-----------------------------------------------------------
104 //-----------------------------------------------------------
105 class Receiver
: public ACE_Service_Handler
111 //FUZZ: disable check_for_lack_ACE_OS
112 /// This is called after the new connection has been accepted.
113 ///FUZZ: enable check_for_lack_ACE_OS
114 virtual void open (ACE_HANDLE handle
,
115 ACE_Message_Block
&message_block
);
118 // These methods are called by the framework
120 /// This is called when asynchronous <read> operation from the socket
122 virtual void handle_read_stream (const ACE_Asynch_Read_Stream::Result
125 /// This is called when an asynchronous <write> to the file
127 virtual void handle_write_stream (const ACE_Asynch_Write_Stream::Result
131 int initiate_read_stream ();
132 int initiate_write_stream (ACE_Message_Block
& mb
, int nBytes
);
133 bool check_destroy () ;
135 ACE_Asynch_Read_Stream rs_
;
136 ACE_Asynch_Write_Stream ws_
;
140 static long nSessions
;
144 long Receiver::nSessions
= 0 ;
146 Receiver::Receiver ()
147 : handle_ (ACE_INVALID_HANDLE
),
150 ACE_GUARD (MyMutex
, locker
, m_Mtx
);
152 ACE_DEBUG ((LM_DEBUG
, "Receiver Ctor nSessions=%d\n", nSessions
));
155 Receiver::~Receiver ()
157 ACE_GUARD (MyMutex
, locker
, m_Mtx
);
159 ACE_OS::closesocket (this->handle_
);
160 ACE_DEBUG ((LM_DEBUG
, "~Receiver Dtor nSessions=%d\n", nSessions
));
163 //---------------------------------------------------------------------
164 // return true if we alive, false we commited suicide
166 //---------------------------------------------------------------------
167 bool Receiver::check_destroy ()
170 ACE_GUARD_RETURN (MyMutex
, locker
, m_Mtx
, false);
183 void Receiver::open (ACE_HANDLE handle
,
186 ACE_DEBUG ((LM_DEBUG
,
187 "%N:%l:Receiver::open called\n"));
190 this->handle_
= handle
;
192 if (this->ws_
.open (*this, this->handle_
) == -1)
194 ACE_ERROR ((LM_ERROR
,
196 "ACE_Asynch_Write_Stream::open"));
198 else if (this->rs_
.open (*this, this->handle_
) == -1)
200 ACE_ERROR ((LM_ERROR
,
202 "ACE_Asynch_Read_Stream::open"));
206 initiate_read_stream ();
213 int Receiver::initiate_read_stream ()
215 ACE_GUARD_RETURN (MyMutex
, locker
, m_Mtx
, -1);
217 // Create a new <Message_Block>. Note that this message block will
218 // be used both to <read> data asynchronously from the socket and to
219 // <write> data asynchronously to the file.
220 ACE_DEBUG ((LM_DEBUG
,
221 "initiate_read_stream called\n"));
224 ACE_Message_Block
*mb
= 0;
226 ACE_Message_Block (BUFSIZ
+ 1),
230 if (this->rs_
.read (*mb
, mb
->size ()- 1) == -1)
233 ACE_ERROR_RETURN ((LM_ERROR
,
235 "ACE_Asynch_Read_Stream::read"),
243 int Receiver::initiate_write_stream (ACE_Message_Block
& mb
, int nBytes
)
245 ACE_GUARD_RETURN (MyMutex
, locker
, m_Mtx
, -1);
246 if (this->ws_
.write (mb
, nBytes
) == -1)
249 ACE_ERROR_RETURN((LM_ERROR
,
251 "ACE_Asynch_Write_File::write"),
260 Receiver::handle_read_stream (const ACE_Asynch_Read_Stream::Result
&result
)
262 ACE_DEBUG ((LM_DEBUG
,
263 "handle_read_stream called\n"));
266 result
.message_block ().rd_ptr ()[result
.bytes_transferred ()] =
269 ACE_DEBUG ((LM_DEBUG
, "********************\n"));
270 ACE_DEBUG ((LM_DEBUG
, "%s = %d\n", "bytes_to_read", result
.bytes_to_read
272 ACE_DEBUG ((LM_DEBUG
, "%s = %d\n", "handle", result
.handle ()));
273 ACE_DEBUG ((LM_DEBUG
, "%s = %d\n", "bytes_transfered",
274 result
.bytes_transferred ()));
275 ACE_DEBUG ((LM_DEBUG
, "%s = %d\n", "act", (u_long
) result
.act ()));
276 ACE_DEBUG ((LM_DEBUG
, "%s = %d\n", "success", result
.success ()));
277 ACE_DEBUG ((LM_DEBUG
, "%s = %d\n", "completion_key", (u_long
)
278 result
.completion_key ()));
279 ACE_DEBUG ((LM_DEBUG
, "%s = %d\n", "error", result
.error ()));
280 ACE_DEBUG ((LM_DEBUG
, "********************\n"));
281 ACE_DEBUG ((LM_DEBUG
, "%s = %s\n", "message_block",
282 result
.message_block ().rd_ptr ()));
284 if ( result
.success () && result
.bytes_transferred () != 0)
286 // Successful read: write the data to the file asynchronously.
287 // Note how we reuse the <ACE_Message_Block> for the writing.
288 // Therefore, we do not delete this buffer because it is handled
289 // in <handle_write_stream>.
291 if(this->initiate_write_stream (result
.message_block (),
293 result
.bytes_transferred () ) == 0 )
297 // Initiate new read from the stream.
298 this->initiate_read_stream () ;
304 result
.message_block ().release ();
305 ACE_DEBUG ((LM_DEBUG
, "Receiver completed\n"));
309 ACE_GUARD (MyMutex
, locker
, m_Mtx
);
316 Receiver::handle_write_stream (const ACE_Asynch_Write_Stream::Result
319 ACE_DEBUG ((LM_DEBUG
, "handle_write_stream called\n"));
321 ACE_DEBUG ((LM_DEBUG
, "********************\n"));
322 ACE_DEBUG ((LM_DEBUG
, "%s = %d\n", "bytes_to_write",
323 result
.bytes_to_write ()));
324 ACE_DEBUG ((LM_DEBUG
, "%s = %d\n", "handle", result
.handle ()));
325 ACE_DEBUG ((LM_DEBUG
, "%s = %d\n", "bytes_transfered",
326 result
.bytes_transferred ()));
327 ACE_DEBUG ((LM_DEBUG
, "%s = %d\n", "act", (u_long
) result
.act ()));
328 ACE_DEBUG ((LM_DEBUG
, "%s = %d\n", "success", result
.success ()));
329 ACE_DEBUG ((LM_DEBUG
, "%s = %d\n", "completion_key", (u_long
)
330 result
.completion_key ()));
331 ACE_DEBUG ((LM_DEBUG
, "%s = %d\n", "error", result
.error ()));
332 ACE_DEBUG ((LM_DEBUG
, "********************\n"));
334 result
.message_block ().release ();
336 if (result
.success ())
338 // This code is not robust enough to deal with short file writes
339 // (which hardly ever happen) ;-)
340 //ACE_ASSERT (result.bytes_to_write () == result.bytes_transferred ());
344 initiate_read_stream () ;
349 ACE_GUARD (MyMutex
, locker
, m_Mtx
);
355 //-------------------------------------------------------------------------
356 // Sender: sends indefinetely welcome message
357 // and recieves it back
358 //------------------------------------------------------------------------
359 class Sender
: public ACE_Handler
365 //FUZZ: disable check_for_lack_ACE_OS
366 ///FUZZ: enable check_for_lack_ACE_OS
367 int open (const ACE_TCHAR
*host
, u_short port
);
370 ACE_HANDLE
handle () const;
371 void handle (ACE_HANDLE
);
374 // These methods are called by the freamwork
376 /// This is called when asynchronous reads from the socket complete
377 virtual void handle_read_stream (const ACE_Asynch_Read_Stream::Result
380 /// This is called when asynchronous writes from the socket complete
381 virtual void handle_write_stream (const ACE_Asynch_Write_Stream::Result
385 int initiate_read_stream ();
386 int initiate_write_stream ();
388 /// Network I/O handle
389 ACE_SOCK_Stream stream_
;
391 /// ws (write stream): for writing to the socket
392 ACE_Asynch_Write_Stream ws_
;
394 /// rs (read file): for reading from the socket
395 ACE_Asynch_Read_Stream rs_
;
398 ACE_Message_Block welcome_message_
;
404 static const char *data
= "Welcome to Irfan World! Irfan RULES here !!\n";
409 // Moment of inspiration... :-)
410 this->welcome_message_
.init (data
, ACE_OS::strlen (data
));
418 void Sender::close ()
420 this->stream_
.close ();
423 ACE_HANDLE
Sender::handle () const
425 return this->stream_
.get_handle ();
428 void Sender::handle (ACE_HANDLE handle
)
430 this->stream_
.set_handle (handle
);
433 int Sender::open (const ACE_TCHAR
*host
, u_short port
)
436 // Connect to remote host
437 ACE_INET_Addr
address (port
, host
);
438 ACE_SOCK_Connector connector
;
440 if (connector
.connect (this->stream_
,
443 ACE_ERROR_RETURN ((LM_ERROR
,
445 "ACE_SOCK_Connector::connect"),
449 // Open ACE_Asynch_Write_Stream
450 if (this->ws_
.open (*this) == -1)
451 ACE_ERROR_RETURN ((LM_ERROR
,
453 "ACE_Asynch_Write_Stream::open"),
456 // Open ACE_Asynch_Read_Stream
457 if (this->rs_
.open (*this) == -1)
458 ACE_ERROR_RETURN ((LM_ERROR
,
460 "ACE_Asynch_Read_File::open"),
463 // Start an asynchronous transmit file
464 if ( this->initiate_write_stream () == -1)
469 // Start an asynchronous read file
470 if (this->initiate_read_stream () == -1)
477 int Sender::initiate_write_stream ()
479 ACE_GUARD_RETURN (MyMutex
, locker
, m_Mtx
, -1);
481 welcome_message_
.rd_ptr( welcome_message_
.base ());
482 welcome_message_
.wr_ptr( welcome_message_
.base ());
483 welcome_message_
.wr_ptr (ACE_OS::strlen (data
));
485 if (this->ws_
.write (welcome_message_
,
486 welcome_message_
.length ()
489 ACE_ERROR_RETURN((LM_ERROR
,
491 "ACE_Asynch_Write_File::write"),
499 int Sender::initiate_read_stream ()
501 ACE_GUARD_RETURN (MyMutex
, locker
, m_Mtx
, -1);
503 // Create a new <Message_Block>. Note that this message block will
504 // be used both to <read> data asynchronously from the socket and to
505 // <write> data asynchronously to the file.
506 ACE_DEBUG ((LM_DEBUG
,
507 "initiate_read_stream called\n"));
510 ACE_Message_Block
*mb
= 0;
512 ACE_Message_Block (BUFSIZ
+ 1),
516 if (this->rs_
.read (*mb
, mb
->size ()- 1) == -1)
519 ACE_ERROR_RETURN ((LM_ERROR
,
521 "ACE_Asynch_Read_Stream::read"),
530 void Sender::handle_write_stream (const ACE_Asynch_Write_Stream::Result
533 ACE_DEBUG ((LM_DEBUG
,
534 "handle_write_stream called\n"));
537 result
.message_block ().rd_ptr (result
.message_block ().rd_ptr () -
538 result
.bytes_transferred ());
541 ACE_DEBUG ((LM_DEBUG
, "********************\n"));
542 ACE_DEBUG ((LM_DEBUG
, "%s = %d\n", "bytes_to_write",
543 result
.bytes_to_write ()));
544 ACE_DEBUG ((LM_DEBUG
, "%s = %d\n", "handle", result
.handle ()));
545 ACE_DEBUG ((LM_DEBUG
, "%s = %d\n", "bytes_transfered",
546 result
.bytes_transferred ()));
547 ACE_DEBUG ((LM_DEBUG
, "%s = %d\n", "act", (u_long
) result
.act ()));
548 ACE_DEBUG ((LM_DEBUG
, "%s = %d\n", "success", result
.success ()));
549 ACE_DEBUG ((LM_DEBUG
, "%s = %d\n", "completion_key", (u_long
)
550 result
.completion_key ()));
551 ACE_DEBUG ((LM_DEBUG
, "%s = %d\n", "error", result
.error ()));
552 ACE_DEBUG ((LM_DEBUG
, "********************\n"));
553 ACE_DEBUG ((LM_DEBUG
, "%s = %s\n", "message_block",
554 result
.message_block ().rd_ptr ()));
556 // Simplify just for Test
557 if (result
.success () && result
.bytes_transferred () != 0)
559 if ( duplex
!= 0 ) // full duplex, continue write
561 initiate_write_stream () ;
563 else // half-duplex read reply, after read we will start
566 initiate_read_stream () ;
571 ACE_GUARD_RETURN (MyMutex
, locker
, m_Mtx
);
577 Sender::handle_read_stream (const ACE_Asynch_Read_Stream::Result
&result
)
579 ACE_DEBUG ((LM_DEBUG
,
580 "handle_read_stream called\n"));
583 result
.message_block ().rd_ptr ()[result
.bytes_transferred ()] =
586 ACE_DEBUG ((LM_DEBUG
, "********************\n"));
587 ACE_DEBUG ((LM_DEBUG
, "%s = %d\n", "bytes_to_read", result
.bytes_to_read
589 ACE_DEBUG ((LM_DEBUG
, "%s = %d\n", "handle", result
.handle ()));
590 ACE_DEBUG ((LM_DEBUG
, "%s = %d\n", "bytes_transfered",
591 result
.bytes_transferred ()));
592 ACE_DEBUG ((LM_DEBUG
, "%s = %d\n", "act", (u_long
) result
.act ()));
593 ACE_DEBUG ((LM_DEBUG
, "%s = %d\n", "success", result
.success ()));
594 ACE_DEBUG ((LM_DEBUG
, "%s = %d\n", "completion_key", (u_long
)
595 result
.completion_key ()));
596 ACE_DEBUG ((LM_DEBUG
, "%s = %d\n", "error", result
.error ()));
597 ACE_DEBUG ((LM_DEBUG
, "********************\n"));
598 ACE_DEBUG ((LM_DEBUG
, "%s = %s\n", "message_block",
599 result
.message_block ().rd_ptr ()));
601 result
.message_block().release ();
603 if ( result
.success () && result
.bytes_transferred () != 0)
605 // Successful read: write the data to the file asynchronously.
606 // Note how we reuse the <ACE_Message_Block> for the writing.
607 // Therefore, we do not delete this buffer because it is handled
608 // in <handle_write_stream>.
610 if ( duplex
!= 0 ) // full duplex, continue read
612 initiate_read_stream () ;
614 else // half-duplex writey, after write we will start read
616 initiate_write_stream () ;
621 ACE_GUARD (MyMutex
, locker
, m_Mtx
);
626 //--------------------------------------------------------------------------
629 parse_args (int argc
, ACE_TCHAR
*argv
[])
631 ACE_Get_Opt
get_opt (argc
, argv
, ACE_TEXT("n:p:d:h:"));
634 while ((c
= get_opt ()) != EOF
)
638 host
= get_opt
.opt_arg ();
641 nThreads
= ACE_OS::atoi (get_opt
.opt_arg ()) ;
644 port
= ACE_OS::atoi (get_opt
.opt_arg ());
647 duplex
= ACE_OS::atoi (get_opt
.opt_arg ());
650 ACE_ERROR ((LM_ERROR
, "%p.\n",
652 "-h <host> for Sender mode\n"
653 "-d <duplex mode 1-on/0-off>\n"
654 "-p <port to listen/connect>\n"
655 "-n <number threads for Proactor pool>\n"));
663 ACE_TMAIN (int argc
, ACE_TCHAR
*argv
[])
665 ACE_UNUSED_ARG (initial_read_size
);
667 if (parse_args (argc
, argv
) == -1)
670 #if defined (ACE_HAS_WIN32_OVERLAPPED_IO)
672 ACE_WIN32_Proactor
* pImpl
= new ACE_WIN32_Proactor
;
674 #elif defined (ACE_HAS_AIO_CALLS)
676 // ACE_POSIX_AIOCB_Proactor * pImpl = new ACE_POSIX_AIOCB_Proactor;
677 ACE_POSIX_SIG_Proactor
* pImpl
= new ACE_POSIX_SIG_Proactor
;
680 ACE_Proactor
Proactor ( pImpl
,1 );
682 ACE_Proactor::instance( & Proactor
);
687 if (Task1
.activate (THR_NEW_LWP
, nThreads
) == -1)
689 ACE_ERROR_RETURN ((LM_ERROR
, "%p.\n", "main"), -1);
693 ACE_Asynch_Acceptor
<Receiver
> acceptor
;
697 if ( host
== 0 ) // Acceptor
699 // Simplify , initial read with zero size
700 Rc
= acceptor
.open (ACE_INET_Addr (port
),0,1);
704 Rc
= sender
.open (host
, port
);
710 cout
<< "Press any key to stop and exit=>\n" << flush
;
715 ACE_Proactor::end_event_loop () ;
717 if ( host
!= 0 ) // we are sender
719 sender
.close () ; // disconnect to get reciever error !!!
723 ACE_Thread_Manager
* pTM
= ACE_Thread_Manager::instance();
725 pTM
->wait_task ( & Task1
) ;
727 ACE_Proactor::instance( ( ACE_Proactor
* )0 );
731 //--------------------------------------------------------------------
733 //--------------------------------------------------------------------
734 int DisableSignal ( int SigNum
)
738 if ( ACE_OS::sigemptyset (&signal_set
) == - 1 )
740 ACE_ERROR ((LM_ERROR
,
741 "Error:(%P | %t):%p\n",
742 "sigemptyset failed"));
745 ACE_OS::sigaddset (&signal_set
, SigNum
);
747 // Put the <signal_set>.
748 if (ACE_OS::pthread_sigmask (SIG_BLOCK
, &signal_set
, 0) != 0)
750 ACE_ERROR ((LM_ERROR
,
751 "Error:(%P | %t):%p\n",
752 "pthread_sigmask failed"));
755 ACE_UNUSED_ARG(SigNum
);
760 //--------------------------------------------------------------------
761 // Get the <signal_set> back from the OS.
762 //--------------------------------------------------------------------
771 COUT ( "\n=============Signal Mask==========" )
773 if (ACE_OS::pthread_sigmask (SIG_SETMASK
, 0, & mask
) != 0)
775 ACE_ERROR ((LM_ERROR
,
776 "Error:(%P | %t):%p\n",
777 "ACE_OS::pthread_sigmask failed"));
779 else for (int i
= 1 ; i
< 1000; i
++)
781 member
= ACE_OS::sigismember (&mask
,i
);
797 #else /* ACE_HAS_WIN32_OVERLAPPED_IO || ACE_HAS_AIO_CALLS */
800 ACE_TMAIN (int, ACE_TCHAR
*[])
802 ACE_DEBUG ((LM_DEBUG
,
803 "This example does not work on this platform.\n"));
807 #endif /* ACE_HAS_WIN32_OVERLAPPED_IO || ACE_HAS_AIO_CALLS */