1 // ============================================================================
3 * @file Proactor_Scatter_Gather_Test.cpp
5 * The test runs on a single thread, and involves a single Sender,
6 * two Receivers and a single Writer. The Sender async-reads
7 * (scattered) from a file into chunks of <page size>. It
8 * async-sends (gathered) the odd chunks to the first receiver over a
9 * stream, and the even chunks to the second receiver over a
10 * different stream. The receivers async-read (scattered) from the
11 * socket streams into chunks in size of <page size>, and convey the
12 * data to the Writer. The Writer reconstructs the file using
13 * async-write (gathered). Then, the reconstructed file is compared
14 * to the original file to determine test success. So, It covers both
15 * async scatter/gather stream I/O and async scatter/gather file I/O.
16 * The wire transfer protocol is very naive (and totally non
17 * reliable...) - when both connections are closed, EOF is assumed.
18 * The test can be also run in a separated sender and receiver mode,
19 * to test real network influences.
21 * This test is based upon some building blocks taken from the
24 * @author Edan Ayal <edanayal@yahoo.com> */
25 // ============================================================================
27 #include "test_config.h"
29 #if defined (ACE_HAS_WIN32_OVERLAPPED_IO)
30 // This currently only works on Win32 platforms (NT SP2 and above).
31 // Support for Unix platforms supporting POSIX aio calls should be added in future.
33 #include "ace/Get_Opt.h"
35 #include "ace/Proactor.h"
36 #include "ace/Asynch_Acceptor.h"
37 #include "ace/Asynch_Connector.h"
38 #include "ace/Mem_Map.h"
39 #include "ace/Min_Max.h"
40 #include "ace/OS_NS_math.h"
41 #include "ace/OS_NS_sys_stat.h"
42 #include "ace/OS_NS_fcntl.h"
43 #include "ace/OS_NS_unistd.h"
45 #include "ace/SOCK_Connector.h"
47 // For the Acceptor/Connector handlers maintenance lists
48 static const int SENDERS
= 1;
49 static const int RECEIVERS
= 2;
51 // Port that we're receiving connections on.
52 static u_short port
= ACE_DEFAULT_SERVER_PORT
;
54 static const ACE_TCHAR
*host
= ACE_LOCALHOST
;
56 // File that we're sending.
57 static const ACE_TCHAR
*input_file
= ACE_TEXT("Proactor_Scatter_Gather_Test.cpp");
59 // Name of the output file.
60 static const ACE_TCHAR
*output_file
= ACE_TEXT("output");
62 static int client_only
= 0;
63 static int server_only
= 0;
64 static size_t chunk_size
= 0;
72 // *************************************************************
73 // Some chunks chain helper routines
74 // *************************************************************
75 static int allocate_chunks_chain (ACE_Message_Block
*&head_mb
,
76 size_t number_of_chunks
)
78 ACE_Message_Block
*pre_mb
= 0;
80 for (size_t index
= 0; index
< number_of_chunks
; ++index
)
82 #if defined (ACE_WIN32)
83 void *addr
= ::VirtualAlloc (0,
88 void *addr
= new char[chunk_size
];
89 #endif /* ACE_WIN32 */
92 ACE_Message_Block
*mb
= new ACE_Message_Block (static_cast<char *> (addr
),
97 // chain them together
113 free_chunks_chain (ACE_Message_Block
*&mb
)
115 for (const ACE_Message_Block
* msg
= mb
;
119 #if defined (ACE_WIN32)
120 ::VirtualFree (msg
->base (),
124 delete [] msg
->base ();
125 #endif /* ACE_WIN32 */
133 last_chunk (ACE_Message_Block
*chain
,
134 ACE_Message_Block
*&last
)
141 while (0 != last
->cont ())
143 last
= last
->cont ();
151 merge_odd_even_chains (ACE_Message_Block
*odd_mb
,
152 ACE_Message_Block
*even_mb
)
154 ACE_Message_Block
*pre_pre_mb
= odd_mb
;
155 ACE_Message_Block
*pre_mb
= even_mb
;
156 ACE_Message_Block
*curr_mb
= odd_mb
->cont ();
160 for (; curr_mb
!= 0; curr_mb
= pre_pre_mb
->cont ())
162 pre_pre_mb
->cont (pre_mb
);
164 // increment history pointers
169 pre_pre_mb
->cont (pre_mb
);
175 split_odd_even_chains (ACE_Message_Block
*odd_mb
,
176 ACE_Message_Block
*even_mb
)
178 ACE_Message_Block
*pre_pre_mb
= odd_mb
;
179 ACE_Message_Block
*pre_mb
= even_mb
;
180 ACE_Message_Block
*curr_mb
= (even_mb
? even_mb
->cont () : 0);
182 for (; curr_mb
!= 0; curr_mb
= curr_mb
->cont ())
184 pre_pre_mb
->cont (curr_mb
);
186 // increment history pointers
191 pre_pre_mb
->cont (0);
197 add_to_chunks_chain (ACE_Message_Block
*&chunks_chain
,
198 ACE_Message_Block
*additional_chunks_chain
)
200 if (0 == chunks_chain
)
201 chunks_chain
= additional_chunks_chain
;
204 ACE_Message_Block
*last
= 0;
205 last_chunk (chunks_chain
, last
);
207 last
->cont (additional_chunks_chain
);
212 remove_empty_chunks (ACE_Message_Block
*&chunks_chain
)
214 if (0 == chunks_chain
)
217 ACE_Message_Block
*first_empty
= chunks_chain
;
218 ACE_Message_Block
*pre_mb
= 0;
220 while (first_empty
->length () > 0 &&
221 0 != first_empty
->cont ())
223 pre_mb
= first_empty
;
224 first_empty
= first_empty
->cont ();
227 // break the chain there, and release the empty end (might be everything)
228 if (0 == first_empty
->length ())
230 if (pre_mb
) // might be 0, in case it's the entire chain
233 if (first_empty
== chunks_chain
)
236 free_chunks_chain (first_empty
);
240 // *************************************************************
241 // Acceptor, Receiver and Writer
242 // *************************************************************
245 class Acceptor
: public ACE_Asynch_Acceptor
<Receiver
>
247 friend class Receiver
;
251 virtual ~Acceptor (void);
255 // Virtual from ACE_Asynch_Acceptor
256 virtual Receiver
*make_handler (void);
258 int get_number_sessions (void) { return this->sessions_
; }
261 void on_new_receiver (Receiver
&rcvr
);
262 void on_delete_receiver (Receiver
&rcvr
);
265 Receiver
*list_receivers_
[RECEIVERS
];
270 // The first instantiated take the role of the odd receiver
271 class Receiver
: public ACE_Service_Handler
273 friend class Acceptor
;
277 Receiver (Acceptor
*acceptor
= 0, int index
= -1);
278 virtual ~Receiver (void);
280 //FUZZ: disable check_for_lack_ACE_OS
281 /// This is called after the new connection has been accepted.
282 virtual void open (ACE_HANDLE handle
,
283 ACE_Message_Block
&message_block
);
284 //FUZZ: enable check_for_lack_ACE_OS
287 /// This is called by the framework when asynchronous <read> operation from the
288 /// socket completes.
289 virtual void handle_read_stream (const ACE_Asynch_Read_Stream::Result
&result
);
292 int initiate_read_stream (void);
294 void check_destroy (void);
300 ACE_Asynch_Read_Stream rs_
;
301 ACE_HANDLE socket_handle_
;
304 static Writer
* writer_
;
310 // if we get non-page-size reminder, we will not send it to the writer
311 // until it is full (unless at end)
312 ACE_Message_Block
*partial_chunk_
;
315 class Writer
: public ACE_Handler
317 friend class Receiver
;
321 virtual ~Writer (void);
323 //FUZZ: disable check_for_lack_ACE_OS
325 //FUZZ: enable check_for_lack_ACE_OS
327 // this is *not* a callback from the framework
328 int handle_read_chunks_chain (ACE_Message_Block
*mb
,
331 // for determining when last receiver dies
332 void on_new_receiver ();
333 void on_delete_receiver ();
336 /// This is called by the framework when an asynchronous <write> to the file
338 virtual void handle_write_file (const ACE_Asynch_Write_File::Result
&result
);
341 int initiate_write_file (void);
345 ACE_Asynch_Write_File wf_
;
346 ACE_HANDLE output_file_handle_
;
347 u_long writing_file_offset_
;
348 u_long reported_file_offset_
;
349 ACE_Message_Block
*odd_chain_
;
350 ACE_Message_Block
*even_chain_
;
352 char receiver_count_
;
355 // *************************************************************
357 // *************************************************************
359 Writer
*Receiver::writer_
= 0;
361 Receiver::Receiver (Acceptor
* acceptor
, int index
)
362 : acceptor_ (acceptor
),
364 socket_handle_ (ACE_INVALID_HANDLE
),
368 // the first one is the odd one
369 this->odd_
= ((0 == index
) ? 1 : 0);
373 Receiver::writer_
= new Writer
;
374 if (!Receiver::writer_
)
381 Receiver::writer_
->on_new_receiver ();
383 if (this->acceptor_
!= 0)
384 this->acceptor_
->on_new_receiver (*this);
387 Receiver::~Receiver (void)
389 ACE_DEBUG ((LM_DEBUG
,
390 ACE_TEXT ("Receiver::~Receiver\n")));
392 if (this->acceptor_
!= 0)
393 this->acceptor_
->on_delete_receiver (*this);
395 if (this->socket_handle_
!= ACE_INVALID_HANDLE
)
396 ACE_OS::closesocket (this->socket_handle_
);
398 Receiver::writer_
->on_delete_receiver ();
400 if (this->partial_chunk_
)
402 ACE_TEST_ASSERT (0); // should not be getting here
403 this->partial_chunk_
->release ();
408 Receiver::check_destroy (void)
410 if (this->io_count_
<= 0)
415 Receiver::open (ACE_HANDLE handle
, ACE_Message_Block
&)
417 this->socket_handle_
= handle
;
419 // Open the ACE_Asynch_Read_Stream
420 if (this->rs_
.open (*this, this->socket_handle_
) == -1)
421 ACE_ERROR ((LM_ERROR
,
423 ACE_TEXT ("Receiver::ACE_Asynch_Read_Stream::open")));
427 Receiver::writer_
->open ();
429 this->initiate_read_stream ();
432 this->check_destroy ();
436 Receiver::initiate_read_stream (void)
438 if (!Receiver::writer_
)
441 // how many chunks to allocate?
442 size_t number_of_new_chunks
= (this->partial_chunk_
?
443 (ACE_IOV_MAX
/ RECEIVERS
) - 1
444 : ACE_IOV_MAX
/ RECEIVERS
);
446 // allocate chunks chain
447 ACE_Message_Block
*head_mb
= 0;
448 if (-1 == allocate_chunks_chain (head_mb
, number_of_new_chunks
))
454 // calculate how many bytes to read
456 // head_mb could be 0 (no new chunks allocated)
457 size_t bytes_to_read
= head_mb
? head_mb
->total_size () : 0;
459 // add the partial chunk at the front if appropriate, and update
460 // the number of bytes to read
461 if (this->partial_chunk_
)
463 bytes_to_read
+= this->partial_chunk_
->space ();
464 this->partial_chunk_
->cont (head_mb
);
465 head_mb
= this->partial_chunk_
;
466 this->partial_chunk_
= 0;
469 ACE_DEBUG ((LM_DEBUG
,
470 ACE_TEXT ("Receiver::initiate_read_stream - (%s) readv %d\n"),
471 this->odd_
? ACE_TEXT ("ODD ") : ACE_TEXT ("EVEN"),
474 // perform the actual scattered read
475 if (this->rs_
.readv (*head_mb
,
476 bytes_to_read
) == -1)
478 free_chunks_chain (head_mb
);
480 ACE_ERROR_RETURN ((LM_ERROR
,
482 ACE_TEXT ("Receiver::ACE_Asynch_Stream::read")),
491 Receiver::handle_read_stream (const ACE_Asynch_Read_Stream::Result
&result
)
493 ACE_Message_Block
*mb
= &result
.message_block ();
495 ACE_DEBUG ((LM_DEBUG
,
496 ACE_TEXT ("Receiver::handle_read_stream - (%s) read %d\n"),
497 this->odd_
? ACE_TEXT ("ODD ") : ACE_TEXT ("EVEN"),
498 result
.bytes_transferred ()));
500 // Transfer only complete chunks to the writer.
501 // Save last partial chunk for the next call.
502 // On disconnect (error or 0 transferred), transfer whatever we have.
504 // at this stage there should not be anything there
505 ACE_TEST_ASSERT (!this->partial_chunk_
);
507 // first, remove the empty chunks
508 remove_empty_chunks (mb
);
510 if (mb
&& Receiver::writer_
)
511 { // there's something to write, and who to write to
513 // write everything or only complete chunks?
515 // write everything - when no new bytes were transferred
516 int write_everything
= 0;
517 if (!result
.bytes_transferred ())
518 write_everything
= 1;
519 if (write_everything
)
520 Receiver::writer_
->handle_read_chunks_chain (mb
,
521 this->odd_
? ODD
: EVEN
);
523 { // filter out the partial chunk at the end (if present)
524 // and save it for later before writing the full chunks
526 // have this->partial_chunk_ point to the last chunk in the chain
527 size_t last_index
= last_chunk (mb
, this->partial_chunk_
);
528 if (this->partial_chunk_
&&
529 this->partial_chunk_
->length () < chunk_size
)
530 { // found partial chunk at end of chain
531 // detach it from the chain
532 if (last_index
> 1) // chain bigger than 1
534 ACE_Message_Block
*pre_last
= mb
;
535 for (size_t index
= 1; index
< last_index
- 1; ++index
)
536 pre_last
= pre_last
->cont ();
538 // detach partial chunk from chain
542 // chain in length of 1 - so we need to zero mb
545 else // last is a full chunk, so hand it over with the rest
546 this->partial_chunk_
= 0;
548 // transfer (if there's anything left)
549 if (mb
&& mb
->total_length ())
550 Receiver::writer_
->handle_read_chunks_chain (
552 this->odd_
? ODD
: EVEN
);
554 // initiate more reads only if no error
555 if (!result
.error ())
556 this->initiate_read_stream ();
561 else if (mb
&& !Receiver::writer_
)
562 // no one to write to
563 free_chunks_chain (mb
);
567 this->check_destroy ();
570 // *************************************************************
572 // *************************************************************
574 Acceptor::Acceptor (void)
577 for (int i
= 0; i
< RECEIVERS
; ++i
)
578 this->list_receivers_
[i
] = 0;
581 Acceptor::~Acceptor (void)
588 Acceptor::stop (void)
590 // This method can be called only after proactor event loop is done
592 for (int i
= 0; i
< RECEIVERS
; ++i
)
594 delete this->list_receivers_
[i
];
595 this->list_receivers_
[i
] = 0;
600 Acceptor::on_new_receiver (Receiver
& rcvr
)
603 this->list_receivers_
[rcvr
.index_
] = &rcvr
;
604 ACE_DEBUG ((LM_DEBUG
,
605 ACE_TEXT ("Receiver::CTOR sessions_ = %d\n"),
610 Acceptor::on_delete_receiver (Receiver
& rcvr
)
614 && rcvr
.index_
< RECEIVERS
615 && this->list_receivers_
[rcvr
.index_
] == &rcvr
)
616 this->list_receivers_
[rcvr
.index_
] = 0;
618 ACE_DEBUG ((LM_DEBUG
,
619 ACE_TEXT ("Receiver::~DTOR sessions_ = %d\n"),
624 Acceptor::make_handler (void)
626 if (this->sessions_
>= RECEIVERS
)
629 for (int i
= 0; i
< RECEIVERS
; ++i
)
631 if (this->list_receivers_
[i
] == 0)
633 ACE_NEW_RETURN (this->list_receivers_
[i
],
636 return this->list_receivers_
[i
];
643 // *************************************************************
645 // *************************************************************
647 Writer::Writer (void)
648 : output_file_handle_ (ACE_INVALID_HANDLE
),
649 writing_file_offset_ (0),
650 reported_file_offset_ (0),
658 Writer::~Writer (void)
660 ACE_DEBUG ((LM_DEBUG
,
661 ACE_TEXT ("Writer::~Writer\n")));
663 if (this->output_file_handle_
!= ACE_INVALID_HANDLE
)
664 ACE_OS::close (this->output_file_handle_
);
666 Receiver::writer_
= 0;
670 Writer::on_new_receiver ()
672 ACE_DEBUG ((LM_DEBUG
,
673 ACE_TEXT ("Writer::on_new_receiver\n")));
675 ++this->receiver_count_
;
679 Writer::on_delete_receiver ()
681 ACE_DEBUG ((LM_DEBUG
,
682 ACE_TEXT ("Writer::on_delete_receiver\n")));
684 --this->receiver_count_
;
686 if (0 == this->receiver_count_
)
688 if (this->io_count_
<= 0)
689 // no pending io, so do the work oursleves
690 // (if pending io, they'll see the zero receiver count)
691 this->initiate_write_file ();
698 // Open the file for output
699 if (ACE_INVALID_HANDLE
== (this->output_file_handle_
= ACE_OS::open (output_file
,
700 O_CREAT
| _O_TRUNC
| _O_WRONLY
|\
701 FILE_FLAG_OVERLAPPED
|\
702 FILE_FLAG_NO_BUFFERING
,
703 ACE_DEFAULT_FILE_PERMS
)))
704 ACE_ERROR ((LM_ERROR
,
706 ACE_TEXT ("Writer::open::ACE_OS::open")));
707 // Open the ACE_Asynch_Write_File
708 else if (this->wf_
.open (*this, this->output_file_handle_
) == -1)
709 ACE_ERROR ((LM_ERROR
,
711 ACE_TEXT ("Writer::open::ACE_Asynch_Write_File::open")));
715 Writer::handle_read_chunks_chain (ACE_Message_Block
*mb
,
718 ACE_DEBUG ((LM_DEBUG
,
719 ACE_TEXT ("Writer::handle_read_chunks_chain - (%s) %d bytes\n"),
720 (type
== ODD
) ? ACE_TEXT ("ODD ") : ACE_TEXT ("EVEN"),
721 mb
->total_length ()));
723 add_to_chunks_chain (ODD
== type
? this->odd_chain_
: this->even_chain_
, mb
);
725 this->initiate_write_file ();
731 Writer::initiate_write_file (void)
733 // find out how much can we merge
734 ACE_Message_Block
*dummy_last
= 0;
735 size_t odd_count
= last_chunk (this->odd_chain_
, dummy_last
);
736 size_t even_count
= last_chunk (this->even_chain_
, dummy_last
);
738 size_t merge_size
= ACE_MIN (ACE_MIN (odd_count
, even_count
),
739 (size_t) ACE_IOV_MAX
);
741 // the options here are as follows:
742 // io_count_ can be zero or greater.
743 // merge_size can be zero or not.
744 // if non zero merge, write the merge. ASSERT receiver_count_ is non zero too.
746 // if receiver_count_ is non zero, NOOP.
747 // if zero receiver_count_, we should write whatever is left,
748 // and terminate the writer at completion.
749 // if nothing to write, and io_count_ is zero too, terminate here.
751 if (0 == merge_size
&&
752 0 != this->receiver_count_
)
755 if (0 == merge_size
&&
756 0 == this->receiver_count_
&&
759 0 == this->io_count_
)
761 ACE_DEBUG ((LM_DEBUG
,
762 ACE_TEXT ("Writer::initiate_write_file")
763 ACE_TEXT (" - ending proactor event loop\n")));
765 ACE_Proactor::instance ()->end_event_loop ();
772 // if we reached nere and merge_size is zero, we should write whatever is
773 // in the queues (1 to 2 chunks together), so let's force the merge size to 1.
776 ACE_TEST_ASSERT (1 == odd_count
&& 1 >= even_count
);
780 // Now that we found out what we want to do, prepare the chain
781 // that will be written, and update the remainders
782 ACE_Message_Block
*new_odd_chain_head
= this->odd_chain_
;
783 ACE_Message_Block
*new_even_chain_head
= this->even_chain_
;
785 // locate the place for detachment in the chains
786 ACE_Message_Block
*pre_odd
= 0;
787 ACE_Message_Block
*pre_even
= 0;
788 for (size_t index
= 0; index
< merge_size
; ++index
)
790 pre_odd
= new_odd_chain_head
;
791 if (new_odd_chain_head
)
792 new_odd_chain_head
= new_odd_chain_head
->cont ();
793 pre_even
= new_even_chain_head
;
794 if (new_even_chain_head
)
795 new_even_chain_head
= new_even_chain_head
->cont ();
797 // now detach the chain
803 // perform merge between the two chains
804 merge_odd_even_chains (this->odd_chain_
, this->even_chain_
);
806 // and now finally perform the write
807 ACE_Message_Block
*united_mb
= this->odd_chain_
;
808 // update the remainders of the chains
809 this->odd_chain_
= new_odd_chain_head
;
810 this->even_chain_
= new_even_chain_head
;
811 size_t increment_writing_file_offset
= united_mb
->total_length ();
813 // Reconstruct the file
814 // Write the size, not the length, because we must write in chunks
816 ACE_DEBUG ((LM_DEBUG
,
817 ACE_TEXT ("Writer::initiate_write_file: write %d bytes at %d\n"),
818 united_mb
->total_size (),
819 this->writing_file_offset_
));
820 if (this->wf_
.writev (*united_mb
,
821 united_mb
->total_size (),
822 this->writing_file_offset_
) == -1)
824 free_chunks_chain (united_mb
);
826 ACE_ERROR_RETURN((LM_ERROR
,
828 ACE_TEXT ("Writer::initiate_write_file::ACE_Asynch_Write_Stream::writev")),
832 // we update now because otherwise, we'd have error when performing
833 // pipelined writing (that is, mulitple calls to write before the callbacks
835 this->writing_file_offset_
+=
836 static_cast<u_long
> (increment_writing_file_offset
);
842 Writer::handle_write_file (const ACE_Asynch_Write_File::Result
&result
)
844 ACE_Message_Block
*mb
= &result
.message_block ();
846 ACE_DEBUG ((LM_DEBUG
,
847 ACE_TEXT ("Writer::handle_write_file at offset %d wrote %d\n"),
848 this->reported_file_offset_
,
849 result
.bytes_transferred ()));
851 this->reported_file_offset_
+=
852 static_cast<u_long
> (result
.bytes_transferred ());
854 // Always truncate as required,
855 // because partial will always be the last write to a file
856 ACE_Message_Block
*last_mb
= mb
;
857 last_chunk (mb
, last_mb
);
859 if (last_mb
->space ())
860 ACE_OS::truncate (output_file
,
861 this->reported_file_offset_
-
862 static_cast<u_long
> (last_mb
->space ()));
864 free_chunks_chain (mb
);
869 if (0 == this->receiver_count_
&&
870 0 == this->io_count_
)
872 ACE_TEST_ASSERT (0 == this->odd_chain_
&& 0 == this->even_chain_
);
874 ACE_DEBUG ((LM_DEBUG
,
875 ACE_TEXT ("Writer::handle_write_file")
876 ACE_TEXT (" - ending proactor event loop\n")));
878 ACE_Proactor::instance ()->end_event_loop ();
884 // *************************************************************
885 // Connector and Sender
886 // *************************************************************
889 class Connector
: public ACE_Asynch_Connector
<Sender
>
895 virtual ~Connector (void);
897 // Address to pass to Sender for secondary connect.
898 void set_address (const ACE_INET_Addr
&addr
);
899 const ACE_INET_Addr
&get_address (void);
903 // Virtual from ACE_Asynch_Connector
904 virtual Sender
*make_handler (void);
907 void on_new_sender (Sender
&rcvr
);
908 void on_delete_sender (Sender
&rcvr
);
912 Sender
*list_senders_
[SENDERS
];
915 class Sender
: public ACE_Service_Handler
917 friend class Connector
;
920 Sender (Connector
*connector
= 0, int index
= -1);
922 virtual ~Sender (void);
924 //FUZZ: disable check_for_lack_ACE_OS
925 /// This is called after the new connection has been established.
926 virtual void open (ACE_HANDLE handle
,
927 ACE_Message_Block
&message_block
);
928 //FUZZ: enable check_for_lack_ACE_OS
930 // This is called by the framework when asynchronous reads from the
932 virtual void handle_read_file (const ACE_Asynch_Read_File::Result
&result
);
934 // This is called by the framework when asynchronous writes from the
936 virtual void handle_write_stream (const ACE_Asynch_Write_Stream::Result
&result
);
939 void check_destroy (void);
941 int initiate_read_file (void);
943 int initiate_write_stream (ACE_Message_Block
&mb
);
946 Connector
* connector_
;
949 ACE_Asynch_Read_File rf_
;
950 ACE_HANDLE input_file_handle_
;
953 // Sockets to send to
954 // odd and even socket output streams
955 ACE_Asynch_Write_Stream ws_
[RECEIVERS
];
956 ACE_HANDLE socket_handle_
[RECEIVERS
];
961 // *************************************************************
963 // *************************************************************
965 Connector::Connector (void)
968 for (int i
= 0; i
< SENDERS
; ++i
)
969 this->list_senders_
[i
] = 0;
972 Connector::~Connector (void)
977 // Address to pass to Sender for secondary connect.
979 Connector::set_address (const ACE_INET_Addr
&addr
)
984 const ACE_INET_Addr
&
985 Connector::get_address (void)
991 Connector::stop (void)
993 // This method can be called only after proactor event loop is done
996 for (int i
= 0; i
< SENDERS
; ++i
)
998 delete this->list_senders_
[i
];
999 this->list_senders_
[i
] = 0;
1004 Connector::on_new_sender (Sender
&sndr
)
1007 this->list_senders_
[sndr
.index_
] = &sndr
;
1008 ACE_DEBUG ((LM_DEBUG
,
1009 ACE_TEXT ("Sender::CTOR sessions_ = %d\n"),
1014 Connector::on_delete_sender (Sender
&sndr
)
1017 if (sndr
.index_
>= 0
1018 && sndr
.index_
< SENDERS
1019 && this->list_senders_
[sndr
.index_
] == &sndr
)
1020 this->list_senders_
[sndr
.index_
] = 0;
1022 ACE_DEBUG ((LM_DEBUG
,
1023 ACE_TEXT ("Sender::~DTOR sessions_ = %d\n"),
1028 Connector::make_handler (void)
1030 if (this->sessions_
>= SENDERS
)
1033 for (int i
= 0; i
< SENDERS
; ++i
)
1035 if (this->list_senders_
[i
] == 0)
1037 ACE_NEW_RETURN (this->list_senders_
[i
],
1040 return this->list_senders_
[i
];
1047 // *************************************************************
1049 // *************************************************************
1051 Sender::Sender (Connector
* connector
, int index
)
1053 connector_ (connector
),
1054 input_file_handle_ (ACE_INVALID_HANDLE
),
1058 socket_handle_
[ODD
] = socket_handle_
[EVEN
] = ACE_INVALID_HANDLE
;
1060 if (this->connector_
!= 0)
1061 this->connector_
->on_new_sender (*this);
1064 Sender::~Sender (void)
1066 ACE_DEBUG ((LM_DEBUG
,
1067 ACE_TEXT ("Sender::~Sender\n")));
1069 if (this->connector_
!= 0)
1070 this->connector_
->on_delete_sender (*this);
1072 if (this->socket_handle_
[ODD
] != ACE_INVALID_HANDLE
)
1073 ACE_OS::closesocket (this->socket_handle_
[ODD
]);
1075 if (this->socket_handle_
[EVEN
] != ACE_INVALID_HANDLE
)
1076 ACE_OS::closesocket (this->socket_handle_
[EVEN
]);
1078 if (this->input_file_handle_
!= ACE_INVALID_HANDLE
)
1079 ACE_OS::close (this->input_file_handle_
);
1082 ACE_Proactor::instance ()->end_event_loop ();
1085 // return true if we alive, false we commited suicide
1087 Sender::check_destroy (void)
1089 if (this->io_count_
<= 0)
1094 Sender::open (ACE_HANDLE handle
, ACE_Message_Block
&)
1096 this->socket_handle_
[ODD
] = handle
;
1098 // Open the input file
1099 if (ACE_INVALID_HANDLE
== (this->input_file_handle_
=
1100 ACE_OS::open (input_file
,
1102 FILE_FLAG_OVERLAPPED
|\
1103 FILE_FLAG_NO_BUFFERING
,
1104 ACE_DEFAULT_FILE_PERMS
)))
1106 ACE_ERROR ((LM_ERROR
,
1108 ACE_TEXT ("Sender::open::ACE_OS::open")));
1112 // Now connect (w/o the connector factory) to the even (=second)
1113 // receiver. We don't connect thru the factory in order not to
1114 // instantiate another Sender.
1115 ACE_SOCK_Connector sock_connector
;
1116 ACE_SOCK_Stream sock_stream
;
1117 if (-1 == sock_connector
.connect (sock_stream
,
1118 this->connector_
->get_address ()))
1119 ACE_ERROR ((LM_ERROR
,
1121 ACE_TEXT ("Sender::open::ACE_SOCK_Connector::connect")));
1125 this->socket_handle_
[EVEN
] = sock_stream
.get_handle ();
1127 // Open odd ACE_Asynch_Write_Stream
1128 if (this->ws_
[ODD
].open (*this, this->socket_handle_
[ODD
]) == -1)
1129 ACE_ERROR ((LM_ERROR
,
1131 ACE_TEXT ("Sender::open::ACE_Asynch_Write_Stream::open")));
1133 // Open even ACE_Asynch_Write_Stream
1134 else if (this->ws_
[EVEN
].open (*this, this->socket_handle_
[EVEN
]) == -1)
1135 ACE_ERROR ((LM_ERROR
,
1137 ACE_TEXT ("Sender::open::ACE_Asynch_Write_Stream::open")));
1139 // Open ACE_Asynch_Read_File
1140 else if (this->rf_
.open (*this, this->input_file_handle_
) == -1)
1141 ACE_ERROR ((LM_ERROR
,
1143 ACE_TEXT ("Sender::open::ACE_Asynch_Read_File::open")));
1145 // Start an asynchronous read
1146 this->initiate_read_file ();
1150 this->check_destroy ();
1154 Sender::initiate_read_file (void)
1156 ACE_TEST_ASSERT (0 == this->file_offset_
% chunk_size
);
1158 static const size_t file_size
= ACE_OS::filesize (input_file
);
1160 static const size_t number_of_chunks_needed_for_file
=
1161 static_cast<size_t> (ACE_OS::ceil ((double) file_size
/ chunk_size
));
1163 size_t relevant_number_of_chunks
=
1164 ACE_MIN ((size_t)ACE_IOV_MAX
,
1165 number_of_chunks_needed_for_file
1166 - (size_t)(this->file_offset_
/ chunk_size
));
1168 if (!relevant_number_of_chunks
)
1170 ACE_TEST_ASSERT (0); // Just 2 C it coming
1174 ACE_Message_Block
*head_mb
= 0;
1175 if (-1 == allocate_chunks_chain (head_mb
, relevant_number_of_chunks
))
1177 ACE_TEST_ASSERT (0);
1182 if (this->rf_
.readv (*head_mb
,
1183 head_mb
->total_size (),
1184 this->file_offset_
) == -1)
1186 free_chunks_chain (head_mb
);
1188 ACE_ERROR_RETURN ((LM_ERROR
,
1190 ACE_TEXT ("Sender::initiate_read_file::")
1191 ACE_TEXT ("ACE_Asynch_Read_Stream::readv")),
1200 Sender::initiate_write_stream (ACE_Message_Block
&mb
)
1202 // send the odd to the first connection, and the even to the second
1205 ACE_Message_Block
*odd_mb
= &mb
;
1206 ACE_Message_Block
*even_mb
= mb
.cont ();
1208 split_odd_even_chains (odd_mb
, even_mb
);
1210 ACE_DEBUG ((LM_DEBUG
,
1211 ACE_TEXT ("Sender::initiate_write_stream - (ODD ) writev %d\n"),
1212 odd_mb
->total_length ()));
1214 if (this->ws_
[ODD
].writev (*odd_mb
, odd_mb
->total_length ()) == -1)
1216 free_chunks_chain (odd_mb
);
1219 free_chunks_chain (even_mb
);
1221 ACE_ERROR_RETURN((LM_ERROR
,
1223 ACE_TEXT ("Sender::ACE_Asynch_Stream::writev")),
1231 ACE_DEBUG ((LM_DEBUG
,
1232 ACE_TEXT ("Sender::initiate_write_stream - (EVEN) writev %d\n"),
1233 even_mb
->total_length ()));
1235 if (this->ws_
[EVEN
].writev (*even_mb
, even_mb
->total_length ()) == -1)
1237 free_chunks_chain (even_mb
);
1239 ACE_ERROR_RETURN((LM_ERROR
,
1241 ACE_TEXT ("Sender::ACE_Asynch_Stream::writev")),
1252 Sender::handle_read_file (const ACE_Asynch_Read_File::Result
&result
)
1254 ACE_Message_Block
*mb
= &result
.message_block ();
1256 if (result
.error () == 0 && result
.bytes_transferred () != 0)
1258 size_t bytes_transferred
= result
.bytes_transferred ();
1259 size_t chunks_chain_size
= mb
->total_size ();
1260 ACE_DEBUG ((LM_DEBUG
,
1261 ACE_TEXT ("Sender::handle_read_file, read %d, ")
1262 ACE_TEXT ("chain total %d\n"),
1264 chunks_chain_size
));
1266 this->file_offset_
+= static_cast<u_long
> (bytes_transferred
);
1268 this->initiate_write_stream (*mb
);
1270 // and read more if required
1271 if (bytes_transferred
== chunks_chain_size
)
1272 this->initiate_read_file ();
1275 free_chunks_chain (mb
);
1279 this->check_destroy ();
1283 Sender::handle_write_stream (const ACE_Asynch_Write_Stream::Result
&result
)
1285 ACE_Message_Block
*mb
= &result
.message_block ();
1287 ACE_DEBUG ((LM_DEBUG
,
1288 ACE_TEXT ("Sender::handle_write_stream - wrote %d bytes\n"),
1289 result
.bytes_transferred ()));
1291 if (result
.error () == 0 && result
.bytes_transferred () != 0)
1293 ACE_TEST_ASSERT (0 == mb
->total_length ());
1295 ACE_TEST_ASSERT (0);
1297 free_chunks_chain (mb
);
1301 this->check_destroy ();
1304 // *************************************************************
1305 // Configuration helpers
1306 // *************************************************************
1308 print_usage (int /* argc */, ACE_TCHAR
*argv
[])
1312 ACE_TEXT ("\nusage: %s")
1313 ACE_TEXT ("\n-f <input file>\n")
1314 ACE_TEXT ("\n-c client only (reader-sender)")
1315 ACE_TEXT ("\n-s server only (receiver-writer)")
1316 ACE_TEXT ("\n-h host to connect to")
1317 ACE_TEXT ("\n-p port")
1318 ACE_TEXT ("\n-u show this message")
1326 parse_args (int argc
, ACE_TCHAR
*argv
[])
1328 if (argc
== 1) // no arguments , so one button test
1331 ACE_Get_Opt
get_opt (argc
, argv
, ACE_TEXT ("f:csh:p:u"));
1334 while ((c
= get_opt ()) != EOF
)
1339 input_file
= get_opt
.opt_arg ();
1350 host
= get_opt
.opt_arg ();
1353 port
= ACE_OS::atoi (get_opt
.opt_arg ());
1357 return print_usage (argc
, argv
);
1365 run_main (int argc
, ACE_TCHAR
*argv
[])
1367 ACE_START_TEST (ACE_TEXT ("Proactor_Scatter_Gather_Test"));
1369 if (::parse_args (argc
, argv
) == -1)
1372 chunk_size
= ACE_OS::getpagesize ();
1375 ACE_DEBUG ((LM_INFO
,
1376 ACE_TEXT ("Running as client only, page size %d\n"),
1378 else if (server_only
)
1379 ACE_DEBUG ((LM_INFO
,
1380 ACE_TEXT ("Running as server only, page size %d\n"),
1383 ACE_DEBUG ((LM_INFO
,
1384 ACE_TEXT ("Running as server and client, page size %d\n"),
1388 Connector connector
;
1389 ACE_INET_Addr
addr (port
);
1393 // Simplify, initial read with zero size
1394 if (-1 == acceptor
.open (addr
, 0, 1))
1396 ACE_TEST_ASSERT (0);
1403 if (-1 == connector
.open (1, ACE_Proactor::instance ()))
1405 ACE_TEST_ASSERT (0);
1409 // connect to first destination
1410 if (addr
.set (port
, host
, 1, addr
.get_type ()) == -1)
1411 ACE_ERROR_RETURN ((LM_ERROR
, ACE_TEXT ("%p\n"), host
), -1);
1412 connector
.set_address (addr
);
1413 if (-1 == connector
.connect (addr
))
1415 ACE_TEST_ASSERT (0);
1420 ACE_Proactor::instance ()->run_event_loop ();
1422 // As Proactor event loop now is inactive it is safe to destroy all
1428 ACE_Proactor::instance()->close_singleton ();
1430 // now compare the files - available only when on same machine
1433 if (!client_only
&& !server_only
)
1435 ACE_DEBUG ((LM_INFO
,
1436 ACE_TEXT ("Comparing the input file and the output file...\n")));
1439 // map the two files, then perform memcmp
1441 ACE_Mem_Map
original_file (input_file
);
1442 ACE_Mem_Map
reconstructed_file (output_file
);
1444 if (original_file
.addr () &&
1445 original_file
.addr () != MAP_FAILED
&&
1446 reconstructed_file
.addr () &&
1447 reconstructed_file
.addr () != MAP_FAILED
)
1450 if ((original_file
.size () == reconstructed_file
.size ()) &&
1451 // and if same size, compare file data
1452 (0 == ACE_OS::memcmp (original_file
.addr (),
1453 reconstructed_file
.addr (),
1454 original_file
.size ())))
1460 ACE_DEBUG ((LM_DEBUG
,
1461 ACE_TEXT ("input file and the output file identical!\n")));
1463 ACE_ERROR ((LM_ERROR
,
1464 ACE_TEXT ("input file and the output file are different!\n")));
1468 ACE_OS::unlink (output_file
);
1477 run_main (int, ACE_TCHAR
*[])
1479 ACE_START_TEST (ACE_TEXT ("Proactor_Scatter_Gather_Test"));
1481 ACE_DEBUG ((LM_INFO
,
1482 ACE_TEXT ("Asynchronous Scatter/Gather IO is unsupported.\n")
1483 ACE_TEXT ("Proactor_Scatter_Gather_Test will not be run.\n")));
1490 #endif /* ACE_HAS_WIN32_OVERLAPPED_IO */