Merge pull request #2309 from mitza-oci/warnings
[ACE_TAO.git] / ACE / performance-tests / SCTP / SOCK_SEQPACK_srv.cpp
blob6cd98f457714f3b98ace69353e7d021530d60b9a
1 #include "ace/Multihomed_INET_Addr.h"
2 #include "ace/SOCK_SEQPACK_Association.h"
3 #include "ace/SOCK_SEQPACK_Acceptor.h"
4 #include "ace/Log_Msg.h"
5 #include "ace/Thread_Manager.h"
6 #include "ace/Handle_Set.h"
7 #include "ace/CDR_Stream.h"
9 // FUZZ: disable check_for_streams_include
10 #include "ace/streams.h"
12 #include "ace/os_include/arpa/os_inet.h"
13 #include "ace/OS_NS_sys_select.h"
14 #include "ace/OS_NS_string.h"
15 #include "ace/OS_NS_arpa_inet.h"
17 // make sure that the code compiles cleanly even if SCTP is not
18 // available. If SCTP is not installed, program will exit early in
19 // main() with an error message
20 #ifdef ACE_HAS_SCTP
21 extern "C" {
22 #include <netinet/sctp.h>
24 #else
25 #ifndef IPPROTO_SCTP
26 #define IPPROTO_SCTP 132
27 #endif // !IPPROTO_SCTP
28 #define SCTP_NODELAY 1
29 #endif
31 // class that manages setting of options
32 #include "Options_Manager.h"
34 #ifdef ACE_HAS_SCTP
36 // structure to bundle arguments for thread functions
37 struct ArgStruct {
38 ACE_SOCK_SEQPACK_Association * stream;
39 ACE_CDR::ULong numIters;
42 // thread function that serves the client for the UnMarshalled Octet
43 // test
44 static ACE_THR_FUNC_RETURN unmarshalledOctetServer (void *arg){
45 // unbundle the arguments
46 ArgStruct * args = reinterpret_cast<ArgStruct *> (arg);
47 ACE_SOCK_SEQPACK_Association * dataModeStream = args->stream;
48 ACE_CDR::ULong numIterations = args->numIters;
49 delete args;
51 // serve the client for numIterations synchronous invocations
52 do {
53 // READ A MESSAGE FROM THE CLIENT
55 size_t bt;
56 ACE_CDR::ULong msgBufSize=0;
57 // read the size of the buffer to follow
58 if ((dataModeStream->recv_n(&msgBufSize, ACE_CDR::LONG_SIZE, 0, &bt)) == -1)
59 ACE_ERROR_RETURN((LM_ERROR,
60 ACE_TEXT ("%p\n"),
61 ACE_TEXT ("recv_n")),
62 0);
63 msgBufSize = ACE_NTOHL(msgBufSize);
65 // allocate the buffer for the message payload
66 ACE_CDR::Octet * msgBuf = 0;
67 ACE_NEW_RETURN(msgBuf,
68 ACE_CDR::Octet[msgBufSize],
69 0);
71 // read the buffer
72 if ((dataModeStream->recv_n(msgBuf, msgBufSize, 0, &bt)) == -1)
73 ACE_ERROR_RETURN((LM_ERROR,
74 ACE_TEXT ("%p\n"),
75 ACE_TEXT ("recv_n")),
76 0);
78 // clean up the allocated buffer
79 delete[] msgBuf;
81 // SEND A REPLY TO THE CLIENT
83 // send back a 2 byte reply
84 ACE_CDR::Short reply;
85 if ((dataModeStream->send_n(&reply, ACE_CDR::SHORT_SIZE, 0, &bt)) == -1)
86 ACE_ERROR_RETURN((LM_ERROR,
87 ACE_TEXT ("%p\n"),
88 ACE_TEXT ("send_n")),
89 0);
90 } while (--numIterations);
92 // close and destroy the stream
93 dataModeStream->close();
94 delete dataModeStream;
96 return 0;
99 // sets up the dataModeSocket Stream, reads the test header infomation
100 // and launches a thread to handle the requested test.
101 static void run_server (ACE_HANDLE handle)
103 ACE_INET_Addr cli_addr;
104 // create a new stream and initialized with the handle returned by
105 // accept
106 ACE_SOCK_SEQPACK_Association * dataModeStream = new ACE_SOCK_SEQPACK_Association;
107 dataModeStream->set_handle (handle);
109 // Make sure we're not in non-blocking mode.
110 if (dataModeStream->disable (ACE_NONBLOCK) == -1){
111 ACE_ERROR ((LM_ERROR,
112 ACE_TEXT ("%p\n"),
113 ACE_TEXT ("disable")));
114 return;
116 else if (dataModeStream->get_remote_addr (cli_addr) == -1){
117 ACE_ERROR ((LM_ERROR,
118 "%p\n",
119 "get_remote_addr"));
120 return;
123 // explicity configure Nagling. Default is
124 // Options_Manager::test_enable_nagle=0 so default configurations is
125 // NO NAGLING
126 ACE_CDR::Long nagle;
127 if (Options_Manager::test_enable_nagle)
128 nagle=0;
129 else
130 nagle=1;
132 if (-1 == dataModeStream->set_option(IPPROTO_SCTP, SCTP_NODELAY, &nagle, sizeof nagle)){
133 // can't ise ACE_ERROR_RETURN b/c function has void return value
134 ACE_ERROR((LM_ERROR,
135 ACE_TEXT ("%p\n"),
136 ACE_TEXT ("set_option")));
137 return;
140 ACE_DEBUG ((LM_DEBUG,
141 ACE_TEXT("(%P|%t) client %C connected from %d\n"),
142 cli_addr.get_host_name (),
143 cli_addr.get_port_number ()));
145 // hdr bufSize is hardcoded to 8 bytes
146 // (4 for a CDR-encoded boolean and 4 for a CDR-encoded ULong)
147 ACE_CDR::ULong hdrBufSize = 8;
148 // allocate a raw buffer large enough to receive the header and be
149 // properly aligned for the CDR decoding.
150 ACE_CDR::Char * hdrBuf= new ACE_CDR::Char[hdrBufSize+ACE_CDR::MAX_ALIGNMENT];
151 // align the raw buffer before reading data into it.
152 char * hdrBuf_a = ACE_ptr_align_binary(hdrBuf, ACE_CDR::MAX_ALIGNMENT);
154 size_t bt;
155 // read the header
156 if ((dataModeStream->recv_n(hdrBuf_a, hdrBufSize, 0, &bt)) == -1){
157 ACE_ERROR ((LM_ERROR,
158 ACE_TEXT ("%p\n"),
159 ACE_TEXT ("recv_n")));
160 return;
163 // pass the CDR encoded data into an ACE_InputCDR class. hdrCDR does
164 // NOT copy this data. Nor does it delete. It assumes the buffer
165 // remains valid while it is in scope.
166 ACE_InputCDR hdrCDR(hdrBuf_a, hdrBufSize);
168 ACE_CDR::Boolean byteOrder;
169 ACE_CDR::ULong numIterations;
171 // extract the data
172 hdrCDR >> ACE_InputCDR::to_boolean (byteOrder);
173 hdrCDR.reset_byte_order(byteOrder);
174 hdrCDR >> numIterations;
176 // make sure the stream is good after the extractions
177 if (!hdrCDR.good_bit()){
178 ACE_ERROR((LM_ERROR,
179 ACE_TEXT ("%p\n"),
180 ACE_TEXT ("hdrCDR")));
182 return;
185 ACE_DEBUG ((LM_DEBUG,
186 ACE_TEXT ("(%P|%t) Test for %u iterations\n"),
187 numIterations));
189 // deallocate the header buffer
190 delete[] hdrBuf;
192 // bundle up the arguments
193 ArgStruct * args = new ArgStruct;
194 args->stream = dataModeStream;
195 args->numIters = numIterations;
197 #if defined (ACE_HAS_THREADS)
198 // Spawn a new thread and run the new connection in that thread of
199 // control using the <server> function as the entry point.
200 if (ACE_Thread_Manager::instance ()->spawn (unmarshalledOctetServer,
201 reinterpret_cast<void *> (args),
202 THR_DETACHED) == -1)
203 ACE_ERROR ((LM_ERROR,
204 ACE_TEXT ("(%P|%t) %p\n"),
205 ACE_TEXT ("spawn")));
206 #else
207 (*unmarshalledOctetServer) (reinterpret_cast<void *> (args));
208 #endif /* ACE_HAS_THREADS */
212 #endif
215 int ACE_TMAIN (int argc, ACE_TCHAR **argv){
216 Options_Manager optsMgr(argc, argv, ACE_TEXT ("server-opts"));
218 // show usage is requested
219 if (optsMgr._usage) {
220 optsMgr._show_usage(stderr, ACE_TEXT ("server-opts"));
221 return 1;
224 #ifndef ACE_HAS_SCTP
225 // SCTP is not installed. Exit with informative error message.
226 ACE_ERROR_RETURN((LM_ERROR,
227 ACE_TEXT ("SCTP was NOT installed/accessible when this binary was compiled.\n")),
229 #else
231 // check that valid options were specified
232 if (optsMgr._error) {
233 ACE_OS::fprintf (stderr, "ERROR: %s\n", ACE_TEXT_ALWAYS_CHAR (optsMgr._error_message));
234 return 1;
237 // this is the socket that the server will listen on
238 ACE_SOCK_SEQPACK_Acceptor acceptor_socket;
240 // Create the address that we want to listen for connections on. If
241 // server_accept_addr=INADDR_ANY (the default), SCTP will listen for
242 // connections on all IP interfaces. If an address is specified,
243 // SCTP will listen for connections on that address ONLY.
244 ACE_Multihomed_INET_Addr serverAddr;
246 serverAddr.set(optsMgr.server_port,
247 optsMgr.server_accept_addr,
249 optsMgr.secondary_accept_addrs,
250 optsMgr.num_secondary_accept_addrs);
252 // this operation creates a socket, binds the specified internet
253 // address to it and calls listen. As this is a wrapper facade
254 // approach, the ACE_OS::{socket,bind,listen} calls are invoked in
255 // the implementation of open.
256 if (acceptor_socket.open(serverAddr, 1,
257 AF_INET,
258 ACE_DEFAULT_BACKLOG,
259 optsMgr.test_transport_protocol) == -1)
260 ACE_ERROR_RETURN ((LM_ERROR,
261 ACE_TEXT ("%p\n"),
262 ACE_TEXT ("open")),
265 // this function checks that the port that was actually bound was
266 // the port we asked for. Apparently some operating systems will
267 // automatically select new ports if the specified port is currently
268 // used.
269 if (acceptor_socket.get_local_addr(serverAddr) == -1)
270 ACE_ERROR_RETURN ((LM_ERROR,
271 ACE_TEXT ("%p\n"),
272 ACE_TEXT ("get_local_addr")),
275 sockaddr_in *addresses = new sockaddr_in[ 1 + serverAddr.get_num_secondary_addresses() ];
276 serverAddr.get_addresses( addresses, 1 + serverAddr.get_num_secondary_addresses() ) ;
277 ACE_DEBUG ((LM_DEBUG,
278 ACE_TEXT ("(%P|%t) Accepting connections, using %C on ")
279 ACE_TEXT ("port %u on interfaces %C"),
280 (optsMgr.test_transport_protocol == IPPROTO_SCTP) ? "IPPROTO_SCTP" : "IPPROTO_TCP",
281 serverAddr.get_port_number(),
282 ACE_OS::inet_ntoa( addresses[0].sin_addr) ));
284 unsigned int i;
285 for(i=1; i <= serverAddr.get_num_secondary_addresses() ; ++i ) {
286 ACE_DEBUG ((LM_DEBUG,
287 ACE_TEXT (" and %C"),
288 ACE_OS::inet_ntoa (addresses[i].sin_addr)));
290 ACE_DEBUG((LM_DEBUG, ACE_TEXT ("\n")));
292 delete[] addresses;
294 // this is the stream object that will associated with a completed
295 // connection (aka the data mode socket). It will be set when accept
296 // is called below.
297 ACE_SOCK_SEQPACK_Association new_stream;
299 // a file decriptor set
300 ACE_Handle_Set handle_set;
301 // add the acceptor socket to the file descriptor set.
302 handle_set.set_bit(acceptor_socket.get_handle());
304 for (;;){
305 ACE_Time_Value timeout(ACE_DEFAULT_TIMEOUT);
306 ACE_Handle_Set temp = handle_set;
308 // wait for connect() call from client. In the original test there
309 // were two different acceptor sockets for two different
310 // services. So select was needed to wait on both sockets
311 // simultaneously. In this test we could just call accept on the
312 // one socket.
313 int result = ACE_OS::select((int) (acceptor_socket.get_handle()) +1,
314 (fd_set *) temp,
317 timeout);
319 // check that select did not end with an error.
320 if (result == -1)
321 ACE_ERROR ((LM_ERROR,
322 ACE_TEXT ("(%P|%t) %p\n"),
323 ACE_TEXT ("select")));
324 // check to see if select timed out.
325 else if (result == 0){
326 ACE_DEBUG ((LM_DEBUG,
327 ACE_TEXT ("(%P|%t) select timed out\n")));
329 else { // case where a file descriptor was actually set
330 if (!(temp.is_set(acceptor_socket.get_handle()))){
331 // CANNOT BE REACHED
332 ACE_ERROR ((LM_ERROR,
333 ACE_TEXT ("(%P|%t) %p\n"),
334 ACE_TEXT ("select: NO ERROR BUT NO FD SET")));
335 } else {
336 // call accept to set up the new stream.
337 if (acceptor_socket.accept(new_stream) == -1) {
338 ACE_ERROR ((LM_ERROR,
339 ACE_TEXT ("%p\n"),
340 ACE_TEXT ("accept")));
341 continue;
343 else{
344 ACE_DEBUG ((LM_DEBUG,
345 ACE_TEXT ("(%P|%t) spawning server\n")));
347 // Run the server.
348 run_server (new_stream.get_handle ());
354 return 0;
355 #endif /* ACE_HAS_SCTP */