2 //=============================================================================
6 * Example for using <ACE_UPIPE_SAP> and <ACE_Thread> for intra-process
7 * communication. This example uses char buffers as input/output
8 * interface to the <ACE_UPIPE_Stream>.
10 * @author Gerhard Lenzer and Prashant Jain.
12 //=============================================================================
14 #include "ace/OS_main.h"
15 #include "ace/UPIPE_Connector.h"
16 #include "ace/UPIPE_Acceptor.h"
17 #include "ace/OS_NS_time.h"
20 #if defined (ACE_HAS_THREADS)
22 // Data for testsuite.
24 static int iterations
= 0;
29 ACE_UPIPE_Stream s_stream
;
30 ACE_UPIPE_Addr
c_addr (ACE_TEXT("pattern"));
32 ACE_UPIPE_Connector con
;
35 "(%t) supplier starting connect thread\n"));
37 if (con
.connect (s_stream
, c_addr
) == -1)
40 "ACE_UPIPE_Acceptor.connect failed"));
42 std::unique_ptr
<char[]> mybuf (new char[size
]);
44 for (int i
= 0; i
< size
; i
++)
47 for (int j
= 0; j
< iterations
; j
++)
48 if (s_stream
.send (mybuf
.get (), size
) == -1)
49 ACE_ERROR_RETURN ((LM_ERROR
,
50 "(%t) %p\n", "send failed"),
53 // Insert a 0-sized message block to signal the other side to shut
55 if (s_stream
.send (new ACE_Message_Block ((size_t) 0)) == -1)
56 ACE_ERROR_RETURN ((LM_ERROR
,
57 "(%t) %p\n", "error put"),
66 ACE_UPIPE_Stream c_stream
;
67 ACE_UPIPE_Addr
serv_addr (ACE_TEXT("pattern"));
69 // Accept will wait up to 4 seconds
70 ACE_UPIPE_Acceptor
acc (serv_addr
);
73 "(%t) consumer spawning the supplier thread\n"));
75 // Spawn the supplier thread.
76 if (ACE_Thread_Manager::instance ()->spawn (ACE_THR_FUNC (supplier
),
78 THR_NEW_LWP
| THR_DETACHED
) == -1)
79 ACE_ERROR_RETURN ((LM_ERROR
,
84 "(%t) consumer starting accept\n"));
86 if (acc
.accept (c_stream
) == -1)
87 ACE_ERROR ((LM_ERROR
, "(%t) %p\n",
88 "ACE_UPIPE_Acceptor.accept failed"));
90 // Ensure deletion upon exit.
91 std::unique_ptr
<char[]> mybuf (new char[size
]);
94 ACE_OS::time (&currsec
);
96 time_t start
= (time_t) currsec
;
102 result
= c_stream
.recv (mybuf
.get (), size
);
108 ACE_ERROR ((LM_ERROR
,
109 "(%t) %p\n", "recv failed"));
111 ACE_OS::time (&currsec
);
112 time_t secs
= (time_t) currsec
- start
;
114 ACE_DEBUG ((LM_DEBUG
,
115 "(%t) Transferred %d blocks of size %d\n"
116 "The program ran %d seconds\n",
125 ACE_TMAIN (int argc
, ACE_TCHAR
*argv
[])
127 size
= argc
> 1 ? ACE_OS::atoi (argv
[1]) : 32;
128 iterations
= argc
> 2 ? ACE_OS::atoi (argv
[2]) : 16;
131 if (ACE_Thread_Manager::instance ()->spawn (ACE_THR_FUNC (consumer
),
133 THR_NEW_LWP
| THR_DETACHED
) == -1)
134 ACE_ERROR_RETURN ((LM_ERROR
,
138 // Wait for producer and consumer threads to exit.
139 ACE_Thread_Manager::instance ()->wait ();
144 ACE_TMAIN (int, ACE_TCHAR
*[])
146 ACE_ERROR_RETURN ((LM_ERROR
,
147 "threads not supported on this platform\n"),
150 #endif /* ACE_HAS_THREADS */