1 #include "ace/Multihomed_INET_Addr.h"
2 #include "ace/SOCK_SEQPACK_Association.h"
3 #include "ace/SOCK_SEQPACK_Acceptor.h"
4 #include "ace/Log_Msg.h"
5 #include "ace/Thread_Manager.h"
6 #include "ace/Handle_Set.h"
7 #include "ace/CDR_Stream.h"
9 // FUZZ: disable check_for_streams_include
10 #include "ace/streams.h"
12 #include "ace/os_include/arpa/os_inet.h"
13 #include "ace/OS_NS_sys_select.h"
14 #include "ace/OS_NS_string.h"
15 #include "ace/OS_NS_arpa_inet.h"
17 // make sure that the code compiles cleanly even if SCTP is not
18 // available. If SCTP is not installed, program will exit early in
19 // main() with an error message
22 #include <netinet/sctp.h>
26 #define IPPROTO_SCTP 132
27 #endif // !IPPROTO_SCTP
28 #define SCTP_NODELAY 1
31 // class that manages setting of options
32 #include "Options_Manager.h"
36 // structure to bundle arguments for thread functions
38 ACE_SOCK_SEQPACK_Association
* stream
;
39 ACE_CDR::ULong numIters
;
42 // thread function that serves the client for the UnMarshalled Octet
44 static ACE_THR_FUNC_RETURN
unmarshalledOctetServer (void *arg
){
45 // unbundle the arguments
46 ArgStruct
* args
= reinterpret_cast<ArgStruct
*> (arg
);
47 ACE_SOCK_SEQPACK_Association
* dataModeStream
= args
->stream
;
48 ACE_CDR::ULong numIterations
= args
->numIters
;
51 // serve the client for numIterations synchronous invocations
53 // READ A MESSAGE FROM THE CLIENT
56 ACE_CDR::ULong msgBufSize
=0;
57 // read the size of the buffer to follow
58 if ((dataModeStream
->recv_n(&msgBufSize
, ACE_CDR::LONG_SIZE
, 0, &bt
)) == -1)
59 ACE_ERROR_RETURN((LM_ERROR
,
63 msgBufSize
= ACE_NTOHL(msgBufSize
);
65 // allocate the buffer for the message payload
66 ACE_CDR::Octet
* msgBuf
= 0;
67 ACE_NEW_RETURN(msgBuf
,
68 ACE_CDR::Octet
[msgBufSize
],
72 if ((dataModeStream
->recv_n(msgBuf
, msgBufSize
, 0, &bt
)) == -1)
73 ACE_ERROR_RETURN((LM_ERROR
,
78 // clean up the allocated buffer
81 // SEND A REPLY TO THE CLIENT
83 // send back a 2 byte reply
85 if ((dataModeStream
->send_n(&reply
, ACE_CDR::SHORT_SIZE
, 0, &bt
)) == -1)
86 ACE_ERROR_RETURN((LM_ERROR
,
90 } while (--numIterations
);
92 // close and destroy the stream
93 dataModeStream
->close();
94 delete dataModeStream
;
99 // sets up the dataModeSocket Stream, reads the test header infomation
100 // and launches a thread to handle the requested test.
101 static void run_server (ACE_HANDLE handle
)
103 ACE_INET_Addr cli_addr
;
104 // create a new stream and initialized with the handle returned by
106 ACE_SOCK_SEQPACK_Association
* dataModeStream
= new ACE_SOCK_SEQPACK_Association
;
107 dataModeStream
->set_handle (handle
);
109 // Make sure we're not in non-blocking mode.
110 if (dataModeStream
->disable (ACE_NONBLOCK
) == -1){
111 ACE_ERROR ((LM_ERROR
,
113 ACE_TEXT ("disable")));
116 else if (dataModeStream
->get_remote_addr (cli_addr
) == -1){
117 ACE_ERROR ((LM_ERROR
,
123 // explicity configure Nagling. Default is
124 // Options_Manager::test_enable_nagle=0 so default configurations is
127 if (Options_Manager::test_enable_nagle
)
132 if (-1 == dataModeStream
->set_option(IPPROTO_SCTP
, SCTP_NODELAY
, &nagle
, sizeof nagle
)){
133 // can't ise ACE_ERROR_RETURN b/c function has void return value
136 ACE_TEXT ("set_option")));
140 ACE_DEBUG ((LM_DEBUG
,
141 ACE_TEXT("(%P|%t) client %C connected from %d\n"),
142 cli_addr
.get_host_name (),
143 cli_addr
.get_port_number ()));
145 // hdr bufSize is hardcoded to 8 bytes
146 // (4 for a CDR-encoded boolean and 4 for a CDR-encoded ULong)
147 ACE_CDR::ULong hdrBufSize
= 8;
148 // allocate a raw buffer large enough to receive the header and be
149 // properly aligned for the CDR decoding.
150 ACE_CDR::Char
* hdrBuf
= new ACE_CDR::Char
[hdrBufSize
+ACE_CDR::MAX_ALIGNMENT
];
151 // align the raw buffer before reading data into it.
152 char * hdrBuf_a
= ACE_ptr_align_binary(hdrBuf
, ACE_CDR::MAX_ALIGNMENT
);
156 if ((dataModeStream
->recv_n(hdrBuf_a
, hdrBufSize
, 0, &bt
)) == -1){
157 ACE_ERROR ((LM_ERROR
,
159 ACE_TEXT ("recv_n")));
163 // pass the CDR encoded data into an ACE_InputCDR class. hdrCDR does
164 // NOT copy this data. Nor does it delete. It assumes the buffer
165 // remains valid while it is in scope.
166 ACE_InputCDR
hdrCDR(hdrBuf_a
, hdrBufSize
);
168 ACE_CDR::Boolean byteOrder
;
169 ACE_CDR::ULong numIterations
;
172 hdrCDR
>> ACE_InputCDR::to_boolean (byteOrder
);
173 hdrCDR
.reset_byte_order(byteOrder
);
174 hdrCDR
>> numIterations
;
176 // make sure the stream is good after the extractions
177 if (!hdrCDR
.good_bit()){
180 ACE_TEXT ("hdrCDR")));
185 ACE_DEBUG ((LM_DEBUG
,
186 ACE_TEXT ("(%P|%t) Test for %u iterations\n"),
189 // deallocate the header buffer
192 // bundle up the arguments
193 ArgStruct
* args
= new ArgStruct
;
194 args
->stream
= dataModeStream
;
195 args
->numIters
= numIterations
;
197 #if defined (ACE_HAS_THREADS)
198 // Spawn a new thread and run the new connection in that thread of
199 // control using the <server> function as the entry point.
200 if (ACE_Thread_Manager::instance ()->spawn (unmarshalledOctetServer
,
201 reinterpret_cast<void *> (args
),
203 ACE_ERROR ((LM_ERROR
,
204 ACE_TEXT ("(%P|%t) %p\n"),
205 ACE_TEXT ("spawn")));
207 (*unmarshalledOctetServer
) (reinterpret_cast<void *> (args
));
208 #endif /* ACE_HAS_THREADS */
215 int ACE_TMAIN (int argc
, ACE_TCHAR
**argv
){
216 Options_Manager
optsMgr(argc
, argv
, ACE_TEXT ("server-opts"));
218 // show usage is requested
219 if (optsMgr
._usage
) {
220 optsMgr
._show_usage(stderr
, ACE_TEXT ("server-opts"));
225 // SCTP is not installed. Exit with informative error message.
226 ACE_ERROR_RETURN((LM_ERROR
,
227 ACE_TEXT ("SCTP was NOT installed/accessible when this binary was compiled.\n")),
231 // check that valid options were specified
232 if (optsMgr
._error
) {
233 ACE_OS::fprintf (stderr
, "ERROR: %s\n", ACE_TEXT_ALWAYS_CHAR (optsMgr
._error_message
));
237 // this is the socket that the server will listen on
238 ACE_SOCK_SEQPACK_Acceptor acceptor_socket
;
240 // Create the address that we want to listen for connections on. If
241 // server_accept_addr=INADDR_ANY (the default), SCTP will listen for
242 // connections on all IP interfaces. If an address is specified,
243 // SCTP will listen for connections on that address ONLY.
244 ACE_Multihomed_INET_Addr serverAddr
;
246 serverAddr
.set(optsMgr
.server_port
,
247 optsMgr
.server_accept_addr
,
249 optsMgr
.secondary_accept_addrs
,
250 optsMgr
.num_secondary_accept_addrs
);
252 // this operation creates a socket, binds the specified internet
253 // address to it and calls listen. As this is a wrapper facade
254 // approach, the ACE_OS::{socket,bind,listen} calls are invoked in
255 // the implementation of open.
256 if (acceptor_socket
.open(serverAddr
, 1,
259 optsMgr
.test_transport_protocol
) == -1)
260 ACE_ERROR_RETURN ((LM_ERROR
,
265 // this function checks that the port that was actually bound was
266 // the port we asked for. Apparently some operating systems will
267 // automatically select new ports if the specified port is currently
269 if (acceptor_socket
.get_local_addr(serverAddr
) == -1)
270 ACE_ERROR_RETURN ((LM_ERROR
,
272 ACE_TEXT ("get_local_addr")),
275 sockaddr_in
*addresses
= new sockaddr_in
[ 1 + serverAddr
.get_num_secondary_addresses() ];
276 serverAddr
.get_addresses( addresses
, 1 + serverAddr
.get_num_secondary_addresses() ) ;
277 ACE_DEBUG ((LM_DEBUG
,
278 ACE_TEXT ("(%P|%t) Accepting connections, using %C on ")
279 ACE_TEXT ("port %u on interfaces %C"),
280 (optsMgr
.test_transport_protocol
== IPPROTO_SCTP
) ? "IPPROTO_SCTP" : "IPPROTO_TCP",
281 serverAddr
.get_port_number(),
282 ACE_OS::inet_ntoa( addresses
[0].sin_addr
) ));
285 for(i
=1; i
<= serverAddr
.get_num_secondary_addresses() ; ++i
) {
286 ACE_DEBUG ((LM_DEBUG
,
287 ACE_TEXT (" and %C"),
288 ACE_OS::inet_ntoa (addresses
[i
].sin_addr
)));
290 ACE_DEBUG((LM_DEBUG
, ACE_TEXT ("\n")));
294 // this is the stream object that will associated with a completed
295 // connection (aka the data mode socket). It will be set when accept
297 ACE_SOCK_SEQPACK_Association new_stream
;
299 // a file decriptor set
300 ACE_Handle_Set handle_set
;
301 // add the acceptor socket to the file descriptor set.
302 handle_set
.set_bit(acceptor_socket
.get_handle());
305 ACE_Time_Value
timeout(ACE_DEFAULT_TIMEOUT
);
306 ACE_Handle_Set temp
= handle_set
;
308 // wait for connect() call from client. In the original test there
309 // were two different acceptor sockets for two different
310 // services. So select was needed to wait on both sockets
311 // simultaneously. In this test we could just call accept on the
313 int result
= ACE_OS::select((int) (acceptor_socket
.get_handle()) +1,
319 // check that select did not end with an error.
321 ACE_ERROR ((LM_ERROR
,
322 ACE_TEXT ("(%P|%t) %p\n"),
323 ACE_TEXT ("select")));
324 // check to see if select timed out.
325 else if (result
== 0){
326 ACE_DEBUG ((LM_DEBUG
,
327 ACE_TEXT ("(%P|%t) select timed out\n")));
329 else { // case where a file descriptor was actually set
330 if (!(temp
.is_set(acceptor_socket
.get_handle()))){
332 ACE_ERROR ((LM_ERROR
,
333 ACE_TEXT ("(%P|%t) %p\n"),
334 ACE_TEXT ("select: NO ERROR BUT NO FD SET")));
336 // call accept to set up the new stream.
337 if (acceptor_socket
.accept(new_stream
) == -1) {
338 ACE_ERROR ((LM_ERROR
,
340 ACE_TEXT ("accept")));
344 ACE_DEBUG ((LM_DEBUG
,
345 ACE_TEXT ("(%P|%t) spawning server\n")));
348 run_server (new_stream
.get_handle ());
355 #endif /* ACE_HAS_SCTP */