Merge pull request #2309 from mitza-oci/warnings
[ACE_TAO.git] / ACE / examples / ASX / UPIPE_Event_Server / event_server.cpp
blob4f3d42bbe87ad69124fbc5101ff78510a5b8fa73
1 // Test the event server.
3 #include "ace/OS_main.h"
4 #include "ace/Stream.h"
5 #include "ace/Service_Config.h"
6 #include "ace/UPIPE_Acceptor.h"
7 #include "ace/UPIPE_Connector.h"
8 #include "ace/Truncate.h"
10 // FUZZ: disable check_for_streams_include
11 #include "ace/streams.h"
13 #include "Options.h"
14 #include "Consumer_Router.h"
15 #include "Event_Analyzer.h"
16 #include "Supplier_Router.h"
17 #include "ace/Sig_Adapter.h"
18 #include "ace/OS_NS_unistd.h"
20 #if defined (ACE_HAS_THREADS)
22 typedef ACE_Stream<ACE_MT_SYNCH> MT_Stream;
23 typedef ACE_Module<ACE_MT_SYNCH> MT_Module;
25 // Handle SIGINT and terminate the entire application.
27 class Quit_Handler : public ACE_Sig_Adapter
29 public:
30 Quit_Handler ();
31 virtual int handle_input (ACE_HANDLE fd);
34 Quit_Handler::Quit_Handler ()
35 : ACE_Sig_Adapter (ACE_Sig_Handler_Ex (ACE_Reactor::end_event_loop))
37 // Register to trap input from the user.
38 if (ACE_Event_Handler::register_stdin_handler (this,
39 ACE_Reactor::instance (),
40 ACE_Thread_Manager::instance ()) == -1)
41 ACE_ERROR ((LM_ERROR, "%p\n", "register_stdin_handler"));
42 // Register to trap the SIGINT signal.
43 else if (ACE_Reactor::instance ()->register_handler
44 (SIGINT, this) == -1)
45 ACE_ERROR ((LM_ERROR, "%p\n", "register_handler"));
48 int
49 Quit_Handler::handle_input (ACE_HANDLE)
51 options.stop_timer ();
52 ACE_DEBUG ((LM_INFO, " (%t) closing down the test\n"));
53 options.print_results ();
55 ACE_Reactor::end_event_loop();
56 return 0;
59 static void *
60 consumer (void *)
62 ACE_UPIPE_Stream c_stream;
63 ACE_UPIPE_Addr c_addr (ACE_TEXT ("/tmp/conupipe"));
65 int verb = options.verbose ();
66 int msiz = ACE_Utils::truncate_cast<int> (options.message_size ());
67 time_t secs, par1, par2;
68 time_t currsec;
70 if (verb)
71 cout << "consumer starting connect" << endl;
73 ACE_UPIPE_Connector con;
75 if (con.connect (c_stream, c_addr) == -1)
76 ACE_DEBUG ((LM_INFO, " (%t) connect failed\n"));
77 else
78 cout << "consumer :we're connected" << endl;
80 ACE_Message_Block *mb_p;
82 int done = 0;
83 int cnt = 0;
84 ACE_OS::time (&currsec);
86 par1 = currsec;
88 while (done == 0 && (c_stream.recv (mb_p) != -1))
90 if (mb_p->length () > 1)
92 cnt++;
93 if (verb)
95 cout << " consumer received message !!!!!! "
96 << mb_p->rd_ptr () << endl;
99 else
101 if (verb)
103 cout << "consumer got last mb"
104 << (char) * (mb_p->rd_ptr ()) << endl;
106 c_stream.close ();
107 done = 1;
111 ACE_OS::time (&currsec);
112 par2 = currsec;
114 secs = par2 - par1;
116 if (secs <= 0)
117 secs=1;
119 ACE_DEBUG ((LM_INFO,
120 ACE_TEXT ("consumer got %d messages of size %d ")
121 ACE_TEXT ("within %: seconds\n"),
122 cnt, msiz, secs));
124 ACE_OS::sleep (2);
125 cout << "consumer terminating " << endl;
126 return 0;
129 static void *
130 supplier (void *dummy)
132 ACE_UPIPE_Stream s_stream;
133 ACE_UPIPE_Addr serv_addr (ACE_TEXT ("/tmp/supupipe"));
134 ACE_UPIPE_Connector con;
136 int iter = ACE_Utils::truncate_cast<int> (options.iterations ());
137 int verb = options.verbose ();
138 int msiz = ACE_Utils::truncate_cast<int> (options.message_size ());
139 cout << "supplier starting connect" << endl;
141 if (con.connect (s_stream, serv_addr) == -1)
142 ACE_DEBUG ((LM_INFO, " (%t) connect failed\n"));
144 cout << "supplier : we're connected" << endl;
145 int n;
146 n = 0;
147 ACE_Message_Block * mb_p;
149 while (n < iter)
151 mb_p = new ACE_Message_Block (msiz);
152 ACE_OS::strcpy (mb_p->rd_ptr (), (char *) dummy);
153 mb_p->length (msiz);
154 if (verb)
155 cout << "supplier sending 1 message_block" << endl;
156 if (s_stream.send (mb_p) == -1)
158 cout << "supplier send failed" << endl;
159 return (void *) -1;
161 n++;
164 mb_p = new ACE_Message_Block (10);
165 mb_p->length (1);
166 *mb_p->rd_ptr () = 'g';
168 cout << "supplier sending last message_block" << endl;
170 if (s_stream.send (mb_p) == -1)
172 cout << "supplier send last mb failed" << endl;
173 return (void *) -1;
175 mb_p = new ACE_Message_Block (10);
176 mb_p->length (0);
178 if (verb)
179 cout << "supplier sending very last message_block" << endl;
181 if (s_stream.send (mb_p) == -1)
183 cout << "supplier send very last mb failed" << endl;
184 return (void *) -1;
187 ACE_OS::sleep (2);
188 cout << "supplier terminating" << endl;
189 return 0;
193 ACE_TMAIN (int argc, ACE_TCHAR *argv[])
195 ACE_Service_Config daemon;
197 options.parse_args (argc, argv);
198 options.start_timer ();
200 // Primary ACE_Stream for EVENT_SERVER application.
201 MT_Stream event_server;
203 // Enable graceful shutdowns....
204 Quit_Handler quit_handler;
206 // Create the modules..
208 MT_Module *sr = new MT_Module (ACE_TEXT ("Supplier_Router"),
209 new Supplier_Router (ACE_Thread_Manager::instance ()));
210 MT_Module *ea = new MT_Module (ACE_TEXT ("Event_Analyzer"),
211 new Event_Analyzer,
212 new Event_Analyzer);
213 MT_Module *cr = new MT_Module (ACE_TEXT ("Consumer_Router"),
214 0, // 0 triggers the creation of a ACE_Thru_Task...
215 new Consumer_Router (ACE_Thread_Manager::instance ()));
217 // Push the modules onto the event_server stream.
219 if (event_server.push (sr) == -1)
220 ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"),
221 ACE_TEXT ("push (Supplier_Router)")), -1);
223 if (event_server.push (ea) == -1)
224 ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"),
225 ACE_TEXT ("push (Event_Analyzer)")), -1);
227 if (event_server.push (cr) == -1)
228 ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"),
229 ACE_TEXT ("push (Consumer_Router)")), -1);
231 // Set the high and low water marks appropriately.
233 int wm = ACE_Utils::truncate_cast<int> (options.low_water_mark ());
235 if (event_server.control (ACE_IO_Cntl_Msg::SET_LWM, &wm) == -1)
236 ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"),
237 ACE_TEXT ("push (setting low watermark)")), -1);
239 wm = ACE_Utils::truncate_cast<int> (options.high_water_mark ());
241 if (event_server.control (ACE_IO_Cntl_Msg::SET_HWM, &wm) == -1)
242 ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"),
243 ACE_TEXT ("push (setting high watermark)")), -1);
245 // spawn the two threads.
247 if (ACE_Thread_Manager::instance ()->spawn (ACE_THR_FUNC (consumer), (void *) 0,
248 THR_NEW_LWP | THR_DETACHED) == -1)
249 ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"), ACE_TEXT ("spawn")), 1);
251 else if (ACE_Thread_Manager::instance ()->spawn (ACE_THR_FUNC (supplier), (void *) "hello",
252 THR_NEW_LWP | THR_DETACHED) == -1)
253 ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"), ACE_TEXT ("spawn")), 1);
255 // Perform the main event loop waiting for the user to type ^C or to
256 // enter a line on the ACE_STDIN.
258 ACE_Reactor::instance ()->run_reactor_event_loop ();
260 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("main exiting\n")));
262 return 0;
264 #else
266 ACE_TMAIN (int, ACE_TCHAR *[])
268 ACE_ERROR_RETURN ((LM_ERROR,
269 ACE_TEXT ("test not defined for this platform\n")),
270 -1);
272 #endif /* ACE_HAS_THREADS */