Merge pull request #2309 from mitza-oci/warnings
[ACE_TAO.git] / ACE / examples / APG / ThreadPools / TP_Reactor.cpp
blobedd8441a815e55954e6a7b54122f8c2ed54cd37c
1 // == == == == == == == == == == == == == == == == == == == == == == ==
2 // Stolen from $ACE_ROOT/tests/Thread_Pool_Reactor_Test.cpp
3 // Thread_Pool_Reactor_Test.cpp, v 1.29 2001/03/20 01:07:21 irfan Exp
4 // = AUTHOR
5 // Irfan Pyarali <irfan@cs.wustl.edu> and
6 // Nanbor Wang <nanbor@cs.wustl.edu>
7 // == == == == == == == == == == == == == == == == == == == == == == ==
9 #include "ace/config-lite.h"
10 #if defined (ACE_HAS_THREADS)
12 #include "ace/OS_NS_string.h"
13 #include "ace/OS_NS_unistd.h"
14 #include "ace/SOCK_Connector.h"
15 #include "ace/SOCK_Acceptor.h"
16 #include "ace/Acceptor.h"
17 #include "ace/Thread_Manager.h"
18 #include "ace/TP_Reactor.h"
19 #include "ace/Truncate.h"
21 #include "Request_Handler.h"
23 // Accepting end point. This is actually "localhost:10010", but some
24 // platform couldn't resolve the name so we use the IP address
25 // directly here.
26 static const ACE_TCHAR *rendezvous = ACE_TEXT ("127.0.0.1:10010");
28 // Total number of server threads.
29 static size_t svr_thrno = 5;
31 // Total number of client threads.
32 static size_t cli_runs = 2;
34 // Total connection attempts of a client thread.
35 static size_t cli_conn_no = 2;
37 // Total requests a client thread sends.
38 static size_t cli_req_no = 5;
40 // Delay before a thread sending the next request (in msec.)
41 static int req_delay = 50;
44 typedef ACE_Strategy_Acceptor <Request_Handler, ACE_SOCK_ACCEPTOR> ACCEPTOR;
47 Request_Handler::Request_Handler (ACE_Thread_Manager *thr_mgr)
48 : ACE_Svc_Handler<ACE_SOCK_STREAM, ACE_MT_SYNCH> (thr_mgr),
49 nr_msgs_rcvd_(0)
51 this->reactor (ACE_Reactor::instance ());
54 int
55 Request_Handler::handle_input (ACE_HANDLE fd)
57 ACE_TCHAR buffer[BUFSIZ];
58 ACE_TCHAR len = 0;
59 ssize_t result = this->peer ().recv (&len, sizeof (ACE_TCHAR));
61 if (result > 0
62 && this->peer ().recv_n (buffer, len * sizeof (ACE_TCHAR))
63 == static_cast<ssize_t> (len * sizeof (ACE_TCHAR)))
65 ++this->nr_msgs_rcvd_;
67 ACE_DEBUG ((LM_DEBUG,
68 ACE_TEXT ("(%t) svr input; fd: 0x%x; input: %s\n"),
69 fd,
70 buffer));
71 if (ACE_OS::strcmp (buffer, ACE_TEXT ("shutdown")) == 0)
72 ACE_Reactor::instance()->end_reactor_event_loop ();
73 return 0;
75 else
76 ACE_DEBUG ((LM_DEBUG,
77 ACE_TEXT ("(%t) Request_Handler: 0x%x peer closed (0x%x)\n"),
78 this, fd));
79 return -1;
82 int
83 Request_Handler::handle_close (ACE_HANDLE fd, ACE_Reactor_Mask)
85 ACE_DEBUG ((LM_DEBUG,
86 ACE_TEXT ("(%t) svr close; fd: 0x%x, rcvd %d msgs\n"),
87 fd,
88 this->nr_msgs_rcvd_));
90 if (this->nr_msgs_rcvd_ != cli_req_no)
91 ACE_ERROR((LM_ERROR,
92 ACE_TEXT ("(%t) Handler 0x%x: Expected %d messages; got %d\n"),
93 this,
94 cli_req_no,
95 this->nr_msgs_rcvd_));
97 this->destroy ();
98 return 0;
101 // Listing 2 code/ch16
102 static int
103 reactor_event_hook (ACE_Reactor *)
105 ACE_DEBUG ((LM_DEBUG,
106 ACE_TEXT ("(%t) handling events ....\n")));
108 return 0;
111 class ServerTP : public ACE_Task_Base
113 public:
114 virtual int svc ()
116 ACE_DEBUG ((LM_DEBUG,
117 ACE_TEXT ("(%t) Running the event loop\n")));
119 int result =
120 ACE_Reactor::instance ()->run_reactor_event_loop
121 (&reactor_event_hook);
123 if (result == -1)
124 ACE_ERROR_RETURN ((LM_ERROR,
125 ACE_TEXT ("(%t) %p\n"),
126 ACE_TEXT ("Error handling events")),
129 ACE_DEBUG ((LM_DEBUG,
130 ACE_TEXT ("(%t) Done handling events.\n")));
132 return 0;
135 // Listing 2
137 class Client: public ACE_Task_Base
139 public:
140 Client()
141 :addr_(rendezvous)
144 virtual int svc()
146 ACE_OS::sleep (3);
147 const ACE_TCHAR *msg =
148 ACE_TEXT ("Message from Connection worker");
150 ACE_TCHAR buf [BUFSIZ];
151 buf[0] =
152 ACE_Utils::truncate_cast<ACE_TCHAR> (ACE_OS::strlen (msg) + 1);
153 ACE_OS::strcpy (&buf[1], msg);
155 for (size_t i = 0; i < cli_runs; i++)
156 send_work_to_server(buf);
158 shut_down();
160 return 0;
163 private:
164 void send_work_to_server(ACE_TCHAR* arg)
166 ACE_SOCK_Stream stream;
167 ACE_SOCK_Connector connect;
168 ACE_Time_Value delay (0, req_delay);
169 size_t len = * reinterpret_cast<ACE_TCHAR *> (arg);
171 for (size_t i = 0 ; i < cli_conn_no; i++)
173 if (connect.connect (stream, addr_) < 0)
175 ACE_ERROR ((LM_ERROR,
176 ACE_TEXT ("(%t) %p\n"),
177 ACE_TEXT ("connect")));
178 continue;
181 for (size_t j = 0; j < cli_req_no; j++)
183 ACE_DEBUG ((LM_DEBUG,
184 ACE_TEXT ("Sending work to server on handle 0x%x, req %d\n"),
185 stream.get_handle (),
186 j+1));
187 if (stream.send_n (arg,
188 (len + 1) * sizeof (ACE_TCHAR)) == -1)
190 ACE_ERROR ((LM_ERROR,
191 ACE_TEXT ("(%t) %p\n"),
192 ACE_TEXT ("send_n")));
193 continue;
195 ACE_OS::sleep (delay);
198 stream.close ();
203 void shut_down()
205 ACE_SOCK_Stream stream;
206 ACE_SOCK_Connector connect;
208 if (connect.connect (stream, addr_) == -1)
209 ACE_ERROR ((LM_ERROR,
210 ACE_TEXT ("(%t) %p Error while connecting\n"),
211 ACE_TEXT ("connect")));
213 const ACE_TCHAR *sbuf = ACE_TEXT ("\011shutdown");
215 ACE_DEBUG ((LM_DEBUG,
216 ACE_TEXT ("shutdown stream handle = %x\n"),
217 stream.get_handle ()));
219 if (stream.send_n (sbuf, (ACE_OS::strlen (sbuf) + 1) * sizeof (ACE_TCHAR)) == -1)
220 ACE_ERROR ((LM_ERROR,
221 ACE_TEXT ("(%t) %p\n"),
222 ACE_TEXT ("send_n")));
224 stream.close ();
226 private:
227 ACE_INET_Addr addr_;
229 // Listing 1 code/ch16
230 int ACE_TMAIN (int, ACE_TCHAR *[])
232 ACE_TP_Reactor sr;
233 ACE_Reactor new_reactor (&sr);
234 ACE_Reactor::instance (&new_reactor);
236 ACCEPTOR acceptor;
237 ACE_INET_Addr accept_addr (rendezvous);
239 if (acceptor.open (accept_addr) == -1)
240 ACE_ERROR_RETURN ((LM_ERROR,
241 ACE_TEXT ("%p\n"),
242 ACE_TEXT ("open")),
245 ACE_DEBUG ((LM_DEBUG,
246 ACE_TEXT ("(%t) Spawning %d server threads...\n"),
247 svr_thrno));
249 ServerTP serverTP;
250 serverTP.activate (THR_NEW_LWP | THR_JOINABLE,
251 ACE_Utils::truncate_cast<int> (svr_thrno));
253 Client client;
254 client.activate ();
256 ACE_Thread_Manager::instance ()->wait ();
258 return 0;
260 // Listing 1
261 #else
262 #include "ace/OS_main.h"
263 #include "ace/OS_NS_stdio.h"
265 int ACE_TMAIN (int, ACE_TCHAR *[])
267 ACE_OS::puts (ACE_TEXT ("This example requires threads."));
268 return 0;
271 #endif /* ACE_HAS_THREADS */