Revert "Use a variable on the stack to not have a temporary in the call"
[ACE_TAO.git] / ACE / tests / Proactor_Scatter_Gather_Test.cpp
blob0210ddb43ef6fb2fba7be9221ccd166323d9e94e
1 // ============================================================================
2 /**
3 * @file Proactor_Scatter_Gather_Test.cpp
5 * The test runs on a single thread, and involves a single Sender,
6 * two Receivers and a single Writer. The Sender async-reads
7 * (scattered) from a file into chunks of <page size>. It
8 * async-sends (gathered) the odd chunks to the first receiver over a
9 * stream, and the even chunks to the second receiver over a
10 * different stream. The receivers async-read (scattered) from the
11 * socket streams into chunks in size of <page size>, and convey the
12 * data to the Writer. The Writer reconstructs the file using
13 * async-write (gathered). Then, the reconstructed file is compared
14 * to the original file to determine test success. So, It covers both
15 * async scatter/gather stream I/O and async scatter/gather file I/O.
16 * The wire transfer protocol is very naive (and totally non
17 * reliable...) - when both connections are closed, EOF is assumed.
18 * The test can be also run in a separated sender and receiver mode,
19 * to test real network influences.
21 * This test is based upon some building blocks taken from the
22 * Proactor_Test.cpp.
24 * @author Edan Ayal <edanayal@yahoo.com> */
25 // ============================================================================
27 #include "test_config.h"
29 #if defined (ACE_HAS_WIN32_OVERLAPPED_IO)
30 // This currently only works on Win32 platforms (NT SP2 and above).
31 // Support for Unix platforms supporting POSIX aio calls should be added in future.
33 #include "ace/Get_Opt.h"
35 #include "ace/Proactor.h"
36 #include "ace/Asynch_Acceptor.h"
37 #include "ace/Asynch_Connector.h"
38 #include "ace/Mem_Map.h"
39 #include "ace/Min_Max.h"
40 #include "ace/OS_NS_math.h"
41 #include "ace/OS_NS_sys_stat.h"
42 #include "ace/OS_NS_fcntl.h"
43 #include "ace/OS_NS_unistd.h"
45 #include "ace/SOCK_Connector.h"
47 // For the Acceptor/Connector handlers maintenance lists
48 static const int SENDERS = 1;
49 static const int RECEIVERS = 2;
51 // Port that we're receiving connections on.
52 static u_short port = ACE_DEFAULT_SERVER_PORT;
54 static const ACE_TCHAR *host = ACE_LOCALHOST;
56 // File that we're sending.
57 static const ACE_TCHAR *input_file = ACE_TEXT("Proactor_Scatter_Gather_Test.cpp");
59 // Name of the output file.
60 static const ACE_TCHAR *output_file = ACE_TEXT("output");
62 static int client_only = 0;
63 static int server_only = 0;
64 static size_t chunk_size = 0;
66 enum
68 ODD = 0,
69 EVEN
72 // *************************************************************
73 // Some chunks chain helper routines
74 // *************************************************************
75 static int allocate_chunks_chain (ACE_Message_Block *&head_mb,
76 size_t number_of_chunks)
78 ACE_Message_Block *pre_mb = 0;
80 for (size_t index = 0; index < number_of_chunks; ++index)
82 #if defined (ACE_WIN32)
83 void *addr = ::VirtualAlloc (0,
84 chunk_size,
85 MEM_COMMIT,
86 PAGE_READWRITE);
87 #else
88 void *addr = new char[chunk_size];
89 #endif /* ACE_WIN32 */
90 if (addr)
92 ACE_Message_Block *mb = new ACE_Message_Block (static_cast<char *> (addr),
93 chunk_size);
94 if (!head_mb)
95 head_mb = mb;
97 // chain them together
98 if (pre_mb)
99 pre_mb->cont (mb);
100 pre_mb = mb;
102 else
104 ACE_TEST_ASSERT (0);
105 return -1;
109 return 0;
112 static void
113 free_chunks_chain (ACE_Message_Block *&mb)
115 for (const ACE_Message_Block* msg = mb;
116 msg != 0;
117 msg = msg->cont ())
119 #if defined (ACE_WIN32)
120 ::VirtualFree (msg->base (),
121 msg->size (),
122 MEM_DECOMMIT);
123 #else
124 delete [] msg->base ();
125 #endif /* ACE_WIN32 */
128 mb->release ();
129 mb = 0;
132 static int
133 last_chunk (ACE_Message_Block *chain,
134 ACE_Message_Block *&last)
136 if (!chain)
137 return 0;
139 int index = 1;
140 last = chain;
141 while (0 != last->cont ())
143 last = last->cont ();
144 ++index;
147 return index;
150 static void
151 merge_odd_even_chains (ACE_Message_Block *odd_mb,
152 ACE_Message_Block *even_mb)
154 ACE_Message_Block *pre_pre_mb = odd_mb;
155 ACE_Message_Block *pre_mb = even_mb;
156 ACE_Message_Block *curr_mb = odd_mb->cont ();
158 if (even_mb)
160 for (; curr_mb != 0; curr_mb = pre_pre_mb->cont ())
162 pre_pre_mb->cont (pre_mb);
164 // increment history pointers
165 pre_pre_mb = pre_mb;
166 pre_mb = curr_mb;
169 pre_pre_mb->cont (pre_mb);
170 pre_mb->cont (0);
174 static void
175 split_odd_even_chains (ACE_Message_Block *odd_mb,
176 ACE_Message_Block *even_mb)
178 ACE_Message_Block *pre_pre_mb = odd_mb;
179 ACE_Message_Block *pre_mb = even_mb;
180 ACE_Message_Block *curr_mb = (even_mb ? even_mb->cont () : 0);
182 for (; curr_mb != 0; curr_mb = curr_mb->cont ())
184 pre_pre_mb->cont (curr_mb);
186 // increment history pointers
187 pre_pre_mb = pre_mb;
188 pre_mb = curr_mb;
191 pre_pre_mb->cont (0);
192 if (pre_mb)
193 pre_mb->cont (0);
196 static void
197 add_to_chunks_chain (ACE_Message_Block *&chunks_chain,
198 ACE_Message_Block *additional_chunks_chain)
200 if (0 == chunks_chain)
201 chunks_chain = additional_chunks_chain;
202 else
204 ACE_Message_Block *last = 0;
205 last_chunk (chunks_chain, last);
206 if (last)
207 last->cont (additional_chunks_chain);
211 static void
212 remove_empty_chunks (ACE_Message_Block *&chunks_chain)
214 if (0 == chunks_chain)
215 return;
217 ACE_Message_Block *first_empty = chunks_chain;
218 ACE_Message_Block *pre_mb = 0;
220 while (first_empty->length () > 0 &&
221 0 != first_empty->cont ())
223 pre_mb = first_empty;
224 first_empty = first_empty->cont ();
227 // break the chain there, and release the empty end (might be everything)
228 if (0 == first_empty->length ())
230 if (pre_mb) // might be 0, in case it's the entire chain
231 pre_mb->cont (0);
233 if (first_empty == chunks_chain)
234 chunks_chain = 0;
236 free_chunks_chain (first_empty);
240 // *************************************************************
241 // Acceptor, Receiver and Writer
242 // *************************************************************
243 class Receiver;
245 class Acceptor : public ACE_Asynch_Acceptor<Receiver>
247 friend class Receiver;
249 public:
250 Acceptor ();
251 virtual ~Acceptor ();
253 void stop ();
255 // Virtual from ACE_Asynch_Acceptor
256 virtual Receiver *make_handler ();
258 int get_number_sessions () { return this->sessions_; }
260 private:
261 void on_new_receiver (Receiver &rcvr);
262 void on_delete_receiver (Receiver &rcvr);
264 int sessions_;
265 Receiver *list_receivers_[RECEIVERS];
268 class Writer;
270 // The first instantiated take the role of the odd receiver
271 class Receiver : public ACE_Service_Handler
273 friend class Acceptor;
274 friend class Writer;
276 public:
277 Receiver (Acceptor *acceptor = 0, int index = -1);
278 virtual ~Receiver ();
280 //FUZZ: disable check_for_lack_ACE_OS
281 /// This is called after the new connection has been accepted.
282 virtual void open (ACE_HANDLE handle,
283 ACE_Message_Block &message_block);
284 //FUZZ: enable check_for_lack_ACE_OS
286 protected:
287 /// This is called by the framework when asynchronous <read> operation from the
288 /// socket completes.
289 virtual void handle_read_stream (const ACE_Asynch_Read_Stream::Result &result);
291 private:
292 int initiate_read_stream ();
294 void check_destroy ();
296 Acceptor *acceptor_;
297 int index_;
299 // Socket input
300 ACE_Asynch_Read_Stream rs_;
301 ACE_HANDLE socket_handle_;
303 // Writer
304 static Writer* writer_;
306 long io_count_;
308 char odd_;
310 // if we get non-page-size reminder, we will not send it to the writer
311 // until it is full (unless at end)
312 ACE_Message_Block *partial_chunk_;
315 class Writer : public ACE_Handler
317 friend class Receiver;
319 public:
320 Writer ();
321 virtual ~Writer ();
323 //FUZZ: disable check_for_lack_ACE_OS
324 void open ();
325 //FUZZ: enable check_for_lack_ACE_OS
327 // this is *not* a callback from the framework
328 int handle_read_chunks_chain (ACE_Message_Block *mb,
329 int type);
331 // for determining when last receiver dies
332 void on_new_receiver ();
333 void on_delete_receiver ();
335 protected:
336 /// This is called by the framework when an asynchronous <write> to the file
337 /// completes.
338 virtual void handle_write_file (const ACE_Asynch_Write_File::Result &result);
340 private:
341 int initiate_write_file ();
343 private:
344 // Output file
345 ACE_Asynch_Write_File wf_;
346 ACE_HANDLE output_file_handle_;
347 u_long writing_file_offset_;
348 u_long reported_file_offset_;
349 ACE_Message_Block *odd_chain_;
350 ACE_Message_Block *even_chain_;
351 long io_count_;
352 char receiver_count_;
355 // *************************************************************
356 // Receiver Impl
357 // *************************************************************
359 Writer *Receiver::writer_ = 0;
361 Receiver::Receiver (Acceptor * acceptor, int index)
362 : acceptor_ (acceptor),
363 index_ (index),
364 socket_handle_ (ACE_INVALID_HANDLE),
365 io_count_ (0),
366 partial_chunk_ (0)
368 // the first one is the odd one
369 this->odd_ = ((0 == index) ? 1 : 0);
371 if (this->odd_)
373 Receiver::writer_ = new Writer;
374 if (!Receiver::writer_)
376 ACE_TEST_ASSERT (0);
377 return;
381 Receiver::writer_->on_new_receiver ();
383 if (this->acceptor_ != 0)
384 this->acceptor_->on_new_receiver (*this);
387 Receiver::~Receiver ()
389 ACE_DEBUG ((LM_DEBUG,
390 ACE_TEXT ("Receiver::~Receiver\n")));
392 if (this->acceptor_ != 0)
393 this->acceptor_->on_delete_receiver (*this);
395 if (this->socket_handle_ != ACE_INVALID_HANDLE)
396 ACE_OS::closesocket (this->socket_handle_);
398 Receiver::writer_->on_delete_receiver ();
400 if (this->partial_chunk_)
402 ACE_TEST_ASSERT (0); // should not be getting here
403 this->partial_chunk_->release ();
407 void
408 Receiver::check_destroy ()
410 if (this->io_count_ <= 0)
411 delete this;
414 void
415 Receiver::open (ACE_HANDLE handle, ACE_Message_Block &)
417 this->socket_handle_ = handle;
419 // Open the ACE_Asynch_Read_Stream
420 if (this->rs_.open (*this, this->socket_handle_) == -1)
421 ACE_ERROR ((LM_ERROR,
422 ACE_TEXT ("%p\n"),
423 ACE_TEXT ("Receiver::ACE_Asynch_Read_Stream::open")));
424 else
426 if (this->odd_)
427 Receiver::writer_->open ();
429 this->initiate_read_stream ();
432 this->check_destroy ();
436 Receiver::initiate_read_stream ()
438 if (!Receiver::writer_)
439 return -1;
441 // how many chunks to allocate?
442 size_t number_of_new_chunks = (this->partial_chunk_ ?
443 (ACE_IOV_MAX / RECEIVERS) - 1
444 : ACE_IOV_MAX / RECEIVERS);
446 // allocate chunks chain
447 ACE_Message_Block *head_mb = 0;
448 if (-1 == allocate_chunks_chain (head_mb, number_of_new_chunks))
450 ACE_TEST_ASSERT (0);
451 return -1;
454 // calculate how many bytes to read
456 // head_mb could be 0 (no new chunks allocated)
457 size_t bytes_to_read = head_mb ? head_mb->total_size () : 0;
459 // add the partial chunk at the front if appropriate, and update
460 // the number of bytes to read
461 if (this->partial_chunk_)
463 bytes_to_read += this->partial_chunk_->space ();
464 this->partial_chunk_->cont (head_mb);
465 head_mb = this->partial_chunk_;
466 this->partial_chunk_ = 0;
469 ACE_DEBUG ((LM_DEBUG,
470 ACE_TEXT ("Receiver::initiate_read_stream - (%s) readv %d\n"),
471 this->odd_ ? ACE_TEXT ("ODD ") : ACE_TEXT ("EVEN"),
472 bytes_to_read));
474 // perform the actual scattered read
475 if (this->rs_.readv (*head_mb,
476 bytes_to_read) == -1)
478 free_chunks_chain (head_mb);
480 ACE_ERROR_RETURN ((LM_ERROR,
481 ACE_TEXT ("%p\n"),
482 ACE_TEXT ("Receiver::ACE_Asynch_Stream::read")),
483 -1);
486 ++this->io_count_;
487 return 0;
490 void
491 Receiver::handle_read_stream (const ACE_Asynch_Read_Stream::Result &result)
493 ACE_Message_Block *mb = &result.message_block ();
495 ACE_DEBUG ((LM_DEBUG,
496 ACE_TEXT ("Receiver::handle_read_stream - (%s) read %d\n"),
497 this->odd_ ? ACE_TEXT ("ODD ") : ACE_TEXT ("EVEN"),
498 result.bytes_transferred ()));
500 // Transfer only complete chunks to the writer.
501 // Save last partial chunk for the next call.
502 // On disconnect (error or 0 transferred), transfer whatever we have.
504 // at this stage there should not be anything there
505 ACE_TEST_ASSERT (!this->partial_chunk_);
507 // first, remove the empty chunks
508 remove_empty_chunks (mb);
510 if (mb && Receiver::writer_)
511 { // there's something to write, and who to write to
513 // write everything or only complete chunks?
515 // write everything - when no new bytes were transferred
516 int write_everything = 0;
517 if (!result.bytes_transferred ())
518 write_everything = 1;
519 if (write_everything)
520 Receiver::writer_->handle_read_chunks_chain (mb,
521 this->odd_ ? ODD : EVEN);
522 else
523 { // filter out the partial chunk at the end (if present)
524 // and save it for later before writing the full chunks
526 // have this->partial_chunk_ point to the last chunk in the chain
527 size_t last_index = last_chunk (mb, this->partial_chunk_);
528 if (this->partial_chunk_ &&
529 this->partial_chunk_->length () < chunk_size)
530 { // found partial chunk at end of chain
531 // detach it from the chain
532 if (last_index > 1) // chain bigger than 1
534 ACE_Message_Block *pre_last = mb;
535 for (size_t index = 1; index < last_index - 1; ++index)
536 pre_last = pre_last->cont ();
538 // detach partial chunk from chain
539 pre_last->cont (0);
541 else
542 // chain in length of 1 - so we need to zero mb
543 mb = 0;
545 else // last is a full chunk, so hand it over with the rest
546 this->partial_chunk_ = 0;
548 // transfer (if there's anything left)
549 if (mb && mb->total_length ())
550 Receiver::writer_->handle_read_chunks_chain (
552 this->odd_ ? ODD : EVEN);
554 // initiate more reads only if no error
555 if (!result.error ())
556 this->initiate_read_stream ();
557 else
558 ACE_TEST_ASSERT (0);
561 else if (mb && !Receiver::writer_)
562 // no one to write to
563 free_chunks_chain (mb);
565 --this->io_count_;
567 this->check_destroy ();
570 // *************************************************************
571 // Acceptor Impl
572 // *************************************************************
574 Acceptor::Acceptor ()
575 : sessions_ (0)
577 for (int i = 0; i < RECEIVERS; ++i)
578 this->list_receivers_[i] = 0;
581 Acceptor::~Acceptor ()
583 this->stop ();
587 void
588 Acceptor::stop ()
590 // This method can be called only after proactor event loop is done
591 // in all threads.
592 for (int i = 0; i < RECEIVERS; ++i)
594 delete this->list_receivers_[i];
595 this->list_receivers_[i] = 0;
599 void
600 Acceptor::on_new_receiver (Receiver & rcvr)
602 ++this->sessions_;
603 this->list_receivers_[rcvr.index_] = &rcvr;
604 ACE_DEBUG ((LM_DEBUG,
605 ACE_TEXT ("Receiver::CTOR sessions_ = %d\n"),
606 this->sessions_));
609 void
610 Acceptor::on_delete_receiver (Receiver & rcvr)
612 --this->sessions_;
613 if (rcvr.index_ >= 0
614 && rcvr.index_ < RECEIVERS
615 && this->list_receivers_[rcvr.index_] == &rcvr)
616 this->list_receivers_[rcvr.index_] = 0;
618 ACE_DEBUG ((LM_DEBUG,
619 ACE_TEXT ("Receiver::~DTOR sessions_ = %d\n"),
620 this->sessions_));
623 Receiver *
624 Acceptor::make_handler ()
626 if (this->sessions_ >= RECEIVERS)
627 return 0;
629 for (int i = 0; i < RECEIVERS; ++i)
631 if (this->list_receivers_[i] == 0)
633 ACE_NEW_RETURN (this->list_receivers_[i],
634 Receiver (this, i),
636 return this->list_receivers_[i];
640 return 0;
643 // *************************************************************
644 // Writer Impl
645 // *************************************************************
647 Writer::Writer ()
648 : output_file_handle_ (ACE_INVALID_HANDLE),
649 writing_file_offset_ (0),
650 reported_file_offset_ (0),
651 odd_chain_ (0),
652 even_chain_ (0),
653 io_count_ (0),
654 receiver_count_ (0)
658 Writer::~Writer ()
660 ACE_DEBUG ((LM_DEBUG,
661 ACE_TEXT ("Writer::~Writer\n")));
663 if (this->output_file_handle_ != ACE_INVALID_HANDLE)
664 ACE_OS::close (this->output_file_handle_);
666 Receiver::writer_ = 0;
669 void
670 Writer::on_new_receiver ()
672 ACE_DEBUG ((LM_DEBUG,
673 ACE_TEXT ("Writer::on_new_receiver\n")));
675 ++this->receiver_count_;
678 void
679 Writer::on_delete_receiver ()
681 ACE_DEBUG ((LM_DEBUG,
682 ACE_TEXT ("Writer::on_delete_receiver\n")));
684 --this->receiver_count_;
686 if (0 == this->receiver_count_)
688 if (this->io_count_ <= 0)
689 // no pending io, so do the work oursleves
690 // (if pending io, they'll see the zero receiver count)
691 this->initiate_write_file ();
695 void
696 Writer::open ()
698 // Open the file for output
699 if (ACE_INVALID_HANDLE == (this->output_file_handle_ = ACE_OS::open (output_file,
700 O_CREAT | _O_TRUNC | _O_WRONLY |\
701 FILE_FLAG_OVERLAPPED |\
702 FILE_FLAG_NO_BUFFERING,
703 ACE_DEFAULT_FILE_PERMS)))
704 ACE_ERROR ((LM_ERROR,
705 ACE_TEXT ("%p\n"),
706 ACE_TEXT ("Writer::open::ACE_OS::open")));
707 // Open the ACE_Asynch_Write_File
708 else if (this->wf_.open (*this, this->output_file_handle_) == -1)
709 ACE_ERROR ((LM_ERROR,
710 ACE_TEXT ("%p\n"),
711 ACE_TEXT ("Writer::open::ACE_Asynch_Write_File::open")));
715 Writer::handle_read_chunks_chain (ACE_Message_Block *mb,
716 int type)
718 ACE_DEBUG ((LM_DEBUG,
719 ACE_TEXT ("Writer::handle_read_chunks_chain - (%s) %d bytes\n"),
720 (type == ODD) ? ACE_TEXT ("ODD ") : ACE_TEXT ("EVEN"),
721 mb->total_length ()));
723 add_to_chunks_chain (ODD == type ? this->odd_chain_ : this->even_chain_, mb);
725 this->initiate_write_file ();
727 return 0;
731 Writer::initiate_write_file ()
733 // find out how much can we merge
734 ACE_Message_Block *dummy_last = 0;
735 size_t odd_count = last_chunk (this->odd_chain_, dummy_last);
736 size_t even_count = last_chunk (this->even_chain_, dummy_last);
738 size_t merge_size = ACE_MIN (ACE_MIN (odd_count, even_count),
739 (size_t) ACE_IOV_MAX);
741 // the options here are as follows:
742 // io_count_ can be zero or greater.
743 // merge_size can be zero or not.
744 // if non zero merge, write the merge. ASSERT receiver_count_ is non zero too.
745 // if zero merge:
746 // if receiver_count_ is non zero, NOOP.
747 // if zero receiver_count_, we should write whatever is left,
748 // and terminate the writer at completion.
749 // if nothing to write, and io_count_ is zero too, terminate here.
751 if (0 == merge_size &&
752 0 != this->receiver_count_)
753 return 0;
755 if (0 == merge_size &&
756 0 == this->receiver_count_ &&
757 0 == odd_count &&
758 0 == even_count &&
759 0 == this->io_count_)
761 ACE_DEBUG ((LM_DEBUG,
762 ACE_TEXT ("Writer::initiate_write_file")
763 ACE_TEXT (" - ending proactor event loop\n")));
765 ACE_Proactor::instance ()->end_event_loop ();
767 delete this;
769 return 0;
772 // if we reached nere and merge_size is zero, we should write whatever is
773 // in the queues (1 to 2 chunks together), so let's force the merge size to 1.
774 if (0 == merge_size)
776 ACE_TEST_ASSERT (1 == odd_count && 1 >= even_count);
777 merge_size = 1;
780 // Now that we found out what we want to do, prepare the chain
781 // that will be written, and update the remainders
782 ACE_Message_Block *new_odd_chain_head = this->odd_chain_;
783 ACE_Message_Block *new_even_chain_head = this->even_chain_;
785 // locate the place for detachment in the chains
786 ACE_Message_Block *pre_odd = 0;
787 ACE_Message_Block *pre_even = 0;
788 for (size_t index = 0; index < merge_size; ++index)
790 pre_odd = new_odd_chain_head;
791 if (new_odd_chain_head)
792 new_odd_chain_head = new_odd_chain_head->cont ();
793 pre_even = new_even_chain_head;
794 if (new_even_chain_head)
795 new_even_chain_head = new_even_chain_head->cont ();
797 // now detach the chain
798 if (pre_odd)
799 pre_odd->cont (0);
800 if (pre_even)
801 pre_even->cont (0);
803 // perform merge between the two chains
804 merge_odd_even_chains (this->odd_chain_, this->even_chain_);
806 // and now finally perform the write
807 ACE_Message_Block *united_mb = this->odd_chain_;
808 // update the remainders of the chains
809 this->odd_chain_ = new_odd_chain_head;
810 this->even_chain_ = new_even_chain_head;
811 size_t increment_writing_file_offset = united_mb->total_length ();
813 // Reconstruct the file
814 // Write the size, not the length, because we must write in chunks
815 // of <page size>
816 ACE_DEBUG ((LM_DEBUG,
817 ACE_TEXT ("Writer::initiate_write_file: write %d bytes at %d\n"),
818 united_mb->total_size (),
819 this->writing_file_offset_));
820 if (this->wf_.writev (*united_mb,
821 united_mb->total_size (),
822 this->writing_file_offset_) == -1)
824 free_chunks_chain (united_mb);
826 ACE_ERROR_RETURN((LM_ERROR,
827 ACE_TEXT ("%p\n"),
828 ACE_TEXT ("Writer::initiate_write_file::ACE_Asynch_Write_Stream::writev")),
829 -1);
832 // we update now because otherwise, we'd have error when performing
833 // pipelined writing (that is, mulitple calls to write before the callbacks
834 // to handle_x)
835 this->writing_file_offset_ +=
836 static_cast<u_long> (increment_writing_file_offset);
837 ++this->io_count_;
838 return 0;
841 void
842 Writer::handle_write_file (const ACE_Asynch_Write_File::Result &result)
844 ACE_Message_Block *mb = &result.message_block ();
846 ACE_DEBUG ((LM_DEBUG,
847 ACE_TEXT ("Writer::handle_write_file at offset %d wrote %d\n"),
848 this->reported_file_offset_,
849 result.bytes_transferred ()));
851 this->reported_file_offset_ +=
852 static_cast<u_long> (result.bytes_transferred ());
854 // Always truncate as required,
855 // because partial will always be the last write to a file
856 ACE_Message_Block *last_mb = mb;
857 last_chunk (mb, last_mb);
859 if (last_mb->space ())
860 ACE_OS::truncate (output_file,
861 this->reported_file_offset_ -
862 static_cast<u_long> (last_mb->space ()));
864 free_chunks_chain (mb);
866 --this->io_count_;
868 // end of process?
869 if (0 == this->receiver_count_ &&
870 0 == this->io_count_)
872 ACE_TEST_ASSERT (0 == this->odd_chain_ && 0 == this->even_chain_);
874 ACE_DEBUG ((LM_DEBUG,
875 ACE_TEXT ("Writer::handle_write_file")
876 ACE_TEXT (" - ending proactor event loop\n")));
878 ACE_Proactor::instance ()->end_event_loop ();
880 delete this;
884 // *************************************************************
885 // Connector and Sender
886 // *************************************************************
887 class Sender;
889 class Connector : public ACE_Asynch_Connector<Sender>
891 friend class Sender;
893 public:
894 Connector ();
895 virtual ~Connector ();
897 // Address to pass to Sender for secondary connect.
898 void set_address (const ACE_INET_Addr &addr);
899 const ACE_INET_Addr &get_address ();
901 void stop ();
903 // Virtual from ACE_Asynch_Connector
904 virtual Sender *make_handler ();
906 private:
907 void on_new_sender (Sender &rcvr);
908 void on_delete_sender (Sender &rcvr);
910 int sessions_;
911 ACE_INET_Addr addr_;
912 Sender *list_senders_[SENDERS];
915 class Sender : public ACE_Service_Handler
917 friend class Connector;
918 public:
919 Sender (Connector *connector = 0, int index = -1);
921 virtual ~Sender ();
923 //FUZZ: disable check_for_lack_ACE_OS
924 /// This is called after the new connection has been established.
925 virtual void open (ACE_HANDLE handle,
926 ACE_Message_Block &message_block);
927 //FUZZ: enable check_for_lack_ACE_OS
929 // This is called by the framework when asynchronous reads from the
930 // file complete.
931 virtual void handle_read_file (const ACE_Asynch_Read_File::Result &result);
933 // This is called by the framework when asynchronous writes from the
934 // socket complete.
935 virtual void handle_write_stream (const ACE_Asynch_Write_Stream::Result &result);
937 private:
938 void check_destroy ();
940 int initiate_read_file ();
942 int initiate_write_stream (ACE_Message_Block &mb);
944 int index_;
945 Connector * connector_;
947 // File to read from
948 ACE_Asynch_Read_File rf_;
949 ACE_HANDLE input_file_handle_;
950 u_long file_offset_;
952 // Sockets to send to
953 // odd and even socket output streams
954 ACE_Asynch_Write_Stream ws_[RECEIVERS];
955 ACE_HANDLE socket_handle_[RECEIVERS];
957 long io_count_;
960 // *************************************************************
961 // Connector Impl
962 // *************************************************************
964 Connector::Connector ()
965 : sessions_ (0)
967 for (int i = 0; i < SENDERS; ++i)
968 this->list_senders_[i] = 0;
971 Connector::~Connector ()
973 this->stop ();
976 // Address to pass to Sender for secondary connect.
977 void
978 Connector::set_address (const ACE_INET_Addr &addr)
980 this->addr_ = addr;
983 const ACE_INET_Addr &
984 Connector::get_address ()
986 return this->addr_;
989 void
990 Connector::stop ()
992 // This method can be called only after proactor event loop is done
993 // in all threads.
995 for (int i = 0; i < SENDERS; ++i)
997 delete this->list_senders_[i];
998 this->list_senders_[i] = 0;
1002 void
1003 Connector::on_new_sender (Sender &sndr)
1005 ++this->sessions_;
1006 this->list_senders_[sndr.index_] = &sndr;
1007 ACE_DEBUG ((LM_DEBUG,
1008 ACE_TEXT ("Sender::CTOR sessions_ = %d\n"),
1009 this->sessions_));
1012 void
1013 Connector::on_delete_sender (Sender &sndr)
1015 --this->sessions_;
1016 if (sndr.index_ >= 0
1017 && sndr.index_ < SENDERS
1018 && this->list_senders_[sndr.index_] == &sndr)
1019 this->list_senders_[sndr.index_] = 0;
1021 ACE_DEBUG ((LM_DEBUG,
1022 ACE_TEXT ("Sender::~DTOR sessions_ = %d\n"),
1023 this->sessions_));
1026 Sender *
1027 Connector::make_handler ()
1029 if (this->sessions_ >= SENDERS)
1030 return 0;
1032 for (int i = 0; i < SENDERS; ++i)
1034 if (this->list_senders_ [i] == 0)
1036 ACE_NEW_RETURN (this->list_senders_[i],
1037 Sender (this, i),
1039 return this->list_senders_[i];
1043 return 0;
1046 // *************************************************************
1047 // Sender Impl
1048 // *************************************************************
1050 Sender::Sender (Connector * connector, int index)
1051 : index_ (index),
1052 connector_ (connector),
1053 input_file_handle_ (ACE_INVALID_HANDLE),
1054 file_offset_ (0),
1055 io_count_ (0)
1057 socket_handle_[ODD] = socket_handle_[EVEN] = ACE_INVALID_HANDLE;
1059 if (this->connector_ != 0)
1060 this->connector_->on_new_sender (*this);
1063 Sender::~Sender ()
1065 ACE_DEBUG ((LM_DEBUG,
1066 ACE_TEXT ("Sender::~Sender\n")));
1068 if (this->connector_ != 0)
1069 this->connector_->on_delete_sender (*this);
1071 if (this->socket_handle_[ODD] != ACE_INVALID_HANDLE)
1072 ACE_OS::closesocket (this->socket_handle_[ODD]);
1074 if (this->socket_handle_[EVEN] != ACE_INVALID_HANDLE)
1075 ACE_OS::closesocket (this->socket_handle_[EVEN]);
1077 if (this->input_file_handle_ != ACE_INVALID_HANDLE)
1078 ACE_OS::close (this->input_file_handle_);
1080 if (client_only)
1081 ACE_Proactor::instance ()->end_event_loop ();
1084 // return true if we alive, false we commited suicide
1085 void
1086 Sender::check_destroy ()
1088 if (this->io_count_ <= 0)
1089 delete this;
1092 void
1093 Sender::open (ACE_HANDLE handle, ACE_Message_Block &)
1095 this->socket_handle_[ODD] = handle;
1097 // Open the input file
1098 if (ACE_INVALID_HANDLE == (this->input_file_handle_ =
1099 ACE_OS::open (input_file,
1100 _O_RDONLY |\
1101 FILE_FLAG_OVERLAPPED |\
1102 FILE_FLAG_NO_BUFFERING,
1103 ACE_DEFAULT_FILE_PERMS)))
1105 ACE_ERROR ((LM_ERROR,
1106 ACE_TEXT ("%p\n"),
1107 ACE_TEXT ("Sender::open::ACE_OS::open")));
1109 else
1111 // Now connect (w/o the connector factory) to the even (=second)
1112 // receiver. We don't connect thru the factory in order not to
1113 // instantiate another Sender.
1114 ACE_SOCK_Connector sock_connector;
1115 ACE_SOCK_Stream sock_stream;
1116 if (-1 == sock_connector.connect (sock_stream,
1117 this->connector_->get_address ()))
1118 ACE_ERROR ((LM_ERROR,
1119 ACE_TEXT ("%p\n"),
1120 ACE_TEXT ("Sender::open::ACE_SOCK_Connector::connect")));
1122 else
1124 this->socket_handle_[EVEN] = sock_stream.get_handle ();
1126 // Open odd ACE_Asynch_Write_Stream
1127 if (this->ws_[ODD].open (*this, this->socket_handle_[ODD]) == -1)
1128 ACE_ERROR ((LM_ERROR,
1129 ACE_TEXT ("%p\n"),
1130 ACE_TEXT ("Sender::open::ACE_Asynch_Write_Stream::open")));
1132 // Open even ACE_Asynch_Write_Stream
1133 else if (this->ws_[EVEN].open (*this, this->socket_handle_[EVEN]) == -1)
1134 ACE_ERROR ((LM_ERROR,
1135 ACE_TEXT ("%p\n"),
1136 ACE_TEXT ("Sender::open::ACE_Asynch_Write_Stream::open")));
1138 // Open ACE_Asynch_Read_File
1139 else if (this->rf_.open (*this, this->input_file_handle_) == -1)
1140 ACE_ERROR ((LM_ERROR,
1141 ACE_TEXT ("%p\n"),
1142 ACE_TEXT ("Sender::open::ACE_Asynch_Read_File::open")));
1143 else
1144 // Start an asynchronous read
1145 this->initiate_read_file ();
1149 this->check_destroy ();
1153 Sender::initiate_read_file ()
1155 ACE_TEST_ASSERT (0 == this->file_offset_ % chunk_size);
1157 static const size_t file_size = ACE_OS::filesize (input_file);
1159 static const size_t number_of_chunks_needed_for_file =
1160 static_cast<size_t> (ACE_OS::ceil ((double) file_size / chunk_size));
1162 size_t relevant_number_of_chunks =
1163 ACE_MIN ((size_t)ACE_IOV_MAX,
1164 number_of_chunks_needed_for_file
1165 - (size_t)(this->file_offset_ / chunk_size));
1167 if (!relevant_number_of_chunks)
1169 ACE_TEST_ASSERT (0); // Just 2 C it coming
1170 return 0;
1173 ACE_Message_Block *head_mb = 0;
1174 if (-1 == allocate_chunks_chain (head_mb, relevant_number_of_chunks))
1176 ACE_TEST_ASSERT (0);
1177 return -1;
1180 // Inititiate read
1181 if (this->rf_.readv (*head_mb,
1182 head_mb->total_size (),
1183 this->file_offset_) == -1)
1185 free_chunks_chain (head_mb);
1187 ACE_ERROR_RETURN ((LM_ERROR,
1188 ACE_TEXT ("%p\n"),
1189 ACE_TEXT ("Sender::initiate_read_file::")
1190 ACE_TEXT ("ACE_Asynch_Read_Stream::readv")),
1191 -1);
1194 ++this->io_count_;
1195 return 0;
1199 Sender::initiate_write_stream (ACE_Message_Block &mb)
1201 // send the odd to the first connection, and the even to the second
1202 // connection.
1204 ACE_Message_Block *odd_mb = &mb;
1205 ACE_Message_Block *even_mb = mb.cont ();
1207 split_odd_even_chains (odd_mb, even_mb);
1209 ACE_DEBUG ((LM_DEBUG,
1210 ACE_TEXT ("Sender::initiate_write_stream - (ODD ) writev %d\n"),
1211 odd_mb->total_length ()));
1213 if (this->ws_[ODD].writev (*odd_mb, odd_mb->total_length ()) == -1)
1215 free_chunks_chain (odd_mb);
1217 if (even_mb)
1218 free_chunks_chain (even_mb);
1220 ACE_ERROR_RETURN((LM_ERROR,
1221 ACE_TEXT ("%p\n"),
1222 ACE_TEXT ("Sender::ACE_Asynch_Stream::writev")),
1223 -1);
1226 ++this->io_count_;
1228 if (even_mb)
1230 ACE_DEBUG ((LM_DEBUG,
1231 ACE_TEXT ("Sender::initiate_write_stream - (EVEN) writev %d\n"),
1232 even_mb->total_length ()));
1234 if (this->ws_[EVEN].writev (*even_mb, even_mb->total_length ()) == -1)
1236 free_chunks_chain (even_mb);
1238 ACE_ERROR_RETURN((LM_ERROR,
1239 ACE_TEXT ("%p\n"),
1240 ACE_TEXT ("Sender::ACE_Asynch_Stream::writev")),
1241 -1);
1244 ++this->io_count_;
1247 return 0;
1250 void
1251 Sender::handle_read_file (const ACE_Asynch_Read_File::Result &result)
1253 ACE_Message_Block *mb = &result.message_block ();
1255 if (result.error () == 0 && result.bytes_transferred () != 0)
1257 size_t bytes_transferred = result.bytes_transferred ();
1258 size_t chunks_chain_size = mb->total_size ();
1259 ACE_DEBUG ((LM_DEBUG,
1260 ACE_TEXT ("Sender::handle_read_file, read %d, ")
1261 ACE_TEXT ("chain total %d\n"),
1262 bytes_transferred,
1263 chunks_chain_size));
1265 this->file_offset_ += static_cast<u_long> (bytes_transferred);
1267 this->initiate_write_stream (*mb);
1269 // and read more if required
1270 if (bytes_transferred == chunks_chain_size)
1271 this->initiate_read_file ();
1273 else
1274 free_chunks_chain (mb);
1276 --this->io_count_;
1278 this->check_destroy ();
1281 void
1282 Sender::handle_write_stream (const ACE_Asynch_Write_Stream::Result &result)
1284 ACE_Message_Block *mb = &result.message_block ();
1286 ACE_DEBUG ((LM_DEBUG,
1287 ACE_TEXT ("Sender::handle_write_stream - wrote %d bytes\n"),
1288 result.bytes_transferred ()));
1290 if (result.error () == 0 && result.bytes_transferred () != 0)
1291 // verify sent all
1292 ACE_TEST_ASSERT (0 == mb->total_length ());
1293 else
1294 ACE_TEST_ASSERT (0);
1296 free_chunks_chain (mb);
1298 --this->io_count_;
1300 this->check_destroy ();
1303 // *************************************************************
1304 // Configuration helpers
1305 // *************************************************************
1307 print_usage (int /* argc */, ACE_TCHAR *argv[])
1309 ACE_ERROR
1310 ((LM_ERROR,
1311 ACE_TEXT ("\nusage: %s")
1312 ACE_TEXT ("\n-f <input file>\n")
1313 ACE_TEXT ("\n-c client only (reader-sender)")
1314 ACE_TEXT ("\n-s server only (receiver-writer)")
1315 ACE_TEXT ("\n-h host to connect to")
1316 ACE_TEXT ("\n-p port")
1317 ACE_TEXT ("\n-u show this message")
1318 ACE_TEXT ("\n"),
1319 argv[0]
1321 return -1;
1324 static int
1325 parse_args (int argc, ACE_TCHAR *argv[])
1327 if (argc == 1) // no arguments , so one button test
1328 return 0;
1330 ACE_Get_Opt get_opt (argc, argv, ACE_TEXT ("f:csh:p:u"));
1331 int c;
1333 while ((c = get_opt ()) != EOF)
1335 switch (c)
1337 case 'f':
1338 input_file = get_opt.opt_arg ();
1339 break;
1340 case 'c':
1341 client_only = 1;
1342 server_only = 0;
1343 break;
1344 case 's':
1345 server_only = 1;
1346 client_only = 0;
1347 break;
1348 case 'h':
1349 host = get_opt.opt_arg ();
1350 break;
1351 case 'p':
1352 port = ACE_OS::atoi (get_opt.opt_arg ());
1353 break;
1354 case 'u':
1355 default:
1356 return print_usage (argc, argv);
1357 } // switch
1358 } // while
1360 return 0;
1364 run_main (int argc, ACE_TCHAR *argv[])
1366 ACE_START_TEST (ACE_TEXT ("Proactor_Scatter_Gather_Test"));
1368 if (::parse_args (argc, argv) == -1)
1369 return -1;
1371 chunk_size = ACE_OS::getpagesize ();
1373 if (client_only)
1374 ACE_DEBUG ((LM_INFO,
1375 ACE_TEXT ("Running as client only, page size %d\n"),
1376 chunk_size));
1377 else if (server_only)
1378 ACE_DEBUG ((LM_INFO,
1379 ACE_TEXT ("Running as server only, page size %d\n"),
1380 chunk_size));
1381 else
1382 ACE_DEBUG ((LM_INFO,
1383 ACE_TEXT ("Running as server and client, page size %d\n"),
1384 chunk_size));
1386 Acceptor acceptor;
1387 Connector connector;
1388 ACE_INET_Addr addr (port);
1390 if (!client_only)
1392 // Simplify, initial read with zero size
1393 if (-1 == acceptor.open (addr, 0, 1))
1395 ACE_TEST_ASSERT (0);
1396 return -1;
1400 if (!server_only)
1402 if (-1 == connector.open (1, ACE_Proactor::instance ()))
1404 ACE_TEST_ASSERT (0);
1405 return -1;
1408 // connect to first destination
1409 if (addr.set (port, host, 1, addr.get_type ()) == -1)
1410 ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"), host), -1);
1411 connector.set_address (addr);
1412 if (-1 == connector.connect (addr))
1414 ACE_TEST_ASSERT (0);
1415 return -1;
1419 ACE_Proactor::instance ()->run_event_loop ();
1421 // As Proactor event loop now is inactive it is safe to destroy all
1422 // senders
1424 connector.stop ();
1425 acceptor.stop ();
1427 ACE_Proactor::instance()->close_singleton ();
1429 // now compare the files - available only when on same machine
1431 int success = 0;
1432 if (!client_only && !server_only)
1434 ACE_DEBUG ((LM_INFO,
1435 ACE_TEXT ("Comparing the input file and the output file...\n")));
1437 success = -1;
1438 // map the two files, then perform memcmp
1440 ACE_Mem_Map original_file (input_file);
1441 ACE_Mem_Map reconstructed_file (output_file);
1443 if (original_file.addr () &&
1444 original_file.addr () != MAP_FAILED &&
1445 reconstructed_file.addr () &&
1446 reconstructed_file.addr () != MAP_FAILED)
1448 // compare lengths
1449 if ((original_file.size () == reconstructed_file.size ()) &&
1450 // and if same size, compare file data
1451 (0 == ACE_OS::memcmp (original_file.addr (),
1452 reconstructed_file.addr (),
1453 original_file.size ())))
1454 success = 0;
1458 if (0 == success)
1459 ACE_DEBUG ((LM_DEBUG,
1460 ACE_TEXT ("input file and the output file identical!\n")));
1461 else
1462 ACE_ERROR ((LM_ERROR,
1463 ACE_TEXT ("input file and the output file are different!\n")));
1466 if (!client_only)
1467 ACE_OS::unlink (output_file);
1469 ACE_END_TEST;
1471 return success;
1474 #else
1476 run_main (int, ACE_TCHAR *[])
1478 ACE_START_TEST (ACE_TEXT ("Proactor_Scatter_Gather_Test"));
1480 ACE_DEBUG ((LM_INFO,
1481 ACE_TEXT ("Asynchronous Scatter/Gather IO is unsupported.\n")
1482 ACE_TEXT ("Proactor_Scatter_Gather_Test will not be run.\n")));
1484 ACE_END_TEST;
1486 return 0;
1489 #endif /* ACE_HAS_WIN32_OVERLAPPED_IO */