Doxygen changes
[ACE_TAO.git] / ACE / tests / Proactor_Scatter_Gather_Test.cpp
blobd0e75e84e25e28ea396b2f5293b95aa453c91bed
1 // ============================================================================
2 /**
3 * @file Proactor_Scatter_Gather_Test.cpp
5 * The test runs on a single thread, and involves a single Sender,
6 * two Receivers and a single Writer. The Sender async-reads
7 * (scattered) from a file into chunks of <page size>. It
8 * async-sends (gathered) the odd chunks to the first receiver over a
9 * stream, and the even chunks to the second receiver over a
10 * different stream. The receivers async-read (scattered) from the
11 * socket streams into chunks in size of <page size>, and convey the
12 * data to the Writer. The Writer reconstructs the file using
13 * async-write (gathered). Then, the reconstructed file is compared
14 * to the original file to determine test success. So, It covers both
15 * async scatter/gather stream I/O and async scatter/gather file I/O.
16 * The wire transfer protocol is very naive (and totally non
17 * reliable...) - when both connections are closed, EOF is assumed.
18 * The test can be also run in a separated sender and receiver mode,
19 * to test real network influences.
21 * This test is based upon some building blocks taken from the
22 * Proactor_Test.cpp.
24 * @author Edan Ayal <edanayal@yahoo.com> */
25 // ============================================================================
27 #include "test_config.h"
29 #if defined (ACE_HAS_WIN32_OVERLAPPED_IO)
30 // This currently only works on Win32 platforms (NT SP2 and above).
31 // Support for Unix platforms supporting POSIX aio calls should be added in future.
33 #include "ace/Get_Opt.h"
35 #include "ace/Proactor.h"
36 #include "ace/Asynch_Acceptor.h"
37 #include "ace/Asynch_Connector.h"
38 #include "ace/Mem_Map.h"
39 #include "ace/Min_Max.h"
40 #include "ace/OS_NS_math.h"
41 #include "ace/OS_NS_sys_stat.h"
42 #include "ace/OS_NS_fcntl.h"
43 #include "ace/OS_NS_unistd.h"
45 #include "ace/SOCK_Connector.h"
47 // For the Acceptor/Connector handlers maintenance lists
48 static const int SENDERS = 1;
49 static const int RECEIVERS = 2;
51 // Port that we're receiving connections on.
52 static u_short port = ACE_DEFAULT_SERVER_PORT;
54 static const ACE_TCHAR *host = ACE_LOCALHOST;
56 // File that we're sending.
57 static const ACE_TCHAR *input_file = ACE_TEXT("Proactor_Scatter_Gather_Test.cpp");
59 // Name of the output file.
60 static const ACE_TCHAR *output_file = ACE_TEXT("output");
62 static int client_only = 0;
63 static int server_only = 0;
64 static size_t chunk_size = 0;
66 enum
68 ODD = 0,
69 EVEN
72 // *************************************************************
73 // Some chunks chain helper routines
74 // *************************************************************
75 static int allocate_chunks_chain (ACE_Message_Block *&head_mb,
76 size_t number_of_chunks)
78 ACE_Message_Block *pre_mb = 0;
80 for (size_t index = 0; index < number_of_chunks; ++index)
82 #if defined (ACE_WIN32)
83 void *addr = ::VirtualAlloc (0,
84 chunk_size,
85 MEM_COMMIT,
86 PAGE_READWRITE);
87 #else
88 void *addr = new char[chunk_size];
89 #endif /* ACE_WIN32 */
90 if (addr)
92 ACE_Message_Block *mb = new ACE_Message_Block (static_cast<char *> (addr),
93 chunk_size);
94 if (!head_mb)
95 head_mb = mb;
97 // chain them together
98 if (pre_mb)
99 pre_mb->cont (mb);
100 pre_mb = mb;
102 else
104 ACE_TEST_ASSERT (0);
105 return -1;
109 return 0;
112 static void
113 free_chunks_chain (ACE_Message_Block *&mb)
115 for (const ACE_Message_Block* msg = mb;
116 msg != 0;
117 msg = msg->cont ())
119 #if defined (ACE_WIN32)
120 ::VirtualFree (msg->base (),
121 msg->size (),
122 MEM_DECOMMIT);
123 #else
124 delete [] msg->base ();
125 #endif /* ACE_WIN32 */
128 mb->release ();
129 mb = 0;
132 static int
133 last_chunk (ACE_Message_Block *chain,
134 ACE_Message_Block *&last)
136 if (!chain)
137 return 0;
139 int index = 1;
140 last = chain;
141 while (0 != last->cont ())
143 last = last->cont ();
144 ++index;
147 return index;
150 static void
151 merge_odd_even_chains (ACE_Message_Block *odd_mb,
152 ACE_Message_Block *even_mb)
154 ACE_Message_Block *pre_pre_mb = odd_mb;
155 ACE_Message_Block *pre_mb = even_mb;
156 ACE_Message_Block *curr_mb = odd_mb->cont ();
158 if (even_mb)
160 for (; curr_mb != 0; curr_mb = pre_pre_mb->cont ())
162 pre_pre_mb->cont (pre_mb);
164 // increment history pointers
165 pre_pre_mb = pre_mb;
166 pre_mb = curr_mb;
169 pre_pre_mb->cont (pre_mb);
170 pre_mb->cont (0);
174 static void
175 split_odd_even_chains (ACE_Message_Block *odd_mb,
176 ACE_Message_Block *even_mb)
178 ACE_Message_Block *pre_pre_mb = odd_mb;
179 ACE_Message_Block *pre_mb = even_mb;
180 ACE_Message_Block *curr_mb = (even_mb ? even_mb->cont () : 0);
182 for (; curr_mb != 0; curr_mb = curr_mb->cont ())
184 pre_pre_mb->cont (curr_mb);
186 // increment history pointers
187 pre_pre_mb = pre_mb;
188 pre_mb = curr_mb;
191 pre_pre_mb->cont (0);
192 if (pre_mb)
193 pre_mb->cont (0);
196 static void
197 add_to_chunks_chain (ACE_Message_Block *&chunks_chain,
198 ACE_Message_Block *additional_chunks_chain)
200 if (0 == chunks_chain)
201 chunks_chain = additional_chunks_chain;
202 else
204 ACE_Message_Block *last = 0;
205 last_chunk (chunks_chain, last);
206 if (last)
207 last->cont (additional_chunks_chain);
211 static void
212 remove_empty_chunks (ACE_Message_Block *&chunks_chain)
214 if (0 == chunks_chain)
215 return;
217 ACE_Message_Block *first_empty = chunks_chain;
218 ACE_Message_Block *pre_mb = 0;
220 while (first_empty->length () > 0 &&
221 0 != first_empty->cont ())
223 pre_mb = first_empty;
224 first_empty = first_empty->cont ();
227 // break the chain there, and release the empty end (might be everything)
228 if (0 == first_empty->length ())
230 if (pre_mb) // might be 0, in case it's the entire chain
231 pre_mb->cont (0);
233 if (first_empty == chunks_chain)
234 chunks_chain = 0;
236 free_chunks_chain (first_empty);
240 // *************************************************************
241 // Acceptor, Receiver and Writer
242 // *************************************************************
243 class Receiver;
245 class Acceptor : public ACE_Asynch_Acceptor<Receiver>
247 friend class Receiver;
249 public:
250 Acceptor (void);
251 virtual ~Acceptor (void);
253 void stop (void);
255 // Virtual from ACE_Asynch_Acceptor
256 virtual Receiver *make_handler (void);
258 int get_number_sessions (void) { return this->sessions_; }
260 private:
261 void on_new_receiver (Receiver &rcvr);
262 void on_delete_receiver (Receiver &rcvr);
264 int sessions_;
265 Receiver *list_receivers_[RECEIVERS];
268 class Writer;
270 // The first instantiated take the role of the odd receiver
271 class Receiver : public ACE_Service_Handler
273 friend class Acceptor;
274 friend class Writer;
276 public:
277 Receiver (Acceptor *acceptor = 0, int index = -1);
278 virtual ~Receiver (void);
280 //FUZZ: disable check_for_lack_ACE_OS
281 /// This is called after the new connection has been accepted.
282 virtual void open (ACE_HANDLE handle,
283 ACE_Message_Block &message_block);
284 //FUZZ: enable check_for_lack_ACE_OS
286 protected:
287 /// This is called by the framework when asynchronous <read> operation from the
288 /// socket completes.
289 virtual void handle_read_stream (const ACE_Asynch_Read_Stream::Result &result);
291 private:
292 int initiate_read_stream (void);
294 void check_destroy (void);
296 Acceptor *acceptor_;
297 int index_;
299 // Socket input
300 ACE_Asynch_Read_Stream rs_;
301 ACE_HANDLE socket_handle_;
303 // Writer
304 static Writer* writer_;
306 long io_count_;
308 char odd_;
310 // if we get non-page-size reminder, we will not send it to the writer
311 // until it is full (unless at end)
312 ACE_Message_Block *partial_chunk_;
315 class Writer : public ACE_Handler
317 friend class Receiver;
319 public:
320 Writer (void);
321 virtual ~Writer (void);
323 //FUZZ: disable check_for_lack_ACE_OS
324 void open (void);
325 //FUZZ: enable check_for_lack_ACE_OS
327 // this is *not* a callback from the framework
328 int handle_read_chunks_chain (ACE_Message_Block *mb,
329 int type);
331 // for determining when last receiver dies
332 void on_new_receiver ();
333 void on_delete_receiver ();
335 protected:
336 /// This is called by the framework when an asynchronous <write> to the file
337 /// completes.
338 virtual void handle_write_file (const ACE_Asynch_Write_File::Result &result);
340 private:
341 int initiate_write_file (void);
343 private:
344 // Output file
345 ACE_Asynch_Write_File wf_;
346 ACE_HANDLE output_file_handle_;
347 u_long writing_file_offset_;
348 u_long reported_file_offset_;
349 ACE_Message_Block *odd_chain_;
350 ACE_Message_Block *even_chain_;
351 long io_count_;
352 char receiver_count_;
355 // *************************************************************
356 // Receiver Impl
357 // *************************************************************
359 Writer *Receiver::writer_ = 0;
361 Receiver::Receiver (Acceptor * acceptor, int index)
362 : acceptor_ (acceptor),
363 index_ (index),
364 socket_handle_ (ACE_INVALID_HANDLE),
365 io_count_ (0),
366 partial_chunk_ (0)
368 // the first one is the odd one
369 this->odd_ = ((0 == index) ? 1 : 0);
371 if (this->odd_)
373 Receiver::writer_ = new Writer;
374 if (!Receiver::writer_)
376 ACE_TEST_ASSERT (0);
377 return;
381 Receiver::writer_->on_new_receiver ();
383 if (this->acceptor_ != 0)
384 this->acceptor_->on_new_receiver (*this);
387 Receiver::~Receiver (void)
389 ACE_DEBUG ((LM_DEBUG,
390 ACE_TEXT ("Receiver::~Receiver\n")));
392 if (this->acceptor_ != 0)
393 this->acceptor_->on_delete_receiver (*this);
395 if (this->socket_handle_ != ACE_INVALID_HANDLE)
396 ACE_OS::closesocket (this->socket_handle_);
398 Receiver::writer_->on_delete_receiver ();
400 if (this->partial_chunk_)
402 ACE_TEST_ASSERT (0); // should not be getting here
403 this->partial_chunk_->release ();
407 void
408 Receiver::check_destroy (void)
410 if (this->io_count_ <= 0)
411 delete this;
414 void
415 Receiver::open (ACE_HANDLE handle, ACE_Message_Block &)
417 this->socket_handle_ = handle;
419 // Open the ACE_Asynch_Read_Stream
420 if (this->rs_.open (*this, this->socket_handle_) == -1)
421 ACE_ERROR ((LM_ERROR,
422 ACE_TEXT ("%p\n"),
423 ACE_TEXT ("Receiver::ACE_Asynch_Read_Stream::open")));
424 else
426 if (this->odd_)
427 Receiver::writer_->open ();
429 this->initiate_read_stream ();
432 this->check_destroy ();
436 Receiver::initiate_read_stream (void)
438 if (!Receiver::writer_)
439 return -1;
441 // how many chunks to allocate?
442 size_t number_of_new_chunks = (this->partial_chunk_ ?
443 (ACE_IOV_MAX / RECEIVERS) - 1
444 : ACE_IOV_MAX / RECEIVERS);
446 // allocate chunks chain
447 ACE_Message_Block *head_mb = 0;
448 if (-1 == allocate_chunks_chain (head_mb, number_of_new_chunks))
450 ACE_TEST_ASSERT (0);
451 return -1;
454 // calculate how many bytes to read
456 // head_mb could be 0 (no new chunks allocated)
457 size_t bytes_to_read = head_mb ? head_mb->total_size () : 0;
459 // add the partial chunk at the front if appropriate, and update
460 // the number of bytes to read
461 if (this->partial_chunk_)
463 bytes_to_read += this->partial_chunk_->space ();
464 this->partial_chunk_->cont (head_mb);
465 head_mb = this->partial_chunk_;
466 this->partial_chunk_ = 0;
469 ACE_DEBUG ((LM_DEBUG,
470 ACE_TEXT ("Receiver::initiate_read_stream - (%s) readv %d\n"),
471 this->odd_ ? ACE_TEXT ("ODD ") : ACE_TEXT ("EVEN"),
472 bytes_to_read));
474 // perform the actual scattered read
475 if (this->rs_.readv (*head_mb,
476 bytes_to_read) == -1)
478 free_chunks_chain (head_mb);
480 ACE_ERROR_RETURN ((LM_ERROR,
481 ACE_TEXT ("%p\n"),
482 ACE_TEXT ("Receiver::ACE_Asynch_Stream::read")),
483 -1);
486 ++this->io_count_;
487 return 0;
490 void
491 Receiver::handle_read_stream (const ACE_Asynch_Read_Stream::Result &result)
493 ACE_Message_Block *mb = &result.message_block ();
495 ACE_DEBUG ((LM_DEBUG,
496 ACE_TEXT ("Receiver::handle_read_stream - (%s) read %d\n"),
497 this->odd_ ? ACE_TEXT ("ODD ") : ACE_TEXT ("EVEN"),
498 result.bytes_transferred ()));
500 // Transfer only complete chunks to the writer.
501 // Save last partial chunk for the next call.
502 // On disconnect (error or 0 transferred), transfer whatever we have.
504 // at this stage there should not be anything there
505 ACE_TEST_ASSERT (!this->partial_chunk_);
507 // first, remove the empty chunks
508 remove_empty_chunks (mb);
510 if (mb && Receiver::writer_)
511 { // there's something to write, and who to write to
513 // write everything or only complete chunks?
515 // write everything - when no new bytes were transferred
516 int write_everything = 0;
517 if (!result.bytes_transferred ())
518 write_everything = 1;
519 if (write_everything)
520 Receiver::writer_->handle_read_chunks_chain (mb,
521 this->odd_ ? ODD : EVEN);
522 else
523 { // filter out the partial chunk at the end (if present)
524 // and save it for later before writing the full chunks
526 // have this->partial_chunk_ point to the last chunk in the chain
527 size_t last_index = last_chunk (mb, this->partial_chunk_);
528 if (this->partial_chunk_ &&
529 this->partial_chunk_->length () < chunk_size)
530 { // found partial chunk at end of chain
531 // detach it from the chain
532 if (last_index > 1) // chain bigger than 1
534 ACE_Message_Block *pre_last = mb;
535 for (size_t index = 1; index < last_index - 1; ++index)
536 pre_last = pre_last->cont ();
538 // detach partial chunk from chain
539 pre_last->cont (0);
541 else
542 // chain in length of 1 - so we need to zero mb
543 mb = 0;
545 else // last is a full chunk, so hand it over with the rest
546 this->partial_chunk_ = 0;
548 // transfer (if there's anything left)
549 if (mb && mb->total_length ())
550 Receiver::writer_->handle_read_chunks_chain (
552 this->odd_ ? ODD : EVEN);
554 // initiate more reads only if no error
555 if (!result.error ())
556 this->initiate_read_stream ();
557 else
558 ACE_TEST_ASSERT (0);
561 else if (mb && !Receiver::writer_)
562 // no one to write to
563 free_chunks_chain (mb);
565 --this->io_count_;
567 this->check_destroy ();
570 // *************************************************************
571 // Acceptor Impl
572 // *************************************************************
574 Acceptor::Acceptor (void)
575 : sessions_ (0)
577 for (int i = 0; i < RECEIVERS; ++i)
578 this->list_receivers_[i] = 0;
581 Acceptor::~Acceptor (void)
583 this->stop ();
587 void
588 Acceptor::stop (void)
590 // This method can be called only after proactor event loop is done
591 // in all threads.
592 for (int i = 0; i < RECEIVERS; ++i)
594 delete this->list_receivers_[i];
595 this->list_receivers_[i] = 0;
599 void
600 Acceptor::on_new_receiver (Receiver & rcvr)
602 ++this->sessions_;
603 this->list_receivers_[rcvr.index_] = &rcvr;
604 ACE_DEBUG ((LM_DEBUG,
605 ACE_TEXT ("Receiver::CTOR sessions_ = %d\n"),
606 this->sessions_));
609 void
610 Acceptor::on_delete_receiver (Receiver & rcvr)
612 --this->sessions_;
613 if (rcvr.index_ >= 0
614 && rcvr.index_ < RECEIVERS
615 && this->list_receivers_[rcvr.index_] == &rcvr)
616 this->list_receivers_[rcvr.index_] = 0;
618 ACE_DEBUG ((LM_DEBUG,
619 ACE_TEXT ("Receiver::~DTOR sessions_ = %d\n"),
620 this->sessions_));
623 Receiver *
624 Acceptor::make_handler (void)
626 if (this->sessions_ >= RECEIVERS)
627 return 0;
629 for (int i = 0; i < RECEIVERS; ++i)
631 if (this->list_receivers_[i] == 0)
633 ACE_NEW_RETURN (this->list_receivers_[i],
634 Receiver (this, i),
636 return this->list_receivers_[i];
640 return 0;
643 // *************************************************************
644 // Writer Impl
645 // *************************************************************
647 Writer::Writer (void)
648 : output_file_handle_ (ACE_INVALID_HANDLE),
649 writing_file_offset_ (0),
650 reported_file_offset_ (0),
651 odd_chain_ (0),
652 even_chain_ (0),
653 io_count_ (0),
654 receiver_count_ (0)
658 Writer::~Writer (void)
660 ACE_DEBUG ((LM_DEBUG,
661 ACE_TEXT ("Writer::~Writer\n")));
663 if (this->output_file_handle_ != ACE_INVALID_HANDLE)
664 ACE_OS::close (this->output_file_handle_);
666 Receiver::writer_ = 0;
669 void
670 Writer::on_new_receiver ()
672 ACE_DEBUG ((LM_DEBUG,
673 ACE_TEXT ("Writer::on_new_receiver\n")));
675 ++this->receiver_count_;
678 void
679 Writer::on_delete_receiver ()
681 ACE_DEBUG ((LM_DEBUG,
682 ACE_TEXT ("Writer::on_delete_receiver\n")));
684 --this->receiver_count_;
686 if (0 == this->receiver_count_)
688 if (this->io_count_ <= 0)
689 // no pending io, so do the work oursleves
690 // (if pending io, they'll see the zero receiver count)
691 this->initiate_write_file ();
695 void
696 Writer::open (void)
698 // Open the file for output
699 if (ACE_INVALID_HANDLE == (this->output_file_handle_ = ACE_OS::open (output_file,
700 O_CREAT | _O_TRUNC | _O_WRONLY |\
701 FILE_FLAG_OVERLAPPED |\
702 FILE_FLAG_NO_BUFFERING,
703 ACE_DEFAULT_FILE_PERMS)))
704 ACE_ERROR ((LM_ERROR,
705 ACE_TEXT ("%p\n"),
706 ACE_TEXT ("Writer::open::ACE_OS::open")));
707 // Open the ACE_Asynch_Write_File
708 else if (this->wf_.open (*this, this->output_file_handle_) == -1)
709 ACE_ERROR ((LM_ERROR,
710 ACE_TEXT ("%p\n"),
711 ACE_TEXT ("Writer::open::ACE_Asynch_Write_File::open")));
715 Writer::handle_read_chunks_chain (ACE_Message_Block *mb,
716 int type)
718 ACE_DEBUG ((LM_DEBUG,
719 ACE_TEXT ("Writer::handle_read_chunks_chain - (%s) %d bytes\n"),
720 (type == ODD) ? ACE_TEXT ("ODD ") : ACE_TEXT ("EVEN"),
721 mb->total_length ()));
723 add_to_chunks_chain (ODD == type ? this->odd_chain_ : this->even_chain_, mb);
725 this->initiate_write_file ();
727 return 0;
731 Writer::initiate_write_file (void)
733 // find out how much can we merge
734 ACE_Message_Block *dummy_last = 0;
735 size_t odd_count = last_chunk (this->odd_chain_, dummy_last);
736 size_t even_count = last_chunk (this->even_chain_, dummy_last);
738 size_t merge_size = ACE_MIN (ACE_MIN (odd_count, even_count),
739 (size_t) ACE_IOV_MAX);
741 // the options here are as follows:
742 // io_count_ can be zero or greater.
743 // merge_size can be zero or not.
744 // if non zero merge, write the merge. ASSERT receiver_count_ is non zero too.
745 // if zero merge:
746 // if receiver_count_ is non zero, NOOP.
747 // if zero receiver_count_, we should write whatever is left,
748 // and terminate the writer at completion.
749 // if nothing to write, and io_count_ is zero too, terminate here.
751 if (0 == merge_size &&
752 0 != this->receiver_count_)
753 return 0;
755 if (0 == merge_size &&
756 0 == this->receiver_count_ &&
757 0 == odd_count &&
758 0 == even_count &&
759 0 == this->io_count_)
761 ACE_DEBUG ((LM_DEBUG,
762 ACE_TEXT ("Writer::initiate_write_file")
763 ACE_TEXT (" - ending proactor event loop\n")));
765 ACE_Proactor::instance ()->end_event_loop ();
767 delete this;
769 return 0;
772 // if we reached nere and merge_size is zero, we should write whatever is
773 // in the queues (1 to 2 chunks together), so let's force the merge size to 1.
774 if (0 == merge_size)
776 ACE_TEST_ASSERT (1 == odd_count && 1 >= even_count);
777 merge_size = 1;
780 // Now that we found out what we want to do, prepare the chain
781 // that will be written, and update the remainders
782 ACE_Message_Block *new_odd_chain_head = this->odd_chain_;
783 ACE_Message_Block *new_even_chain_head = this->even_chain_;
785 // locate the place for detachment in the chains
786 ACE_Message_Block *pre_odd = 0;
787 ACE_Message_Block *pre_even = 0;
788 for (size_t index = 0; index < merge_size; ++index)
790 pre_odd = new_odd_chain_head;
791 if (new_odd_chain_head)
792 new_odd_chain_head = new_odd_chain_head->cont ();
793 pre_even = new_even_chain_head;
794 if (new_even_chain_head)
795 new_even_chain_head = new_even_chain_head->cont ();
797 // now detach the chain
798 if (pre_odd)
799 pre_odd->cont (0);
800 if (pre_even)
801 pre_even->cont (0);
803 // perform merge between the two chains
804 merge_odd_even_chains (this->odd_chain_, this->even_chain_);
806 // and now finally perform the write
807 ACE_Message_Block *united_mb = this->odd_chain_;
808 // update the remainders of the chains
809 this->odd_chain_ = new_odd_chain_head;
810 this->even_chain_ = new_even_chain_head;
811 size_t increment_writing_file_offset = united_mb->total_length ();
813 // Reconstruct the file
814 // Write the size, not the length, because we must write in chunks
815 // of <page size>
816 ACE_DEBUG ((LM_DEBUG,
817 ACE_TEXT ("Writer::initiate_write_file: write %d bytes at %d\n"),
818 united_mb->total_size (),
819 this->writing_file_offset_));
820 if (this->wf_.writev (*united_mb,
821 united_mb->total_size (),
822 this->writing_file_offset_) == -1)
824 free_chunks_chain (united_mb);
826 ACE_ERROR_RETURN((LM_ERROR,
827 ACE_TEXT ("%p\n"),
828 ACE_TEXT ("Writer::initiate_write_file::ACE_Asynch_Write_Stream::writev")),
829 -1);
832 // we update now because otherwise, we'd have error when performing
833 // pipelined writing (that is, mulitple calls to write before the callbacks
834 // to handle_x)
835 this->writing_file_offset_ +=
836 static_cast<u_long> (increment_writing_file_offset);
837 ++this->io_count_;
838 return 0;
841 void
842 Writer::handle_write_file (const ACE_Asynch_Write_File::Result &result)
844 ACE_Message_Block *mb = &result.message_block ();
846 ACE_DEBUG ((LM_DEBUG,
847 ACE_TEXT ("Writer::handle_write_file at offset %d wrote %d\n"),
848 this->reported_file_offset_,
849 result.bytes_transferred ()));
851 this->reported_file_offset_ +=
852 static_cast<u_long> (result.bytes_transferred ());
854 // Always truncate as required,
855 // because partial will always be the last write to a file
856 ACE_Message_Block *last_mb = mb;
857 last_chunk (mb, last_mb);
859 if (last_mb->space ())
860 ACE_OS::truncate (output_file,
861 this->reported_file_offset_ -
862 static_cast<u_long> (last_mb->space ()));
864 free_chunks_chain (mb);
866 --this->io_count_;
868 // end of process?
869 if (0 == this->receiver_count_ &&
870 0 == this->io_count_)
872 ACE_TEST_ASSERT (0 == this->odd_chain_ && 0 == this->even_chain_);
874 ACE_DEBUG ((LM_DEBUG,
875 ACE_TEXT ("Writer::handle_write_file")
876 ACE_TEXT (" - ending proactor event loop\n")));
878 ACE_Proactor::instance ()->end_event_loop ();
880 delete this;
884 // *************************************************************
885 // Connector and Sender
886 // *************************************************************
887 class Sender;
889 class Connector : public ACE_Asynch_Connector<Sender>
891 friend class Sender;
893 public:
894 Connector (void);
895 virtual ~Connector (void);
897 // Address to pass to Sender for secondary connect.
898 void set_address (const ACE_INET_Addr &addr);
899 const ACE_INET_Addr &get_address (void);
901 void stop (void);
903 // Virtual from ACE_Asynch_Connector
904 virtual Sender *make_handler (void);
906 private:
907 void on_new_sender (Sender &rcvr);
908 void on_delete_sender (Sender &rcvr);
910 int sessions_;
911 ACE_INET_Addr addr_;
912 Sender *list_senders_[SENDERS];
915 class Sender : public ACE_Service_Handler
917 friend class Connector;
918 public:
920 Sender (Connector *connector = 0, int index = -1);
922 virtual ~Sender (void);
924 //FUZZ: disable check_for_lack_ACE_OS
925 /// This is called after the new connection has been established.
926 virtual void open (ACE_HANDLE handle,
927 ACE_Message_Block &message_block);
928 //FUZZ: enable check_for_lack_ACE_OS
930 // This is called by the framework when asynchronous reads from the
931 // file complete.
932 virtual void handle_read_file (const ACE_Asynch_Read_File::Result &result);
934 // This is called by the framework when asynchronous writes from the
935 // socket complete.
936 virtual void handle_write_stream (const ACE_Asynch_Write_Stream::Result &result);
938 private:
939 void check_destroy (void);
941 int initiate_read_file (void);
943 int initiate_write_stream (ACE_Message_Block &mb);
945 int index_;
946 Connector * connector_;
948 // File to read from
949 ACE_Asynch_Read_File rf_;
950 ACE_HANDLE input_file_handle_;
951 u_long file_offset_;
953 // Sockets to send to
954 // odd and even socket output streams
955 ACE_Asynch_Write_Stream ws_[RECEIVERS];
956 ACE_HANDLE socket_handle_[RECEIVERS];
958 long io_count_;
961 // *************************************************************
962 // Connector Impl
963 // *************************************************************
965 Connector::Connector (void)
966 : sessions_ (0)
968 for (int i = 0; i < SENDERS; ++i)
969 this->list_senders_[i] = 0;
972 Connector::~Connector (void)
974 this->stop ();
977 // Address to pass to Sender for secondary connect.
978 void
979 Connector::set_address (const ACE_INET_Addr &addr)
981 this->addr_ = addr;
984 const ACE_INET_Addr &
985 Connector::get_address (void)
987 return this->addr_;
990 void
991 Connector::stop (void)
993 // This method can be called only after proactor event loop is done
994 // in all threads.
996 for (int i = 0; i < SENDERS; ++i)
998 delete this->list_senders_[i];
999 this->list_senders_[i] = 0;
1003 void
1004 Connector::on_new_sender (Sender &sndr)
1006 ++this->sessions_;
1007 this->list_senders_[sndr.index_] = &sndr;
1008 ACE_DEBUG ((LM_DEBUG,
1009 ACE_TEXT ("Sender::CTOR sessions_ = %d\n"),
1010 this->sessions_));
1013 void
1014 Connector::on_delete_sender (Sender &sndr)
1016 --this->sessions_;
1017 if (sndr.index_ >= 0
1018 && sndr.index_ < SENDERS
1019 && this->list_senders_[sndr.index_] == &sndr)
1020 this->list_senders_[sndr.index_] = 0;
1022 ACE_DEBUG ((LM_DEBUG,
1023 ACE_TEXT ("Sender::~DTOR sessions_ = %d\n"),
1024 this->sessions_));
1027 Sender *
1028 Connector::make_handler (void)
1030 if (this->sessions_ >= SENDERS)
1031 return 0;
1033 for (int i = 0; i < SENDERS; ++i)
1035 if (this->list_senders_ [i] == 0)
1037 ACE_NEW_RETURN (this->list_senders_[i],
1038 Sender (this, i),
1040 return this->list_senders_[i];
1044 return 0;
1047 // *************************************************************
1048 // Sender Impl
1049 // *************************************************************
1051 Sender::Sender (Connector * connector, int index)
1052 : index_ (index),
1053 connector_ (connector),
1054 input_file_handle_ (ACE_INVALID_HANDLE),
1055 file_offset_ (0),
1056 io_count_ (0)
1058 socket_handle_[ODD] = socket_handle_[EVEN] = ACE_INVALID_HANDLE;
1060 if (this->connector_ != 0)
1061 this->connector_->on_new_sender (*this);
1064 Sender::~Sender (void)
1066 ACE_DEBUG ((LM_DEBUG,
1067 ACE_TEXT ("Sender::~Sender\n")));
1069 if (this->connector_ != 0)
1070 this->connector_->on_delete_sender (*this);
1072 if (this->socket_handle_[ODD] != ACE_INVALID_HANDLE)
1073 ACE_OS::closesocket (this->socket_handle_[ODD]);
1075 if (this->socket_handle_[EVEN] != ACE_INVALID_HANDLE)
1076 ACE_OS::closesocket (this->socket_handle_[EVEN]);
1078 if (this->input_file_handle_ != ACE_INVALID_HANDLE)
1079 ACE_OS::close (this->input_file_handle_);
1081 if (client_only)
1082 ACE_Proactor::instance ()->end_event_loop ();
1085 // return true if we alive, false we commited suicide
1086 void
1087 Sender::check_destroy (void)
1089 if (this->io_count_ <= 0)
1090 delete this;
1093 void
1094 Sender::open (ACE_HANDLE handle, ACE_Message_Block &)
1096 this->socket_handle_[ODD] = handle;
1098 // Open the input file
1099 if (ACE_INVALID_HANDLE == (this->input_file_handle_ =
1100 ACE_OS::open (input_file,
1101 _O_RDONLY |\
1102 FILE_FLAG_OVERLAPPED |\
1103 FILE_FLAG_NO_BUFFERING,
1104 ACE_DEFAULT_FILE_PERMS)))
1106 ACE_ERROR ((LM_ERROR,
1107 ACE_TEXT ("%p\n"),
1108 ACE_TEXT ("Sender::open::ACE_OS::open")));
1110 else
1112 // Now connect (w/o the connector factory) to the even (=second)
1113 // receiver. We don't connect thru the factory in order not to
1114 // instantiate another Sender.
1115 ACE_SOCK_Connector sock_connector;
1116 ACE_SOCK_Stream sock_stream;
1117 if (-1 == sock_connector.connect (sock_stream,
1118 this->connector_->get_address ()))
1119 ACE_ERROR ((LM_ERROR,
1120 ACE_TEXT ("%p\n"),
1121 ACE_TEXT ("Sender::open::ACE_SOCK_Connector::connect")));
1123 else
1125 this->socket_handle_[EVEN] = sock_stream.get_handle ();
1127 // Open odd ACE_Asynch_Write_Stream
1128 if (this->ws_[ODD].open (*this, this->socket_handle_[ODD]) == -1)
1129 ACE_ERROR ((LM_ERROR,
1130 ACE_TEXT ("%p\n"),
1131 ACE_TEXT ("Sender::open::ACE_Asynch_Write_Stream::open")));
1133 // Open even ACE_Asynch_Write_Stream
1134 else if (this->ws_[EVEN].open (*this, this->socket_handle_[EVEN]) == -1)
1135 ACE_ERROR ((LM_ERROR,
1136 ACE_TEXT ("%p\n"),
1137 ACE_TEXT ("Sender::open::ACE_Asynch_Write_Stream::open")));
1139 // Open ACE_Asynch_Read_File
1140 else if (this->rf_.open (*this, this->input_file_handle_) == -1)
1141 ACE_ERROR ((LM_ERROR,
1142 ACE_TEXT ("%p\n"),
1143 ACE_TEXT ("Sender::open::ACE_Asynch_Read_File::open")));
1144 else
1145 // Start an asynchronous read
1146 this->initiate_read_file ();
1150 this->check_destroy ();
1154 Sender::initiate_read_file (void)
1156 ACE_TEST_ASSERT (0 == this->file_offset_ % chunk_size);
1158 static const size_t file_size = ACE_OS::filesize (input_file);
1160 static const size_t number_of_chunks_needed_for_file =
1161 static_cast<size_t> (ACE_OS::ceil ((double) file_size / chunk_size));
1163 size_t relevant_number_of_chunks =
1164 ACE_MIN ((size_t)ACE_IOV_MAX,
1165 number_of_chunks_needed_for_file
1166 - (size_t)(this->file_offset_ / chunk_size));
1168 if (!relevant_number_of_chunks)
1170 ACE_TEST_ASSERT (0); // Just 2 C it coming
1171 return 0;
1174 ACE_Message_Block *head_mb = 0;
1175 if (-1 == allocate_chunks_chain (head_mb, relevant_number_of_chunks))
1177 ACE_TEST_ASSERT (0);
1178 return -1;
1181 // Inititiate read
1182 if (this->rf_.readv (*head_mb,
1183 head_mb->total_size (),
1184 this->file_offset_) == -1)
1186 free_chunks_chain (head_mb);
1188 ACE_ERROR_RETURN ((LM_ERROR,
1189 ACE_TEXT ("%p\n"),
1190 ACE_TEXT ("Sender::initiate_read_file::")
1191 ACE_TEXT ("ACE_Asynch_Read_Stream::readv")),
1192 -1);
1195 ++this->io_count_;
1196 return 0;
1200 Sender::initiate_write_stream (ACE_Message_Block &mb)
1202 // send the odd to the first connection, and the even to the second
1203 // connection.
1205 ACE_Message_Block *odd_mb = &mb;
1206 ACE_Message_Block *even_mb = mb.cont ();
1208 split_odd_even_chains (odd_mb, even_mb);
1210 ACE_DEBUG ((LM_DEBUG,
1211 ACE_TEXT ("Sender::initiate_write_stream - (ODD ) writev %d\n"),
1212 odd_mb->total_length ()));
1214 if (this->ws_[ODD].writev (*odd_mb, odd_mb->total_length ()) == -1)
1216 free_chunks_chain (odd_mb);
1218 if (even_mb)
1219 free_chunks_chain (even_mb);
1221 ACE_ERROR_RETURN((LM_ERROR,
1222 ACE_TEXT ("%p\n"),
1223 ACE_TEXT ("Sender::ACE_Asynch_Stream::writev")),
1224 -1);
1227 ++this->io_count_;
1229 if (even_mb)
1231 ACE_DEBUG ((LM_DEBUG,
1232 ACE_TEXT ("Sender::initiate_write_stream - (EVEN) writev %d\n"),
1233 even_mb->total_length ()));
1235 if (this->ws_[EVEN].writev (*even_mb, even_mb->total_length ()) == -1)
1237 free_chunks_chain (even_mb);
1239 ACE_ERROR_RETURN((LM_ERROR,
1240 ACE_TEXT ("%p\n"),
1241 ACE_TEXT ("Sender::ACE_Asynch_Stream::writev")),
1242 -1);
1245 ++this->io_count_;
1248 return 0;
1251 void
1252 Sender::handle_read_file (const ACE_Asynch_Read_File::Result &result)
1254 ACE_Message_Block *mb = &result.message_block ();
1256 if (result.error () == 0 && result.bytes_transferred () != 0)
1258 size_t bytes_transferred = result.bytes_transferred ();
1259 size_t chunks_chain_size = mb->total_size ();
1260 ACE_DEBUG ((LM_DEBUG,
1261 ACE_TEXT ("Sender::handle_read_file, read %d, ")
1262 ACE_TEXT ("chain total %d\n"),
1263 bytes_transferred,
1264 chunks_chain_size));
1266 this->file_offset_ += static_cast<u_long> (bytes_transferred);
1268 this->initiate_write_stream (*mb);
1270 // and read more if required
1271 if (bytes_transferred == chunks_chain_size)
1272 this->initiate_read_file ();
1274 else
1275 free_chunks_chain (mb);
1277 --this->io_count_;
1279 this->check_destroy ();
1282 void
1283 Sender::handle_write_stream (const ACE_Asynch_Write_Stream::Result &result)
1285 ACE_Message_Block *mb = &result.message_block ();
1287 ACE_DEBUG ((LM_DEBUG,
1288 ACE_TEXT ("Sender::handle_write_stream - wrote %d bytes\n"),
1289 result.bytes_transferred ()));
1291 if (result.error () == 0 && result.bytes_transferred () != 0)
1292 // verify sent all
1293 ACE_TEST_ASSERT (0 == mb->total_length ());
1294 else
1295 ACE_TEST_ASSERT (0);
1297 free_chunks_chain (mb);
1299 --this->io_count_;
1301 this->check_destroy ();
1304 // *************************************************************
1305 // Configuration helpers
1306 // *************************************************************
1308 print_usage (int /* argc */, ACE_TCHAR *argv[])
1310 ACE_ERROR
1311 ((LM_ERROR,
1312 ACE_TEXT ("\nusage: %s")
1313 ACE_TEXT ("\n-f <input file>\n")
1314 ACE_TEXT ("\n-c client only (reader-sender)")
1315 ACE_TEXT ("\n-s server only (receiver-writer)")
1316 ACE_TEXT ("\n-h host to connect to")
1317 ACE_TEXT ("\n-p port")
1318 ACE_TEXT ("\n-u show this message")
1319 ACE_TEXT ("\n"),
1320 argv[0]
1322 return -1;
1325 static int
1326 parse_args (int argc, ACE_TCHAR *argv[])
1328 if (argc == 1) // no arguments , so one button test
1329 return 0;
1331 ACE_Get_Opt get_opt (argc, argv, ACE_TEXT ("f:csh:p:u"));
1332 int c;
1334 while ((c = get_opt ()) != EOF)
1336 switch (c)
1338 case 'f':
1339 input_file = get_opt.opt_arg ();
1340 break;
1341 case 'c':
1342 client_only = 1;
1343 server_only = 0;
1344 break;
1345 case 's':
1346 server_only = 1;
1347 client_only = 0;
1348 break;
1349 case 'h':
1350 host = get_opt.opt_arg ();
1351 break;
1352 case 'p':
1353 port = ACE_OS::atoi (get_opt.opt_arg ());
1354 break;
1355 case 'u':
1356 default:
1357 return print_usage (argc, argv);
1358 } // switch
1359 } // while
1361 return 0;
1365 run_main (int argc, ACE_TCHAR *argv[])
1367 ACE_START_TEST (ACE_TEXT ("Proactor_Scatter_Gather_Test"));
1369 if (::parse_args (argc, argv) == -1)
1370 return -1;
1372 chunk_size = ACE_OS::getpagesize ();
1374 if (client_only)
1375 ACE_DEBUG ((LM_INFO,
1376 ACE_TEXT ("Running as client only, page size %d\n"),
1377 chunk_size));
1378 else if (server_only)
1379 ACE_DEBUG ((LM_INFO,
1380 ACE_TEXT ("Running as server only, page size %d\n"),
1381 chunk_size));
1382 else
1383 ACE_DEBUG ((LM_INFO,
1384 ACE_TEXT ("Running as server and client, page size %d\n"),
1385 chunk_size));
1387 Acceptor acceptor;
1388 Connector connector;
1389 ACE_INET_Addr addr (port);
1391 if (!client_only)
1393 // Simplify, initial read with zero size
1394 if (-1 == acceptor.open (addr, 0, 1))
1396 ACE_TEST_ASSERT (0);
1397 return -1;
1401 if (!server_only)
1403 if (-1 == connector.open (1, ACE_Proactor::instance ()))
1405 ACE_TEST_ASSERT (0);
1406 return -1;
1409 // connect to first destination
1410 if (addr.set (port, host, 1, addr.get_type ()) == -1)
1411 ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"), host), -1);
1412 connector.set_address (addr);
1413 if (-1 == connector.connect (addr))
1415 ACE_TEST_ASSERT (0);
1416 return -1;
1420 ACE_Proactor::instance ()->run_event_loop ();
1422 // As Proactor event loop now is inactive it is safe to destroy all
1423 // senders
1425 connector.stop ();
1426 acceptor.stop ();
1428 ACE_Proactor::instance()->close_singleton ();
1430 // now compare the files - available only when on same machine
1432 int success = 0;
1433 if (!client_only && !server_only)
1435 ACE_DEBUG ((LM_INFO,
1436 ACE_TEXT ("Comparing the input file and the output file...\n")));
1438 success = -1;
1439 // map the two files, then perform memcmp
1441 ACE_Mem_Map original_file (input_file);
1442 ACE_Mem_Map reconstructed_file (output_file);
1444 if (original_file.addr () &&
1445 original_file.addr () != MAP_FAILED &&
1446 reconstructed_file.addr () &&
1447 reconstructed_file.addr () != MAP_FAILED)
1449 // compare lengths
1450 if ((original_file.size () == reconstructed_file.size ()) &&
1451 // and if same size, compare file data
1452 (0 == ACE_OS::memcmp (original_file.addr (),
1453 reconstructed_file.addr (),
1454 original_file.size ())))
1455 success = 0;
1459 if (0 == success)
1460 ACE_DEBUG ((LM_DEBUG,
1461 ACE_TEXT ("input file and the output file identical!\n")));
1462 else
1463 ACE_ERROR ((LM_ERROR,
1464 ACE_TEXT ("input file and the output file are different!\n")));
1467 if (!client_only)
1468 ACE_OS::unlink (output_file);
1470 ACE_END_TEST;
1472 return success;
1475 #else
1477 run_main (int, ACE_TCHAR *[])
1479 ACE_START_TEST (ACE_TEXT ("Proactor_Scatter_Gather_Test"));
1481 ACE_DEBUG ((LM_INFO,
1482 ACE_TEXT ("Asynchronous Scatter/Gather IO is unsupported.\n")
1483 ACE_TEXT ("Proactor_Scatter_Gather_Test will not be run.\n")));
1485 ACE_END_TEST;
1487 return 0;
1490 #endif /* ACE_HAS_WIN32_OVERLAPPED_IO */