Merge pull request #2316 from jwillemsen/jwi-taskcommenttypo
[ACE_TAO.git] / ACE / examples / IPC_SAP / SOCK_SAP / CPP-inserver.cpp
blob476d0a4299bec4befb0202a3b2f41331455f8719
1 // This example tests the features of the <ACE_SOCK_Acceptor>,
2 // <ACE_SOCK_Stream>, and <ACE_Svc_Handler> classes. If the platform
3 // supports threads it uses a thread-per-connection concurrency model.
4 // Otherwise, it uses a single-threaded iterative server model.
6 #include "ace/SOCK_Acceptor.h"
7 #include "ace/Thread_Manager.h"
8 #include "ace/Handle_Set.h"
9 #include "ace/Profile_Timer.h"
10 #include "ace/Basic_Types.h"
11 #include "ace/OS_NS_sys_select.h"
12 #include "ace/OS_main.h"
13 #include "ace/Truncate.h"
16 // Are we running verbosely?
17 static int verbose = 0;
19 static void
20 run_server (ACE_THR_FUNC server,
21 ACE_HANDLE handle)
23 #if defined (ACE_HAS_THREADS)
24 // Spawn a new thread and run the new connection in that thread of
25 // control using the <server> function as the entry point.
26 if (ACE_Thread_Manager::instance ()->spawn (server,
27 reinterpret_cast<void *> (handle),
28 THR_DETACHED) == -1)
29 ACE_ERROR ((LM_ERROR,
30 "(%P|%t) %p\n",
31 "spawn"));
32 #else
33 (*server) (reinterpret_cast<void *> (handle));
34 #endif /* ACE_HAS_THREADS */
37 // Function entry point into the twoway server task.
39 static ACE_THR_FUNC_RETURN
40 twoway_server (void *arg)
42 ACE_INET_Addr cli_addr;
43 ACE_SOCK_Stream new_stream;
44 ACE_HANDLE handle = (ACE_HANDLE) (intptr_t) arg;
46 new_stream.set_handle (handle);
48 // Make sure we're not in non-blocking mode.
49 if (new_stream.disable (ACE_NONBLOCK) == -1)
50 ACE_ERROR_RETURN ((LM_ERROR,
51 "%p\n",
52 "disable"),
53 0);
54 else if (new_stream.get_remote_addr (cli_addr) == -1)
55 ACE_ERROR_RETURN ((LM_ERROR,
56 "%p\n",
57 "get_remote_addr"),
58 0);
60 ACE_DEBUG ((LM_DEBUG,
61 "(%P|%t) client %s connected from %d\n",
62 cli_addr.get_host_name (),
63 cli_addr.get_port_number ()));
65 size_t total_bytes = 0;
66 size_t message_count = 0;
68 char *request = 0;
70 // Read data from client (terminate on error).
72 for (;;)
74 ACE_INT32 len;
76 ssize_t r_bytes = new_stream.recv_n ((void *) &len,
77 sizeof (ACE_INT32));
78 if (r_bytes == -1)
80 ACE_ERROR ((LM_ERROR,
81 "%p\n",
82 "recv"));
83 break;
85 else if (r_bytes == 0)
87 ACE_DEBUG ((LM_DEBUG,
88 "(%P|%t) reached end of input, connection closed by client\n"));
89 break;
91 else if (r_bytes != (ssize_t) sizeof (ACE_INT32))
93 ACE_ERROR ((LM_ERROR,
94 "(%P|%t) %p\n",
95 "recv_n failed"));
96 break;
98 else
100 len = ACE_NTOHL (len);
101 ACE_NEW_RETURN (request,
102 char [len],
106 // Subtract off the sizeof the length prefix.
107 r_bytes = new_stream.recv_n (request,
108 len - sizeof (ACE_UINT32));
109 if (r_bytes == -1)
111 ACE_ERROR ((LM_ERROR,
112 "%p\n",
113 "recv"));
114 break;
116 else if (r_bytes == 0)
118 ACE_DEBUG ((LM_DEBUG,
119 "(%P|%t) reached end of input, connection closed by client\n"));
120 break;
122 else if (verbose
123 && ACE::write_n (ACE_STDOUT,
124 request,
125 r_bytes) != r_bytes)
126 ACE_ERROR ((LM_ERROR,
127 "%p\n",
128 "ACE::write_n"));
129 else if (new_stream.send_n (request,
130 r_bytes) != r_bytes)
131 ACE_ERROR ((LM_ERROR,
132 "%p\n",
133 "send_n"));
135 total_bytes += size_t (r_bytes);
136 message_count++;
138 delete [] request;
139 request = 0;
142 // Close new endpoint (listening endpoint stays open).
143 new_stream.close ();
145 delete [] request;
146 return 0;
149 // Function entry point into the oneway server task.
151 static ACE_THR_FUNC_RETURN
152 oneway_server (void *arg)
154 ACE_INET_Addr cli_addr;
155 ACE_SOCK_Stream new_stream;
156 ACE_HANDLE handle = (ACE_HANDLE) (intptr_t) arg;
158 new_stream.set_handle (handle);
160 // Make sure we're not in non-blocking mode.
161 if (new_stream.disable (ACE_NONBLOCK) == -1)
162 ACE_ERROR_RETURN ((LM_ERROR,
163 "%p\n",
164 "disable"),
166 else if (new_stream.get_remote_addr (cli_addr) == -1)
167 ACE_ERROR_RETURN ((LM_ERROR,
168 "%p\n",
169 "get_remote_addr"),
172 ACE_DEBUG ((LM_DEBUG,
173 "(%P|%t) client %s connected from %d\n",
174 cli_addr.get_host_name (),
175 cli_addr.get_port_number ()));
177 // Timer business
178 ACE_Profile_Timer timer;
179 timer.start ();
181 size_t total_bytes = 0;
182 size_t message_count = 0;
184 char *request = 0;
186 // Read data from client (terminate on error).
188 for (;;)
190 ACE_INT32 len;
192 ssize_t r_bytes = new_stream.recv_n ((void *) &len,
193 sizeof (ACE_INT32));
194 if (r_bytes == -1)
196 ACE_ERROR ((LM_ERROR,
197 "%p\n",
198 "recv"));
199 break;
201 else if (r_bytes == 0)
203 ACE_DEBUG ((LM_DEBUG,
204 "(%P|%t) reached end of input, connection closed by client\n"));
205 break;
207 else if (r_bytes != (ssize_t) sizeof (ACE_INT32))
209 ACE_ERROR ((LM_ERROR,
210 "(%P|%t) %p\n",
211 "recv_n failed"));
212 break;
214 else
216 len = ACE_NTOHL (len);
217 ACE_NEW_RETURN (request,
218 char [len],
222 // Subtract off the sizeof the length prefix.
223 r_bytes = new_stream.recv_n (request,
224 len - sizeof (ACE_UINT32));
226 if (r_bytes == -1)
228 ACE_ERROR ((LM_ERROR,
229 "%p\n",
230 "recv"));
231 break;
233 else if (r_bytes == 0)
235 ACE_DEBUG ((LM_DEBUG,
236 "(%P|%t) reached end of input, connection closed by client\n"));
237 break;
239 else if (verbose
240 && ACE::write_n (ACE_STDOUT, request, r_bytes) != r_bytes)
241 ACE_ERROR ((LM_ERROR,
242 "%p\n",
243 "ACE::write_n"));
245 total_bytes += size_t (r_bytes);
246 message_count++;
248 delete [] request;
249 request = 0;
252 timer.stop ();
254 ACE_Profile_Timer::ACE_Elapsed_Time et;
255 timer.elapsed_time (et);
257 ACE_DEBUG ((LM_DEBUG,
258 ACE_TEXT ("\t\treal time = %f secs \n\t\tuser time = %f secs \n\t\tsystem time = %f secs\n"),
259 et.real_time,
260 et.user_time,
261 et.system_time));
263 double messages_per_sec = double (message_count) / et.real_time;
265 ACE_DEBUG ((LM_DEBUG,
266 ACE_TEXT ("\t\tmessages = %d\n\t\ttotal bytes = %d\n\t\tmbits/sec = %f\n\t\tusec-per-message = %f\n\t\tmessages-per-second = %0.00f\n"),
267 message_count,
268 total_bytes,
269 (((double) total_bytes * 8) / et.real_time) / (double) (1024 * 1024),
270 (et.real_time / (double) message_count) * 1000000,
271 messages_per_sec < 0 ? 0 : messages_per_sec));
273 // Close new endpoint (listening endpoint stays open).
274 new_stream.close ();
276 delete [] request;
277 return 0;
280 static int
281 run_event_loop (u_short port)
283 // Raise the socket handle limit to the maximum.
284 ACE::set_handle_limit ();
286 // Create the oneway and twoway acceptors.
287 ACE_SOCK_Acceptor twoway_acceptor;
288 ACE_SOCK_Acceptor oneway_acceptor;
290 // Create the oneway and twoway server addresses.
291 ACE_INET_Addr twoway_server_addr (port);
292 ACE_INET_Addr oneway_server_addr (port + 1);
294 // Create acceptors, reuse the address.
295 if (twoway_acceptor.open (twoway_server_addr, 1) == -1
296 || oneway_acceptor.open (oneway_server_addr, 1) == -1)
297 ACE_ERROR_RETURN ((LM_ERROR,
298 "%p\n",
299 "open"),
301 // Check to see what addresses we actually got bound to!
302 else if (twoway_acceptor.get_local_addr (twoway_server_addr) == -1
303 || oneway_acceptor.get_local_addr (oneway_server_addr) == -1)
304 ACE_ERROR_RETURN ((LM_ERROR,
305 "%p\n",
306 "get_local_addr"),
309 ACE_DEBUG ((LM_DEBUG,
310 "(%P|%t) starting twoway server at port %d and oneway server at port %d\n",
311 twoway_server_addr.get_port_number (),
312 oneway_server_addr.get_port_number ()));
314 // Keep these objects out here to prevent excessive constructor
315 // calls within the loop.
316 ACE_SOCK_Stream new_stream;
318 ACE_Handle_Set handle_set;
319 handle_set.set_bit (twoway_acceptor.get_handle ());
320 handle_set.set_bit (oneway_acceptor.get_handle ());
322 // Performs the iterative server activities.
324 for (;;)
326 ACE_Time_Value timeout (ACE_DEFAULT_TIMEOUT);
327 ACE_Handle_Set temp = handle_set;
329 int result = ACE_OS::select (ACE_Utils::truncate_cast<int> ((intptr_t)oneway_acceptor.get_handle ()) + 1,
330 (fd_set *) temp,
333 timeout);
334 if (result == -1)
335 ACE_ERROR ((LM_ERROR,
336 "(%P|%t) %p\n",
337 "select"));
338 else if (result == 0 && verbose)
339 ACE_DEBUG ((LM_DEBUG,
340 "(%P|%t) select timed out\n"));
341 else
343 if (temp.is_set (twoway_acceptor.get_handle ()))
345 if (twoway_acceptor.accept (new_stream) == -1)
347 ACE_ERROR ((LM_ERROR,
348 "%p\n",
349 "accept"));
350 continue;
352 else
353 ACE_DEBUG ((LM_DEBUG,
354 "(%P|%t) spawning twoway server\n"));
356 // Run the twoway server.
357 run_server (twoway_server,
358 new_stream.get_handle ());
360 if (temp.is_set (oneway_acceptor.get_handle ()))
362 if (oneway_acceptor.accept (new_stream) == -1)
364 ACE_ERROR ((LM_ERROR, "%p\n", "accept"));
365 continue;
367 else
368 ACE_DEBUG ((LM_DEBUG,
369 "(%P|%t) spawning oneway server\n"));
371 // Run the oneway server.
372 run_server (oneway_server,
373 new_stream.get_handle ());
378 /* NOTREACHED */
382 ACE_TMAIN (int argc, ACE_TCHAR *argv[])
384 u_short port = ACE_DEFAULT_SERVER_PORT;
386 if (argc > 1)
387 port = ACE_OS::atoi (argv[1]);
389 return run_event_loop (port);