Merge pull request #2317 from jwillemsen/jwi-deleteop
[ACE_TAO.git] / ACE / examples / Connection / blocking / SPIPE-acceptor.cpp
blobc0722d5ebda50b90438916e710557b189c38ba1c
1 #if !defined (SPIPE_ACCEPTOR_C)
2 #define SPIPE_ACCEPTOR_C
4 #include "ace/OS_NS_string.h"
5 #include "ace/SPIPE_Addr.h"
6 #include "ace/SPIPE_Acceptor.h"
7 #include "ace/Proactor.h"
8 #include "ace/Get_Opt.h"
9 #include "ace/Signal.h"
10 #include "SPIPE-acceptor.h"
13 #if defined (ACE_HAS_WIN32_OVERLAPPED_IO) || defined (ACE_HAS_AIO_CALLS)
15 Svc_Handler::Svc_Handler ()
16 : mb_ (BUFSIZ + 1)
18 // An extra byte for null termination.
19 this->mb_.size (BUFSIZ);
22 Svc_Handler::~Svc_Handler ()
26 int
27 Svc_Handler::open (void *)
29 ACE_DEBUG ((LM_DEBUG,
30 ACE_TEXT ("client connected on handle %d\n"),
31 this->peer ().get_handle ()));
32 if (this->ar_.open (*this,
33 this->peer ().get_handle ()) == -1)
34 return -1;
35 return this->ar_.read (this->mb_,
36 this->mb_.size ());
39 void
40 Svc_Handler::handle_read_stream (const ACE_Asynch_Read_Stream::Result &result)
42 if (result.success () && result.bytes_transferred () > 0)
44 result.message_block ().rd_ptr ()[result.message_block ().length ()] = '\0';
46 // Print out the message received from the server.
47 ACE_DEBUG ((LM_DEBUG,
48 ACE_TEXT ("(%t) message size %d.\n"),
49 result.message_block ().length ()));
50 ACE_DEBUG ((LM_DEBUG,
51 ACE_TEXT ("%C"),
52 result.message_block ().rd_ptr ()));
53 // Reset the message block here to make sure multiple writes to
54 // the pipe don't keep appending to the message_block!
55 this->mb_.reset ();
57 this->ar_.read (this->mb_, this->mb_.size ());
59 else
60 ACE_Proactor::end_event_loop ();
63 IPC_Server::IPC_Server ()
64 : n_threads_ (1),
65 shutdown_ (0)
67 ACE_OS::strcpy (rendezvous_, ACE_TEXT ("acepipe"));
70 IPC_Server::~IPC_Server ()
74 int
75 IPC_Server::handle_signal (int, siginfo_t *, ucontext_t *)
77 ACE_LOG_MSG->log (LM_INFO, ACE_TEXT ("IPC_Server::handle_signal().\n"));
79 // Flag the main <svc> loop to shutdown.
80 this->shutdown_ = 1;
82 this->acceptor ().close (); // Close underlying acceptor.
83 // This should cause the <accept> to fail, which will "bounce"
84 // us out of the loop in <svc>.
85 return 0;
88 int
89 IPC_Server::init (int argc, ACE_TCHAR *argv[])
91 if (this->parse_args (argc, argv) == -1)
92 return -1;
94 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("Opening %s\n"), rendezvous_));
96 // Initialize named pipe listener.
97 if (this->open (ACE_SPIPE_Addr (rendezvous_)) == -1)
98 ACE_ERROR_RETURN ((LM_ERROR,
99 ACE_TEXT ("%p\n"),
100 ACE_TEXT ("open")), 1);
102 // Register to receive shutdowns using this handler.
103 else if (ACE_Reactor::instance ()->register_handler
104 (SIGINT, this) == -1)
105 return -1;
106 else
107 return 0;
111 IPC_Server::fini ()
113 return 0;
117 IPC_Server::parse_args (int argc, ACE_TCHAR *argv[])
119 ACE_LOG_MSG->open (argv[0]);
121 ACE_Get_Opt get_opt (argc, argv, ACE_TEXT ("ut:r:"));
123 for (int c; (c = get_opt ()) != -1; )
125 switch (c)
127 case 'r':
128 ACE_OS::strncpy (rendezvous_,
129 get_opt.opt_arg (),
130 sizeof (rendezvous_) / sizeof (ACE_TCHAR));
131 break;
132 case 't':
133 n_threads_ = ACE_OS::atoi (get_opt.opt_arg ());
134 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("%s == %d.\n"),
135 get_opt.opt_arg (),
136 n_threads_));
137 ACE_Proactor::instance (2 * n_threads_);
138 // This is a lame way to tell the proactor how many threads
139 // we'll be using.
140 break;
141 case 'u':
142 default:
143 ACE_ERROR_RETURN ((LM_ERROR,
144 ACE_TEXT ("usage: %n -t <threads>\n")
145 ACE_TEXT (" -r <rendezvous>\n")), -1);
149 return 0;
152 static ACE_THR_FUNC_RETURN
153 run_reactor_event_loop (void *)
155 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) worker thread starting\n")));
157 ACE_Proactor::run_event_loop ();
158 return 0;
162 IPC_Server::svc ()
164 // Performs the iterative server activities.
165 while (this->shutdown_ == 0)
167 Svc_Handler sh;
169 // Create a new SH endpoint, which performs all processing in
170 // its open() method (note no automatic restart if errno ==
171 // EINTR).
172 if (this->accept (&sh, 0) == -1)
173 ACE_ERROR_RETURN ((LM_ERROR,
174 ACE_TEXT ("%p\n"),
175 ACE_TEXT ("accept")),
178 // SH's destructor closes the stream implicitly but the
179 // listening endpoint stays open.
180 else
182 // Run single-threaded.
183 if (n_threads_ <= 1)
184 run_reactor_event_loop (0);
185 else
187 if (ACE_Thread_Manager::instance ()->spawn_n
188 (n_threads_,
189 run_reactor_event_loop,
191 THR_NEW_LWP) == -1)
192 ACE_ERROR_RETURN ((LM_ERROR,
193 ACE_TEXT ("%p\n"),
194 ACE_TEXT ("spawn_n")),
197 ACE_Thread_Manager::instance ()->wait ();
200 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) main thread exiting.\n")));
202 // Reset the Proactor so another accept will work.
203 ACE_Proactor::reset_event_loop();
205 // Must use some other method now to terminate this thing
206 // instead of the ACE_Signal_Adapter just running
207 // ACE_Proactor::end_event_loop()... Since this is an
208 // ACE_Event_Handler, doesn't it seem possible to implement
209 // a handle_signal() hook, and catch the signal there?
213 /* NOTREACHED */
214 return 0;
217 #endif /* ACE_HAS_WIN32_OVERLAPPED_IO || ACE_HAS_AIO_CALLS */
218 #endif /* SPIPE_ACCEPTOR_C */