Merge pull request #2317 from jwillemsen/jwi-deleteop
[ACE_TAO.git] / ACE / performance-tests / SCTP / SOCK_STREAM_srv.cpp
blobcfffabcb2d78cb8ba445f8058445940a2ee7cc74
1 #include "ace/INET_Addr.h"
2 #include "ace/SOCK_Stream.h"
3 #include "ace/SOCK_Acceptor.h"
4 #include "ace/Log_Msg.h"
5 #include "ace/Thread_Manager.h"
6 #include "ace/Handle_Set.h"
7 #include "ace/CDR_Stream.h"
8 #include "ace/Truncate.h"
10 // FUZZ: disable check_for_streams_include
11 #include "ace/streams.h"
13 #include "ace/os_include/netinet/os_tcp.h"
14 #include "ace/os_include/arpa/os_inet.h"
15 #include "ace/OS_NS_sys_select.h"
17 // make sure that the code compiles cleanly even if SCTP is not
18 // available. If SCTP is not installed, program will exit early in
19 // main() with an error message
20 #ifdef ACE_HAS_SCTP
21 extern "C" {
22 #include <netinet/sctp.h>
24 #else
25 #ifndef IPPROTO_SCTP
26 #define IPPROTO_SCTP 132 /* always the same value on every platform */
27 #endif
28 #define SCTP_NODELAY -1
29 #endif
31 // class that manages setting of options
32 #include "Options_Manager.h"
34 // structure to bundle arguments for thread functions
35 struct ArgStruct {
36 ACE_SOCK_Stream * stream;
37 ACE_CDR::ULong numIters;
40 // thread function that serves the client for the UnMarshalled Octet
41 // test
42 static ACE_THR_FUNC_RETURN unmarshalledOctetServer (void *arg){
43 // unbundle the arguments
44 ArgStruct * args = reinterpret_cast<ArgStruct *> (arg);
45 ACE_SOCK_Stream * dataModeStream = args->stream;
46 ACE_CDR::ULong numIterations = args->numIters;
47 delete args;
49 // serve the client for numIterations synchronous invocations
50 do {
51 // READ A MESSAGE FROM THE CLIENT
53 size_t bt;
54 ACE_CDR::ULong msgBufSize=0;
55 // read the size of the buffer to follow
56 if ((dataModeStream->recv_n(&msgBufSize, ACE_CDR::LONG_SIZE, 0, &bt)) == -1)
57 ACE_ERROR_RETURN((LM_ERROR,
58 ACE_TEXT ("%p\n"),
59 ACE_TEXT ("recv_n")),
60 0);
61 msgBufSize = ACE_NTOHL(msgBufSize);
63 // allocate the buffer for the message payload
64 ACE_CDR::Octet * msgBuf = 0;
65 ACE_NEW_RETURN(msgBuf,
66 ACE_CDR::Octet[msgBufSize],
67 0);
69 // read the buffer
70 if ((dataModeStream->recv_n(msgBuf, msgBufSize, 0, &bt)) == -1)
71 ACE_ERROR_RETURN((LM_ERROR,
72 ACE_TEXT ("%p\n"),
73 ACE_TEXT ("recv_n")),
74 0);
76 // clean up the allocated buffer
77 delete[] msgBuf;
79 // SEND A REPLY TO THE CLIENT
81 // send back a 2 byte reply
82 ACE_CDR::Short reply;
83 if ((dataModeStream->send_n(&reply, ACE_CDR::SHORT_SIZE, 0, &bt)) == -1)
84 ACE_ERROR_RETURN((LM_ERROR,
85 ACE_TEXT ("%p\n"),
86 ACE_TEXT ("send_n")),
87 0);
88 } while (--numIterations);
90 // close and destroy the stream
91 dataModeStream->close();
92 delete dataModeStream;
94 return 0;
97 // sets up the dataModeSocket Stream, reads the test header infomation
98 // and launches a thread to handle the requested test.
99 static void run_server (ACE_HANDLE handle)
101 ACE_INET_Addr cli_addr;
102 // create a new stream and initialized with the handle returned by
103 // accept
104 ACE_SOCK_Stream * dataModeStream = new ACE_SOCK_Stream;
105 dataModeStream->set_handle (handle);
107 // Make sure we're not in non-blocking mode.
108 if (dataModeStream->disable (ACE_NONBLOCK) == -1){
109 ACE_ERROR ((LM_ERROR,
110 ACE_TEXT ("%p\n"),
111 ACE_TEXT ("disable")));
112 return;
114 else if (dataModeStream->get_remote_addr (cli_addr) == -1){
115 ACE_ERROR ((LM_ERROR,
116 ACE_TEXT ("%p\n"),
117 ACE_TEXT ("get_remote_addr")));
118 return;
121 // explicity configure Nagling. Default is
122 // Options_Manager::test_enable_nagle=0 so default configurations is
123 // NO NAGLING
124 ACE_CDR::Long nagle;
125 if (Options_Manager::test_enable_nagle)
126 nagle=0;
127 else
128 nagle=1;
130 if (Options_Manager::test_transport_protocol == IPPROTO_SCTP){
131 // default - sctp case
132 if (-1 == dataModeStream->set_option(IPPROTO_SCTP, SCTP_NODELAY, &nagle, sizeof nagle)){
133 ACE_ERROR((LM_ERROR,
134 ACE_TEXT ("%p\n"),
135 ACE_TEXT ("set_option")));
136 return;
138 } else {
139 // tcp case
140 if (-1 == dataModeStream->set_option(IPPROTO_TCP, TCP_NODELAY, &nagle, sizeof nagle)){
141 ACE_ERROR ((LM_ERROR,
142 "%p\n",
143 "set_option"));
144 return;
148 ACE_DEBUG ((LM_DEBUG,
149 ACE_TEXT ("(%P|%t) client %C connected from %d\n"),
150 cli_addr.get_host_name (),
151 cli_addr.get_port_number ()));
153 // hdr bufSize is hardcoded to 8 bytes
154 // (4 for a CDR-encoded boolean and 4 for a CDR-encoded ULong)
155 ACE_CDR::ULong hdrBufSize = 8;
156 // allocate a raw buffer large enough to receive the header and be
157 // properly aligned for the CDR decoding.
158 ACE_CDR::Char * hdrBuf= new ACE_CDR::Char[hdrBufSize+ACE_CDR::MAX_ALIGNMENT];
159 // align the raw buffer before reading data into it.
160 char * hdrBuf_a = ACE_ptr_align_binary(hdrBuf, ACE_CDR::MAX_ALIGNMENT);
162 size_t bt;
163 // read the header
164 if ((dataModeStream->recv_n(hdrBuf_a, hdrBufSize, 0, &bt)) == -1){
165 ACE_ERROR ((LM_ERROR,
166 ACE_TEXT ("%p\n"),
167 ACE_TEXT ("recv_n")));
168 return;
171 // pass the CDR encoded data into an ACE_InputCDR class. hdrCDR does
172 // NOT copy this data. Nor does it delete. It assumes the buffer
173 // remains valid while it is in scope.
174 ACE_InputCDR hdrCDR(hdrBuf_a, hdrBufSize);
176 ACE_CDR::Boolean byteOrder;
177 ACE_CDR::ULong numIterations;
179 // extract the data
180 hdrCDR >> ACE_InputCDR::to_boolean (byteOrder);
181 hdrCDR.reset_byte_order(byteOrder);
182 hdrCDR >> numIterations;
184 // make sure the stream is good after the extractions
185 if (!hdrCDR.good_bit()){
186 ACE_ERROR((LM_ERROR,
187 ACE_TEXT ("%p\n"),
188 ACE_TEXT ("hdrCDR")));
190 return;
193 ACE_DEBUG ((LM_DEBUG,
194 ACE_TEXT ("(%P|%t) Test for %u iterations\n"),
195 numIterations));
197 // deallocate the header buffer
198 delete[] hdrBuf;
200 // bundle up the arguments
201 ArgStruct * args = new ArgStruct;
202 args->stream = dataModeStream;
203 args->numIters = numIterations;
205 #if defined (ACE_HAS_THREADS)
206 // Spawn a new thread and run the new connection in that thread of
207 // control using the <server> function as the entry point.
208 if (ACE_Thread_Manager::instance ()->spawn (unmarshalledOctetServer,
209 reinterpret_cast<void *> (args),
210 THR_DETACHED) == -1)
211 ACE_ERROR ((LM_ERROR,
212 ACE_TEXT ("(%P|%t) %p\n"),
213 ACE_TEXT ("spawn")));
214 #else
215 (*unmarshalledOctetServer) (reinterpret_cast<void *> (args));
216 #endif /* ACE_HAS_THREADS */
220 int ACE_TMAIN (int argc, ACE_TCHAR **argv){
221 Options_Manager optsMgr(argc, argv, ACE_TEXT ("server-opts"));
223 // show usage is requested
224 if (optsMgr._usage) {
225 optsMgr._show_usage(stderr, ACE_TEXT ("server-opts"));
226 return 1;
229 // If SCTP is not installed then terminate the program, unless TCP
230 // was specified.
231 #ifndef ACE_HAS_SCTP
232 if (optsMgr.test_transport_protocol == IPPROTO_SCTP)
233 ACE_ERROR_RETURN((LM_ERROR,
234 ACE_TEXT ("SCTP was NOT installed when this binary was compiled.\n")
235 ACE_TEXT ("SOCK_STREAM_srv may still be run using TCP ")
236 ACE_TEXT ("via the '-t tcp' option.\n")),
238 #endif
240 // check that valid options were specified
241 if (optsMgr._error) {
242 ACE_OS::fprintf (stderr, "ERROR: %s\n", ACE_TEXT_ALWAYS_CHAR (optsMgr._error_message));
243 return 1;
246 // this is the socket that the server will listen on
247 ACE_SOCK_Acceptor acceptor_socket;
249 // Create the address that we want to listen for connections on. If
250 // server_accept_addr=INADDR_ANY (the default), SCTP will listen for
251 // connections on all IP interfaces. If an address is specified,
252 // SCTP will listen for connections on that address ONLY.
253 ACE_INET_Addr serverAddr(optsMgr.server_port,
254 optsMgr.server_accept_addr);
256 ACE_DEBUG((LM_DEBUG,
257 ACE_TEXT ("(%P|%t) Accepting connections on port %u on interface %C using %C\n"),
258 serverAddr.get_port_number(),
259 (optsMgr.server_accept_addr == INADDR_ANY) ? "INADDR_ANY" : serverAddr.get_host_addr(),
260 (optsMgr.test_transport_protocol == IPPROTO_SCTP) ? "IPPROTO_SCTP" : "IPPROTO_TCP"));
262 // this operation creates a socket, binds the specified internet
263 // address to it and calls listen. As this is a wrapper facade
264 // approach, the ACE_OS::{socket,bind,listen} calls are invoked in
265 // the implementation of open.
266 if (acceptor_socket.open(serverAddr, 1,
267 AF_INET,
268 ACE_DEFAULT_BACKLOG,
269 optsMgr.test_transport_protocol) == -1)
270 ACE_ERROR_RETURN ((LM_ERROR,
271 ACE_TEXT ("%p\n"),
272 ACE_TEXT ("open")),
275 // this function checks that the port that was actually bound was
276 // the port we asked for. Apparently some operating systems will
277 // automatically select new ports if the specified port is currently
278 // used.
279 else if (acceptor_socket.get_local_addr(serverAddr) == -1)
280 ACE_ERROR_RETURN ((LM_ERROR,
281 ACE_TEXT ("%p\n"),
282 ACE_TEXT ("get_local_addr")),
285 ACE_DEBUG ((LM_DEBUG,
286 ACE_TEXT ("(%P|%t) starting server at port %d\n"),
287 serverAddr.get_port_number()));
289 // this is the stream object that will associated with a completed
290 // connection (aka the data mode socket). It will be set when accept
291 // is called below.
292 ACE_SOCK_Stream new_stream;
294 // a file decriptor set
295 ACE_Handle_Set handle_set;
296 // add the acceptor socket to the file descriptor set.
297 handle_set.set_bit(acceptor_socket.get_handle());
299 for (;;){
300 ACE_Time_Value timeout(ACE_DEFAULT_TIMEOUT);
301 ACE_Handle_Set temp = handle_set;
303 // wait for connect() call from client. In the original test there
304 // were two different acceptor sockets for two different
305 // services. So select was needed to wait on both sockets
306 // simultaneously. In this test we could just call accept on the
307 // one socket.
308 int result = ACE_OS::select(ACE_Utils::truncate_cast<int> ((intptr_t)acceptor_socket.get_handle()) +1,
309 (fd_set *) temp,
312 timeout);
314 // check that select did not end with an error.
315 if (result == -1)
316 ACE_ERROR ((LM_ERROR,
317 ACE_TEXT ("(%P|%t) %p\n"),
318 ACE_TEXT ("select")));
319 // check to see if select timed out.
320 else if (result == 0){
321 ACE_DEBUG ((LM_DEBUG,
322 ACE_TEXT ("(%P|%t) select timed out\n")));
324 else { // case where a file descriptor was actually set
325 if (!(temp.is_set(acceptor_socket.get_handle()))){
326 // CANNOT BE REACHED
327 ACE_ERROR ((LM_ERROR,
328 ACE_TEXT ("(%P|%t) %p\n"),
329 ACE_TEXT ("select: NO ERROR BUT NO FD SET")));
330 } else {
331 // call accept to set up the new stream.
332 if (acceptor_socket.accept(new_stream) == -1) {
333 ACE_ERROR ((LM_ERROR,
334 ACE_TEXT ("%p\n"),
335 ACE_TEXT ("accept")));
336 continue;
338 else{
339 ACE_DEBUG ((LM_DEBUG,
340 ACE_TEXT ("(%P|%t) spawning server\n")));
342 // Run the server.
343 run_server (new_stream.get_handle ());
349 ACE_NOTREACHED (return 0;)