2 //=============================================================================
4 * @file Message_Queue_Notifications_Test.cpp
6 * There are two tests that test 2 different notification
7 * mechanisms in Message Queue.
9 * The first test illustrates the notification mechanisms in
10 * Message_Queue and its integration with Reactor.
12 * Note the following things about this part of the test:
14 * 1. Multiple threads are not required.
15 * 2. You do not have to explicitly notify the Reactor
16 * 3. This code will work the same with any Reactor Implementation
17 * 4. handle_input, handle_exception, handle_output are the only
18 * callbacks supported by this mechanism
19 * 5. The notification mechanism need not notify the Reactor. You can
20 * write your own strategy classes that can do whatever application
21 * specific behavior you want.
23 * The second test also makes sure the high/low water mark
24 * signaling mechanism works flawlessly.
26 * @author Irfan Pyarali <irfan@cs.wustl.edu> and Nanbor Wang <nanbor@cs.wustl.edu>
28 //=============================================================================
31 #include "test_config.h"
32 #include "ace/Reactor.h"
34 #include "ace/Reactor_Notification_Strategy.h"
35 #include "ace/Atomic_Op.h"
36 #include "ace/Barrier.h"
37 #include "ace/Synch_Traits.h"
38 #include "ace/Null_Condition.h"
39 #include "ace/Null_Mutex.h"
40 #include "ace/OS_NS_string.h"
41 #include "ace/OS_NS_unistd.h"
45 static int iterations
= 10;
47 static const size_t worker_threads
= 2;
48 static const char * default_message
= "ACE RULES";
49 static const size_t default_high_water_mark
= 20;
50 static const size_t default_low_water_mark
= 10;
51 static const int watermark_iterations
= 2 * default_high_water_mark
;
54 * @class Message_Handler
56 * @brief This class implements a notification strategy for the Reactor.
58 class Message_Handler
: public ACE_Task
<ACE_NULL_SYNCH
>
62 Message_Handler (ACE_Reactor
&reactor
);
65 int handle_input (ACE_HANDLE
) override
;
66 int handle_output (ACE_HANDLE fd
= ACE_INVALID_HANDLE
) override
;
67 int handle_exception (ACE_HANDLE fd
= ACE_INVALID_HANDLE
) override
;
70 int process_message ();
73 ACE_Reactor_Notification_Strategy notification_strategy_
;
77 * @class Watermark_Test
79 * @brief This class test the correct functioning of build-in flow
80 * control machanism in ACE_Task.
82 class Watermark_Test
: public ACE_Task
<ACE_SYNCH
>
91 int put_message (ACE_Time_Value
* timeout
= 0);
93 void print_producer_debug_message ();
99 ACE_Atomic_Op
<ACE_SYNCH_MUTEX
, int> role_
;
100 #if defined (ACE_HAS_THREADS)
101 ACE_Barrier mq_full_
;
102 ACE_Barrier mq_low_water_mark_hit_
;
103 #endif /* ACE_HAS_THREADS */
106 Message_Handler::Message_Handler (ACE_Reactor
&reactor
)
107 // First time handle_input will be called
108 : notification_strategy_ (&reactor
,
110 ACE_Event_Handler::READ_MASK
)
112 this->msg_queue ()->notification_strategy (&this->notification_strategy_
);
113 this->make_message ();
117 Message_Handler::handle_input (ACE_HANDLE
)
119 ACE_DEBUG ((LM_DEBUG
,
120 ACE_TEXT ("Message_Handler::handle_input\n")));
122 // Next time handle_output will be called.
123 this->notification_strategy_
.mask (ACE_Event_Handler::WRITE_MASK
);
125 return process_message ();
129 Message_Handler::handle_output (ACE_HANDLE fd
)
131 ACE_DEBUG ((LM_DEBUG
,
132 ACE_TEXT ("Message_Handler::handle_output\n")));
135 // Next time handle_exception will be called.
136 this->notification_strategy_
.mask (ACE_Event_Handler::EXCEPT_MASK
);
138 return process_message ();
142 Message_Handler::handle_exception (ACE_HANDLE fd
)
144 ACE_DEBUG ((LM_DEBUG
,
145 ACE_TEXT ("Message_Handler::handle_exception\n")));
148 // Next time handle_input will be called.
149 this->notification_strategy_
.mask (ACE_Event_Handler::READ_MASK
);
151 return this->process_message ();
155 Message_Handler::process_message ()
157 ACE_Message_Block
*mb
= 0;
160 (ACE_Time_Value
*) &ACE_Time_Value::zero
) == -1)
161 ACE_ERROR_RETURN ((LM_ERROR
,
163 ACE_TEXT ("dequeue_head")),
167 ACE_DEBUG ((LM_DEBUG
,
168 ACE_TEXT ("message received = %s\n"),
173 this->make_message ();
178 Message_Handler::make_message ()
180 if (--iterations
> 0)
182 ACE_Message_Block
*mb
= 0;
184 ACE_Message_Block ((char *) ACE_TEXT ("hello")));
186 ACE_DEBUG ((LM_DEBUG
,
187 ACE_TEXT ("sending message\n")));
192 Watermark_Test::Watermark_Test ()
193 : len_ (ACE_OS::strlen (default_message
) + 1),
194 hwm_ (this->len_
* default_high_water_mark
),
195 lwm_ (this->len_
* default_low_water_mark
),
197 #if defined (ACE_HAS_THREADS)
198 , mq_full_ (worker_threads
),
199 mq_low_water_mark_hit_ (worker_threads
)
200 #endif /* ACE_HAS_THREADS */
202 this->water_marks (ACE_IO_Cntl_Msg::SET_LWM
,
204 this->water_marks (ACE_IO_Cntl_Msg::SET_HWM
,
209 Watermark_Test::producer ()
211 int i
= watermark_iterations
;
213 for (ssize_t hwm
= this->hwm_
;
217 this->put_message ();
218 this->print_producer_debug_message ();
220 if (this->msg_queue ()->is_full ())
224 ACE_DEBUG ((LM_DEBUG
,
225 ACE_TEXT ("(%P|%t) Producer: High water mark hit ----\n")));
227 ACE_MT (this->mq_full_
.wait ());
229 // The following put_message should block until the message queue
230 // has dropped under the lwm.
231 this->put_message ();
233 ACE_TEST_ASSERT (this->msg_queue ()-> message_bytes () <= this->lwm_
+ this->len_
);
235 this->print_producer_debug_message ();
237 for (i
--; i
>= 0 ; i
--)
239 this->put_message ();
240 this->print_producer_debug_message ();
247 Watermark_Test::consumer ()
249 ACE_MT (this->mq_full_
.wait ());
253 // Let producer proceed and block in putq.
255 for (int i
= watermark_iterations
; i
>= 0; i
--)
257 this->get_message ();
265 Watermark_Test::get_message ()
267 ACE_Message_Block
*mb
= 0;
269 if (this->getq (mb
) == -1)
270 ACE_ERROR_RETURN ((LM_ERROR
,
272 ACE_TEXT ("dequeue_head")),
276 ACE_DEBUG ((LM_DEBUG
,
277 ACE_TEXT ("(%P|%t) Consumer: message size = %3d, ")
278 ACE_TEXT ("message count = %3d\n"),
279 this->msg_queue ()-> message_bytes (),
280 this->msg_queue ()-> message_count ()));
288 Watermark_Test::put_message (ACE_Time_Value
*timeout
)
290 ACE_Message_Block
*mb
= 0;
293 ACE_Message_Block (default_message
,
297 return this->putq (mb
, timeout
);
301 Watermark_Test::print_producer_debug_message ()
303 ACE_DEBUG ((LM_DEBUG
,
304 ACE_TEXT ("(%P|%t) Producer: message size = %3d, ")
305 ACE_TEXT ("message count = %3d\n"),
306 this->msg_queue ()-> message_bytes (),
307 this->msg_queue ()-> message_count ()));
311 Watermark_Test::svc ()
313 // this->role_ is an Atomic_Op object.
314 int role
= this->role_
++;
331 run_main (int, ACE_TCHAR
*[])
333 ACE_START_TEST (ACE_TEXT ("Message_Queue_Notifications_Test"));
335 ACE_DEBUG ((LM_DEBUG
,
336 ACE_TEXT ("Starting message queue reactive notification test...\n")));
339 Message_Handler
mh (reactor
);
341 while (iterations
> 0)
342 reactor
.handle_events ();
344 #if defined (ACE_HAS_THREADS)
345 ACE_DEBUG ((LM_DEBUG
,
346 ACE_TEXT ("Starting message queue watermark test...\n")));
347 Watermark_Test watermark_test
;
348 ACE_DEBUG ((LM_DEBUG
,
349 ACE_TEXT ("High water mark is %d\n")
350 ACE_TEXT ("Low water mark is %d\n"),
351 default_high_water_mark
,
352 default_low_water_mark
));
354 watermark_test
.activate (THR_NEW_LWP
,
357 ACE_Thread_Manager::instance ()->wait ();
360 ACE_TEXT ("Message queue watermark test not performed because threads are not supported\n")));
361 #endif /* ACE_HAS_THREADS */