1 // ============================================================================
7 // Bug_3943_Regression_Test.cpp
10 // When a large buffer is sent to ACE_OS::sendv, using winsock2,
11 // ENOBUFS occurs, which indicates that the buffer was too large
12 // for WSASend to accept. On other platforms ENOBUFS indicates a
13 // situation similar to EWOULDBLOCK, only that the buffer on the
14 // send side is completely full. ACE_OS::sendv for winsock2 now
15 // tries to divide the buffer and try to send, until data is sent
16 // or a different error is received. This test allocates a large
17 // iovec array buffer and verifies that a partial send occurs.
20 // Brian Johnson <johnsonb@ociweb.com>,
22 // ============================================================================
24 #include "test_config.h"
25 #include "ace/SOCK_Connector.h"
26 #include "ace/LOCK_SOCK_Acceptor.h"
27 #include "ace/Acceptor.h"
28 #include "ace/Handle_Set.h"
29 #include "ace/Connector.h"
30 #include "ace/OS_NS_sys_select.h"
31 #include "ace/OS_NS_sys_wait.h"
32 #include "ace/OS_NS_unistd.h"
33 #include "ace/os_include/os_netdb.h"
35 // The following works around bugs with some operating systems, which
36 // don't allow multiple threads/process to call accept() on the same
37 // listen-mode port/socket. Also, note that since timed accept is
38 // implemented using select(), and we use timed accepts with threads,
39 // we need a real lock when using timed accepts even if the OS has
40 // thread-safe accept.
42 #if defined (ACE_LACKS_FORK)
43 # if defined (ACE_HAS_THREADS)
44 # include "ace/Thread_Mutex.h"
45 typedef ACE_Thread_Mutex ACCEPTOR_LOCKING
;
47 # include "ace/Null_Mutex.h"
48 typedef ACE_Null_Mutex ACCEPTOR_LOCKING
;
49 # endif /* ACE_HAS_THREADS */
51 # if defined (ACE_HAS_THREAD_SAFE_ACCEPT)
52 # include "ace/Null_Mutex.h"
53 typedef ACE_Null_Mutex ACCEPTOR_LOCKING
;
55 # include "ace/Process_Mutex.h"
56 using ACCEPTOR_LOCKING
= ACE_Process_Mutex
;
57 # define CLEANUP_PROCESS_MUTEX
58 # endif /* ACE_HAS_THREAD_SAFE_ACCEPT */
59 #endif /* ACE_LACKS_FORK */
61 #if defined (ACE_HAS_TEMPLATE_TYPEDEFS)
62 #define LOCK_SOCK_ACCEPTOR ACE_LOCK_SOCK_Acceptor<ACCEPTOR_LOCKING>
64 #define LOCK_SOCK_ACCEPTOR ACE_LOCK_SOCK_Acceptor<ACCEPTOR_LOCKING>, ACE_INET_Addr
65 #endif /* ACE_HAS_TEMPLATE_TYPEDEFS */
67 #define REFCOUNTED_HASH_RECYCLABLE_ADDR ACE_Refcounted_Hash_Recyclable<ACE_INET_Addr>
70 const char FINISHED_CHAR
= '%';
71 const char RESTART_CHAR
= '&';
72 const char START_CHAR
= '0';
73 bool server_complete
= false;
74 bool client_complete
= false;
75 volatile int expected_num_messages
= 0;
77 char nextChar(const char current
)
79 if ((current
== '9') ||
80 (current
== RESTART_CHAR
) ||
81 (current
== FINISHED_CHAR
))
87 #if defined (ACE_LACKS_IOVEC)
88 typedef u_long buffer_len
;
90 using buffer_len
= size_t;
91 #endif /* ACE_LACKS_IOVEC */
93 #if defined (ACE_WIN32)
94 int beforeVersion(const DWORD majorVersion
,
95 const DWORD minorVersion
,
96 const BYTE productType
)
98 OSVERSIONINFOEX versioninfo
;
99 versioninfo
.dwOSVersionInfoSize
= sizeof (OSVERSIONINFOEX
);
100 versioninfo
.dwMajorVersion
= majorVersion
;
101 versioninfo
.dwMinorVersion
= minorVersion
;
102 versioninfo
.wProductType
= productType
;
104 ULONGLONG aboveMajorVer6TypeMask
= 0;
105 aboveMajorVer6TypeMask
=
106 ::VerSetConditionMask(0, VER_MAJORVERSION
, VER_GREATER
);
108 if (::VerifyVersionInfo(&versioninfo
, VER_MAJORVERSION
,
109 aboveMajorVer6TypeMask
) > 0)
112 ACE_OS::set_errno_to_last_error ();
113 if (errno
!= ERROR_OLD_WIN_VERSION
)
115 ACE_ERROR ((LM_ERROR
,
116 ACE_TEXT ("(%P|%t) VerifyVersionInfo errno = %d, ")
117 ACE_TEXT ("major version check must have been ")
118 ACE_TEXT ("defined incorrectly.\n"),
123 ULONGLONG majorV6AboveMinorV1TypeMask
= 0;
124 majorV6AboveMinorV1TypeMask
=
125 ::VerSetConditionMask(0, VER_MAJORVERSION
, VER_EQUAL
);
126 majorV6AboveMinorV1TypeMask
=
127 ::VerSetConditionMask(majorV6AboveMinorV1TypeMask
,
128 VER_MINORVERSION
, VER_GREATER
);
130 if (::VerifyVersionInfo(&versioninfo
,
131 VER_MAJORVERSION
| VER_MINORVERSION
,
132 majorV6AboveMinorV1TypeMask
) > 0)
135 ACE_OS::set_errno_to_last_error ();
136 if (errno
!= ERROR_OLD_WIN_VERSION
)
138 ACE_ERROR ((LM_ERROR
,
139 ACE_TEXT ("(%P|%t) VerifyVersionInfo errno = %d, ")
140 ACE_TEXT ("minor version check must have been ")
141 ACE_TEXT ("defined incorrectly.\n"),
146 ULONGLONG majorV6MinorV1NTTypeMask
= 0;
147 majorV6MinorV1NTTypeMask
=
148 ::VerSetConditionMask(0, VER_MAJORVERSION
, VER_EQUAL
);
149 majorV6MinorV1NTTypeMask
=
150 ::VerSetConditionMask(majorV6MinorV1NTTypeMask
,
151 VER_MINORVERSION
, VER_EQUAL
);
152 majorV6MinorV1NTTypeMask
=
153 ::VerSetConditionMask(majorV6MinorV1NTTypeMask
,
154 VER_PRODUCT_TYPE
, VER_EQUAL
);
156 if (::VerifyVersionInfo(&versioninfo
,
157 VER_MAJORVERSION
| VER_MINORVERSION
| VER_PRODUCT_TYPE
,
158 majorV6MinorV1NTTypeMask
) > 0)
161 ACE_OS::set_errno_to_last_error ();
162 if (errno
!= ERROR_OLD_WIN_VERSION
)
164 ACE_ERROR ((LM_ERROR
,
165 ACE_TEXT ("(%P|%t) VerifyVersionInfo errno=%d,")
166 ACE_TEXT (" call must have been defined incorrectly.\n"),
173 #endif /* ACE_WIN32 */
177 #if defined (ACE_WIN32)
178 // it has been identified that Windows7 does not have the ENOBUFS issue
179 // but testing has not been performed on Server 2008 or Vista to identify
180 // whether the issue exists or not
181 return beforeVersion(6, 1, VER_NT_WORKSTATION
);
184 #endif /* ACE_WIN32 */
189 IovecGuard(const int count
, const int slot
, const buffer_len max
);
191 char* getBufferAtOffset(const ssize_t offset
);
194 buffer_len totalBytes_
;
196 static const int ALL_SLOTS
= -1;
200 * This class is the product created by both ACE_Connector
201 * and ACE_Acceptor objects.
203 class Svc_Handler
: public ACE_Svc_Handler
<ACE_SOCK_STREAM
, ACE_NULL_SYNCH
>
206 // Do-nothing constructor.
207 Svc_Handler (ACE_Thread_Manager
* = nullptr);
209 // Initialization hook.
210 int open (void *) override
;
212 // Send data to server.
215 // Recv data from client.
218 // Shutdown the <Svc_Handler>.
219 int close (u_long
= 0) override
;
222 enum Direction
{ READX
, WRITEX
}; // VxWorks defines READ and WRITE
223 bool wait_for_completion(Direction direction
);
225 ssize_t
send (IovecGuard
& iovec_array
,
226 const ACE_TCHAR
* const send_desc
,
228 bool test_message
= false);
230 ssize_t
send (char send_char
, const ACE_TCHAR
* const send_desc
);
232 const ACE_Time_Value DEFAULT_TIME_VALUE
;
236 using ACCEPTOR
= ACE_Oneshot_Acceptor
<Svc_Handler
, ACE_LOCK_SOCK_Acceptor
<ACCEPTOR_LOCKING
>>;
237 using CONNECTOR
= ACE_Connector
<Svc_Handler
, ACE_SOCK_Connector
>;
240 IovecGuard::IovecGuard(const int count
, const int slot
, const buffer_len max
)
244 ACE_NEW (iov_
,iovec
[iovcnt_
]);
246 char expChar
= START_CHAR
;
247 for ( ; i
< iovcnt_
; ++i
)
249 iov_
[i
].iov_len
= ((slot
== i
) || (slot
== ALL_SLOTS
)) ? max
: 10;
250 totalBytes_
+= iov_
[i
].iov_len
;
253 // allocate all iov_bases as one big chunk
254 ACE_NEW (totalBuffer
,
256 for (i
= 0; i
< iovcnt_
; ++i
)
258 iov_
[i
].iov_base
= totalBuffer
;
259 totalBuffer
+= iov_
[i
].iov_len
;
260 for (u_long j
= 0; j
< static_cast<u_long
>(iov_
[i
].iov_len
); ++j
)
262 char *charbase
= static_cast<char *>(iov_
[i
].iov_base
);
263 charbase
[j
] = expChar
;
264 expChar
= ::nextChar(expChar
);
269 IovecGuard::~IovecGuard()
271 // iov_bases are all just part of one big buffer
272 char* totalBuffer
= static_cast<char *>(iov_
[0].iov_base
);
273 delete [] totalBuffer
;
278 IovecGuard::getBufferAtOffset(const ssize_t offset
)
280 char * totalBuffer
= static_cast<char *>(iov_
[0].iov_base
);
281 return totalBuffer
+ offset
;
284 Svc_Handler::Svc_Handler (ACE_Thread_Manager
*)
285 : DEFAULT_TIME_VALUE (ACE_DEFAULT_TIMEOUT
)
290 Svc_Handler::open (void *)
292 ACE_DEBUG ((LM_DEBUG
,
293 ACE_TEXT ("(%P|%t) opening Svc_Handler %@ with handle %d\n"),
295 this->peer ().get_handle ()));
296 // Enable non-blocking I/O.
297 if (this->peer ().enable (ACE_NONBLOCK
) == -1)
298 ACE_ERROR_RETURN ((LM_ERROR
,
299 ACE_TEXT ("(%P|%t) %p\n"),
300 ACE_TEXT ("enable")),
306 Svc_Handler::send_data ()
308 bool successful
= true;
309 bool win32_test
= false;
310 const int testType
= processENOBUFS();
311 const ACE_TCHAR
*send_desc
= ACE_TEXT ("");
315 buffer_len tryThreshold
= 0x7fff;
316 ssize_t thresholdActualSend
= -1;
318 static_cast<ssize_t
>(
319 (static_cast<unsigned long long>(1) <<
320 (static_cast<unsigned long long>(sizeof(ssize_t
) * 8) -
321 static_cast<unsigned long long>(1))) - 1);
322 const unsigned int startShift
= 4;
323 unsigned int shift
= startShift
;
324 unsigned int trailingMask
= 0xffff;
325 while (static_cast<ssize_t
>(tryThreshold
) < MAX
)
327 IovecGuard
all(1, 0, tryThreshold
);
328 thresholdActualSend
=
329 this->send(all
, ACE_TEXT ("identifying threshold"), true, true);
330 if (thresholdActualSend
<= static_cast<ssize_t
>(tryThreshold
)/2 + 1)
335 // try and identify the threshold more closely
337 tryThreshold
= tryThreshold
>> shift
;
340 else if ((shift
< startShift
) && (shift
> 1))
341 // already narrowing in on value
344 tryThreshold
= (tryThreshold
<< shift
) | trailingMask
;
348 #if defined (ACE_WIN32)
350 // This test only applies to win32 platforms, on systems with
351 // sane sendv impls, this is not a problem.
352 if (thresholdActualSend
!= static_cast<ssize_t
>(tryThreshold
)/2 + 1)
354 if (static_cast<ssize_t
>(tryThreshold
) == MAX
)
355 ACE_ERROR ((LM_ERROR
,
356 ACE_TEXT ("(%P|%t) was not able to identify a point ")
357 ACE_TEXT ("where ACE_OS::sendv does not send a ")
358 ACE_TEXT ("complete buffer so the Bug #3943 ENOBUFS ")
359 ACE_TEXT ("condition does not occur on this ")
360 ACE_TEXT ("platform.\n")));
362 ACE_ERROR ((LM_ERROR
,
363 ACE_TEXT ("(%P|%t) was not able to identify a point ")
364 ACE_TEXT ("where ACE_OS::sendv sent a partial buffer ")
365 ACE_TEXT ("that was consistent with Bug #3943 ")
366 ACE_TEXT ("ENOBUFS condition logic, so this test ")
367 ACE_TEXT ("probably running into other socket ")
368 ACE_TEXT ("limitations and needs to be redesigned. ")
369 ACE_TEXT ("Stuck sending %d.\n"),
370 thresholdActualSend
));
374 #endif /* ACE_WIN32 */
376 buffer_len overThreshold
= tryThreshold
;
378 ACE_DEBUG ((LM_DEBUG
,
379 ACE_TEXT ("(%P|%t) identified a buffer with %d bytes ")
380 ACE_TEXT ("hits the ENOBUFS condition.\n"),
383 #if !defined (ACE_WIN32) || (defined (ACE_HAS_WINSOCK2) && (ACE_HAS_WINSOCK2 != 0))
386 buffer_len underThreshold
= (overThreshold
+ 1) / 2;
387 // verify that if the total buffer is too large that partial is sent
388 IovecGuard
all(2, IovecGuard::ALL_SLOTS
, underThreshold
);
389 send_desc
= ACE_TEXT ("2 iovecs combined to be too large");
390 result
= this->send(all
, send_desc
, true);
391 if (win32_test
&& static_cast<u_long
>(result
) != underThreshold
)
394 ACE_ERROR ((LM_ERROR
,
395 ACE_TEXT ("(%P|%t) logic should have sent the ")
396 ACE_TEXT ("complete first iovec, ")
397 ACE_TEXT ("expected %d got %d out of %d\n"),
398 underThreshold
, result
,all
.totalBytes_
));
403 IovecGuard
all(2, IovecGuard::ALL_SLOTS
, overThreshold
);
404 send_desc
= ACE_TEXT ("2 iovecs each are too large");
405 result
= this->send(all
, send_desc
, true);
408 ACE_ERROR ((LM_ERROR
,
409 ACE_TEXT ("(%P|%t) %s: expected > 0, got %d\n"),
416 IovecGuard
small_iov(2, 0, overThreshold
);
417 send_desc
= ACE_TEXT ("large iovec followed by small iovec");
418 result
= this->send(small_iov
, send_desc
, true);
421 ACE_ERROR ((LM_ERROR
,
422 ACE_TEXT ("(%P|%t) %s: expected > 0, got %d\n"),
429 IovecGuard
large(4, 2, overThreshold
);
430 send_desc
= ACE_TEXT ("4 iovecs with third large");
431 result
= this->send(large
, send_desc
, true);
434 ACE_ERROR ((LM_ERROR
,
435 ACE_TEXT ("(%P|%t) %s: expected > 0, got %d\n"),
442 // verify that the buffer gets divided till it can send
443 IovecGuard
large(6, 5, 2 * overThreshold
);
444 send_desc
= ACE_TEXT ("6 iovecs with last very large");
445 result
= this->send(large
, send_desc
, true);
448 ACE_ERROR ((LM_ERROR
,
449 ACE_TEXT ("(%P|%t) %s: expected > 0, got %d\n"),
457 IovecGuard
array(1, 0, overThreshold
);
458 send_desc
= ACE_TEXT ("just one large iovec in array");
459 result
= this->send(array
, send_desc
, true);
462 ACE_ERROR ((LM_ERROR
,
463 ACE_TEXT ("(%P|%t) %s: expected > 0, got %d\n"),
470 IovecGuard
array(1, 0, 2 * overThreshold
);
471 send_desc
= ACE_TEXT ("just one very large iovec in array");
472 result
= this->send(array
, send_desc
, true);
475 ACE_ERROR ((LM_ERROR
,
476 ACE_TEXT ("(%P|%t) %s: expected > 0, got %d\n"),
482 #endif /* ACE_HAS_WINSOCK2 && (ACE_HAS_WINSOCK2 != 0) */
483 #if !defined (ACE_LACKS_SEND)
486 IovecGuard
one(1, 0, overThreshold
);
487 send_desc
= ACE_TEXT ("large");
488 result
= this->send(one
, send_desc
, false);
491 ACE_ERROR ((LM_ERROR
,
492 ACE_TEXT ("(%P|%t) %s: expected > 0, got %d\n"),
499 IovecGuard
one(1, 0, 2 * overThreshold
);
500 send_desc
= ACE_TEXT ("very large");
501 result
= send(one
, send_desc
, false);
504 ACE_ERROR ((LM_ERROR
,
505 ACE_TEXT ("(%P|%t) %s: expected > 0, got %d\n"),
511 #endif /* !ACE_LACKS_SEND */
513 else if (testType
> 0)
515 // since ENOBUFS condition is expected to not occur on this platform,
516 // send a very large message and verify that ACE_OS::sendv and send
517 // are able to send the whole buffer in one call
518 #if defined (ACE_HAS_WINSOCK2) && (ACE_HAS_WINSOCK2 != 0)
521 IovecGuard
small_iov(2, 0, 0x0fffffff);
522 send_desc
= ACE_TEXT ("large iovec followed by small iovec");
523 result
= this->send(small_iov
, send_desc
, true, true);
524 if (result
< 0 || static_cast<u_long
>(result
) != small_iov
.totalBytes_
)
526 ACE_ERROR ((LM_ERROR
,
527 ACE_TEXT ("(%P|%t) %s: expected %d, got %d\n"),
528 send_desc
, small_iov
.totalBytes_
, result
));
534 IovecGuard
array(1, 0, 0x0fffffff);
535 send_desc
= ACE_TEXT ("just one large iovec in array");
536 result
= this->send(array
, send_desc
, true, true);
537 if (result
< 0 || (static_cast<u_long
>(result
) != array
.totalBytes_
))
539 ACE_ERROR ((LM_ERROR
,
540 ACE_TEXT ("(%P|%t) %s: expected %d, got %d\n"),
541 send_desc
, array
.totalBytes_
, result
));
546 #endif /* ACE_HAS_WINSOCK2 && (ACE_HAS_WINSOCK2 != 0) */
547 #if !defined (ACE_LACKS_SEND)
550 IovecGuard
one(1, 0, 0x0fffffff);
551 send_desc
= ACE_TEXT ("large");
552 result
= this->send(one
, send_desc
, false, true);
553 if (result
< 0 || static_cast<u_long
>(result
) != one
.totalBytes_
)
555 ACE_ERROR ((LM_ERROR
,
556 ACE_TEXT ("(%P|%t) %s: expected %d, got %d\n"),
557 send_desc
, one
.totalBytes_
, result
));
562 #endif /* !ACE_LACKS_SEND */
565 // the determination of testType failed, ERROR is already reported
568 // need to indicate that the message is restarting
569 // this may fail if the server reads the char and closes before
570 // it is done, so let the server report the error if there was one
571 send_desc
= ACE_TEXT ("indicating no more messages");
572 this->send(FINISHED_CHAR
, send_desc
);
574 this->wait_for_completion(READX
);
577 ACE_ERROR ((LM_ERROR
,
578 ACE_TEXT ("(%P|%t) %p\n"),
579 ACE_TEXT ("close")));
582 client_complete
= successful
;
586 Svc_Handler::send (IovecGuard
& iovec_array
,
587 const ACE_TCHAR
* const send_desc
,
588 const bool use_sendv
,
589 const bool test_message
)
591 ++expected_num_messages
;
592 const ACE_TCHAR
* const send_func_name
=
593 (use_sendv
) ? ACE_TEXT ("ACE_OS::sendv") : ACE_TEXT ("ACE_OS::send");
595 ACE_DEBUG ((LM_DEBUG
,
596 ACE_TEXT ("(%P|%t) send, using %s for %s (%d bytes)\n"),
597 send_func_name
, send_desc
, iovec_array
.totalBytes_
));
599 if (!use_sendv
&& (iovec_array
.iovcnt_
!= 1))
601 ACE_ERROR ((LM_ERROR
,
602 ACE_TEXT ("(%P|%t) send, this function is not designed to ")
603 ACE_TEXT ("send an array of iovecs as individuals, %s\n"),
608 if (expected_num_messages
> 1)
609 // need to indicate that the message is restarting
610 if (this->send(RESTART_CHAR
, send_desc
) < 1)
613 ssize_t actual_send_status
;
616 while (((actual_send_status
=
617 this->peer ().sendv (iovec_array
.iov_
,
619 &DEFAULT_TIME_VALUE
)) == -1) &&
620 (errno
== EWOULDBLOCK
))
624 while (((actual_send_status
=
625 this->peer ().send (iovec_array
.iov_
->iov_base
,
626 iovec_array
.iov_
->iov_len
,
627 &DEFAULT_TIME_VALUE
)) == -1) &&
628 (errno
== EWOULDBLOCK
))
632 if (actual_send_status
== 0)
634 ACE_ERROR ((LM_ERROR
,
635 ACE_TEXT ("(%P|%t) %p, %s no data sent\n"),
636 send_func_name
, send_desc
));
639 if (actual_send_status
== -1)
641 if (errno
== ENOBUFS
)
642 ACE_ERROR ((LM_ERROR
,
643 ACE_TEXT ("(%P|%t) %p, failed regression test for %s\n"),
644 send_func_name
, send_desc
));
646 ACE_ERROR ((LM_ERROR
,
647 ACE_TEXT ("(%P|%t) %p, %s send returned errno=%d\n"),
648 send_func_name
, send_desc
, errno
));
651 buffer_len sent_bytes
= static_cast<buffer_len
>(actual_send_status
);
652 if (sent_bytes
>= iovec_array
.totalBytes_
)
654 #if defined (ACE_WIN32)
657 // the particular call to send was designed poorly and is not
658 // hitting the ENOBUFS condition
659 ACE_DEBUG ((LM_DEBUG
,
660 ACE_TEXT ("(%P|%t) expected %s to hit an ENOBUFS ")
661 ACE_TEXT ("condition and divide the buffer in half, ")
662 ACE_TEXT ("till a partial buffer is finally sent, ")
663 ACE_TEXT ("but the whole buffer was sent, so either ")
664 ACE_TEXT ("the call to Svc_Handler::send was designed ")
665 ACE_TEXT ("poorly, or the ENOBUFS condition doesn't ")
666 ACE_TEXT ("occur on this platform.")
667 ACE_TEXT (" See call to beforeVersion.\n"),
674 ACE_UNUSED_ARG (test_message
);
676 #endif /* ACE_WIN32 */
679 #if defined (ACE_win32)
680 // the test here only matters for windows, on other platforms there is
681 // no issue, so we skip this test
683 // the algorithm subtracts half of the whole, so we round up
684 u_long expectedBytes
=
685 (iovec_array
.totalBytes_
% 2) + (iovec_array
.totalBytes_
/ 2);
686 for ( ; sent_bytes
< expectedBytes
;
687 expectedBytes
= (expectedBytes
% 2) + (expectedBytes
/ 2))
690 if (sent_bytes
!= expectedBytes
)
692 ACE_ERROR ((LM_ERROR
,
693 ACE_TEXT ("(%P|%t) %p, bytes sent are not consistent ")
694 ACE_TEXT ("with the sendv logic, expected %d, got %d\n"),
695 send_func_name
, expectedBytes
, sent_bytes
));
698 #endif /* ACE_WIN32 */
700 buffer_len send_remainder
=
701 iovec_array
.totalBytes_
- static_cast<buffer_len
>(actual_send_status
);
702 char* offset
= iovec_array
.getBufferAtOffset(actual_send_status
);
704 while (send_remainder
> 0)
706 const ssize_t sendSize
=
707 (send_remainder
< 10000) ? send_remainder
: 10000;
709 send_status
= this->peer ().send (offset
, sendSize
, &DEFAULT_TIME_VALUE
);
710 if (send_status
== 0)
712 ACE_ERROR ((LM_ERROR
,
713 ACE_TEXT ("(%P|%t) %p, %s remainder send no data sent\n"),
717 if (send_status
== -1)
719 if (errno
== EWOULDBLOCK
)
722 ACE_ERROR ((LM_ERROR
,
723 ACE_TEXT ("(%P|%t) %p, %s remainder send returned ")
724 ACE_TEXT ("errno = %d\n"),
725 send_func_name
, send_desc
, errno
));
728 send_remainder
-= static_cast<buffer_len
> (send_status
);
729 offset
+= send_status
;
732 return actual_send_status
;
736 Svc_Handler::send (char send_char
, const ACE_TCHAR
* const send_desc
)
739 while ((send_status
=
740 this->peer ().send (&send_char
, 1, &DEFAULT_TIME_VALUE
)) < 1)
742 if (send_status
== -1)
744 if (errno
== EWOULDBLOCK
)
747 ACE_ERROR ((LM_ERROR
,
748 ACE_TEXT ("(%P|%t) %p, %s sending character ")
749 ACE_TEXT ("%c returned errno=%d\n"),
750 ACE_TEXT ("send"), send_desc
, send_char
, errno
));
753 if (send_status
== 0)
755 ACE_ERROR ((LM_ERROR
,
756 ACE_TEXT ("(%P|%t) %p, socket closed prematurely while ")
757 ACE_TEXT ("%s sending character %c\n"),
758 ACE_TEXT ("send"), send_desc
, send_char
));
766 Svc_Handler::recv_data ()
768 ACE_SOCK_Stream
&new_stream
= this->peer ();
770 // Read data from client (terminate on error).
771 ACE_UINT64 total_bytes
= 0;
772 ACE_UINT64 total_bytes_since_last_message
= 0;
773 const int BUFFER_SIZE
= 10000;
774 char buffer
[BUFFER_SIZE
+1];
776 bool badData
= false;
777 const int EXPECTED_BUFFER_SIZE
= BUFFER_SIZE
+ 9;
778 char expectedBuffer
[EXPECTED_BUFFER_SIZE
];
779 char expChar
= START_CHAR
;
781 for ( ; i
< EXPECTED_BUFFER_SIZE
; ++i
)
783 expectedBuffer
[i
] = expChar
;
784 expChar
= ::nextChar(expChar
);
786 expChar
= START_CHAR
;
791 if (!this->wait_for_completion(READX
))
792 ACE_ERROR ((LM_ERROR
,
793 ACE_TEXT ("(%P|%t) %p\n"),
794 ACE_TEXT ("select")));
798 ((r_bytes
= new_stream
.recv(&buffer
[0], BUFFER_SIZE
)) > 0);
799 total_bytes
+= r_bytes
)
801 bool finished
= false;
802 const char* const actualBufferEnd
= buffer
+ r_bytes
;
803 *(buffer
+ r_bytes
) = 0;
804 const char* partOfBufferEnd
= actualBufferEnd
;
805 const char* partOfBufferStart
= buffer
;
806 if (*(actualBufferEnd
- 1) == FINISHED_CHAR
)
811 ACE_DEBUG ((LM_DEBUG
,
812 ACE_TEXT ("(%P|%t) identified %d messages ")
813 ACE_TEXT ("and it is finished.\n"),
816 // loop through in case there is more than one message represented
817 while (partOfBufferStart
< partOfBufferEnd
)
819 const char* restartLoc
=
820 ACE_OS::strchr(partOfBufferStart
, RESTART_CHAR
);
821 if (restartLoc
&& (restartLoc
< partOfBufferEnd
))
824 total_bytes_since_last_message
= 0;
825 partOfBufferEnd
= restartLoc
;
827 ACE_DEBUG ((LM_DEBUG
,
828 ACE_TEXT ("(%P|%t) identified %d ")
829 ACE_TEXT ("messages.\n"),
835 total_bytes_since_last_message
+=
836 partOfBufferEnd
- partOfBufferStart
;
838 if (ACE_OS::memcmp(partOfBufferStart
,
839 &(expectedBuffer
[expChar
- START_CHAR
]),
840 partOfBufferEnd
- partOfBufferStart
) != 0)
844 const char lastCharOfBuffer
=
845 *((partOfBufferEnd
< actualBufferEnd
) ?
846 partOfBufferEnd
: partOfBufferEnd
- 1);
847 expChar
= ::nextChar(lastCharOfBuffer
);
848 // see if there is more data in the buffer
849 partOfBufferStart
= partOfBufferEnd
+ 1;
850 partOfBufferEnd
= actualBufferEnd
;
852 // if FINISHED_CHAR was found
856 ACE_ERROR ((LM_ERROR
,
857 ACE_TEXT ("(%P|%t) %p\n"),
858 ACE_TEXT ("close")));
860 ACE_ERROR ((LM_ERROR
,
861 ACE_TEXT ("(%P|%t) received final char, ")
862 ACE_TEXT ("but did not receive all data\n")));
863 else if (messages
!= expected_num_messages
)
864 ACE_ERROR ((LM_ERROR
,
865 ACE_TEXT ("(%P|%t) received final char, ")
866 ACE_TEXT ("but expected %d messages ")
867 ACE_TEXT ("and got %d\n"),
868 expected_num_messages
,
871 server_complete
= true;
879 ACE_ERROR ((LM_ERROR
,
880 ACE_TEXT ("(%P|%t) %p, socket closed prematurely\n"),
884 else if (r_bytes
< 0)
886 if (errno
!= EWOULDBLOCK
)
888 ACE_ERROR ((LM_ERROR
,
889 ACE_TEXT ("(%P|%t) %p, received %d messages and ")
890 ACE_TEXT ("%Q bytes and %Q bytes since the ")
891 ACE_TEXT ("last message\n"),
892 ACE_TEXT ("recv"), messages
,
893 total_bytes
, total_bytes_since_last_message
));
902 Svc_Handler::wait_for_completion(Direction direction
)
904 ACE_SOCK_Stream
&new_stream
= this->peer ();
906 ACE_Handle_Set handle_set
;
907 handle_set
.set_bit (new_stream
.get_handle ());
909 // Since we're in non-blocking mode we need to use <select> to
910 // avoid busy waiting.
911 #if defined (ACE_WIN64)
912 int select_width
= 0;
914 int select_width
= int (new_stream
.get_handle ()) + 1;
915 #endif /* ACE_WIN64 */
918 (direction
== READX
) ?
919 ACE_OS::select (select_width
, handle_set
, 0, 0, &DEFAULT_TIME_VALUE
) :
920 ACE_OS::select (select_width
, 0, handle_set
, 0, &DEFAULT_TIME_VALUE
);
925 Svc_Handler::close (u_long side
)
927 // Only run this protocol if we're the write-side (i.e., "1").
928 if (side
== 1 && this->peer ().close () == -1)
929 ACE_ERROR ((LM_ERROR
,
930 ACE_TEXT ("(%P|%t) %p\n"),
931 ACE_TEXT ("close_writer")));
932 // Trigger the shutdown.
933 return this->handle_close ();
936 #if defined (ACE_HAS_THREADS)
941 ACE_INET_Addr
*remote_addr
= reinterpret_cast<ACE_INET_Addr
*> (arg
);
942 ACE_INET_Addr
server_addr (remote_addr
->get_port_number (),
943 ACE_DEFAULT_SERVER_HOST
);
946 Svc_Handler
*svc_handler
= 0;
947 // Run the blocking test.
948 ACE_NEW_RETURN (svc_handler
,
952 // Perform a blocking connect to the server.
953 if (connector
.connect (svc_handler
,
955 ACE_ERROR ((LM_ERROR
,
956 ACE_TEXT ("(%P|%t) %p\n"),
957 ACE_TEXT ("connection failed")));
960 // Send the data to the server.
961 svc_handler
->send_data ();
966 // Performs the iterative server activities.
971 ACCEPTOR
*acceptor
= (ACCEPTOR
*) arg
;
972 ACE_INET_Addr cli_addr
;
973 ACE_TCHAR peer_host
[MAXHOSTNAMELEN
];
974 const ACE_Time_Value
tv (ACE_DEFAULT_TIMEOUT
);
975 ACE_Synch_Options
options (ACE_Synch_Options::USE_TIMEOUT
, tv
);
977 Svc_Handler
*svc_handler
= 0;
978 ACE_NEW_RETURN (svc_handler
,
982 // Keep looping until we timeout on <accept> or fail.
986 // Create a new <Svc_Handler> to consume the data.
988 int result
= acceptor
->accept (svc_handler
,
992 // Timing out is the only way for threads to stop accepting
993 // since we don't have signals.
997 // svc_handler->close (); The ACE_Onsehot_Acceptor closed it.
999 if (errno
== ETIMEDOUT
)
1001 ACE_DEBUG ((LM_DEBUG
,
1002 ACE_TEXT ("accept timed out\n")));
1006 ACE_ERROR_RETURN ((LM_ERROR
,
1007 ACE_TEXT ("(%P|%t) %p\n"),
1008 ACE_TEXT ("accept failed, shutting down")),
1011 // Use this rather than get_host_name() to properly adjust to the
1012 // charset width in use.
1013 cli_addr
.get_host_name (peer_host
, MAXHOSTNAMELEN
);
1014 ACE_DEBUG ((LM_DEBUG
,
1015 ACE_TEXT ("(%P|%t) client %s connected from %d\n"),
1017 cli_addr
.get_port_number ()));
1019 svc_handler
->recv_data ();
1026 // Spawn threads and run the client and server.
1029 spawn_threads (ACCEPTOR
*acceptor
,
1030 ACE_INET_Addr
*server_addr
)
1034 if (ACE_Thread_Manager::instance ()->spawn_n
1037 (ACE_THR_FUNC
) server
,
1040 , ACE_DEFAULT_THREAD_PRIORITY
1044 ACE_ERROR ((LM_ERROR
,
1045 ACE_TEXT ("(%P|%t) %p\n%a"),
1046 ACE_TEXT ("server thread create failed"),
1049 if (ACE_Thread_Manager::instance ()->spawn
1050 ((ACE_THR_FUNC
) client
,
1051 (void *) server_addr
,
1055 ACE_ERROR ((LM_ERROR
,
1056 ACE_TEXT ("(%P|%t) %p\n%a"),
1057 ACE_TEXT ("client thread create failed"),
1060 // Wait for the threads to exit.
1061 // But, wait for a limited time because sometimes the test hangs on Irix.
1062 ACE_Time_Value
const max_wait (400 /* seconds */);
1063 ACE_Time_Value
const wait_time (ACE_OS::gettimeofday () + max_wait
);
1064 if (ACE_Thread_Manager::instance ()->wait (&wait_time
) == -1)
1067 ACE_ERROR ((LM_ERROR
,
1068 ACE_TEXT ("maximum wait time of %d msec exceeded\n"),
1071 ACE_OS::perror (ACE_TEXT ("ACE_Thread_Manager::wait"));
1078 #endif /* ACE_HAS_THREADS */
1079 //#endif /* ACE_WIN32 */
1082 run_main (int , ACE_TCHAR
*[])
1084 ACE_START_TEST (ACE_TEXT ("Bug_3943_Regression_Test"));
1086 #if defined (ACE_HAS_THREADS)
1087 # if !defined (ACE_WIN32) || ((defined (ACE_HAS_WINSOCK2) && (ACE_HAS_WINSOCK2 != 0)) || !defined (ACE_LACKS_SEND))
1089 # ifndef ACE_LACKS_ACCEPT
1092 ACE_INET_Addr server_addr
;
1094 // Bind acceptor to any port and then find out what the port was.
1095 if (acceptor
.open (ACE_sap_any_cast (const ACE_INET_Addr
&)) == -1
1096 || acceptor
.acceptor ().get_local_addr (server_addr
) == -1)
1098 ACE_ERROR ((LM_ERROR
,
1099 ACE_TEXT ("(%P|%t) %p\n"),
1100 ACE_TEXT ("open")));
1105 ACE_DEBUG ((LM_DEBUG
,
1106 ACE_TEXT ("(%P|%t) starting server at port %d\n"),
1107 server_addr
.get_port_number ()));
1109 if (spawn_threads (&acceptor
, &server_addr
) == -1)
1110 ACE_ERROR_RETURN ((LM_ERROR
,
1111 ACE_TEXT ("(%P|%t) %p\n"),
1112 ACE_TEXT ("spawn_threads")),
1116 if (!client_complete
|| !server_complete
)
1119 # ifdef CLEANUP_PROCESS_MUTEX
1120 ACE_Process_Mutex::unlink (acceptor
.acceptor ().lock ().name ());
1123 # endif // ACE_LACKS_ACCEPT
1124 # endif /* ACE_HAS_WINSOCK2 && (ACE_HAS_WINSOCK2 != 0)) || !ACE_LACKS_SEND */
1125 #else /* !ACE_HAS_THREADS */
1126 ACE_ERROR ((LM_INFO
,
1127 ACE_TEXT ("threads not supported on this platform\n")));
1128 #endif /* ACE_HAS_THREADS */