Cleanup ACE_HAS_PTHREAD_SIGMASK_PROTOTYPE, all platforms support it so far as I can...
[ACE_TAO.git] / ACE / tests / Message_Queue_Notifications_Test.cpp
blob7ea1d2971eb8c3e105f472e697cf23dfaac208f8
2 //=============================================================================
3 /**
4 * @file Message_Queue_Notifications_Test.cpp
6 * There are two tests that test 2 different notification
7 * mechanisms in Message Queue.
9 * The first test illustrates the notification mechanisms in
10 * Message_Queue and its integration with Reactor.
12 * Note the following things about this part of the test:
14 * 1. Multiple threads are not required.
15 * 2. You do not have to explicitly notify the Reactor
16 * 3. This code will work the same with any Reactor Implementation
17 * 4. handle_input, handle_exception, handle_output are the only
18 * callbacks supported by this mechanism
19 * 5. The notification mechanism need not notify the Reactor. You can
20 * write your own strategy classes that can do whatever application
21 * specific behavior you want.
23 * The second test also makes sure the high/low water mark
24 * signaling mechanism works flawlessly.
26 * @author Irfan Pyarali <irfan@cs.wustl.edu> and Nanbor Wang <nanbor@cs.wustl.edu>
28 //=============================================================================
31 #include "test_config.h"
32 #include "ace/Reactor.h"
33 #include "ace/Task.h"
34 #include "ace/Reactor_Notification_Strategy.h"
35 #include "ace/Atomic_Op.h"
36 #include "ace/Barrier.h"
37 #include "ace/Synch_Traits.h"
38 #include "ace/Null_Condition.h"
39 #include "ace/Null_Mutex.h"
40 #include "ace/OS_NS_string.h"
41 #include "ace/OS_NS_unistd.h"
44 static int iterations = 10;
46 static const size_t worker_threads = 2;
47 static const char * default_message = "ACE RULES";
48 static const size_t default_high_water_mark = 20;
49 static const size_t default_low_water_mark = 10;
50 static const int watermark_iterations = 2 * default_high_water_mark;
52 /**
53 * @class Message_Handler
55 * @brief This class implements a notification strategy for the Reactor.
57 class Message_Handler : public ACE_Task<ACE_NULL_SYNCH>
59 public:
60 /// Constructor.
61 Message_Handler (ACE_Reactor &reactor);
63 // = Demuxing hooks.
64 int handle_input (ACE_HANDLE) override;
65 int handle_output (ACE_HANDLE fd = ACE_INVALID_HANDLE) override;
66 int handle_exception (ACE_HANDLE fd = ACE_INVALID_HANDLE) override;
68 private:
69 int process_message ();
70 void make_message ();
72 ACE_Reactor_Notification_Strategy notification_strategy_;
75 /**
76 * @class Watermark_Test
78 * @brief This class test the correct functioning of build-in flow
79 * control machanism in ACE_Task.
81 class Watermark_Test : public ACE_Task<ACE_SYNCH>
83 public:
84 Watermark_Test ();
86 int svc () override;
88 int consumer ();
89 int producer ();
90 int put_message (ACE_Time_Value* timeout = 0);
91 int get_message ();
92 void print_producer_debug_message ();
94 private:
95 const size_t len_;
96 const size_t hwm_;
97 const size_t lwm_;
98 ACE_Atomic_Op <ACE_SYNCH_MUTEX, int> role_;
99 #if defined (ACE_HAS_THREADS)
100 ACE_Barrier mq_full_;
101 ACE_Barrier mq_low_water_mark_hit_;
102 #endif /* ACE_HAS_THREADS */
105 Message_Handler::Message_Handler (ACE_Reactor &reactor)
106 // First time handle_input will be called
107 : notification_strategy_ (&reactor,
108 this,
109 ACE_Event_Handler::READ_MASK)
111 this->msg_queue ()->notification_strategy (&this->notification_strategy_);
112 this->make_message ();
116 Message_Handler::handle_input (ACE_HANDLE)
118 ACE_DEBUG ((LM_DEBUG,
119 ACE_TEXT ("Message_Handler::handle_input\n")));
121 // Next time handle_output will be called.
122 this->notification_strategy_.mask (ACE_Event_Handler::WRITE_MASK);
124 return process_message ();
128 Message_Handler::handle_output (ACE_HANDLE fd)
130 ACE_DEBUG ((LM_DEBUG,
131 ACE_TEXT ("Message_Handler::handle_output\n")));
132 ACE_UNUSED_ARG (fd);
134 // Next time handle_exception will be called.
135 this->notification_strategy_.mask (ACE_Event_Handler::EXCEPT_MASK);
137 return process_message ();
141 Message_Handler::handle_exception (ACE_HANDLE fd)
143 ACE_DEBUG ((LM_DEBUG,
144 ACE_TEXT ("Message_Handler::handle_exception\n")));
145 ACE_UNUSED_ARG (fd);
147 // Next time handle_input will be called.
148 this->notification_strategy_.mask (ACE_Event_Handler::READ_MASK);
150 return this->process_message ();
154 Message_Handler::process_message ()
156 ACE_Message_Block *mb = 0;
158 if (this->getq (mb,
159 (ACE_Time_Value *) &ACE_Time_Value::zero) == -1)
160 ACE_ERROR_RETURN ((LM_ERROR,
161 ACE_TEXT ("%p\n"),
162 ACE_TEXT ("dequeue_head")),
163 -1);
164 else
166 ACE_DEBUG ((LM_DEBUG,
167 ACE_TEXT ("message received = %s\n"),
168 mb->rd_ptr ()));
169 mb->release ();
172 this->make_message ();
173 return 0;
176 void
177 Message_Handler::make_message ()
179 if (--iterations > 0)
181 ACE_Message_Block *mb = 0;
182 ACE_NEW (mb,
183 ACE_Message_Block ((char *) ACE_TEXT ("hello")));
185 ACE_DEBUG ((LM_DEBUG,
186 ACE_TEXT ("sending message\n")));
187 this->putq (mb);
191 Watermark_Test::Watermark_Test ()
192 : len_ (ACE_OS::strlen (default_message) + 1),
193 hwm_ (this->len_ * default_high_water_mark),
194 lwm_ (this->len_ * default_low_water_mark),
195 role_ (0)
196 #if defined (ACE_HAS_THREADS)
197 , mq_full_ (worker_threads),
198 mq_low_water_mark_hit_ (worker_threads)
199 #endif /* ACE_HAS_THREADS */
201 this->water_marks (ACE_IO_Cntl_Msg::SET_LWM,
202 this->lwm_);
203 this->water_marks (ACE_IO_Cntl_Msg::SET_HWM,
204 this->hwm_);
208 Watermark_Test::producer ()
210 int i = watermark_iterations;
212 for (ssize_t hwm = this->hwm_;
213 hwm >= 0 ;
214 hwm -= this->len_)
216 this->put_message ();
217 this->print_producer_debug_message ();
218 i--;
219 if (this->msg_queue ()->is_full ())
220 break;
222 ACE_DEBUG ((LM_DEBUG,
223 ACE_TEXT ("(%P|%t) Producer: High water mark hit ----\n")));
225 ACE_MT (this->mq_full_.wait ());
227 // The following put_message should block until the message queue
228 // has dropped under the lwm.
229 this->put_message ();
231 ACE_TEST_ASSERT (this->msg_queue ()-> message_bytes () <= this->lwm_ + this->len_);
233 this->print_producer_debug_message ();
235 for (i--; i >= 0 ; i--)
237 this->put_message ();
238 this->print_producer_debug_message ();
241 return 0;
245 Watermark_Test::consumer ()
247 ACE_MT (this->mq_full_.wait ());
249 ACE_OS::sleep (1);
251 // Let producer proceed and block in putq.
253 for (int i = watermark_iterations; i >= 0; i--)
255 this->get_message ();
256 ACE_OS::sleep (0);
259 return 0;
263 Watermark_Test::get_message ()
265 ACE_Message_Block *mb = 0;
267 if (this->getq (mb) == -1)
268 ACE_ERROR_RETURN ((LM_ERROR,
269 ACE_TEXT ("%p\n"),
270 ACE_TEXT ("dequeue_head")),
271 -1);
272 else
274 ACE_DEBUG ((LM_DEBUG,
275 ACE_TEXT ("(%P|%t) Consumer: message size = %3d, ")
276 ACE_TEXT ("message count = %3d\n"),
277 this->msg_queue ()-> message_bytes (),
278 this->msg_queue ()-> message_count ()));
279 mb->release ();
282 return 0;
286 Watermark_Test::put_message (ACE_Time_Value *timeout)
288 ACE_Message_Block *mb = 0;
290 ACE_NEW_RETURN (mb,
291 ACE_Message_Block (default_message,
292 this->len_),
293 -1);
295 return this->putq (mb, timeout);
298 void
299 Watermark_Test::print_producer_debug_message ()
301 ACE_DEBUG ((LM_DEBUG,
302 ACE_TEXT ("(%P|%t) Producer: message size = %3d, ")
303 ACE_TEXT ("message count = %3d\n"),
304 this->msg_queue ()-> message_bytes (),
305 this->msg_queue ()-> message_count ()));
309 Watermark_Test::svc ()
311 // this->role_ is an Atomic_Op object.
312 int role = this->role_++;
314 switch (role)
316 case 0:
317 this->producer ();
318 break;
319 case 1:
320 this->consumer ();
321 break;
322 default:
323 break;
325 return 0;
329 run_main (int, ACE_TCHAR *[])
331 ACE_START_TEST (ACE_TEXT ("Message_Queue_Notifications_Test"));
333 ACE_DEBUG ((LM_DEBUG,
334 ACE_TEXT ("Starting message queue reactive notification test...\n")));
336 ACE_Reactor reactor;
337 Message_Handler mh (reactor);
339 while (iterations > 0)
340 reactor.handle_events ();
342 #if defined (ACE_HAS_THREADS)
343 ACE_DEBUG ((LM_DEBUG,
344 ACE_TEXT ("Starting message queue watermark test...\n")));
345 Watermark_Test watermark_test;
346 ACE_DEBUG ((LM_DEBUG,
347 ACE_TEXT ("High water mark is %d\n")
348 ACE_TEXT ("Low water mark is %d\n"),
349 default_high_water_mark,
350 default_low_water_mark));
352 watermark_test.activate (THR_NEW_LWP,
353 worker_threads);
355 ACE_Thread_Manager::instance ()->wait ();
356 #else
357 ACE_DEBUG ((LM_INFO,
358 ACE_TEXT ("Message queue watermark test not performed because threads are not supported\n")));
359 #endif /* ACE_HAS_THREADS */
361 ACE_END_TEST;
362 return 0;