Merge pull request #2216 from jwillemsen/jwi-cxxversionchecks
[ACE_TAO.git] / ACE / examples / IPC_SAP / UPIPE_SAP / ex2.cpp
blob9ab4125b16bfc44c3ff723678fb8b90cc201e5f0
2 //=============================================================================
3 /**
4 * @file ex2.cpp
6 * Example for using <ACE_UPIPE_SAP> and <ACE_Thread> for
7 * intra-process communication.
9 * @author Gerhard Lenzer and Douglas C. Schmidt
11 //=============================================================================
14 #include "ace/OS_main.h"
15 #include "ace/UPIPE_Connector.h"
16 #include "ace/UPIPE_Acceptor.h"
17 #include "ace/OS_NS_time.h"
18 #include <memory>
20 #if defined (ACE_HAS_THREADS)
22 // Data for testsuite.
23 static int size = 0;
24 static int iterations = 0;
26 static void *
27 supplier (void *)
29 ACE_UPIPE_Stream s_stream;
31 ACE_UPIPE_Addr c_addr (ACE_TEXT("pattern"));
33 std::unique_ptr<char[]> mybuf (new char[size]);
35 for (int i = 0; i < size; i++)
36 mybuf[i] = 'a';
38 ACE_DEBUG ((LM_DEBUG,
39 "(%t) supplier starting connect thread\n"));
41 ACE_UPIPE_Connector con;
43 if (con.connect (s_stream, c_addr) == -1)
44 ACE_ERROR ((LM_ERROR,
45 "(%t) %p\n",
46 "ACE_UPIPE_Acceptor.connect failed"));
48 // Test asynchronicity (the "acausal principle" ;-)).
49 s_stream.enable (ACE_SIGIO);
51 ACE_Message_Block *mb_p;
53 for (int j = 0; j < iterations; j++)
55 ACE_NEW_RETURN (mb_p,
56 ACE_Message_Block (size,
57 ACE_Message_Block::MB_DATA,
58 (ACE_Message_Block *) 0,
59 mybuf.get ()),
60 0);
61 if (s_stream.send (mb_p) == -1)
62 ACE_ERROR_RETURN ((LM_ERROR,
63 "(%t) %p\n",
64 "send failed"),
65 0);
68 ACE_NEW_RETURN (mb_p,
69 ACE_Message_Block ((size_t) 0),
70 0);
72 // Insert a 0-sized message block to signal the other side to shut
73 // down.
74 if (s_stream.send (mb_p) == -1)
75 ACE_ERROR_RETURN ((LM_ERROR,
76 "(%t) %p\n",
77 "send failed"),
78 0);
79 s_stream.close ();
80 return 0;
83 static void *
84 consumer (void *)
86 ACE_UPIPE_Stream c_stream;
88 // Set the high water mark to size to achieve optimum performance.
90 int wm = size * iterations;
92 if (c_stream.control (ACE_IO_Cntl_Msg::SET_HWM,
93 &wm) == -1)
94 ACE_DEBUG ((LM_DEBUG,
95 "set HWM failed\n"));
97 ACE_UPIPE_Addr serv_addr (ACE_TEXT("pattern"));
99 // accept will wait up to 4 seconds
100 ACE_UPIPE_Acceptor acc (serv_addr);
102 ACE_DEBUG ((LM_DEBUG,
103 "(%t) consumer spawning the supplier thread\n"));
105 // Spawn the supplier thread.
106 if (ACE_Thread_Manager::instance ()->spawn (ACE_THR_FUNC (supplier),
107 (void *) 0,
108 THR_NEW_LWP | THR_DETACHED) == -1)
109 ACE_ERROR_RETURN ((LM_ERROR,
110 "%p\n",
111 "spawn"),
114 ACE_DEBUG ((LM_DEBUG,
115 "(%t) consumer starting accept\n"));
117 if (acc.accept (c_stream) == -1)
118 ACE_ERROR ((LM_ERROR,
119 "(%t) %p\n",
120 "ACE_UPIPE_Acceptor.accept failed"));
122 // Time measurement.
123 time_t currsec;
124 ACE_OS::time (&currsec);
125 time_t start = (time_t) currsec;
127 int received_messages = 0;
129 for (ACE_Message_Block *mb = 0;
130 c_stream.recv (mb) != -1 && mb->size () != 0;
131 mb->release ())
132 received_messages++;
134 ACE_OS::time (&currsec);
135 time_t secs = (time_t) currsec - start;
137 ACE_DEBUG ((LM_DEBUG,
138 "(%t) Transferred %d blocks of size %d\n"
139 "The program ran %d seconds\n",
140 received_messages, size, secs));
141 c_stream.close ();
142 return 0;
146 ACE_TMAIN (int argc, ACE_TCHAR *argv[])
148 size = argc > 1 ? ACE_OS::atoi (argv[1]) : 32;
149 iterations = argc > 2 ? ACE_OS::atoi (argv[2]) : 16;
151 // Spawn the two threads.
152 if (ACE_Thread_Manager::instance ()->spawn (ACE_THR_FUNC (consumer),
153 (void *) 0,
154 THR_NEW_LWP | THR_DETACHED) == -1)
155 ACE_ERROR_RETURN ((LM_ERROR,
156 "%p\n",
157 "spawn"),
159 // Wait for producer and consumer threads to exit.
160 ACE_Thread_Manager::instance ()->wait ();
161 return 0;
163 #else
165 ACE_TMAIN (int, ACE_TCHAR *[])
167 ACE_ERROR_RETURN ((LM_ERROR,
168 "threads not supported on this platform\n"),
171 #endif /* ACE_HAS_THREADS */