1 // == == == == == == == == == == == == == == == == == == == == == == ==
2 // Stolen from $ACE_ROOT/tests/Thread_Pool_Reactor_Test.cpp
3 // Thread_Pool_Reactor_Test.cpp, v 1.29 2001/03/20 01:07:21 irfan Exp
5 // Irfan Pyarali <irfan@cs.wustl.edu> and
6 // Nanbor Wang <nanbor@cs.wustl.edu>
7 // == == == == == == == == == == == == == == == == == == == == == == ==
9 #include "ace/config-lite.h"
10 #if defined (ACE_HAS_THREADS)
12 #include "ace/OS_NS_string.h"
13 #include "ace/OS_NS_unistd.h"
14 #include "ace/SOCK_Connector.h"
15 #include "ace/SOCK_Acceptor.h"
16 #include "ace/Acceptor.h"
17 #include "ace/Thread_Manager.h"
18 #include "ace/TP_Reactor.h"
19 #include "ace/Truncate.h"
21 #include "Request_Handler.h"
23 // Accepting end point. This is actually "localhost:10010", but some
24 // platform couldn't resolve the name so we use the IP address
26 static const ACE_TCHAR
*rendezvous
= ACE_TEXT ("127.0.0.1:10010");
28 // Total number of server threads.
29 static size_t svr_thrno
= 5;
31 // Total number of client threads.
32 static size_t cli_runs
= 2;
34 // Total connection attempts of a client thread.
35 static size_t cli_conn_no
= 2;
37 // Total requests a client thread sends.
38 static size_t cli_req_no
= 5;
40 // Delay before a thread sending the next request (in msec.)
41 static int req_delay
= 50;
44 typedef ACE_Strategy_Acceptor
<Request_Handler
, ACE_SOCK_ACCEPTOR
> ACCEPTOR
;
47 Request_Handler::Request_Handler (ACE_Thread_Manager
*thr_mgr
)
48 : ACE_Svc_Handler
<ACE_SOCK_STREAM
, ACE_MT_SYNCH
> (thr_mgr
),
51 this->reactor (ACE_Reactor::instance ());
55 Request_Handler::handle_input (ACE_HANDLE fd
)
57 ACE_TCHAR buffer
[BUFSIZ
];
59 ssize_t result
= this->peer ().recv (&len
, sizeof (ACE_TCHAR
));
62 && this->peer ().recv_n (buffer
, len
* sizeof (ACE_TCHAR
))
63 == static_cast<ssize_t
> (len
* sizeof (ACE_TCHAR
)))
65 ++this->nr_msgs_rcvd_
;
68 ACE_TEXT ("(%t) svr input; fd: 0x%x; input: %s\n"),
71 if (ACE_OS::strcmp (buffer
, ACE_TEXT ("shutdown")) == 0)
72 ACE_Reactor::instance()->end_reactor_event_loop ();
77 ACE_TEXT ("(%t) Request_Handler: 0x%x peer closed (0x%x)\n"),
83 Request_Handler::handle_close (ACE_HANDLE fd
, ACE_Reactor_Mask
)
86 ACE_TEXT ("(%t) svr close; fd: 0x%x, rcvd %d msgs\n"),
88 this->nr_msgs_rcvd_
));
90 if (this->nr_msgs_rcvd_
!= cli_req_no
)
92 ACE_TEXT ("(%t) Handler 0x%x: Expected %d messages; got %d\n"),
95 this->nr_msgs_rcvd_
));
101 // Listing 2 code/ch16
103 reactor_event_hook (ACE_Reactor
*)
105 ACE_DEBUG ((LM_DEBUG
,
106 ACE_TEXT ("(%t) handling events ....\n")));
111 class ServerTP
: public ACE_Task_Base
116 ACE_DEBUG ((LM_DEBUG
,
117 ACE_TEXT ("(%t) Running the event loop\n")));
120 ACE_Reactor::instance ()->run_reactor_event_loop
121 (&reactor_event_hook
);
124 ACE_ERROR_RETURN ((LM_ERROR
,
125 ACE_TEXT ("(%t) %p\n"),
126 ACE_TEXT ("Error handling events")),
129 ACE_DEBUG ((LM_DEBUG
,
130 ACE_TEXT ("(%t) Done handling events.\n")));
137 class Client
: public ACE_Task_Base
147 const ACE_TCHAR
*msg
=
148 ACE_TEXT ("Message from Connection worker");
150 ACE_TCHAR buf
[BUFSIZ
];
152 ACE_Utils::truncate_cast
<ACE_TCHAR
> (ACE_OS::strlen (msg
) + 1);
153 ACE_OS::strcpy (&buf
[1], msg
);
155 for (size_t i
= 0; i
< cli_runs
; i
++)
156 send_work_to_server(buf
);
164 void send_work_to_server(ACE_TCHAR
* arg
)
166 ACE_SOCK_Stream stream
;
167 ACE_SOCK_Connector connect
;
168 ACE_Time_Value
delay (0, req_delay
);
169 size_t len
= * reinterpret_cast<ACE_TCHAR
*> (arg
);
171 for (size_t i
= 0 ; i
< cli_conn_no
; i
++)
173 if (connect
.connect (stream
, addr_
) < 0)
175 ACE_ERROR ((LM_ERROR
,
176 ACE_TEXT ("(%t) %p\n"),
177 ACE_TEXT ("connect")));
181 for (size_t j
= 0; j
< cli_req_no
; j
++)
183 ACE_DEBUG ((LM_DEBUG
,
184 ACE_TEXT ("Sending work to server on handle 0x%x, req %d\n"),
185 stream
.get_handle (),
187 if (stream
.send_n (arg
,
188 (len
+ 1) * sizeof (ACE_TCHAR
)) == -1)
190 ACE_ERROR ((LM_ERROR
,
191 ACE_TEXT ("(%t) %p\n"),
192 ACE_TEXT ("send_n")));
195 ACE_OS::sleep (delay
);
205 ACE_SOCK_Stream stream
;
206 ACE_SOCK_Connector connect
;
208 if (connect
.connect (stream
, addr_
) == -1)
209 ACE_ERROR ((LM_ERROR
,
210 ACE_TEXT ("(%t) %p Error while connecting\n"),
211 ACE_TEXT ("connect")));
213 const ACE_TCHAR
*sbuf
= ACE_TEXT ("\011shutdown");
215 ACE_DEBUG ((LM_DEBUG
,
216 ACE_TEXT ("shutdown stream handle = %x\n"),
217 stream
.get_handle ()));
219 if (stream
.send_n (sbuf
, (ACE_OS::strlen (sbuf
) + 1) * sizeof (ACE_TCHAR
)) == -1)
220 ACE_ERROR ((LM_ERROR
,
221 ACE_TEXT ("(%t) %p\n"),
222 ACE_TEXT ("send_n")));
229 // Listing 1 code/ch16
230 int ACE_TMAIN (int, ACE_TCHAR
*[])
233 ACE_Reactor
new_reactor (&sr
);
234 ACE_Reactor::instance (&new_reactor
);
237 ACE_INET_Addr
accept_addr (rendezvous
);
239 if (acceptor
.open (accept_addr
) == -1)
240 ACE_ERROR_RETURN ((LM_ERROR
,
245 ACE_DEBUG ((LM_DEBUG
,
246 ACE_TEXT ("(%t) Spawning %d server threads...\n"),
250 serverTP
.activate (THR_NEW_LWP
| THR_JOINABLE
,
251 ACE_Utils::truncate_cast
<int> (svr_thrno
));
256 ACE_Thread_Manager::instance ()->wait ();
262 #include "ace/OS_main.h"
263 #include "ace/OS_NS_stdio.h"
265 int ACE_TMAIN (int, ACE_TCHAR
*[])
267 ACE_OS::puts (ACE_TEXT ("This example requires threads."));
271 #endif /* ACE_HAS_THREADS */