1 // ============================================================================
7 // Bug_3943_Regression_Test.cpp
10 // When a large buffer is sent to ACE_OS::sendv, using winsock2,
11 // ENOBUFS occurs, which indicates that the buffer was too large
12 // for WSASend to accept. On other platforms ENOBUFS indicates a
13 // situation similar to EWOULDBLOCK, only that the buffer on the
14 // send side is completely full. ACE_OS::sendv for winsock2 now
15 // tries to divide the buffer and try to send, until data is sent
16 // or a different error is received. This test allocates a large
17 // iovec array buffer and verifies that a partial send occurs.
20 // Brian Johnson <johnsonb@ociweb.com>,
22 // ============================================================================
24 #include "test_config.h"
25 #include "ace/SOCK_Connector.h"
26 #include "ace/LOCK_SOCK_Acceptor.h"
27 #include "ace/Acceptor.h"
28 #include "ace/Handle_Set.h"
29 #include "ace/Connector.h"
30 #include "ace/OS_NS_sys_select.h"
31 #include "ace/OS_NS_sys_wait.h"
32 #include "ace/OS_NS_unistd.h"
33 #include "ace/os_include/os_netdb.h"
35 // The following works around bugs with some operating systems, which
36 // don't allow multiple threads/process to call accept() on the same
37 // listen-mode port/socket. Also, note that since timed accept is
38 // implemented using select(), and we use timed accepts with threads,
39 // we need a real lock when using timed accepts even if the OS has
40 // thread-safe accept.
42 #if defined (ACE_LACKS_FORK)
43 # if defined (ACE_HAS_THREADS)
44 # include "ace/Thread_Mutex.h"
45 typedef ACE_Thread_Mutex ACCEPTOR_LOCKING
;
47 # include "ace/Null_Mutex.h"
48 typedef ACE_Null_Mutex ACCEPTOR_LOCKING
;
49 # endif /* ACE_HAS_THREADS */
51 # if defined (ACE_HAS_THREAD_SAFE_ACCEPT)
52 # include "ace/Null_Mutex.h"
53 typedef ACE_Null_Mutex ACCEPTOR_LOCKING
;
55 # include "ace/Process_Mutex.h"
56 typedef ACE_Process_Mutex ACCEPTOR_LOCKING
;
57 # define CLEANUP_PROCESS_MUTEX
58 # endif /* ACE_HAS_THREAD_SAFE_ACCEPT */
59 #endif /* ACE_LACKS_FORK */
61 #if defined (ACE_HAS_TEMPLATE_TYPEDEFS)
62 #define LOCK_SOCK_ACCEPTOR ACE_LOCK_SOCK_Acceptor<ACCEPTOR_LOCKING>
64 #define LOCK_SOCK_ACCEPTOR ACE_LOCK_SOCK_Acceptor<ACCEPTOR_LOCKING>, ACE_INET_Addr
65 #endif /* ACE_HAS_TEMPLATE_TYPEDEFS */
67 #define REFCOUNTED_HASH_RECYCLABLE_ADDR ACE_Refcounted_Hash_Recyclable<ACE_INET_Addr>
71 const char FINISHED_CHAR
= '%';
72 const char RESTART_CHAR
= '&';
73 const char START_CHAR
= '0';
74 bool server_complete
= false;
75 bool client_complete
= false;
76 volatile int expected_num_messages
= 0;
78 char nextChar(const char current
)
80 if ((current
== '9') ||
81 (current
== RESTART_CHAR
) ||
82 (current
== FINISHED_CHAR
))
88 #if defined (ACE_LACKS_IOVEC)
89 typedef u_long buffer_len
;
91 typedef size_t buffer_len
;
92 #endif /* ACE_LACKS_IOVEC */
94 #if defined (ACE_WIN32)
95 int beforeVersion(const DWORD majorVersion
,
96 const DWORD minorVersion
,
97 const BYTE productType
)
99 #if !defined(ACE_HAS_WINCE)
100 OSVERSIONINFOEX versioninfo
;
101 versioninfo
.dwOSVersionInfoSize
= sizeof (OSVERSIONINFOEX
);
102 versioninfo
.dwMajorVersion
= majorVersion
;
103 versioninfo
.dwMinorVersion
= minorVersion
;
104 versioninfo
.wProductType
= productType
;
106 ULONGLONG aboveMajorVer6TypeMask
= 0;
107 aboveMajorVer6TypeMask
=
108 ::VerSetConditionMask(0, VER_MAJORVERSION
, VER_GREATER
);
110 if (::VerifyVersionInfo(&versioninfo
, VER_MAJORVERSION
,
111 aboveMajorVer6TypeMask
) > 0)
114 ACE_OS::set_errno_to_last_error ();
115 if (errno
!= ERROR_OLD_WIN_VERSION
)
117 ACE_ERROR ((LM_ERROR
,
118 ACE_TEXT ("(%P|%t) VerifyVersionInfo errno = %d, ")
119 ACE_TEXT ("major version check must have been ")
120 ACE_TEXT ("defined incorrectly.\n"),
125 ULONGLONG majorV6AboveMinorV1TypeMask
= 0;
126 majorV6AboveMinorV1TypeMask
=
127 ::VerSetConditionMask(0, VER_MAJORVERSION
, VER_EQUAL
);
128 majorV6AboveMinorV1TypeMask
=
129 ::VerSetConditionMask(majorV6AboveMinorV1TypeMask
,
130 VER_MINORVERSION
, VER_GREATER
);
132 if (::VerifyVersionInfo(&versioninfo
,
133 VER_MAJORVERSION
| VER_MINORVERSION
,
134 majorV6AboveMinorV1TypeMask
) > 0)
137 ACE_OS::set_errno_to_last_error ();
138 if (errno
!= ERROR_OLD_WIN_VERSION
)
140 ACE_ERROR ((LM_ERROR
,
141 ACE_TEXT ("(%P|%t) VerifyVersionInfo errno = %d, ")
142 ACE_TEXT ("minor version check must have been ")
143 ACE_TEXT ("defined incorrectly.\n"),
148 ULONGLONG majorV6MinorV1NTTypeMask
= 0;
149 majorV6MinorV1NTTypeMask
=
150 ::VerSetConditionMask(0, VER_MAJORVERSION
, VER_EQUAL
);
151 majorV6MinorV1NTTypeMask
=
152 ::VerSetConditionMask(majorV6MinorV1NTTypeMask
,
153 VER_MINORVERSION
, VER_EQUAL
);
154 majorV6MinorV1NTTypeMask
=
155 ::VerSetConditionMask(majorV6MinorV1NTTypeMask
,
156 VER_PRODUCT_TYPE
, VER_EQUAL
);
158 if (::VerifyVersionInfo(&versioninfo
,
159 VER_MAJORVERSION
| VER_MINORVERSION
| VER_PRODUCT_TYPE
,
160 majorV6MinorV1NTTypeMask
) > 0)
163 ACE_OS::set_errno_to_last_error ();
164 if (errno
!= ERROR_OLD_WIN_VERSION
)
166 ACE_ERROR ((LM_ERROR
,
167 ACE_TEXT ("(%P|%t) VerifyVersionInfo errno=%d,")
168 ACE_TEXT (" call must have been defined incorrectly.\n"),
174 #else // defined(ACE_HAS_WINCE)
175 // no version testing of WinCE has been performed
176 ACE_UNUSED_ARG (majorVersion
);
177 ACE_UNUSED_ARG (minorVersion
);
178 ACE_UNUSED_ARG (productType
);
180 #endif /* ACE_HAS_WINCE */
182 #endif /* ACE_WIN32 */
186 #if defined (ACE_WIN32) && !defined(ACE_HAS_WINCE)
187 // it has been identified that Windows7 does not have the ENOBUFS issue
188 // but testing has not been performed on Server 2008 or Vista to identify
189 // wether the issue exists or not
190 return beforeVersion(6, 1, VER_NT_WORKSTATION
);
191 #else // defined(ACE_HAS_WINCE)
192 // currently, no versions of WINCE identified to not have the ENOBUFS error
194 #endif /* ACE_WIN32 && !ACE_HAS_WINCE */
199 IovecGuard(const int count
, const int slot
, const buffer_len max
);
201 char* getBufferAtOffset(const ssize_t offset
);
204 buffer_len totalBytes_
;
206 static const int ALL_SLOTS
= -1;
209 class Svc_Handler
: public ACE_Svc_Handler
<ACE_SOCK_STREAM
, ACE_NULL_SYNCH
>
212 // This class is the product created by both <ACE_Connector>
213 // and <ACE_Acceptor> objects.
216 // This class gets its own header file to work around AIX C++
217 // compiler "features" related to template instantiation... It is
218 // only used by Conn_Test.cpp.
220 Svc_Handler (ACE_Thread_Manager
* = 0);
221 // Do-nothing constructor.
223 virtual int open (void *);
224 // Initialization hook.
226 void send_data (void);
227 // Send data to server.
229 void recv_data (void);
230 // Recv data from client.
232 int close (u_long
= 0);
233 // Shutdown the <Svc_Handler>.
236 enum Direction
{ READX
, WRITEX
}; // VxWorks defines READ and WRITE
237 bool wait_for_completion(Direction direction
);
239 ssize_t
send (IovecGuard
& iovec_array
,
240 const ACE_TCHAR
* const send_desc
,
242 bool test_message
= false);
244 ssize_t
send (char send_char
, const ACE_TCHAR
* const send_desc
);
246 const ACE_Time_Value DEFAULT_TIME_VALUE
;
251 typedef ACE_Oneshot_Acceptor
<Svc_Handler
,
252 LOCK_SOCK_ACCEPTOR
> ACCEPTOR
;
253 typedef ACE_Connector
<Svc_Handler
,
254 ACE_SOCK_CONNECTOR
> CONNECTOR
;
257 IovecGuard::IovecGuard(const int count
, const int slot
, const buffer_len max
)
261 ACE_NEW (iov_
,iovec
[iovcnt_
]);
263 char expChar
= START_CHAR
;
264 for ( ; i
< iovcnt_
; ++i
)
266 iov_
[i
].iov_len
= ((slot
== i
) || (slot
== ALL_SLOTS
)) ? max
: 10;
267 totalBytes_
+= iov_
[i
].iov_len
;
270 // allocate all iov_bases as one big chunk
271 ACE_NEW (totalBuffer
,
273 for (i
= 0; i
< iovcnt_
; ++i
)
275 iov_
[i
].iov_base
= totalBuffer
;
276 totalBuffer
+= iov_
[i
].iov_len
;
277 for (u_long j
= 0; j
< static_cast<u_long
>(iov_
[i
].iov_len
); ++j
)
279 char *charbase
= static_cast<char *>(iov_
[i
].iov_base
);
280 charbase
[j
] = expChar
;
281 expChar
= ::nextChar(expChar
);
286 IovecGuard::~IovecGuard()
288 // iov_bases are all just part of one big buffer
289 char* totalBuffer
= static_cast<char *>(iov_
[0].iov_base
);
290 delete [] totalBuffer
;
295 IovecGuard::getBufferAtOffset(const ssize_t offset
)
297 char * totalBuffer
= static_cast<char *>(iov_
[0].iov_base
);
298 return totalBuffer
+ offset
;
301 Svc_Handler::Svc_Handler (ACE_Thread_Manager
*)
302 : DEFAULT_TIME_VALUE (ACE_DEFAULT_TIMEOUT
)
307 Svc_Handler::open (void *)
309 ACE_DEBUG ((LM_DEBUG
,
310 ACE_TEXT ("(%P|%t) opening Svc_Handler %@ with handle %d\n"),
312 this->peer ().get_handle ()));
313 // Enable non-blocking I/O.
314 if (this->peer ().enable (ACE_NONBLOCK
) == -1)
315 ACE_ERROR_RETURN ((LM_ERROR
,
316 ACE_TEXT ("(%P|%t) %p\n"),
317 ACE_TEXT ("enable")),
323 Svc_Handler::send_data (void)
325 bool successful
= true;
326 bool win32_test
= false;
327 const int testType
= processENOBUFS();
328 const ACE_TCHAR
*send_desc
= ACE_TEXT ("");
332 buffer_len tryThreshold
= 0x7fff;
333 ssize_t thresholdActualSend
= -1;
335 static_cast<ssize_t
>(
336 (static_cast<unsigned long long>(1) <<
337 (static_cast<unsigned long long>(sizeof(ssize_t
) * 8) -
338 static_cast<unsigned long long>(1))) - 1);
339 const unsigned int startShift
= 4;
340 unsigned int shift
= startShift
;
341 unsigned int trailingMask
= 0xffff;
342 while (static_cast<ssize_t
>(tryThreshold
) < MAX
)
344 IovecGuard
all(1, 0, tryThreshold
);
345 thresholdActualSend
=
346 this->send(all
, ACE_TEXT ("identifying threshold"), true, true);
347 if (thresholdActualSend
<= static_cast<ssize_t
>(tryThreshold
)/2 + 1)
352 // try and identify the threshold more closely
354 tryThreshold
= tryThreshold
>> shift
;
357 else if ((shift
< startShift
) && (shift
> 1))
358 // already narrowing in on value
361 tryThreshold
= (tryThreshold
<< shift
) | trailingMask
;
365 #if defined (ACE_WIN32)
367 // This test only applies to win32 platforms, on systems with
368 // sane sendv impls, this is not a problem.
369 if (thresholdActualSend
!= static_cast<ssize_t
>(tryThreshold
)/2 + 1)
371 if (static_cast<ssize_t
>(tryThreshold
) == MAX
)
372 ACE_ERROR ((LM_ERROR
,
373 ACE_TEXT ("(%P|%t) was not able to identify a point ")
374 ACE_TEXT ("where ACE_OS::sendv does not send a ")
375 ACE_TEXT ("complete buffer so the Bug #3943 ENOBUFS ")
376 ACE_TEXT ("condition does not occur on this ")
377 ACE_TEXT ("platform.\n")));
379 ACE_ERROR ((LM_ERROR
,
380 ACE_TEXT ("(%P|%t) was not able to identify a point ")
381 ACE_TEXT ("where ACE_OS::sendv sent a partial buffer ")
382 ACE_TEXT ("that was consistent with Bug #3943 ")
383 ACE_TEXT ("ENOBUFS condition logic, so this test ")
384 ACE_TEXT ("probably running into other socket ")
385 ACE_TEXT ("limitations and needs to be redesigned. ")
386 ACE_TEXT ("Stuck sending %d.\n"),
387 thresholdActualSend
));
391 #endif /* ACE_WIN32 */
393 buffer_len overThreshold
= tryThreshold
;
395 ACE_DEBUG ((LM_DEBUG
,
396 ACE_TEXT ("(%P|%t) identified a buffer with %d bytes ")
397 ACE_TEXT ("hits the ENOBUFS condition.\n"),
400 #if !defined (ACE_WIN32) || (defined (ACE_HAS_WINSOCK2) && (ACE_HAS_WINSOCK2 != 0))
403 buffer_len underThreshold
= (overThreshold
+ 1) / 2;
404 // verify that if the total buffer is too large that partial is sent
405 IovecGuard
all(2, IovecGuard::ALL_SLOTS
, underThreshold
);
406 send_desc
= ACE_TEXT ("2 iovecs combined to be too large");
407 result
= this->send(all
, send_desc
, true);
408 if (win32_test
&& static_cast<u_long
>(result
) != underThreshold
)
411 ACE_ERROR ((LM_ERROR
,
412 ACE_TEXT ("(%P|%t) logic should have sent the ")
413 ACE_TEXT ("complete first iovec, ")
414 ACE_TEXT ("expected %d got %d out of %d\n"),
415 underThreshold
, result
,all
.totalBytes_
));
420 IovecGuard
all(2, IovecGuard::ALL_SLOTS
, overThreshold
);
421 send_desc
= ACE_TEXT ("2 iovecs each are too large");
422 result
= this->send(all
, send_desc
, true);
425 ACE_ERROR ((LM_ERROR
,
426 ACE_TEXT ("(%P|%t) %s: expected > 0, got %d\n"),
433 IovecGuard
small_iov(2, 0, overThreshold
);
434 send_desc
= ACE_TEXT ("large iovec followed by small iovec");
435 result
= this->send(small_iov
, send_desc
, true);
438 ACE_ERROR ((LM_ERROR
,
439 ACE_TEXT ("(%P|%t) %s: expected > 0, got %d\n"),
446 IovecGuard
large(4, 2, overThreshold
);
447 send_desc
= ACE_TEXT ("4 iovecs with third large");
448 result
= this->send(large
, send_desc
, true);
451 ACE_ERROR ((LM_ERROR
,
452 ACE_TEXT ("(%P|%t) %s: expected > 0, got %d\n"),
459 // verify that the buffer gets divided till it can send
460 IovecGuard
large(6, 5, 2 * overThreshold
);
461 send_desc
= ACE_TEXT ("6 iovecs with last very large");
462 result
= this->send(large
, send_desc
, true);
465 ACE_ERROR ((LM_ERROR
,
466 ACE_TEXT ("(%P|%t) %s: expected > 0, got %d\n"),
474 IovecGuard
array(1, 0, overThreshold
);
475 send_desc
= ACE_TEXT ("just one large iovec in array");
476 result
= this->send(array
, send_desc
, true);
479 ACE_ERROR ((LM_ERROR
,
480 ACE_TEXT ("(%P|%t) %s: expected > 0, got %d\n"),
487 IovecGuard
array(1, 0, 2 * overThreshold
);
488 send_desc
= ACE_TEXT ("just one very large iovec in array");
489 result
= this->send(array
, send_desc
, true);
492 ACE_ERROR ((LM_ERROR
,
493 ACE_TEXT ("(%P|%t) %s: expected > 0, got %d\n"),
499 #endif /* ACE_HAS_WINSOCK2 && (ACE_HAS_WINSOCK2 != 0) */
500 #if !defined (ACE_LACKS_SEND)
503 IovecGuard
one(1, 0, overThreshold
);
504 send_desc
= ACE_TEXT ("large");
505 result
= this->send(one
, send_desc
, false);
508 ACE_ERROR ((LM_ERROR
,
509 ACE_TEXT ("(%P|%t) %s: expected > 0, got %d\n"),
516 IovecGuard
one(1, 0, 2 * overThreshold
);
517 send_desc
= ACE_TEXT ("very large");
518 result
= send(one
, send_desc
, false);
521 ACE_ERROR ((LM_ERROR
,
522 ACE_TEXT ("(%P|%t) %s: expected > 0, got %d\n"),
528 #endif /* !ACE_LACKS_SEND */
530 else if (testType
> 0)
532 // since ENOBUFS condition is expected to not occur on this platform,
533 // send a very large message and verify that ACE_OS::sendv and send
534 // are able to send the whole buffer in one call
535 #if defined (ACE_HAS_WINSOCK2) && (ACE_HAS_WINSOCK2 != 0)
538 IovecGuard
small_iov(2, 0, 0x0fffffff);
539 send_desc
= ACE_TEXT ("large iovec followed by small iovec");
540 result
= this->send(small_iov
, send_desc
, true, true);
541 if (result
< 0 || static_cast<u_long
>(result
) != small_iov
.totalBytes_
)
543 ACE_ERROR ((LM_ERROR
,
544 ACE_TEXT ("(%P|%t) %s: expected %d, got %d\n"),
545 send_desc
, small_iov
.totalBytes_
, result
));
551 IovecGuard
array(1, 0, 0x0fffffff);
552 send_desc
= ACE_TEXT ("just one large iovec in array");
553 result
= this->send(array
, send_desc
, true, true);
554 if (result
< 0 || (static_cast<u_long
>(result
) != array
.totalBytes_
))
556 ACE_ERROR ((LM_ERROR
,
557 ACE_TEXT ("(%P|%t) %s: expected %d, got %d\n"),
558 send_desc
, array
.totalBytes_
, result
));
563 #endif /* ACE_HAS_WINSOCK2 && (ACE_HAS_WINSOCK2 != 0) */
564 #if !defined (ACE_LACKS_SEND)
567 IovecGuard
one(1, 0, 0x0fffffff);
568 send_desc
= ACE_TEXT ("large");
569 result
= this->send(one
, send_desc
, false, true);
570 if (result
< 0 || static_cast<u_long
>(result
) != one
.totalBytes_
)
572 ACE_ERROR ((LM_ERROR
,
573 ACE_TEXT ("(%P|%t) %s: expected %d, got %d\n"),
574 send_desc
, one
.totalBytes_
, result
));
579 #endif /* !ACE_LACKS_SEND */
582 // the determination of testType failed, ERROR is already reported
585 // need to indicate that the message is restarting
586 // this may fail if the server reads the char and closes before
587 // it is done, so let the server report the error if there was one
588 send_desc
= ACE_TEXT ("indicating no more messages");
589 this->send(FINISHED_CHAR
, send_desc
);
591 this->wait_for_completion(READX
);
594 ACE_ERROR ((LM_ERROR
,
595 ACE_TEXT ("(%P|%t) %p\n"),
596 ACE_TEXT ("close")));
599 client_complete
= successful
;
603 Svc_Handler::send (IovecGuard
& iovec_array
,
604 const ACE_TCHAR
* const send_desc
,
605 const bool use_sendv
,
606 const bool test_message
)
608 ++expected_num_messages
;
609 const ACE_TCHAR
* const send_func_name
=
610 (use_sendv
) ? ACE_TEXT ("ACE_OS::sendv") : ACE_TEXT ("ACE_OS::send");
612 ACE_DEBUG ((LM_DEBUG
,
613 ACE_TEXT ("(%P|%t) send, using %s for %s (%d bytes)\n"),
614 send_func_name
, send_desc
, iovec_array
.totalBytes_
));
616 if (!use_sendv
&& (iovec_array
.iovcnt_
!= 1))
618 ACE_ERROR ((LM_ERROR
,
619 ACE_TEXT ("(%P|%t) send, this function is not designed to ")
620 ACE_TEXT ("send an array of iovecs as individuals, %s\n"),
625 if (expected_num_messages
> 1)
626 // need to indicate that the message is restarting
627 if (this->send(RESTART_CHAR
, send_desc
) < 1)
630 ssize_t actual_send_status
;
633 while (((actual_send_status
=
634 this->peer ().sendv (iovec_array
.iov_
,
636 &DEFAULT_TIME_VALUE
)) == -1) &&
637 (errno
== EWOULDBLOCK
))
641 while (((actual_send_status
=
642 this->peer ().send (iovec_array
.iov_
->iov_base
,
643 iovec_array
.iov_
->iov_len
,
644 &DEFAULT_TIME_VALUE
)) == -1) &&
645 (errno
== EWOULDBLOCK
))
649 if (actual_send_status
== 0)
651 ACE_ERROR ((LM_ERROR
,
652 ACE_TEXT ("(%P|%t) %p, %s no data sent\n"),
653 send_func_name
, send_desc
));
656 if (actual_send_status
== -1)
658 if (errno
== ENOBUFS
)
659 ACE_ERROR ((LM_ERROR
,
660 ACE_TEXT ("(%P|%t) %p, failed regression test for %s\n"),
661 send_func_name
, send_desc
));
663 ACE_ERROR ((LM_ERROR
,
664 ACE_TEXT ("(%P|%t) %p, %s send returned errno=%d\n"),
665 send_func_name
, send_desc
, errno
));
668 buffer_len sent_bytes
= static_cast<buffer_len
>(actual_send_status
);
669 if (sent_bytes
>= iovec_array
.totalBytes_
)
671 #if defined (ACE_WIN32)
674 // the particular call to send was designed poorly and is not
675 // hitting the ENOBUFS condition
676 ACE_DEBUG ((LM_DEBUG
,
677 ACE_TEXT ("(%P|%t) expected %s to hit an ENOBUFS ")
678 ACE_TEXT ("condition and divide the buffer in half, ")
679 ACE_TEXT ("till a partial buffer is finally sent, ")
680 ACE_TEXT ("but the whole buffer was sent, so either ")
681 ACE_TEXT ("the call to Svc_Handler::send was designed ")
682 ACE_TEXT ("poorly, or the ENOBUFS condition doesn't ")
683 ACE_TEXT ("occur on this platform.")
684 ACE_TEXT (" See call to beforeVersion.\n"),
691 ACE_UNUSED_ARG (test_message
);
693 #endif /* ACE_WIN32 */
696 #if defined (ACE_win32)
697 // the test here only matters for windows, on other platforms there is
698 // no issue, so we skip this test
700 // the algorithm subtracts half of the whole, so we round up
701 u_long expectedBytes
=
702 (iovec_array
.totalBytes_
% 2) + (iovec_array
.totalBytes_
/ 2);
703 for ( ; sent_bytes
< expectedBytes
;
704 expectedBytes
= (expectedBytes
% 2) + (expectedBytes
/ 2))
707 if (sent_bytes
!= expectedBytes
)
709 ACE_ERROR ((LM_ERROR
,
710 ACE_TEXT ("(%P|%t) %p, bytes sent are not consistent ")
711 ACE_TEXT ("with the sendv logic, expected %d, got %d\n"),
712 send_func_name
, expectedBytes
, sent_bytes
));
715 #endif /* ACE_WIN32 */
717 buffer_len send_remainder
=
718 iovec_array
.totalBytes_
- static_cast<buffer_len
>(actual_send_status
);
719 char* offset
= iovec_array
.getBufferAtOffset(actual_send_status
);
721 while (send_remainder
> 0)
723 const ssize_t sendSize
=
724 (send_remainder
< 10000) ? send_remainder
: 10000;
726 send_status
= this->peer ().send (offset
, sendSize
, &DEFAULT_TIME_VALUE
);
727 if (send_status
== 0)
729 ACE_ERROR ((LM_ERROR
,
730 ACE_TEXT ("(%P|%t) %p, %s remainder send no data sent\n"),
734 if (send_status
== -1)
736 if (errno
== EWOULDBLOCK
)
739 ACE_ERROR ((LM_ERROR
,
740 ACE_TEXT ("(%P|%t) %p, %s remainder send returned ")
741 ACE_TEXT ("errno = %d\n"),
742 send_func_name
, send_desc
, errno
));
745 send_remainder
-= static_cast<buffer_len
> (send_status
);
746 offset
+= send_status
;
749 return actual_send_status
;
753 Svc_Handler::send (char send_char
, const ACE_TCHAR
* const send_desc
)
756 while ((send_status
=
757 this->peer ().send (&send_char
, 1, &DEFAULT_TIME_VALUE
)) < 1)
759 if (send_status
== -1)
761 if (errno
== EWOULDBLOCK
)
764 ACE_ERROR ((LM_ERROR
,
765 ACE_TEXT ("(%P|%t) %p, %s sending character ")
766 ACE_TEXT ("%c returned errno=%d\n"),
767 ACE_TEXT ("send"), send_desc
, send_char
, errno
));
770 if (send_status
== 0)
772 ACE_ERROR ((LM_ERROR
,
773 ACE_TEXT ("(%P|%t) %p, socket closed prematurely while ")
774 ACE_TEXT ("%s sending character %c\n"),
775 ACE_TEXT ("send"), send_desc
, send_char
));
783 Svc_Handler::recv_data (void)
785 ACE_SOCK_Stream
&new_stream
= this->peer ();
787 // Read data from client (terminate on error).
788 ACE_UINT64 total_bytes
= 0;
789 ACE_UINT64 total_bytes_since_last_message
= 0;
790 const int BUFFER_SIZE
= 10000;
791 char buffer
[BUFFER_SIZE
+1];
793 bool badData
= false;
794 const int EXPECTED_BUFFER_SIZE
= BUFFER_SIZE
+ 9;
795 char expectedBuffer
[EXPECTED_BUFFER_SIZE
];
796 char expChar
= START_CHAR
;
798 for ( ; i
< EXPECTED_BUFFER_SIZE
; ++i
)
800 expectedBuffer
[i
] = expChar
;
801 expChar
= ::nextChar(expChar
);
803 expChar
= START_CHAR
;
808 if (!this->wait_for_completion(READX
))
809 ACE_ERROR ((LM_ERROR
,
810 ACE_TEXT ("(%P|%t) %p\n"),
811 ACE_TEXT ("select")));
815 ((r_bytes
= new_stream
.recv(&buffer
[0], BUFFER_SIZE
)) > 0);
816 total_bytes
+= r_bytes
)
818 bool finished
= false;
819 const char* const actualBufferEnd
= buffer
+ r_bytes
;
820 *(buffer
+ r_bytes
) = 0;
821 const char* partOfBufferEnd
= actualBufferEnd
;
822 const char* partOfBufferStart
= buffer
;
823 if (*(actualBufferEnd
- 1) == FINISHED_CHAR
)
828 ACE_DEBUG ((LM_DEBUG
,
829 ACE_TEXT ("(%P|%t) identified %d messages ")
830 ACE_TEXT ("and it is finished.\n"),
833 // loop through in case there is more than one message represented
834 while (partOfBufferStart
< partOfBufferEnd
)
836 const char* restartLoc
=
837 ACE_OS::strchr(partOfBufferStart
, RESTART_CHAR
);
838 if (restartLoc
&& (restartLoc
< partOfBufferEnd
))
841 total_bytes_since_last_message
= 0;
842 partOfBufferEnd
= restartLoc
;
844 ACE_DEBUG ((LM_DEBUG
,
845 ACE_TEXT ("(%P|%t) identified %d ")
846 ACE_TEXT ("messages.\n"),
852 total_bytes_since_last_message
+=
853 partOfBufferEnd
- partOfBufferStart
;
855 if (ACE_OS::memcmp(partOfBufferStart
,
856 &(expectedBuffer
[expChar
- START_CHAR
]),
857 partOfBufferEnd
- partOfBufferStart
) != 0)
861 const char lastCharOfBuffer
=
862 *((partOfBufferEnd
< actualBufferEnd
) ?
863 partOfBufferEnd
: partOfBufferEnd
- 1);
864 expChar
= ::nextChar(lastCharOfBuffer
);
865 // see if there is more data in the buffer
866 partOfBufferStart
= partOfBufferEnd
+ 1;
867 partOfBufferEnd
= actualBufferEnd
;
869 // if FINISHED_CHAR was found
873 ACE_ERROR ((LM_ERROR
,
874 ACE_TEXT ("(%P|%t) %p\n"),
875 ACE_TEXT ("close")));
877 ACE_ERROR ((LM_ERROR
,
878 ACE_TEXT ("(%P|%t) received final char, ")
879 ACE_TEXT ("but did not receive all data\n")));
880 else if (messages
!= expected_num_messages
)
881 ACE_ERROR ((LM_ERROR
,
882 ACE_TEXT ("(%P|%t) received final char, ")
883 ACE_TEXT ("but expected %d messages ")
884 ACE_TEXT ("and got %d\n"),
885 expected_num_messages
,
888 server_complete
= true;
896 ACE_ERROR ((LM_ERROR
,
897 ACE_TEXT ("(%P|%t) %p, socket closed prematurely\n"),
901 else if (r_bytes
< 0)
903 if (errno
!= EWOULDBLOCK
)
905 ACE_ERROR ((LM_ERROR
,
906 ACE_TEXT ("(%P|%t) %p, received %d messages and ")
907 ACE_TEXT ("%Q bytes and %Q bytes since the ")
908 ACE_TEXT ("last message\n"),
909 ACE_TEXT ("recv"), messages
,
910 total_bytes
, total_bytes_since_last_message
));
919 Svc_Handler::wait_for_completion(Direction direction
)
921 ACE_SOCK_Stream
&new_stream
= this->peer ();
923 ACE_Handle_Set handle_set
;
924 handle_set
.set_bit (new_stream
.get_handle ());
926 // Since we're in non-blocking mode we need to use <select> to
927 // avoid busy waiting.
928 #if defined (ACE_WIN64)
929 int select_width
= 0;
931 int select_width
= int (new_stream
.get_handle ()) + 1;
932 #endif /* ACE_WIN64 */
935 (direction
== READX
) ?
936 ACE_OS::select (select_width
, handle_set
, 0, 0, &DEFAULT_TIME_VALUE
) :
937 ACE_OS::select (select_width
, 0, handle_set
, 0, &DEFAULT_TIME_VALUE
);
942 Svc_Handler::close (u_long side
)
944 // Only run this protocol if we're the write-side (i.e., "1").
945 if (side
== 1 && this->peer ().close () == -1)
946 ACE_ERROR ((LM_ERROR
,
947 ACE_TEXT ("(%P|%t) %p\n"),
948 ACE_TEXT ("close_writer")));
949 // Trigger the shutdown.
950 return this->handle_close ();
953 #if defined (ACE_HAS_THREADS)
958 ACE_INET_Addr
*remote_addr
= reinterpret_cast<ACE_INET_Addr
*> (arg
);
959 ACE_INET_Addr
server_addr (remote_addr
->get_port_number (),
960 ACE_DEFAULT_SERVER_HOST
);
963 Svc_Handler
*svc_handler
= 0;
964 // Run the blocking test.
965 ACE_NEW_RETURN (svc_handler
,
969 // Perform a blocking connect to the server.
970 if (connector
.connect (svc_handler
,
972 ACE_ERROR ((LM_ERROR
,
973 ACE_TEXT ("(%P|%t) %p\n"),
974 ACE_TEXT ("connection failed")));
977 // Send the data to the server.
978 svc_handler
->send_data ();
984 // Performs the iterative server activities.
989 ACCEPTOR
*acceptor
= (ACCEPTOR
*) arg
;
990 ACE_INET_Addr cli_addr
;
991 ACE_TCHAR peer_host
[MAXHOSTNAMELEN
];
992 const ACE_Time_Value
tv (ACE_DEFAULT_TIMEOUT
);
993 ACE_Synch_Options
options (ACE_Synch_Options::USE_TIMEOUT
, tv
);
995 Svc_Handler
*svc_handler
= 0;
996 ACE_NEW_RETURN (svc_handler
,
1000 // Keep looping until we timeout on <accept> or fail.
1004 // Create a new <Svc_Handler> to consume the data.
1006 int result
= acceptor
->accept (svc_handler
,
1010 // Timing out is the only way for threads to stop accepting
1011 // since we don't have signals.
1015 // svc_handler->close (); The ACE_Onsehot_Acceptor closed it.
1017 if (errno
== ETIMEDOUT
)
1019 ACE_DEBUG ((LM_DEBUG
,
1020 ACE_TEXT ("accept timed out\n")));
1024 ACE_ERROR_RETURN ((LM_ERROR
,
1025 ACE_TEXT ("(%P|%t) %p\n"),
1026 ACE_TEXT ("accept failed, shutting down")),
1029 // Use this rather than get_host_name() to properly adjust to the
1030 // charset width in use.
1031 cli_addr
.get_host_name (peer_host
, MAXHOSTNAMELEN
);
1032 ACE_DEBUG ((LM_DEBUG
,
1033 ACE_TEXT ("(%P|%t) client %s connected from %d\n"),
1035 cli_addr
.get_port_number ()));
1037 svc_handler
->recv_data ();
1044 // Spawn threads and run the client and server.
1047 spawn_threads (ACCEPTOR
*acceptor
,
1048 ACE_INET_Addr
*server_addr
)
1052 if (ACE_Thread_Manager::instance ()->spawn_n
1055 (ACE_THR_FUNC
) server
,
1058 , ACE_DEFAULT_THREAD_PRIORITY
1062 ACE_ERROR ((LM_ERROR
,
1063 ACE_TEXT ("(%P|%t) %p\n%a"),
1064 ACE_TEXT ("server thread create failed"),
1067 if (ACE_Thread_Manager::instance ()->spawn
1068 ((ACE_THR_FUNC
) client
,
1069 (void *) server_addr
,
1073 ACE_ERROR ((LM_ERROR
,
1074 ACE_TEXT ("(%P|%t) %p\n%a"),
1075 ACE_TEXT ("client thread create failed"),
1078 // Wait for the threads to exit.
1079 // But, wait for a limited time because sometimes the test hangs on Irix.
1080 ACE_Time_Value
const max_wait (400 /* seconds */);
1081 ACE_Time_Value
const wait_time (ACE_OS::gettimeofday () + max_wait
);
1082 if (ACE_Thread_Manager::instance ()->wait (&wait_time
) == -1)
1085 ACE_ERROR ((LM_ERROR
,
1086 ACE_TEXT ("maximum wait time of %d msec exceeded\n"),
1089 ACE_OS::perror (ACE_TEXT ("ACE_Thread_Manager::wait"));
1096 #endif /* ACE_HAS_THREADS */
1097 //#endif /* ACE_WIN32 */
1100 run_main (int , ACE_TCHAR
*[])
1102 ACE_START_TEST (ACE_TEXT ("Bug_3943_Regression_Test"));
1104 #if defined (ACE_HAS_THREADS)
1105 # if !defined (ACE_WIN32) || ((defined (ACE_HAS_WINSOCK2) && (ACE_HAS_WINSOCK2 != 0)) || !defined (ACE_LACKS_SEND))
1107 # ifndef ACE_LACKS_ACCEPT
1110 ACE_INET_Addr server_addr
;
1112 // Bind acceptor to any port and then find out what the port was.
1113 if (acceptor
.open (ACE_sap_any_cast (const ACE_INET_Addr
&)) == -1
1114 || acceptor
.acceptor ().get_local_addr (server_addr
) == -1)
1116 ACE_ERROR ((LM_ERROR
,
1117 ACE_TEXT ("(%P|%t) %p\n"),
1118 ACE_TEXT ("open")));
1123 ACE_DEBUG ((LM_DEBUG
,
1124 ACE_TEXT ("(%P|%t) starting server at port %d\n"),
1125 server_addr
.get_port_number ()));
1127 if (spawn_threads (&acceptor
, &server_addr
) == -1)
1128 ACE_ERROR_RETURN ((LM_ERROR
,
1129 ACE_TEXT ("(%P|%t) %p\n"),
1130 ACE_TEXT ("spawn_threads")),
1134 if (!client_complete
|| !server_complete
)
1137 # ifdef CLEANUP_PROCESS_MUTEX
1138 ACE_Process_Mutex::unlink (acceptor
.acceptor ().lock ().name ());
1141 # endif // ACE_LACKS_ACCEPT
1142 # endif /* ACE_HAS_WINSOCK2 && (ACE_HAS_WINSOCK2 != 0)) || !ACE_LACKS_SEND */
1143 #else /* !ACE_HAS_THREADS */
1144 ACE_ERROR ((LM_INFO
,
1145 ACE_TEXT ("threads not supported on this platform\n")));
1146 #endif /* ACE_HAS_THREADS */