Merge branch 'master' into jwi-bcc64xsingletonwarning
[ACE_TAO.git] / ACE / examples / Reactor / Proactor / test_proactor3.cpp
blob68542f460801f24d698942578c637d5d9af665df
2 //=============================================================================
3 /**
4 * @file test_proactor3.cpp
6 * This program illustrates how the <ACE_Proactor> can be used to
7 * implement an application that does various asynchronous
8 * operations.
10 * @author Irfan Pyarali <irfan@cs.wustl.edu> modified by Alexander Libman <alibman@baltimore.com> from original test_proactor.cpp
12 //=============================================================================
15 #include "ace/Signal.h"
17 #include "ace/Service_Config.h"
18 #include "ace/Proactor.h"
19 #include "ace/Asynch_IO.h"
20 #include "ace/Asynch_IO_Impl.h"
21 #include "ace/Asynch_Acceptor.h"
22 #include "ace/INET_Addr.h"
23 #include "ace/Manual_Event.h"
24 #include "ace/SOCK_Connector.h"
25 #include "ace/SOCK_Acceptor.h"
26 #include "ace/SOCK_Stream.h"
27 #include "ace/Message_Block.h"
28 #include "ace/Get_Opt.h"
30 // FUZZ: disable check_for_streams_include
31 #include "ace/streams.h"
33 #include "ace/Task.h"
36 #if defined (ACE_HAS_WIN32_OVERLAPPED_IO) || defined (ACE_HAS_AIO_CALLS)
37 // This only works on Win32 platforms and on Unix platforms
38 // supporting POSIX aio calls.
40 #if defined (ACE_HAS_WIN32_OVERLAPPED_IO)
42 #include "ace/WIN32_Proactor.h"
44 #elif defined (ACE_HAS_AIO_CALLS)
46 #include "ace/POSIX_Proactor.h"
48 #endif /* ACE_HAS_WIN32_OVERLAPPED_IO */
50 // Some debug helper functions
51 static int disable_signal (int sigmin, int sigmax);
52 #if 0
53 static int print_sigmask ();
54 #endif
56 #define COUT(X) cout << X; cout.flush ();
58 // Proactor Type (UNIX only, Win32 ignored) 0-default, 1 -AIOCB,
59 // 2-SIG, 3-SUN
60 static int proactor_type = 0;
62 // POSIX : > 0 max number aio operations proactor,
63 static int max_aio_operations = 0;
65 // Host that we're connecting to.
66 static ACE_TCHAR *host = 0;
68 // number of Senders instances
69 static int senders = 1;
70 static const int MaxSenders = 100;
72 // duplex mode: ==0 half-duplex
73 // !=0 full duplex
74 static int duplex = 0;
76 // number threads in the Proactor thread pool
77 static int threads = 1;
79 // Port that we're receiving connections on.
80 static u_short port = ACE_DEFAULT_SERVER_PORT;
82 /**
83 * @class MyTask:
85 * @brief MyTask plays role for Proactor threads pool
87 class MyTask: public ACE_Task<ACE_MT_SYNCH>
89 public:
90 MyTask () : threads_ (0), proactor_ (0) {}
92 int svc ();
93 void waitready () { event_.wait (); }
95 private:
96 ACE_Recursive_Thread_Mutex mutex_;
97 int threads_;
98 ACE_Proactor *proactor_;
99 ACE_Manual_Event event_;
101 void create_proactor ();
102 void delete_proactor ();
105 void
106 MyTask::create_proactor ()
108 ACE_GUARD (ACE_Recursive_Thread_Mutex, locker, mutex_);
110 if (threads_ == 0)
112 #if defined (ACE_HAS_WIN32_OVERLAPPED_IO)
113 ACE_WIN32_Proactor *proactor = new ACE_WIN32_Proactor;
114 ACE_DEBUG ((LM_DEBUG,"(%t) Create Proactor Type=WIN32"));
116 #elif defined (ACE_HAS_AIO_CALLS)
118 ACE_POSIX_Proactor *proactor = 0;
120 switch (proactor_type)
122 case 1:
123 proactor = new ACE_POSIX_AIOCB_Proactor (max_aio_operations);
124 ACE_DEBUG ((LM_DEBUG,"(%t) Create Proactor Type=AIOCB\n"));
125 break;
126 case 2:
127 proactor = new ACE_POSIX_SIG_Proactor;
128 ACE_DEBUG ((LM_DEBUG,"(%t) Create Proactor Type=SIG\n"));
129 break;
130 default:
131 proactor = new ACE_POSIX_SIG_Proactor;
132 ACE_DEBUG ((LM_DEBUG,"(%t) Create Proactor Type=SIG\n"));
133 break;
135 #endif
137 proactor_ = new ACE_Proactor (proactor, 1);
139 ACE_Proactor::instance(proactor_);
140 event_.signal ();
143 threads_++;
146 void
147 MyTask::delete_proactor ()
149 ACE_GUARD (ACE_Recursive_Thread_Mutex, locker, mutex_);
150 if (--threads_ == 0)
152 ACE_DEBUG ((LM_DEBUG, "(%t) Delete Proactor\n"));
153 ACE_Proactor::instance ((ACE_Proactor *) 0);
154 delete proactor_;
155 proactor_ = 0;
160 MyTask::svc ()
162 ACE_DEBUG ((LM_DEBUG, "(%t) MyTask started\n"));
164 create_proactor ();
165 disable_signal (ACE_SIGRTMIN, ACE_SIGRTMAX);
167 while (ACE_Proactor::event_loop_done () == 0)
168 ACE_Proactor::run_event_loop ();
170 delete_proactor ();
172 ACE_DEBUG ((LM_DEBUG, "(%t) MyTask finished\n"));
173 return 0;
176 class Receiver : public ACE_Service_Handler
178 public:
179 Receiver ();
180 ~Receiver ();
182 //FUZZ: disable check_for_lack_ACE_OS
183 /// This is called after the new connection has been accepted.
184 ///FUZZ: enable check_for_lack_ACE_OS
185 virtual void open (ACE_HANDLE handle,
186 ACE_Message_Block &message_block);
188 static long get_number_sessions () { return sessions_; }
190 protected:
191 // These methods are called by the framework
193 /// This is called when asynchronous <read> operation from the socket
194 /// complete.
195 virtual void handle_read_stream (const ACE_Asynch_Read_Stream::Result &result);
197 /// This is called when an asynchronous <write> to the file
198 /// completes.
199 virtual void handle_write_stream (const ACE_Asynch_Write_Stream::Result &result);
201 private:
202 int initiate_read_stream ();
203 int initiate_write_stream (ACE_Message_Block & mb, int nBytes);
204 int check_destroy ();
206 ACE_Asynch_Read_Stream rs_;
207 ACE_Asynch_Write_Stream ws_;
208 ACE_HANDLE handle_;
209 ACE_Recursive_Thread_Mutex mutex_;
210 long io_count_;
211 static long sessions_;
214 long Receiver::sessions_ = 0;
216 Receiver::Receiver ()
217 : handle_ (ACE_INVALID_HANDLE),
218 io_count_ (0)
220 ACE_GUARD (ACE_Recursive_Thread_Mutex, locker, mutex_);
221 sessions_++;
222 ACE_DEBUG ((LM_DEBUG, "Receiver Ctor sessions_=%d\n", sessions_));
225 Receiver::~Receiver ()
227 ACE_GUARD (ACE_Recursive_Thread_Mutex, locker, mutex_);
228 sessions_--;
229 ACE_OS::closesocket (this->handle_);
230 ACE_DEBUG ((LM_DEBUG, "~Receiver Dtor sessions_=%d\n", sessions_));
233 // return true if we alive, false we commited suicide
235 Receiver::check_destroy ()
238 ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex, locker, mutex_, -1);
240 if (io_count_ > 0)
241 return 1;
244 delete this;
245 return 0;
248 void
249 Receiver::open (ACE_HANDLE handle,
250 ACE_Message_Block &)
252 ACE_DEBUG ((LM_DEBUG,
253 "%N:%l:Receiver::open called\n"));
255 this->handle_ = handle;
257 if (this->ws_.open (*this, this->handle_) == -1)
258 ACE_ERROR ((LM_ERROR,
259 "%p\n",
260 "ACE_Asynch_Write_Stream::open"));
261 else if (this->rs_.open (*this, this->handle_) == -1)
262 ACE_ERROR ((LM_ERROR,
263 "%p\n",
264 "ACE_Asynch_Read_Stream::open"));
265 else
266 initiate_read_stream ();
268 check_destroy ();
272 Receiver::initiate_read_stream ()
274 ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex, locker, mutex_, -1);
276 ACE_Message_Block *mb = 0;
277 ACE_NEW_RETURN (mb,
278 ACE_Message_Block (BUFSIZ + 1),
279 -1);
281 // Inititiate read
282 if (this->rs_.read (*mb, mb->size ()- 1) == -1)
284 mb->release ();
285 ACE_ERROR_RETURN ((LM_ERROR,
286 "%p\n",
287 "ACE_Asynch_Read_Stream::read"),
288 -1);
291 io_count_++;
292 return 0;
296 Receiver::initiate_write_stream (ACE_Message_Block &mb, int nbytes)
298 ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex, locker, mutex_, -1);
299 if (nbytes <= 0)
301 mb.release ();
302 ACE_ERROR_RETURN((LM_ERROR,
303 "ACE_Asynch_Write_Stream::write nbytes <0 "),
304 -1);
307 if (this->ws_.write (mb, nbytes) == -1)
309 mb.release ();
310 ACE_ERROR_RETURN((LM_ERROR,
311 "%p\n",
312 "ACE_Asynch_Write_Stream::write"),
313 -1);
316 io_count_++;
317 return 0;
320 void
321 Receiver::handle_read_stream (const ACE_Asynch_Read_Stream::Result &result)
323 // Reset pointers.
324 result.message_block ().rd_ptr ()[result.bytes_transferred ()] = '\0';
326 if (result.bytes_transferred () == 0 || result.error () != 0)
328 ACE_DEBUG ((LM_DEBUG, "handle_read_stream called\n"));
329 ACE_DEBUG ((LM_DEBUG, "********************\n"));
330 ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_to_read", result.bytes_to_read ()));
331 ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "handle", result.handle ()));
332 ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_transfered", result.bytes_transferred ()));
333 ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "act", (u_long) result.act ()));
334 ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "success", result.success ()));
335 ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "completion_key", (u_long) result.completion_key ()));
336 ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "error", result.error ()));
337 ACE_DEBUG ((LM_DEBUG, "********************\n"));
338 ACE_DEBUG ((LM_DEBUG, "%s = %s\n", "message_block", result.message_block ().rd_ptr ()));
341 if (result.success () && result.bytes_transferred () != 0)
343 // Successful read: write the data to the file asynchronously.
344 // Note how we reuse the <ACE_Message_Block> for the writing.
345 // Therefore, we do not delete this buffer because it is handled
346 // in <handle_write_stream>.
348 if(this->initiate_write_stream (result.message_block (),
349 result.bytes_transferred ()) == 0)
351 if (duplex != 0)
353 // Initiate new read from the stream.
354 this->initiate_read_stream ();
358 else
360 result.message_block ().release ();
361 ACE_DEBUG ((LM_DEBUG, "Receiver completed\n"));
365 ACE_GUARD (ACE_Recursive_Thread_Mutex, locker, mutex_);
366 --io_count_;
368 check_destroy ();
371 void
372 Receiver::handle_write_stream (const ACE_Asynch_Write_Stream::Result &result)
374 if (result.bytes_transferred () == 0 || result.error () != 0)
376 ACE_DEBUG ((LM_DEBUG, "handle_write_stream called\n"));
378 ACE_DEBUG ((LM_DEBUG, "********************\n"));
379 ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_to_write", result.bytes_to_write ()));
380 ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "handle", result.handle ()));
381 ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_transfered", result.bytes_transferred ()));
382 ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "act", (u_long) result.act ()));
383 ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "success", result.success ()));
384 ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "completion_key", (u_long) result.completion_key ()));
385 ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "error", result.error ()));
386 ACE_DEBUG ((LM_DEBUG, "********************\n"));
389 result.message_block ().release ();
391 if (result.success () && result.bytes_transferred () != 0)
393 // This code is not robust enough to deal with short file writes
394 // (which hardly ever happen);-)
395 // ACE_ASSERT (result.bytes_to_write () == result.bytes_transferred ());
397 if (duplex == 0)
398 initiate_read_stream ();
402 ACE_GUARD (ACE_Recursive_Thread_Mutex, locker, mutex_);
403 --io_count_;
405 check_destroy ();
409 * @class Sender
411 * @brief Sends welcome messages receives them back.
413 class Sender : public ACE_Handler
415 public:
416 Sender ();
417 ~Sender ();
419 //FUZZ: disable check_for_lack_ACE_OS
420 ///FUZZ: enable check_for_lack_ACE_OS
421 int open (const ACE_TCHAR *host, u_short port);
422 void close ();
424 ACE_HANDLE handle () const;
425 virtual void handle (ACE_HANDLE);
427 protected:
428 // These methods are called by the freamwork
430 /// This is called when asynchronous reads from the socket complete
431 virtual void handle_read_stream (const ACE_Asynch_Read_Stream::Result &result);
433 /// This is called when asynchronous writes from the socket complete
434 virtual void handle_write_stream (const ACE_Asynch_Write_Stream::Result &result);
436 private:
437 int initiate_read_stream ();
438 int initiate_write_stream ();
440 /// Network I/O handle
441 ACE_SOCK_Stream stream_;
443 /// ws (write stream): for writing to the socket
444 ACE_Asynch_Write_Stream ws_;
446 /// rs (read file): for reading from the socket
447 ACE_Asynch_Read_Stream rs_;
449 /// Welcome message
450 ACE_Message_Block welcome_message_;
452 ACE_Recursive_Thread_Mutex mutex_;
453 long io_count_;
456 static const char *data = "Welcome to Irfan World! Irfan RULES here !!\n";
458 Sender::Sender ()
459 : io_count_ (0)
461 // Moment of inspiration... :-)
462 this->welcome_message_.init (data, ACE_OS::strlen (data));
465 Sender::~Sender ()
467 this->close ();
470 void Sender::close ()
472 this->stream_.close ();
475 ACE_HANDLE Sender::handle () const
477 return this->stream_.get_handle ();
480 void Sender::handle (ACE_HANDLE handle)
482 this->stream_.set_handle (handle);
485 int Sender::open (const ACE_TCHAR *host, u_short port)
487 // Initialize stuff
488 // Connect to remote host
489 ACE_INET_Addr address (port, host);
490 ACE_SOCK_Connector connector;
492 if (connector.connect (this->stream_, address) == -1)
494 ACE_ERROR_RETURN ((LM_ERROR,
495 "%p\n",
496 "ACE_SOCK_Connector::connect"),
497 -1);
500 // Open ACE_Asynch_Write_Stream
501 if (this->ws_.open (*this) == -1)
502 ACE_ERROR_RETURN ((LM_ERROR,
503 "%p\n",
504 "ACE_Asynch_Write_Stream::open"),
505 -1);
507 // Open ACE_Asynch_Read_Stream
508 if (this->rs_.open (*this) == -1)
509 ACE_ERROR_RETURN ((LM_ERROR,
510 "%p\n",
511 "ACE_Asynch_Read_Stream::open"),
512 -1);
514 // Start an asynchronous transmit file
515 if (this->initiate_write_stream () == -1)
516 return -1;
518 if (duplex != 0)
519 // Start an asynchronous read file
520 if (this->initiate_read_stream () == -1)
521 return -1;
523 return 0;
527 Sender::initiate_write_stream ()
529 ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex, locker, mutex_, -1);
531 welcome_message_.rd_ptr(welcome_message_.base ());
532 welcome_message_.wr_ptr(welcome_message_.base ());
533 welcome_message_.wr_ptr (ACE_OS::strlen (data));
535 if (this->ws_.write (welcome_message_,
536 welcome_message_.length ()) == -1)
537 ACE_ERROR_RETURN((LM_ERROR,
538 "%p\n",
539 "ACE_Asynch_Write_Stream::write"),
540 -1);
541 io_count_++;
542 return 0;
546 Sender::initiate_read_stream ()
548 ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex, locker, mutex_, -1);
550 // Create a new <Message_Block>. Note that this message block will
551 // be used both to <read> data asynchronously from the socket and to
552 // <write> data asynchronously to the file.
553 ACE_DEBUG ((LM_DEBUG,
554 "initiate_read_stream called\n"));
556 ACE_Message_Block *mb = 0;
557 ACE_NEW_RETURN (mb,
558 ACE_Message_Block (BUFSIZ + 1),
559 -1);
561 // Inititiate read
562 if (this->rs_.read (*mb, mb->size ()- 1) == -1)
564 mb->release ();
565 ACE_ERROR_RETURN ((LM_ERROR,
566 "%p\n",
567 "ACE_Asynch_Read_Stream::read"),
568 -1);
571 io_count_++;
572 return 0;
575 void
576 Sender::handle_write_stream (const ACE_Asynch_Write_Stream::Result &result)
578 if (result.bytes_transferred () == 0 || result.error () != 0)
580 ACE_DEBUG ((LM_DEBUG, "handle_write_stream called\n"));
582 // Reset pointers.
583 result.message_block ().rd_ptr (result.message_block ().rd_ptr () - result.bytes_transferred ());
585 ACE_DEBUG ((LM_DEBUG, "********************\n"));
586 ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_to_write", result.bytes_to_write ()));
587 ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "handle", result.handle ()));
588 ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_transfered", result.bytes_transferred ()));
589 ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "act", (u_long) result.act ()));
590 ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "success", result.success ()));
591 ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "completion_key", (u_long) result.completion_key ()));
592 ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "error", result.error ()));
593 ACE_DEBUG ((LM_DEBUG, "********************\n"));
594 ACE_DEBUG ((LM_DEBUG, "%s = %s\n", "message_block", result.message_block ().rd_ptr ()));
597 // Simplify just for Test
598 if (result.success () && result.bytes_transferred () != 0)
600 if (duplex != 0) // full duplex, continue write
601 initiate_write_stream ();
602 else // half-duplex read reply, after read we will start write
603 initiate_read_stream ();
607 ACE_GUARD (ACE_Recursive_Thread_Mutex, locker, mutex_);
608 --io_count_;
612 void
613 Sender::handle_read_stream (const ACE_Asynch_Read_Stream::Result &result)
615 if (result.bytes_transferred () == 0 || result.error () != 0)
617 ACE_DEBUG ((LM_DEBUG,
618 "handle_read_stream called\n"));
620 // Reset pointers.
621 result.message_block ().rd_ptr ()[result.bytes_transferred ()] = '\0';
623 ACE_DEBUG ((LM_DEBUG, "********************\n"));
624 ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_to_read", result.bytes_to_read ()));
625 ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "handle", result.handle ()));
626 ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "bytes_transfered", result.bytes_transferred ()));
627 ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "act", (u_long) result.act ()));
628 ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "success", result.success ()));
629 ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "completion_key", (u_long) result.completion_key ()));
630 ACE_DEBUG ((LM_DEBUG, "%s = %d\n", "error", result.error ()));
631 ACE_DEBUG ((LM_DEBUG, "********************\n"));
632 ACE_DEBUG ((LM_DEBUG, "%s = %s\n", "message_block", result.message_block ().rd_ptr ()));
635 result.message_block().release ();
637 if (result.success () && result.bytes_transferred () != 0)
639 // Successful read: write the data to the file asynchronously.
640 // Note how we reuse the <ACE_Message_Block> for the writing.
641 // Therefore, we do not delete this buffer because it is handled
642 // in <handle_write_stream>.
644 if (duplex != 0) // full duplex, continue read
645 initiate_read_stream ();
646 else // half-duplex writey, after write we will start read
647 initiate_write_stream ();
651 ACE_GUARD (ACE_Recursive_Thread_Mutex, locker, mutex_);
652 --io_count_;
656 static int
657 set_proactor_type (const char *ptype)
659 if (!ptype)
660 return false;
662 switch (ACE_OS::ace_toupper (*ptype))
664 case 'D' : proactor_type = 0; return true;
665 case 'A' : proactor_type = 1; return true;
666 case 'I' : proactor_type = 2; return true;
668 return false;
671 static int
672 parse_args (int argc, ACE_TCHAR *argv[])
674 ACE_Get_Opt get_opt (argc, argv, ACE_TEXT("t:o:n:p:d:h:s:u"));
675 int c;
677 while ((c = get_opt ()) != EOF)
678 switch (c)
680 case 'd': // duplex
681 duplex = ACE_OS::atoi (get_opt.opt_arg ());
682 break;
683 case 'h': // host for sender
684 host = get_opt.opt_arg ();
685 break;
686 case 'p': // port number
687 port = ACE_OS::atoi (get_opt.opt_arg ());
688 break;
689 case 'n': // thread pool size
690 threads = ACE_OS::atoi (get_opt.opt_arg ());
691 break;
692 case 's': // number of senders
693 senders = ACE_OS::atoi (get_opt.opt_arg ());
694 if (senders > MaxSenders)
695 senders = MaxSenders;
696 break;
697 case 'o': // max number of aio for proactor
698 max_aio_operations = ACE_OS::atoi (get_opt.opt_arg ());
699 break;
700 case 't': // Proactor Type
701 if (set_proactor_type (get_opt.opt_arg ()))
702 break;
703 case 'u':
704 default:
705 ACE_ERROR ((LM_ERROR, "%p.",
706 "\nusage:"
707 "\n-o <max number of started aio operations for Proactor>"
708 "\n-t <Proactor type> UNIX-only, Win32-default always:"
709 "\n a AIOCB"
710 "\n i SIG"
711 "\n s SUN"
712 "\n d default"
713 "\n-d <duplex mode 1-on/0-off>"
714 "\n-h <host> for Sender mode"
715 "\n-n <number threads for Proactor pool>"
716 "\n-p <port to listen/connect>"
717 "\n-s <number of sender's instances>"
718 "\n-u show this message"
719 "\n"));
721 return -1;
724 return 0;
728 ACE_TMAIN (int argc, ACE_TCHAR *argv[])
730 if (parse_args (argc, argv) == -1)
731 return -1;
733 disable_signal (ACE_SIGRTMIN, ACE_SIGRTMAX);
735 MyTask task1;
737 if (task1.activate (THR_NEW_LWP, threads) == -1)
738 ACE_ERROR_RETURN ((LM_ERROR,
739 "%p.\n",
740 "main"),
741 -1);
743 // wait for creation of Proactor
744 task1.waitready ();
746 Sender * send_list[MaxSenders];
748 ACE_Asynch_Acceptor<Receiver> acceptor;
750 int rc = -1;
751 int i;
752 char c;
754 if (host == 0) // Acceptor
756 // Simplify, initial read with zero size
757 if (acceptor.open (ACE_INET_Addr (port),0,1) == 0)
758 rc = 1;
760 else
762 for (i = 0; i < senders; ++i)
763 send_list[i] = new Sender;
765 for (i = 0; i < senders; ++i)
766 if (send_list[i]->open (host, port) == 0)
767 rc++;
770 if (rc > 0)
772 cout << "Press any key to stop=>" << flush;
773 cin.clear ();
774 cin >> c;
777 ACE_Proactor::end_event_loop ();
779 if (host != 0) // we are sender
781 for (i = 0; i < senders; ++i)
782 send_list[i]->close ();
786 ACE_Thread_Manager *tm =
787 ACE_Thread_Manager::instance();
789 tm->wait_task (&task1);
791 cout << "\nNumber of Receivers objects="
792 << Receiver::get_number_sessions ()
793 << flush;
795 for (i = 0; i < senders; ++i)
797 delete (send_list[i]);
798 send_list[i] = 0;
801 return 0;
804 static int
805 disable_signal (int sigmin, int sigmax)
807 #ifndef ACE_WIN32
809 sigset_t signal_set;
810 if (ACE_OS::sigemptyset (&signal_set) == - 1)
811 ACE_ERROR ((LM_ERROR,
812 "Error:(%P | %t):%p\n",
813 "sigemptyset failed"));
815 for (int i = sigmin; i <= sigmax; i++)
816 ACE_OS::sigaddset (&signal_set, i);
818 // Put the <signal_set>.
819 if (ACE_OS::pthread_sigmask (SIG_BLOCK, &signal_set, 0) != 0)
820 ACE_ERROR ((LM_ERROR,
821 "Error:(%P | %t):%p\n",
822 "pthread_sigmask failed"));
823 #else
824 ACE_UNUSED_ARG (sigmin);
825 ACE_UNUSED_ARG (sigmax);
826 #endif /* ACE_WIN32 */
828 return 1;
831 // Get the <signal_set> back from the OS.
833 #if 0
834 static int
835 print_sigmask ()
837 #ifndef ACE_WIN32
838 sigset_t mask;
839 int member = 0;
841 COUT ("\n=============Signal Mask==========")
843 if (ACE_OS::pthread_sigmask (SIG_SETMASK, 0, & mask) != 0)
844 ACE_ERROR ((LM_ERROR,
845 "Error:(%P | %t):%p\n",
846 "ACE_OS::pthread_sigmask failed"));
847 else
848 for (int i = 1; i < 1000; i++)
850 member = ACE_OS::sigismember (&mask,i);
852 COUT ("\nSig ")
853 COUT (i)
854 COUT (" is ")
855 COUT (member)
857 if (member == -1)
858 break;
861 #endif /* ACE_WIN32 */
862 return 0;
864 #endif /* 0 */
866 #else /* ACE_HAS_WIN32_OVERLAPPED_IO || ACE_HAS_AIO_CALLS */
869 ACE_TMAIN (int, ACE_TCHAR *[])
871 ACE_DEBUG ((LM_DEBUG,
872 "This example does not work on this platform.\n"));
873 return 1;
876 #endif /* ACE_HAS_WIN32_OVERLAPPED_IO || ACE_HAS_AIO_CALLS */