1 // Test the event server.
3 #include "ace/OS_main.h"
4 #include "ace/Stream.h"
5 #include "ace/Service_Config.h"
6 #include "ace/UPIPE_Acceptor.h"
7 #include "ace/UPIPE_Connector.h"
8 #include "ace/Truncate.h"
10 // FUZZ: disable check_for_streams_include
11 #include "ace/streams.h"
14 #include "Consumer_Router.h"
15 #include "Event_Analyzer.h"
16 #include "Supplier_Router.h"
17 #include "ace/Sig_Adapter.h"
18 #include "ace/OS_NS_unistd.h"
20 #if defined (ACE_HAS_THREADS)
22 typedef ACE_Stream
<ACE_MT_SYNCH
> MT_Stream
;
23 typedef ACE_Module
<ACE_MT_SYNCH
> MT_Module
;
25 // Handle SIGINT and terminate the entire application.
27 class Quit_Handler
: public ACE_Sig_Adapter
31 virtual int handle_input (ACE_HANDLE fd
);
34 Quit_Handler::Quit_Handler ()
35 : ACE_Sig_Adapter (ACE_Sig_Handler_Ex (ACE_Reactor::end_event_loop
))
37 // Register to trap input from the user.
38 if (ACE_Event_Handler::register_stdin_handler (this,
39 ACE_Reactor::instance (),
40 ACE_Thread_Manager::instance ()) == -1)
41 ACE_ERROR ((LM_ERROR
, "%p\n", "register_stdin_handler"));
42 // Register to trap the SIGINT signal.
43 else if (ACE_Reactor::instance ()->register_handler
45 ACE_ERROR ((LM_ERROR
, "%p\n", "register_handler"));
49 Quit_Handler::handle_input (ACE_HANDLE
)
51 options
.stop_timer ();
52 ACE_DEBUG ((LM_INFO
, " (%t) closing down the test\n"));
53 options
.print_results ();
55 ACE_Reactor::end_event_loop();
62 ACE_UPIPE_Stream c_stream
;
63 ACE_UPIPE_Addr
c_addr (ACE_TEXT ("/tmp/conupipe"));
65 int verb
= options
.verbose ();
66 int msiz
= ACE_Utils::truncate_cast
<int> (options
.message_size ());
67 time_t secs
, par1
, par2
;
71 cout
<< "consumer starting connect" << endl
;
73 ACE_UPIPE_Connector con
;
75 if (con
.connect (c_stream
, c_addr
) == -1)
76 ACE_DEBUG ((LM_INFO
, " (%t) connect failed\n"));
78 cout
<< "consumer :we're connected" << endl
;
80 ACE_Message_Block
*mb_p
;
84 ACE_OS::time (&currsec
);
88 while (done
== 0 && (c_stream
.recv (mb_p
) != -1))
90 if (mb_p
->length () > 1)
95 cout
<< " consumer received message !!!!!! "
96 << mb_p
->rd_ptr () << endl
;
103 cout
<< "consumer got last mb"
104 << (char) * (mb_p
->rd_ptr ()) << endl
;
111 ACE_OS::time (&currsec
);
120 ACE_TEXT ("consumer got %d messages of size %d ")
121 ACE_TEXT ("within %: seconds\n"),
125 cout
<< "consumer terminating " << endl
;
130 supplier (void *dummy
)
132 ACE_UPIPE_Stream s_stream
;
133 ACE_UPIPE_Addr
serv_addr (ACE_TEXT ("/tmp/supupipe"));
134 ACE_UPIPE_Connector con
;
136 int iter
= ACE_Utils::truncate_cast
<int> (options
.iterations ());
137 int verb
= options
.verbose ();
138 int msiz
= ACE_Utils::truncate_cast
<int> (options
.message_size ());
139 cout
<< "supplier starting connect" << endl
;
141 if (con
.connect (s_stream
, serv_addr
) == -1)
142 ACE_DEBUG ((LM_INFO
, " (%t) connect failed\n"));
144 cout
<< "supplier : we're connected" << endl
;
147 ACE_Message_Block
* mb_p
;
151 mb_p
= new ACE_Message_Block (msiz
);
152 ACE_OS::strcpy (mb_p
->rd_ptr (), (char *) dummy
);
155 cout
<< "supplier sending 1 message_block" << endl
;
156 if (s_stream
.send (mb_p
) == -1)
158 cout
<< "supplier send failed" << endl
;
164 mb_p
= new ACE_Message_Block (10);
166 *mb_p
->rd_ptr () = 'g';
168 cout
<< "supplier sending last message_block" << endl
;
170 if (s_stream
.send (mb_p
) == -1)
172 cout
<< "supplier send last mb failed" << endl
;
175 mb_p
= new ACE_Message_Block (10);
179 cout
<< "supplier sending very last message_block" << endl
;
181 if (s_stream
.send (mb_p
) == -1)
183 cout
<< "supplier send very last mb failed" << endl
;
188 cout
<< "supplier terminating" << endl
;
193 ACE_TMAIN (int argc
, ACE_TCHAR
*argv
[])
195 ACE_Service_Config daemon
;
197 options
.parse_args (argc
, argv
);
198 options
.start_timer ();
200 // Primary ACE_Stream for EVENT_SERVER application.
201 MT_Stream event_server
;
203 // Enable graceful shutdowns....
204 Quit_Handler quit_handler
;
206 // Create the modules..
208 MT_Module
*sr
= new MT_Module (ACE_TEXT ("Supplier_Router"),
209 new Supplier_Router (ACE_Thread_Manager::instance ()));
210 MT_Module
*ea
= new MT_Module (ACE_TEXT ("Event_Analyzer"),
213 MT_Module
*cr
= new MT_Module (ACE_TEXT ("Consumer_Router"),
214 0, // 0 triggers the creation of a ACE_Thru_Task...
215 new Consumer_Router (ACE_Thread_Manager::instance ()));
217 // Push the modules onto the event_server stream.
219 if (event_server
.push (sr
) == -1)
220 ACE_ERROR_RETURN ((LM_ERROR
, ACE_TEXT ("%p\n"),
221 ACE_TEXT ("push (Supplier_Router)")), -1);
223 if (event_server
.push (ea
) == -1)
224 ACE_ERROR_RETURN ((LM_ERROR
, ACE_TEXT ("%p\n"),
225 ACE_TEXT ("push (Event_Analyzer)")), -1);
227 if (event_server
.push (cr
) == -1)
228 ACE_ERROR_RETURN ((LM_ERROR
, ACE_TEXT ("%p\n"),
229 ACE_TEXT ("push (Consumer_Router)")), -1);
231 // Set the high and low water marks appropriately.
233 int wm
= ACE_Utils::truncate_cast
<int> (options
.low_water_mark ());
235 if (event_server
.control (ACE_IO_Cntl_Msg::SET_LWM
, &wm
) == -1)
236 ACE_ERROR_RETURN ((LM_ERROR
, ACE_TEXT ("%p\n"),
237 ACE_TEXT ("push (setting low watermark)")), -1);
239 wm
= ACE_Utils::truncate_cast
<int> (options
.high_water_mark ());
241 if (event_server
.control (ACE_IO_Cntl_Msg::SET_HWM
, &wm
) == -1)
242 ACE_ERROR_RETURN ((LM_ERROR
, ACE_TEXT ("%p\n"),
243 ACE_TEXT ("push (setting high watermark)")), -1);
245 // spawn the two threads.
247 if (ACE_Thread_Manager::instance ()->spawn (ACE_THR_FUNC (consumer
), (void *) 0,
248 THR_NEW_LWP
| THR_DETACHED
) == -1)
249 ACE_ERROR_RETURN ((LM_ERROR
, ACE_TEXT ("%p\n"), ACE_TEXT ("spawn")), 1);
251 else if (ACE_Thread_Manager::instance ()->spawn (ACE_THR_FUNC (supplier
), (void *) "hello",
252 THR_NEW_LWP
| THR_DETACHED
) == -1)
253 ACE_ERROR_RETURN ((LM_ERROR
, ACE_TEXT ("%p\n"), ACE_TEXT ("spawn")), 1);
255 // Perform the main event loop waiting for the user to type ^C or to
256 // enter a line on the ACE_STDIN.
258 ACE_Reactor::instance ()->run_reactor_event_loop ();
260 ACE_DEBUG ((LM_DEBUG
, ACE_TEXT ("main exiting\n")));
266 ACE_TMAIN (int, ACE_TCHAR
*[])
268 ACE_ERROR_RETURN ((LM_ERROR
,
269 ACE_TEXT ("test not defined for this platform\n")),
272 #endif /* ACE_HAS_THREADS */