1 // ============================================================================
3 * @file Proactor_Scatter_Gather_Test.cpp
5 * The test runs on a single thread, and involves a single Sender,
6 * two Receivers and a single Writer. The Sender async-reads
7 * (scattered) from a file into chunks of <page size>. It
8 * async-sends (gathered) the odd chunks to the first receiver over a
9 * stream, and the even chunks to the second receiver over a
10 * different stream. The receivers async-read (scattered) from the
11 * socket streams into chunks in size of <page size>, and convey the
12 * data to the Writer. The Writer reconstructs the file using
13 * async-write (gathered). Then, the reconstructed file is compared
14 * to the original file to determine test success. So, It covers both
15 * async scatter/gather stream I/O and async scatter/gather file I/O.
16 * The wire transfer protocol is very naive (and totally non
17 * reliable...) - when both connections are closed, EOF is assumed.
18 * The test can be also run in a separated sender and receiver mode,
19 * to test real network influences.
21 * This test is based upon some building blocks taken from the
24 * @author Edan Ayal <edanayal@yahoo.com> */
25 // ============================================================================
27 #include "test_config.h"
29 #if defined (ACE_HAS_WIN32_OVERLAPPED_IO)
30 // This currently only works on Win32 platforms (NT SP2 and above).
31 // Support for Unix platforms supporting POSIX aio calls should be added in future.
33 #include "ace/Get_Opt.h"
35 #include "ace/Proactor.h"
36 #include "ace/Asynch_Acceptor.h"
37 #include "ace/Asynch_Connector.h"
38 #include "ace/Mem_Map.h"
39 #include "ace/Min_Max.h"
40 #include "ace/OS_NS_math.h"
41 #include "ace/OS_NS_sys_stat.h"
42 #include "ace/OS_NS_fcntl.h"
43 #include "ace/OS_NS_unistd.h"
45 #include "ace/SOCK_Connector.h"
47 // For the Acceptor/Connector handlers maintenance lists
48 static const int SENDERS
= 1;
49 static const int RECEIVERS
= 2;
51 // Port that we're receiving connections on.
52 static u_short port
= ACE_DEFAULT_SERVER_PORT
;
54 static const ACE_TCHAR
*host
= ACE_LOCALHOST
;
56 // File that we're sending.
57 static const ACE_TCHAR
*input_file
= ACE_TEXT("Proactor_Scatter_Gather_Test.cpp");
59 // Name of the output file.
60 static const ACE_TCHAR
*output_file
= ACE_TEXT("output");
62 static int client_only
= 0;
63 static int server_only
= 0;
64 static size_t chunk_size
= 0;
72 // *************************************************************
73 // Some chunks chain helper routines
74 // *************************************************************
75 static int allocate_chunks_chain (ACE_Message_Block
*&head_mb
,
76 size_t number_of_chunks
)
78 ACE_Message_Block
*pre_mb
= 0;
80 for (size_t index
= 0; index
< number_of_chunks
; ++index
)
82 #if defined (ACE_WIN32)
83 void *addr
= ::VirtualAlloc (0,
88 void *addr
= new char[chunk_size
];
89 #endif /* ACE_WIN32 */
92 ACE_Message_Block
*mb
= new ACE_Message_Block (static_cast<char *> (addr
),
97 // chain them together
113 free_chunks_chain (ACE_Message_Block
*&mb
)
115 for (const ACE_Message_Block
* msg
= mb
;
119 #if defined (ACE_WIN32)
120 ::VirtualFree (msg
->base (),
124 delete [] msg
->base ();
125 #endif /* ACE_WIN32 */
133 last_chunk (ACE_Message_Block
*chain
,
134 ACE_Message_Block
*&last
)
141 while (0 != last
->cont ())
143 last
= last
->cont ();
151 merge_odd_even_chains (ACE_Message_Block
*odd_mb
,
152 ACE_Message_Block
*even_mb
)
154 ACE_Message_Block
*pre_pre_mb
= odd_mb
;
155 ACE_Message_Block
*pre_mb
= even_mb
;
156 ACE_Message_Block
*curr_mb
= odd_mb
->cont ();
160 for (; curr_mb
!= 0; curr_mb
= pre_pre_mb
->cont ())
162 pre_pre_mb
->cont (pre_mb
);
164 // increment history pointers
169 pre_pre_mb
->cont (pre_mb
);
175 split_odd_even_chains (ACE_Message_Block
*odd_mb
,
176 ACE_Message_Block
*even_mb
)
178 ACE_Message_Block
*pre_pre_mb
= odd_mb
;
179 ACE_Message_Block
*pre_mb
= even_mb
;
180 ACE_Message_Block
*curr_mb
= (even_mb
? even_mb
->cont () : 0);
182 for (; curr_mb
!= 0; curr_mb
= curr_mb
->cont ())
184 pre_pre_mb
->cont (curr_mb
);
186 // increment history pointers
191 pre_pre_mb
->cont (0);
197 add_to_chunks_chain (ACE_Message_Block
*&chunks_chain
,
198 ACE_Message_Block
*additional_chunks_chain
)
200 if (0 == chunks_chain
)
201 chunks_chain
= additional_chunks_chain
;
204 ACE_Message_Block
*last
= 0;
205 last_chunk (chunks_chain
, last
);
207 last
->cont (additional_chunks_chain
);
212 remove_empty_chunks (ACE_Message_Block
*&chunks_chain
)
214 if (0 == chunks_chain
)
217 ACE_Message_Block
*first_empty
= chunks_chain
;
218 ACE_Message_Block
*pre_mb
= 0;
220 while (first_empty
->length () > 0 &&
221 0 != first_empty
->cont ())
223 pre_mb
= first_empty
;
224 first_empty
= first_empty
->cont ();
227 // break the chain there, and release the empty end (might be everything)
228 if (0 == first_empty
->length ())
230 if (pre_mb
) // might be 0, in case it's the entire chain
233 if (first_empty
== chunks_chain
)
236 free_chunks_chain (first_empty
);
240 // *************************************************************
241 // Acceptor, Receiver and Writer
242 // *************************************************************
245 class Acceptor
: public ACE_Asynch_Acceptor
<Receiver
>
247 friend class Receiver
;
251 virtual ~Acceptor ();
255 // Virtual from ACE_Asynch_Acceptor
256 virtual Receiver
*make_handler ();
258 int get_number_sessions () { return this->sessions_
; }
261 void on_new_receiver (Receiver
&rcvr
);
262 void on_delete_receiver (Receiver
&rcvr
);
265 Receiver
*list_receivers_
[RECEIVERS
];
270 // The first instantiated take the role of the odd receiver
271 class Receiver
: public ACE_Service_Handler
273 friend class Acceptor
;
277 Receiver (Acceptor
*acceptor
= 0, int index
= -1);
278 virtual ~Receiver ();
280 //FUZZ: disable check_for_lack_ACE_OS
281 /// This is called after the new connection has been accepted.
282 virtual void open (ACE_HANDLE handle
,
283 ACE_Message_Block
&message_block
);
284 //FUZZ: enable check_for_lack_ACE_OS
287 /// This is called by the framework when asynchronous <read> operation from the
288 /// socket completes.
289 virtual void handle_read_stream (const ACE_Asynch_Read_Stream::Result
&result
);
292 int initiate_read_stream ();
294 void check_destroy ();
300 ACE_Asynch_Read_Stream rs_
;
301 ACE_HANDLE socket_handle_
;
304 static Writer
* writer_
;
310 // if we get non-page-size reminder, we will not send it to the writer
311 // until it is full (unless at end)
312 ACE_Message_Block
*partial_chunk_
;
315 class Writer
: public ACE_Handler
317 friend class Receiver
;
323 //FUZZ: disable check_for_lack_ACE_OS
325 //FUZZ: enable check_for_lack_ACE_OS
327 // this is *not* a callback from the framework
328 int handle_read_chunks_chain (ACE_Message_Block
*mb
,
331 // for determining when last receiver dies
332 void on_new_receiver ();
333 void on_delete_receiver ();
336 /// This is called by the framework when an asynchronous <write> to the file
338 virtual void handle_write_file (const ACE_Asynch_Write_File::Result
&result
);
341 int initiate_write_file ();
345 ACE_Asynch_Write_File wf_
;
346 ACE_HANDLE output_file_handle_
;
347 u_long writing_file_offset_
;
348 u_long reported_file_offset_
;
349 ACE_Message_Block
*odd_chain_
;
350 ACE_Message_Block
*even_chain_
;
352 char receiver_count_
;
355 // *************************************************************
357 // *************************************************************
359 Writer
*Receiver::writer_
= 0;
361 Receiver::Receiver (Acceptor
* acceptor
, int index
)
362 : acceptor_ (acceptor
),
364 socket_handle_ (ACE_INVALID_HANDLE
),
368 // the first one is the odd one
369 this->odd_
= ((0 == index
) ? 1 : 0);
373 Receiver::writer_
= new Writer
;
374 if (!Receiver::writer_
)
381 Receiver::writer_
->on_new_receiver ();
383 if (this->acceptor_
!= 0)
384 this->acceptor_
->on_new_receiver (*this);
387 Receiver::~Receiver ()
389 ACE_DEBUG ((LM_DEBUG
,
390 ACE_TEXT ("Receiver::~Receiver\n")));
392 if (this->acceptor_
!= 0)
393 this->acceptor_
->on_delete_receiver (*this);
395 if (this->socket_handle_
!= ACE_INVALID_HANDLE
)
396 ACE_OS::closesocket (this->socket_handle_
);
398 Receiver::writer_
->on_delete_receiver ();
400 if (this->partial_chunk_
)
402 ACE_TEST_ASSERT (0); // should not be getting here
403 this->partial_chunk_
->release ();
408 Receiver::check_destroy ()
410 if (this->io_count_
<= 0)
415 Receiver::open (ACE_HANDLE handle
, ACE_Message_Block
&)
417 this->socket_handle_
= handle
;
419 // Open the ACE_Asynch_Read_Stream
420 if (this->rs_
.open (*this, this->socket_handle_
) == -1)
421 ACE_ERROR ((LM_ERROR
,
423 ACE_TEXT ("Receiver::ACE_Asynch_Read_Stream::open")));
427 Receiver::writer_
->open ();
429 this->initiate_read_stream ();
432 this->check_destroy ();
436 Receiver::initiate_read_stream ()
438 if (!Receiver::writer_
)
441 // how many chunks to allocate?
442 size_t number_of_new_chunks
= (this->partial_chunk_
?
443 (ACE_IOV_MAX
/ RECEIVERS
) - 1
444 : ACE_IOV_MAX
/ RECEIVERS
);
446 // allocate chunks chain
447 ACE_Message_Block
*head_mb
= 0;
448 if (-1 == allocate_chunks_chain (head_mb
, number_of_new_chunks
))
454 // calculate how many bytes to read
456 // head_mb could be 0 (no new chunks allocated)
457 size_t bytes_to_read
= head_mb
? head_mb
->total_size () : 0;
459 // add the partial chunk at the front if appropriate, and update
460 // the number of bytes to read
461 if (this->partial_chunk_
)
463 bytes_to_read
+= this->partial_chunk_
->space ();
464 this->partial_chunk_
->cont (head_mb
);
465 head_mb
= this->partial_chunk_
;
466 this->partial_chunk_
= 0;
469 ACE_DEBUG ((LM_DEBUG
,
470 ACE_TEXT ("Receiver::initiate_read_stream - (%s) readv %d\n"),
471 this->odd_
? ACE_TEXT ("ODD ") : ACE_TEXT ("EVEN"),
474 // perform the actual scattered read
475 if (this->rs_
.readv (*head_mb
,
476 bytes_to_read
) == -1)
478 free_chunks_chain (head_mb
);
480 ACE_ERROR_RETURN ((LM_ERROR
,
482 ACE_TEXT ("Receiver::ACE_Asynch_Stream::read")),
491 Receiver::handle_read_stream (const ACE_Asynch_Read_Stream::Result
&result
)
493 ACE_Message_Block
*mb
= &result
.message_block ();
495 ACE_DEBUG ((LM_DEBUG
,
496 ACE_TEXT ("Receiver::handle_read_stream - (%s) read %d\n"),
497 this->odd_
? ACE_TEXT ("ODD ") : ACE_TEXT ("EVEN"),
498 result
.bytes_transferred ()));
500 // Transfer only complete chunks to the writer.
501 // Save last partial chunk for the next call.
502 // On disconnect (error or 0 transferred), transfer whatever we have.
504 // at this stage there should not be anything there
505 ACE_TEST_ASSERT (!this->partial_chunk_
);
507 // first, remove the empty chunks
508 remove_empty_chunks (mb
);
510 if (mb
&& Receiver::writer_
)
511 { // there's something to write, and who to write to
513 // write everything or only complete chunks?
515 // write everything - when no new bytes were transferred
516 int write_everything
= 0;
517 if (!result
.bytes_transferred ())
518 write_everything
= 1;
519 if (write_everything
)
520 Receiver::writer_
->handle_read_chunks_chain (mb
,
521 this->odd_
? ODD
: EVEN
);
523 { // filter out the partial chunk at the end (if present)
524 // and save it for later before writing the full chunks
526 // have this->partial_chunk_ point to the last chunk in the chain
527 size_t last_index
= last_chunk (mb
, this->partial_chunk_
);
528 if (this->partial_chunk_
&&
529 this->partial_chunk_
->length () < chunk_size
)
530 { // found partial chunk at end of chain
531 // detach it from the chain
532 if (last_index
> 1) // chain bigger than 1
534 ACE_Message_Block
*pre_last
= mb
;
535 for (size_t index
= 1; index
< last_index
- 1; ++index
)
536 pre_last
= pre_last
->cont ();
538 // detach partial chunk from chain
542 // chain in length of 1 - so we need to zero mb
545 else // last is a full chunk, so hand it over with the rest
546 this->partial_chunk_
= 0;
548 // transfer (if there's anything left)
549 if (mb
&& mb
->total_length ())
550 Receiver::writer_
->handle_read_chunks_chain (
552 this->odd_
? ODD
: EVEN
);
554 // initiate more reads only if no error
555 if (!result
.error ())
556 this->initiate_read_stream ();
561 else if (mb
&& !Receiver::writer_
)
562 // no one to write to
563 free_chunks_chain (mb
);
567 this->check_destroy ();
570 // *************************************************************
572 // *************************************************************
574 Acceptor::Acceptor ()
577 for (int i
= 0; i
< RECEIVERS
; ++i
)
578 this->list_receivers_
[i
] = 0;
581 Acceptor::~Acceptor ()
590 // This method can be called only after proactor event loop is done
592 for (int i
= 0; i
< RECEIVERS
; ++i
)
594 delete this->list_receivers_
[i
];
595 this->list_receivers_
[i
] = 0;
600 Acceptor::on_new_receiver (Receiver
& rcvr
)
603 this->list_receivers_
[rcvr
.index_
] = &rcvr
;
604 ACE_DEBUG ((LM_DEBUG
,
605 ACE_TEXT ("Receiver::CTOR sessions_ = %d\n"),
610 Acceptor::on_delete_receiver (Receiver
& rcvr
)
614 && rcvr
.index_
< RECEIVERS
615 && this->list_receivers_
[rcvr
.index_
] == &rcvr
)
616 this->list_receivers_
[rcvr
.index_
] = 0;
618 ACE_DEBUG ((LM_DEBUG
,
619 ACE_TEXT ("Receiver::~DTOR sessions_ = %d\n"),
624 Acceptor::make_handler ()
626 if (this->sessions_
>= RECEIVERS
)
629 for (int i
= 0; i
< RECEIVERS
; ++i
)
631 if (this->list_receivers_
[i
] == 0)
633 ACE_NEW_RETURN (this->list_receivers_
[i
],
636 return this->list_receivers_
[i
];
643 // *************************************************************
645 // *************************************************************
648 : output_file_handle_ (ACE_INVALID_HANDLE
),
649 writing_file_offset_ (0),
650 reported_file_offset_ (0),
660 ACE_DEBUG ((LM_DEBUG
,
661 ACE_TEXT ("Writer::~Writer\n")));
663 if (this->output_file_handle_
!= ACE_INVALID_HANDLE
)
664 ACE_OS::close (this->output_file_handle_
);
666 Receiver::writer_
= 0;
670 Writer::on_new_receiver ()
672 ACE_DEBUG ((LM_DEBUG
,
673 ACE_TEXT ("Writer::on_new_receiver\n")));
675 ++this->receiver_count_
;
679 Writer::on_delete_receiver ()
681 ACE_DEBUG ((LM_DEBUG
,
682 ACE_TEXT ("Writer::on_delete_receiver\n")));
684 --this->receiver_count_
;
686 if (0 == this->receiver_count_
)
688 if (this->io_count_
<= 0)
689 // no pending io, so do the work oursleves
690 // (if pending io, they'll see the zero receiver count)
691 this->initiate_write_file ();
698 // Open the file for output
699 if (ACE_INVALID_HANDLE
== (this->output_file_handle_
= ACE_OS::open (output_file
,
700 O_CREAT
| _O_TRUNC
| _O_WRONLY
|\
701 FILE_FLAG_OVERLAPPED
|\
702 FILE_FLAG_NO_BUFFERING
,
703 ACE_DEFAULT_FILE_PERMS
)))
704 ACE_ERROR ((LM_ERROR
,
706 ACE_TEXT ("Writer::open::ACE_OS::open")));
707 // Open the ACE_Asynch_Write_File
708 else if (this->wf_
.open (*this, this->output_file_handle_
) == -1)
709 ACE_ERROR ((LM_ERROR
,
711 ACE_TEXT ("Writer::open::ACE_Asynch_Write_File::open")));
715 Writer::handle_read_chunks_chain (ACE_Message_Block
*mb
,
718 ACE_DEBUG ((LM_DEBUG
,
719 ACE_TEXT ("Writer::handle_read_chunks_chain - (%s) %d bytes\n"),
720 (type
== ODD
) ? ACE_TEXT ("ODD ") : ACE_TEXT ("EVEN"),
721 mb
->total_length ()));
723 add_to_chunks_chain (ODD
== type
? this->odd_chain_
: this->even_chain_
, mb
);
725 this->initiate_write_file ();
731 Writer::initiate_write_file ()
733 // find out how much can we merge
734 ACE_Message_Block
*dummy_last
= 0;
735 size_t odd_count
= last_chunk (this->odd_chain_
, dummy_last
);
736 size_t even_count
= last_chunk (this->even_chain_
, dummy_last
);
738 size_t merge_size
= ACE_MIN (ACE_MIN (odd_count
, even_count
),
739 (size_t) ACE_IOV_MAX
);
741 // the options here are as follows:
742 // io_count_ can be zero or greater.
743 // merge_size can be zero or not.
744 // if non zero merge, write the merge. ASSERT receiver_count_ is non zero too.
746 // if receiver_count_ is non zero, NOOP.
747 // if zero receiver_count_, we should write whatever is left,
748 // and terminate the writer at completion.
749 // if nothing to write, and io_count_ is zero too, terminate here.
751 if (0 == merge_size
&&
752 0 != this->receiver_count_
)
755 if (0 == merge_size
&&
756 0 == this->receiver_count_
&&
759 0 == this->io_count_
)
761 ACE_DEBUG ((LM_DEBUG
,
762 ACE_TEXT ("Writer::initiate_write_file")
763 ACE_TEXT (" - ending proactor event loop\n")));
765 ACE_Proactor::instance ()->end_event_loop ();
772 // if we reached nere and merge_size is zero, we should write whatever is
773 // in the queues (1 to 2 chunks together), so let's force the merge size to 1.
776 ACE_TEST_ASSERT (1 == odd_count
&& 1 >= even_count
);
780 // Now that we found out what we want to do, prepare the chain
781 // that will be written, and update the remainders
782 ACE_Message_Block
*new_odd_chain_head
= this->odd_chain_
;
783 ACE_Message_Block
*new_even_chain_head
= this->even_chain_
;
785 // locate the place for detachment in the chains
786 ACE_Message_Block
*pre_odd
= 0;
787 ACE_Message_Block
*pre_even
= 0;
788 for (size_t index
= 0; index
< merge_size
; ++index
)
790 pre_odd
= new_odd_chain_head
;
791 if (new_odd_chain_head
)
792 new_odd_chain_head
= new_odd_chain_head
->cont ();
793 pre_even
= new_even_chain_head
;
794 if (new_even_chain_head
)
795 new_even_chain_head
= new_even_chain_head
->cont ();
797 // now detach the chain
803 // perform merge between the two chains
804 merge_odd_even_chains (this->odd_chain_
, this->even_chain_
);
806 // and now finally perform the write
807 ACE_Message_Block
*united_mb
= this->odd_chain_
;
808 // update the remainders of the chains
809 this->odd_chain_
= new_odd_chain_head
;
810 this->even_chain_
= new_even_chain_head
;
811 size_t increment_writing_file_offset
= united_mb
->total_length ();
813 // Reconstruct the file
814 // Write the size, not the length, because we must write in chunks
816 ACE_DEBUG ((LM_DEBUG
,
817 ACE_TEXT ("Writer::initiate_write_file: write %d bytes at %d\n"),
818 united_mb
->total_size (),
819 this->writing_file_offset_
));
820 if (this->wf_
.writev (*united_mb
,
821 united_mb
->total_size (),
822 this->writing_file_offset_
) == -1)
824 free_chunks_chain (united_mb
);
826 ACE_ERROR_RETURN((LM_ERROR
,
828 ACE_TEXT ("Writer::initiate_write_file::ACE_Asynch_Write_Stream::writev")),
832 // we update now because otherwise, we'd have error when performing
833 // pipelined writing (that is, mulitple calls to write before the callbacks
835 this->writing_file_offset_
+=
836 static_cast<u_long
> (increment_writing_file_offset
);
842 Writer::handle_write_file (const ACE_Asynch_Write_File::Result
&result
)
844 ACE_Message_Block
*mb
= &result
.message_block ();
846 ACE_DEBUG ((LM_DEBUG
,
847 ACE_TEXT ("Writer::handle_write_file at offset %d wrote %d\n"),
848 this->reported_file_offset_
,
849 result
.bytes_transferred ()));
851 this->reported_file_offset_
+=
852 static_cast<u_long
> (result
.bytes_transferred ());
854 // Always truncate as required,
855 // because partial will always be the last write to a file
856 ACE_Message_Block
*last_mb
= mb
;
857 last_chunk (mb
, last_mb
);
859 if (last_mb
->space ())
860 ACE_OS::truncate (output_file
,
861 this->reported_file_offset_
-
862 static_cast<u_long
> (last_mb
->space ()));
864 free_chunks_chain (mb
);
869 if (0 == this->receiver_count_
&&
870 0 == this->io_count_
)
872 ACE_TEST_ASSERT (0 == this->odd_chain_
&& 0 == this->even_chain_
);
874 ACE_DEBUG ((LM_DEBUG
,
875 ACE_TEXT ("Writer::handle_write_file")
876 ACE_TEXT (" - ending proactor event loop\n")));
878 ACE_Proactor::instance ()->end_event_loop ();
884 // *************************************************************
885 // Connector and Sender
886 // *************************************************************
889 class Connector
: public ACE_Asynch_Connector
<Sender
>
895 virtual ~Connector ();
897 // Address to pass to Sender for secondary connect.
898 void set_address (const ACE_INET_Addr
&addr
);
899 const ACE_INET_Addr
&get_address ();
903 // Virtual from ACE_Asynch_Connector
904 virtual Sender
*make_handler ();
907 void on_new_sender (Sender
&rcvr
);
908 void on_delete_sender (Sender
&rcvr
);
912 Sender
*list_senders_
[SENDERS
];
915 class Sender
: public ACE_Service_Handler
917 friend class Connector
;
919 Sender (Connector
*connector
= 0, int index
= -1);
923 //FUZZ: disable check_for_lack_ACE_OS
924 /// This is called after the new connection has been established.
925 virtual void open (ACE_HANDLE handle
,
926 ACE_Message_Block
&message_block
);
927 //FUZZ: enable check_for_lack_ACE_OS
929 // This is called by the framework when asynchronous reads from the
931 virtual void handle_read_file (const ACE_Asynch_Read_File::Result
&result
);
933 // This is called by the framework when asynchronous writes from the
935 virtual void handle_write_stream (const ACE_Asynch_Write_Stream::Result
&result
);
938 void check_destroy ();
940 int initiate_read_file ();
942 int initiate_write_stream (ACE_Message_Block
&mb
);
945 Connector
* connector_
;
948 ACE_Asynch_Read_File rf_
;
949 ACE_HANDLE input_file_handle_
;
952 // Sockets to send to
953 // odd and even socket output streams
954 ACE_Asynch_Write_Stream ws_
[RECEIVERS
];
955 ACE_HANDLE socket_handle_
[RECEIVERS
];
960 // *************************************************************
962 // *************************************************************
964 Connector::Connector ()
967 for (int i
= 0; i
< SENDERS
; ++i
)
968 this->list_senders_
[i
] = 0;
971 Connector::~Connector ()
976 // Address to pass to Sender for secondary connect.
978 Connector::set_address (const ACE_INET_Addr
&addr
)
983 const ACE_INET_Addr
&
984 Connector::get_address ()
992 // This method can be called only after proactor event loop is done
995 for (int i
= 0; i
< SENDERS
; ++i
)
997 delete this->list_senders_
[i
];
998 this->list_senders_
[i
] = 0;
1003 Connector::on_new_sender (Sender
&sndr
)
1006 this->list_senders_
[sndr
.index_
] = &sndr
;
1007 ACE_DEBUG ((LM_DEBUG
,
1008 ACE_TEXT ("Sender::CTOR sessions_ = %d\n"),
1013 Connector::on_delete_sender (Sender
&sndr
)
1016 if (sndr
.index_
>= 0
1017 && sndr
.index_
< SENDERS
1018 && this->list_senders_
[sndr
.index_
] == &sndr
)
1019 this->list_senders_
[sndr
.index_
] = 0;
1021 ACE_DEBUG ((LM_DEBUG
,
1022 ACE_TEXT ("Sender::~DTOR sessions_ = %d\n"),
1027 Connector::make_handler ()
1029 if (this->sessions_
>= SENDERS
)
1032 for (int i
= 0; i
< SENDERS
; ++i
)
1034 if (this->list_senders_
[i
] == 0)
1036 ACE_NEW_RETURN (this->list_senders_
[i
],
1039 return this->list_senders_
[i
];
1046 // *************************************************************
1048 // *************************************************************
1050 Sender::Sender (Connector
* connector
, int index
)
1052 connector_ (connector
),
1053 input_file_handle_ (ACE_INVALID_HANDLE
),
1057 socket_handle_
[ODD
] = socket_handle_
[EVEN
] = ACE_INVALID_HANDLE
;
1059 if (this->connector_
!= 0)
1060 this->connector_
->on_new_sender (*this);
1065 ACE_DEBUG ((LM_DEBUG
,
1066 ACE_TEXT ("Sender::~Sender\n")));
1068 if (this->connector_
!= 0)
1069 this->connector_
->on_delete_sender (*this);
1071 if (this->socket_handle_
[ODD
] != ACE_INVALID_HANDLE
)
1072 ACE_OS::closesocket (this->socket_handle_
[ODD
]);
1074 if (this->socket_handle_
[EVEN
] != ACE_INVALID_HANDLE
)
1075 ACE_OS::closesocket (this->socket_handle_
[EVEN
]);
1077 if (this->input_file_handle_
!= ACE_INVALID_HANDLE
)
1078 ACE_OS::close (this->input_file_handle_
);
1081 ACE_Proactor::instance ()->end_event_loop ();
1084 // return true if we alive, false we commited suicide
1086 Sender::check_destroy ()
1088 if (this->io_count_
<= 0)
1093 Sender::open (ACE_HANDLE handle
, ACE_Message_Block
&)
1095 this->socket_handle_
[ODD
] = handle
;
1097 // Open the input file
1098 if (ACE_INVALID_HANDLE
== (this->input_file_handle_
=
1099 ACE_OS::open (input_file
,
1101 FILE_FLAG_OVERLAPPED
|\
1102 FILE_FLAG_NO_BUFFERING
,
1103 ACE_DEFAULT_FILE_PERMS
)))
1105 ACE_ERROR ((LM_ERROR
,
1107 ACE_TEXT ("Sender::open::ACE_OS::open")));
1111 // Now connect (w/o the connector factory) to the even (=second)
1112 // receiver. We don't connect thru the factory in order not to
1113 // instantiate another Sender.
1114 ACE_SOCK_Connector sock_connector
;
1115 ACE_SOCK_Stream sock_stream
;
1116 if (-1 == sock_connector
.connect (sock_stream
,
1117 this->connector_
->get_address ()))
1118 ACE_ERROR ((LM_ERROR
,
1120 ACE_TEXT ("Sender::open::ACE_SOCK_Connector::connect")));
1124 this->socket_handle_
[EVEN
] = sock_stream
.get_handle ();
1126 // Open odd ACE_Asynch_Write_Stream
1127 if (this->ws_
[ODD
].open (*this, this->socket_handle_
[ODD
]) == -1)
1128 ACE_ERROR ((LM_ERROR
,
1130 ACE_TEXT ("Sender::open::ACE_Asynch_Write_Stream::open")));
1132 // Open even ACE_Asynch_Write_Stream
1133 else if (this->ws_
[EVEN
].open (*this, this->socket_handle_
[EVEN
]) == -1)
1134 ACE_ERROR ((LM_ERROR
,
1136 ACE_TEXT ("Sender::open::ACE_Asynch_Write_Stream::open")));
1138 // Open ACE_Asynch_Read_File
1139 else if (this->rf_
.open (*this, this->input_file_handle_
) == -1)
1140 ACE_ERROR ((LM_ERROR
,
1142 ACE_TEXT ("Sender::open::ACE_Asynch_Read_File::open")));
1144 // Start an asynchronous read
1145 this->initiate_read_file ();
1149 this->check_destroy ();
1153 Sender::initiate_read_file ()
1155 ACE_TEST_ASSERT (0 == this->file_offset_
% chunk_size
);
1157 static const size_t file_size
= ACE_OS::filesize (input_file
);
1159 static const size_t number_of_chunks_needed_for_file
=
1160 static_cast<size_t> (ACE_OS::ceil ((double) file_size
/ chunk_size
));
1162 size_t relevant_number_of_chunks
=
1163 ACE_MIN ((size_t)ACE_IOV_MAX
,
1164 number_of_chunks_needed_for_file
1165 - (size_t)(this->file_offset_
/ chunk_size
));
1167 if (!relevant_number_of_chunks
)
1169 ACE_TEST_ASSERT (0); // Just 2 C it coming
1173 ACE_Message_Block
*head_mb
= 0;
1174 if (-1 == allocate_chunks_chain (head_mb
, relevant_number_of_chunks
))
1176 ACE_TEST_ASSERT (0);
1181 if (this->rf_
.readv (*head_mb
,
1182 head_mb
->total_size (),
1183 this->file_offset_
) == -1)
1185 free_chunks_chain (head_mb
);
1187 ACE_ERROR_RETURN ((LM_ERROR
,
1189 ACE_TEXT ("Sender::initiate_read_file::")
1190 ACE_TEXT ("ACE_Asynch_Read_Stream::readv")),
1199 Sender::initiate_write_stream (ACE_Message_Block
&mb
)
1201 // send the odd to the first connection, and the even to the second
1204 ACE_Message_Block
*odd_mb
= &mb
;
1205 ACE_Message_Block
*even_mb
= mb
.cont ();
1207 split_odd_even_chains (odd_mb
, even_mb
);
1209 ACE_DEBUG ((LM_DEBUG
,
1210 ACE_TEXT ("Sender::initiate_write_stream - (ODD ) writev %d\n"),
1211 odd_mb
->total_length ()));
1213 if (this->ws_
[ODD
].writev (*odd_mb
, odd_mb
->total_length ()) == -1)
1215 free_chunks_chain (odd_mb
);
1218 free_chunks_chain (even_mb
);
1220 ACE_ERROR_RETURN((LM_ERROR
,
1222 ACE_TEXT ("Sender::ACE_Asynch_Stream::writev")),
1230 ACE_DEBUG ((LM_DEBUG
,
1231 ACE_TEXT ("Sender::initiate_write_stream - (EVEN) writev %d\n"),
1232 even_mb
->total_length ()));
1234 if (this->ws_
[EVEN
].writev (*even_mb
, even_mb
->total_length ()) == -1)
1236 free_chunks_chain (even_mb
);
1238 ACE_ERROR_RETURN((LM_ERROR
,
1240 ACE_TEXT ("Sender::ACE_Asynch_Stream::writev")),
1251 Sender::handle_read_file (const ACE_Asynch_Read_File::Result
&result
)
1253 ACE_Message_Block
*mb
= &result
.message_block ();
1255 if (result
.error () == 0 && result
.bytes_transferred () != 0)
1257 size_t bytes_transferred
= result
.bytes_transferred ();
1258 size_t chunks_chain_size
= mb
->total_size ();
1259 ACE_DEBUG ((LM_DEBUG
,
1260 ACE_TEXT ("Sender::handle_read_file, read %d, ")
1261 ACE_TEXT ("chain total %d\n"),
1263 chunks_chain_size
));
1265 this->file_offset_
+= static_cast<u_long
> (bytes_transferred
);
1267 this->initiate_write_stream (*mb
);
1269 // and read more if required
1270 if (bytes_transferred
== chunks_chain_size
)
1271 this->initiate_read_file ();
1274 free_chunks_chain (mb
);
1278 this->check_destroy ();
1282 Sender::handle_write_stream (const ACE_Asynch_Write_Stream::Result
&result
)
1284 ACE_Message_Block
*mb
= &result
.message_block ();
1286 ACE_DEBUG ((LM_DEBUG
,
1287 ACE_TEXT ("Sender::handle_write_stream - wrote %d bytes\n"),
1288 result
.bytes_transferred ()));
1290 if (result
.error () == 0 && result
.bytes_transferred () != 0)
1292 ACE_TEST_ASSERT (0 == mb
->total_length ());
1294 ACE_TEST_ASSERT (0);
1296 free_chunks_chain (mb
);
1300 this->check_destroy ();
1303 // *************************************************************
1304 // Configuration helpers
1305 // *************************************************************
1307 print_usage (int /* argc */, ACE_TCHAR
*argv
[])
1311 ACE_TEXT ("\nusage: %s")
1312 ACE_TEXT ("\n-f <input file>\n")
1313 ACE_TEXT ("\n-c client only (reader-sender)")
1314 ACE_TEXT ("\n-s server only (receiver-writer)")
1315 ACE_TEXT ("\n-h host to connect to")
1316 ACE_TEXT ("\n-p port")
1317 ACE_TEXT ("\n-u show this message")
1325 parse_args (int argc
, ACE_TCHAR
*argv
[])
1327 if (argc
== 1) // no arguments , so one button test
1330 ACE_Get_Opt
get_opt (argc
, argv
, ACE_TEXT ("f:csh:p:u"));
1333 while ((c
= get_opt ()) != EOF
)
1338 input_file
= get_opt
.opt_arg ();
1349 host
= get_opt
.opt_arg ();
1352 port
= ACE_OS::atoi (get_opt
.opt_arg ());
1356 return print_usage (argc
, argv
);
1364 run_main (int argc
, ACE_TCHAR
*argv
[])
1366 ACE_START_TEST (ACE_TEXT ("Proactor_Scatter_Gather_Test"));
1368 if (::parse_args (argc
, argv
) == -1)
1371 chunk_size
= ACE_OS::getpagesize ();
1374 ACE_DEBUG ((LM_INFO
,
1375 ACE_TEXT ("Running as client only, page size %d\n"),
1377 else if (server_only
)
1378 ACE_DEBUG ((LM_INFO
,
1379 ACE_TEXT ("Running as server only, page size %d\n"),
1382 ACE_DEBUG ((LM_INFO
,
1383 ACE_TEXT ("Running as server and client, page size %d\n"),
1387 Connector connector
;
1388 ACE_INET_Addr
addr (port
);
1392 // Simplify, initial read with zero size
1393 if (-1 == acceptor
.open (addr
, 0, 1))
1395 ACE_TEST_ASSERT (0);
1402 if (-1 == connector
.open (1, ACE_Proactor::instance ()))
1404 ACE_TEST_ASSERT (0);
1408 // connect to first destination
1409 if (addr
.set (port
, host
, 1, addr
.get_type ()) == -1)
1410 ACE_ERROR_RETURN ((LM_ERROR
, ACE_TEXT ("%p\n"), host
), -1);
1411 connector
.set_address (addr
);
1412 if (-1 == connector
.connect (addr
))
1414 ACE_TEST_ASSERT (0);
1419 ACE_Proactor::instance ()->run_event_loop ();
1421 // As Proactor event loop now is inactive it is safe to destroy all
1427 ACE_Proactor::instance()->close_singleton ();
1429 // now compare the files - available only when on same machine
1432 if (!client_only
&& !server_only
)
1434 ACE_DEBUG ((LM_INFO
,
1435 ACE_TEXT ("Comparing the input file and the output file...\n")));
1438 // map the two files, then perform memcmp
1440 ACE_Mem_Map
original_file (input_file
);
1441 ACE_Mem_Map
reconstructed_file (output_file
);
1443 if (original_file
.addr () &&
1444 original_file
.addr () != MAP_FAILED
&&
1445 reconstructed_file
.addr () &&
1446 reconstructed_file
.addr () != MAP_FAILED
)
1449 if ((original_file
.size () == reconstructed_file
.size ()) &&
1450 // and if same size, compare file data
1451 (0 == ACE_OS::memcmp (original_file
.addr (),
1452 reconstructed_file
.addr (),
1453 original_file
.size ())))
1459 ACE_DEBUG ((LM_DEBUG
,
1460 ACE_TEXT ("input file and the output file identical!\n")));
1462 ACE_ERROR ((LM_ERROR
,
1463 ACE_TEXT ("input file and the output file are different!\n")));
1467 ACE_OS::unlink (output_file
);
1476 run_main (int, ACE_TCHAR
*[])
1478 ACE_START_TEST (ACE_TEXT ("Proactor_Scatter_Gather_Test"));
1480 ACE_DEBUG ((LM_INFO
,
1481 ACE_TEXT ("Asynchronous Scatter/Gather IO is unsupported.\n")
1482 ACE_TEXT ("Proactor_Scatter_Gather_Test will not be run.\n")));
1489 #endif /* ACE_HAS_WIN32_OVERLAPPED_IO */