Merge pull request #2216 from jwillemsen/jwi-cxxversionchecks
[ACE_TAO.git] / ACE / examples / IPC_SAP / UPIPE_SAP / ex3.cpp
blob83d1ba39725beacd9d1f96422ffcb86f4ee2b5ac
2 //=============================================================================
3 /**
4 * @file ex3.cpp
6 * Example for using <ACE_UPIPE_SAP> and <ACE_Thread> for intra-process
7 * communication. This example uses char buffers as input/output
8 * interface to the <ACE_UPIPE_Stream>.
10 * @author Gerhard Lenzer and Prashant Jain.
12 //=============================================================================
14 #include "ace/OS_main.h"
15 #include "ace/UPIPE_Connector.h"
16 #include "ace/UPIPE_Acceptor.h"
17 #include "ace/OS_NS_time.h"
18 #include <memory>
20 #if defined (ACE_HAS_THREADS)
22 // Data for testsuite.
23 static int size = 0;
24 static int iterations = 0;
26 static void *
27 supplier (void *)
29 ACE_UPIPE_Stream s_stream;
30 ACE_UPIPE_Addr c_addr (ACE_TEXT("pattern"));
32 ACE_UPIPE_Connector con;
34 ACE_DEBUG ((LM_DEBUG,
35 "(%t) supplier starting connect thread\n"));
37 if (con.connect (s_stream, c_addr) == -1)
38 ACE_ERROR ((LM_ERROR,
39 "(%t) %p\n",
40 "ACE_UPIPE_Acceptor.connect failed"));
42 std::unique_ptr<char[]> mybuf (new char[size]);
44 for (int i = 0; i < size; i++)
45 mybuf[i] = 'a';
47 for (int j = 0; j < iterations; j++)
48 if (s_stream.send (mybuf.get (), size) == -1)
49 ACE_ERROR_RETURN ((LM_ERROR,
50 "(%t) %p\n", "send failed"),
51 0);
53 // Insert a 0-sized message block to signal the other side to shut
54 // down.
55 if (s_stream.send (new ACE_Message_Block ((size_t) 0)) == -1)
56 ACE_ERROR_RETURN ((LM_ERROR,
57 "(%t) %p\n", "error put"),
58 0);
59 s_stream.close ();
60 return 0;
63 static void *
64 consumer (void *)
66 ACE_UPIPE_Stream c_stream;
67 ACE_UPIPE_Addr serv_addr (ACE_TEXT("pattern"));
69 // Accept will wait up to 4 seconds
70 ACE_UPIPE_Acceptor acc (serv_addr);
72 ACE_DEBUG ((LM_DEBUG,
73 "(%t) consumer spawning the supplier thread\n"));
75 // Spawn the supplier thread.
76 if (ACE_Thread_Manager::instance ()->spawn (ACE_THR_FUNC (supplier),
77 (void *) 0,
78 THR_NEW_LWP | THR_DETACHED) == -1)
79 ACE_ERROR_RETURN ((LM_ERROR,
80 "%p\n",
81 "spawn"),
82 0);
83 ACE_DEBUG ((LM_DEBUG,
84 "(%t) consumer starting accept\n"));
86 if (acc.accept (c_stream) == -1)
87 ACE_ERROR ((LM_ERROR, "(%t) %p\n",
88 "ACE_UPIPE_Acceptor.accept failed"));
90 // Ensure deletion upon exit.
91 std::unique_ptr<char[]> mybuf (new char[size]);
92 time_t currsec;
94 ACE_OS::time (&currsec);
96 time_t start = (time_t) currsec;
97 int result = 0;
98 int blocks = 0;
100 for (;; blocks++)
102 result = c_stream.recv (mybuf.get (), size);
103 if (result <= 0)
104 break;
107 if (result == -1)
108 ACE_ERROR ((LM_ERROR,
109 "(%t) %p\n", "recv failed"));
111 ACE_OS::time (&currsec);
112 time_t secs = (time_t) currsec - start;
114 ACE_DEBUG ((LM_DEBUG,
115 "(%t) Transferred %d blocks of size %d\n"
116 "The program ran %d seconds\n",
117 blocks,
118 size,
119 secs));
120 c_stream.close ();
121 return 0;
125 ACE_TMAIN (int argc, ACE_TCHAR *argv[])
127 size = argc > 1 ? ACE_OS::atoi (argv[1]) : 32;
128 iterations = argc > 2 ? ACE_OS::atoi (argv[2]) : 16;
130 // Spawn the thread.
131 if (ACE_Thread_Manager::instance ()->spawn (ACE_THR_FUNC (consumer),
132 (void *) 0,
133 THR_NEW_LWP | THR_DETACHED) == -1)
134 ACE_ERROR_RETURN ((LM_ERROR,
135 "%p\n",
136 "spawn"),
138 // Wait for producer and consumer threads to exit.
139 ACE_Thread_Manager::instance ()->wait ();
140 return 0;
142 #else
144 ACE_TMAIN (int, ACE_TCHAR *[])
146 ACE_ERROR_RETURN ((LM_ERROR,
147 "threads not supported on this platform\n"),
150 #endif /* ACE_HAS_THREADS */