2 //=============================================================================
6 * Example for using <ACE_UPIPE_SAP> and <ACE_Thread> for
7 * intra-process communication.
9 * @author Gerhard Lenzer and Douglas C. Schmidt
11 //=============================================================================
14 #include "ace/OS_main.h"
15 #include "ace/UPIPE_Connector.h"
16 #include "ace/UPIPE_Acceptor.h"
17 #include "ace/OS_NS_time.h"
20 #if defined (ACE_HAS_THREADS)
22 // Data for testsuite.
24 static int iterations
= 0;
29 ACE_UPIPE_Stream s_stream
;
31 ACE_UPIPE_Addr
c_addr (ACE_TEXT("pattern"));
33 std::unique_ptr
<char[]> mybuf (new char[size
]);
35 for (int i
= 0; i
< size
; i
++)
39 "(%t) supplier starting connect thread\n"));
41 ACE_UPIPE_Connector con
;
43 if (con
.connect (s_stream
, c_addr
) == -1)
46 "ACE_UPIPE_Acceptor.connect failed"));
48 // Test asynchronicity (the "acausal principle" ;-)).
49 s_stream
.enable (ACE_SIGIO
);
51 ACE_Message_Block
*mb_p
;
53 for (int j
= 0; j
< iterations
; j
++)
56 ACE_Message_Block (size
,
57 ACE_Message_Block::MB_DATA
,
58 (ACE_Message_Block
*) 0,
61 if (s_stream
.send (mb_p
) == -1)
62 ACE_ERROR_RETURN ((LM_ERROR
,
69 ACE_Message_Block ((size_t) 0),
72 // Insert a 0-sized message block to signal the other side to shut
74 if (s_stream
.send (mb_p
) == -1)
75 ACE_ERROR_RETURN ((LM_ERROR
,
86 ACE_UPIPE_Stream c_stream
;
88 // Set the high water mark to size to achieve optimum performance.
90 int wm
= size
* iterations
;
92 if (c_stream
.control (ACE_IO_Cntl_Msg::SET_HWM
,
97 ACE_UPIPE_Addr
serv_addr (ACE_TEXT("pattern"));
99 // accept will wait up to 4 seconds
100 ACE_UPIPE_Acceptor
acc (serv_addr
);
102 ACE_DEBUG ((LM_DEBUG
,
103 "(%t) consumer spawning the supplier thread\n"));
105 // Spawn the supplier thread.
106 if (ACE_Thread_Manager::instance ()->spawn (ACE_THR_FUNC (supplier
),
108 THR_NEW_LWP
| THR_DETACHED
) == -1)
109 ACE_ERROR_RETURN ((LM_ERROR
,
114 ACE_DEBUG ((LM_DEBUG
,
115 "(%t) consumer starting accept\n"));
117 if (acc
.accept (c_stream
) == -1)
118 ACE_ERROR ((LM_ERROR
,
120 "ACE_UPIPE_Acceptor.accept failed"));
124 ACE_OS::time (&currsec
);
125 time_t start
= (time_t) currsec
;
127 int received_messages
= 0;
129 for (ACE_Message_Block
*mb
= 0;
130 c_stream
.recv (mb
) != -1 && mb
->size () != 0;
134 ACE_OS::time (&currsec
);
135 time_t secs
= (time_t) currsec
- start
;
137 ACE_DEBUG ((LM_DEBUG
,
138 "(%t) Transferred %d blocks of size %d\n"
139 "The program ran %d seconds\n",
140 received_messages
, size
, secs
));
146 ACE_TMAIN (int argc
, ACE_TCHAR
*argv
[])
148 size
= argc
> 1 ? ACE_OS::atoi (argv
[1]) : 32;
149 iterations
= argc
> 2 ? ACE_OS::atoi (argv
[2]) : 16;
151 // Spawn the two threads.
152 if (ACE_Thread_Manager::instance ()->spawn (ACE_THR_FUNC (consumer
),
154 THR_NEW_LWP
| THR_DETACHED
) == -1)
155 ACE_ERROR_RETURN ((LM_ERROR
,
159 // Wait for producer and consumer threads to exit.
160 ACE_Thread_Manager::instance ()->wait ();
165 ACE_TMAIN (int, ACE_TCHAR
*[])
167 ACE_ERROR_RETURN ((LM_ERROR
,
168 "threads not supported on this platform\n"),
171 #endif /* ACE_HAS_THREADS */